> ## Documentation Index
> Fetch the complete documentation index at: https://docs.ocient.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Load Data with a User-Defined Data Pipeline Function

export const Ocient = "Ocient®";

export const Groovy = "Apache® Groovy™";

The data pipeline functionality in the {Ocient} System enables you to transform data during loading. You can create your own data pipeline function to perform a custom transformation or calculation.

This example uses a user-defined data pipeline function to load weather data for Chicago. The function converts temperatures from Fahrenheit to Celsius. The formula for the conversion is `°C = (°F - 32) × 5/9`. The precipitation type data is an array with zero or more values in lowercase. The data pipeline also utilizes a lambda function to convert the first character of each precipitation type to uppercase. The pipeline uses the metadata `filename` in the data load as an audit trail. 

## Retrieve the Weather Data from the Source

Retrieve year-to-date weather data for Chicago in JSON format from the [Weather Query Builder](https://www.visualcrossing.com/weather-query-builder/). Here is a sample row of data. The data contains:

* `queryCost` — Cost of the query
* `latitude` — Latitude of the location
* `longitude` — Longitude of the location
* `resolvedAddress` — Address of the location with the country
* `address` — City and state of the address
* `timezone` — Time zone of the location
* `tzoffset` — Time zone offset
* `name` — Name of the location
* `days` — Daily weather data in an array with these fields:
  * `datetime` — Date of the weather data
  * `datetimeEpoch` — Date epoch of the weather data
  * `tempmax` — Maximum temperature of the day
  * `tempmin` — Minimum temperature of the day
  * `temp` — Temperature of the day
  * `feelslike` — Feels like temperature of the day
  * `humidity` — Humidity for the day
  * `precip` — Precipitation amount for the day
  * `precipprob` — Probability of precipitation for the day
  * `preciptype` — Type of precipitation for the day (array of strings for types such as rain, snow, and so on)
  * `windspeed` — Wind speed for the day

```json JSON theme={null}
{ 
    "queryCost" : 212, 
    "latitude" : 41.8843, 
    "longitude" : -87.6324, 
    "resolvedAddress" : "Chicago, IL, United States", 
    "address" : "chicago, il", 
    "timezone" : "America/Chicago", 
    "tzoffset" : -6.0, 
    "name" : "chicago, il", 
    "days" : [ { 
        "datetime" : "2025-01-01", 
        "datetimeEpoch" : 1735711200, 
        "tempmax" : 35.3, 
        "tempmin" : 24.8, 
        "temp" : 30.0, 
        "feelslike" : 19.5, 
        "humidity" : 63.9, 
        "precip" : 0.037,
        "precipprob" : 100.0, 
        "preciptype" : [ "rain", "snow" ], 
        "windspeed" : 17.6 
    } ]
}
```

Create the `chicago_weather_ytd` table with these columns:

* `latitude` — Latitude of the location
* `longitude`  — Longitude of the location
* `location_name` — Name of the location
* `log_date` — Date of the weather event
* `temp` — Temperature of the day
* `preciptype` — An array of characters for multiple types of weather events
* `file_name` — Name of the file that contains the weather data

```sql SQL theme={null}
CREATE TABLE chicago_weather_ytd(
    "latitude" double, 
    "longitude" double, 
    "location_name" varchar(20), 
    "log_date" date, 
    "temp" double, 
    "preciptype" varchar(100)[],         
    "file_name" varchar(500)); 
```

## Create a User-Defined Function for Temperature Conversion

Create a user-defined function that converts a degree value from Fahrenheit to Celsius, and round the returned value to one decimal place. Use {Groovy} code to specify the calculation and rounding.

```sql SQL theme={null}
CREATE PIPELINE FUNCTION f_to_c(f_value DOUBLE NOT NULL) 
    LANGUAGE GROOVY 
    RETURNS DOUBLE NOT NULL 
    IMPORTS []
    AS $$ 
        return (10 * ((f_value - 32.0) * 5.0 / 9.0)).toInteger() / 10  
    $$; 
```

## Create a Data Pipeline to Load the Weather Data

Build a pipeline by using a preview of the data to load with the `PREVIEW PIPELINE` SQL statement to see how a limited number of rows loads. Preview four rows. Specify the JSON format. Transform the latitude and longitude to the double type using the `DOUBLE` cast function. For the date of the weather of the event `log_date`, expand the input array of days into its elements using the `EXPLODE_OUTER` function and convert the date to a `date` type using the `DATE` cast function. For the temperature of the day, expand the input array of days into its elements using the `EXPLODE_OUTER` function and convert the temperature to the double type using the `DOUBLE` cast function. Execute the `f_to_c` user-defined function to convert the temperature from Fahrenheit to Celsius. For the type of precipitation, expand the input array of days into its elements using the `EXPLODE_OUTER` function. Then, transform each element of the input array using a lambda function that converts the first character of the precipitation type to uppercase. Use the `METADATA` function to capture the filename.

```sql SQL theme={null}
PREVIEW PIPELINE chicago_weather_ytd_pipeline 
SOURCE filesystem 
    FILTER '/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json' 
    LIMIT 4
EXTRACT FORMAT json  
INTO chicago_weather_ytd 
SELECT DOUBLE($latitude) AS latitude,   
    DOUBLE($longitude) AS longitude,   
    $address AS location_name,   
    DATE(EXPLODE_OUTER($days[].datetime)) AS log_date, 
    f_to_c(DOUBLE(EXPLODE_OUTER($days[].temp))) AS temp, 
    EXPLODE_OUTER(TRANSFORM($days[].preciptype[], 
        (x VARCHAR) -> CONCAT(UPPER(SUBSTRING(x,1,1)),SUBSTRING(x,2,20)))) AS preciptype, 
    METADATA('filename') AS file_name; 
```

Output

```none Text theme={null}
latitude              longitude             location_name                                log_date   temp                  preciptype                                                                      file_name 
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
41.8843               -87.6324              chicago, il                                  2025-01-01 -1.1                  [Rain,Snow]                                                                    /tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json 
41.8843               -87.6324              chicago, il                                  2025-01-02 -2.2                  [Rain,Snow]                                                                    /tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json 
41.8843               -87.6324              chicago, il                                  2025-01-03 -4.6                  [Snow]                                                                         /tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json 
41.8843               -87.6324              chicago, il                                  2025-01-04 -7.2                  NULL                                                                           /tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json 
```

Create the data pipeline by replacing `PREVIEW` with the `CREATE` keyword in the SQL statement.

```sql SQL theme={null}
CREATE PIPELINE chicago_weather_ytd_pipeline
SOURCE filesystem 
FILTER '/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json' 
EXTRACT FORMAT json  
INTO chicago_weather_ytd 
SELECT DOUBLE($latitude) AS latitude,   
    DOUBLE($longitude) AS longitude,   
    $address AS location_name,   
    DATE(EXPLODE_OUTER($days[].datetime)) AS log_date, 
    f_to_c(DOUBLE(EXPLODE_OUTER($days[].temp))) AS temp, 
    EXPLODE_OUTER(TRANSFORM($days[].preciptype[], 
        (x VARCHAR) -> CONCAT(UPPER(SUBSTRING(x,1,1)),SUBSTRING(x,2,20)))) AS preciptype, 
    METADATA('filename') AS file_name; 
```

## Execute the Data Pipeline Load

Start the data pipeline by allowing 100 errors before the pipeline fails. The data pipeline runs to completion.

```sql SQL theme={null}
START PIPELINE chicago_weather_ytd_pipeline ERROR LIMIT 100; 
```

Use the `SHOW PIPELINE_STATUS` command to view the status of the data pipeline.

```sql SQL theme={null}
SHOW PIPELINE_STATUS;
```

Output

```none Text theme={null}
database_name|pipeline_name      |table_names                         |status   |status_message                                    |duration_seconds|files_processed|files_failed|files_remaining|files_total|fraction_complete|records_processed|records_loaded|records_failed|
-------------+-------------------+------------------------------------+---------+--------------------------------------------------+----------------+---------------+------------+---------------+-----------+-----------------+-----------------+--------------+--------------+
training     |chicago_weather_ytd|{'admin@system.chicago_weather_ytd'}|COMPLETED|Completed processing pipeline chicago_weather_ytd.|              11|              1|           0|              0|          1|              1.0|              212|           212|             0|
training     |city_weather       |{'admin@system.city_weather'}       |COMPLETED|Completed processing pipeline city_weather.       |              10|              1|           0|              0|          1|              1.0|               48|            48|             0|
training     |counties_load      |{'admin@system.us_counties'}        |COMPLETED|Completed processing pipeline counties_load.      |              11|              1|           0|              0|          1|              1.0|             3429|          3429|             0|
```

Display five rows of the loaded data sorted by the date of the weather event.

```sql SQL theme={null}
 SELECT *
 FROM chicago_weather_ytd
 LIMIT 5
 ORDER BY log_date; 
```

```none Text theme={null}
latitude|longitude|location_name|log_date  |temp|preciptype     |file_name                                                      |
--------+---------+-------------+----------+----+---------------+---------------------------------------------------------------+
 41.8843| -87.6324|chicago, il  |2025-01-01|-1.1|{'Rain','Snow'}|/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
 41.8843| -87.6324|chicago, il  |2025-01-02|-2.1|{'Rain','Snow'}|/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
 41.8843| -87.6324|chicago, il  |2025-01-03|-4.6|{'Snow'}       |/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
 41.8843| -87.6324|chicago, il  |2025-01-04|-7.2|NULL           |/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
 41.8843| -87.6324|chicago, il  |2025-01-05|-6.4|{'Snow'}       |/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
```

## Related Links

[Data Pipelines](/data-pipelines)

[User-Defined Data Pipeline Functions](/transform-data-in-data-pipelines#user-defined-data-pipeline-functions)

[Lambda Functions](/transform-data-in-data-pipelines#lambda-functions)

[Load Metadata and File-Based Partitioned Data in Data Pipelines](/load-metadata-and-file-based-partitioned-data-in-data-pipelines)
