Skip to main content
The data pipeline functionality in the System enables you to transform data during loading. You can create your own data pipeline function to perform a custom transformation or calculation. This example uses a user-defined data pipeline function to load weather data for Chicago. The function converts temperatures from Fahrenheit to Celsius. The formula for the conversion is °C = (°F - 32) × 5/9. The precipitation type data is an array with zero or more values in lowercase. The data pipeline also utilizes a lambda function to convert the first character of each precipitation type to uppercase. The pipeline uses the metadata filename in the data load as an audit trail. 

Retrieve the Weather Data from the Source

Retrieve year-to-date weather data for Chicago in JSON format from the Weather Query Builder. Here is a sample row of data. The data contains:
  • queryCost — Cost of the query
  • latitude — Latitude of the location
  • longitude — Longitude of the location
  • resolvedAddress — Address of the location with the country
  • address — City and state of the address
  • timezone — Time zone of the location
  • tzoffset — Time zone offset
  • name — Name of the location
  • days — Daily weather data in an array with these fields:
    • datetime — Date of the weather data
    • datetimeEpoch — Date epoch of the weather data
    • tempmax — Maximum temperature of the day
    • tempmin — Minimum temperature of the day
    • temp — Temperature of the day
    • feelslike — Feels like temperature of the day
    • humidity — Humidity for the day
    • precip — Precipitation amount for the day
    • precipprob — Probability of precipitation for the day
    • preciptype — Type of precipitation for the day (array of strings for types such as rain, snow, and so on)
    • windspeed — Wind speed for the day
JSON

    "queryCost" : 212
    "latitude" : 41.8843
    "longitude" : -87.6324
    "resolvedAddress" : "Chicago, IL, United States"
    "address" : "chicago, il"
    "timezone" : "America/Chicago"
    "tzoffset" : -6.0
    "name" : "chicago, il"
    "days" : [ { 
        "datetime" : "2025-01-01"
        "datetimeEpoch" : 1735711200
        "tempmax" : 35.3
        "tempmin" : 24.8
        "temp" : 30.0
        "feelslike" : 19.5
        "humidity" : 63.9
        "precip" : 0.037,
        "precipprob" : 100.0
        "preciptype" : [ "rain", "snow" ], 
        "windspeed" : 17.6 
    } ]
}
Create the chicago_weather_ytd table with these columns:
  • latitude — Latitude of the location
  • longitude — Longitude of the location
  • location_name — Name of the location
  • log_date — Date of the weather event
  • temp — Temperature of the day
  • preciptype — An array of characters for multiple types of weather events
  • file_name — Name of the file that contains the weather data
SQL
CREATE TABLE chicago_weather_ytd(
    "latitude" double, 
    "longitude" double, 
    "location_name" varchar(20), 
    "log_date" date
    "temp" double, 
    "preciptype" varchar(100)[],         
    "file_name" varchar(500)); 

Create a User-Defined Function for Temperature Conversion

Create a user-defined function that converts a degree value from Fahrenheit to Celsius, and round the returned value to one decimal place. Use code to specify the calculation and rounding.
SQL
CREATE PIPELINE FUNCTION f_to_c(f_value DOUBLE NOT NULL
    LANGUAGE GROOVY 
    RETURNS DOUBLE NOT NULL 
    IMPORTS []
    AS $$ 
        return (10 * ((f_value - 32.0) * 5.0 / 9.0)).toInteger() / 10  
    $$; 

Create a Data Pipeline to Load the Weather Data

Build a pipeline by using a preview of the data to load with the PREVIEW PIPELINE SQL statement to see how a limited number of rows loads. Preview four rows. Specify the JSON format. Transform the latitude and longitude to the double type using the DOUBLE cast function. For the date of the weather of the event log_date, expand the input array of days into its elements using the EXPLODE_OUTER function and convert the date to a date type using the DATE cast function. For the temperature of the day, expand the input array of days into its elements using the EXPLODE_OUTER function and convert the temperature to the double type using the DOUBLE cast function. Execute the f_to_c user-defined function to convert the temperature from Fahrenheit to Celsius. For the type of precipitation, expand the input array of days into its elements using the EXPLODE_OUTER function. Then, transform each element of the input array using a lambda function that converts the first character of the precipitation type to uppercase. Use the METADATA function to capture the filename.
SQL
PREVIEW PIPELINE chicago_weather_ytd_pipeline 
SOURCE filesystem 
    FILTER '/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json' 
    LIMIT 4
EXTRACT FORMAT json  
INTO chicago_weather_ytd 
SELECT DOUBLE($latitude) AS latitude,   
    DOUBLE($longitude) AS longitude,   
    $address AS location_name,   
    DATE(EXPLODE_OUTER($days[].datetime)) AS log_date, 
    f_to_c(DOUBLE(EXPLODE_OUTER($days[].temp))) AS temp, 
    EXPLODE_OUTER(TRANSFORM($days[].preciptype[], 
        (x VARCHAR) -> CONCAT(UPPER(SUBSTRING(x,1,1)),SUBSTRING(x,2,20)))) AS preciptype, 
    METADATA('filename') AS file_name; 
Output
Text
latitude              longitude             location_name                                log_date   temp                  preciptype                                                                      file_name 
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
41.8843               -87.6324              chicago, il                                  2025-01-01 -1.1                  [Rain,Snow]                                                                    /tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json 
41.8843               -87.6324              chicago, il                                  2025-01-02 -2.2                  [Rain,Snow]                                                                    /tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json 
41.8843               -87.6324              chicago, il                                  2025-01-03 -4.6                  [Snow]                                                                         /tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json 
41.8843               -87.6324              chicago, il                                  2025-01-04 -7.2                  NULL                                                                           /tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json 
Create the data pipeline by replacing PREVIEW with the CREATE keyword in the SQL statement.
SQL
CREATE PIPELINE chicago_weather_ytd_pipeline
SOURCE filesystem 
FILTER '/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json' 
EXTRACT FORMAT json  
INTO chicago_weather_ytd 
SELECT DOUBLE($latitude) AS latitude,   
    DOUBLE($longitude) AS longitude,   
    $address AS location_name,   
    DATE(EXPLODE_OUTER($days[].datetime)) AS log_date, 
    f_to_c(DOUBLE(EXPLODE_OUTER($days[].temp))) AS temp, 
    EXPLODE_OUTER(TRANSFORM($days[].preciptype[], 
        (x VARCHAR) -> CONCAT(UPPER(SUBSTRING(x,1,1)),SUBSTRING(x,2,20)))) AS preciptype, 
    METADATA('filename') AS file_name; 

Execute the Data Pipeline Load

Start the data pipeline by allowing 100 errors before the pipeline fails. The data pipeline runs to completion.
SQL
START PIPELINE chicago_weather_ytd_pipeline ERROR LIMIT 100
Use the SHOW PIPELINE_STATUS command to view the status of the data pipeline.
SQL
SHOW PIPELINE_STATUS;
Output
Text
database_name|pipeline_name      |table_names                         |status   |status_message                                    |duration_seconds|files_processed|files_failed|files_remaining|files_total|fraction_complete|records_processed|records_loaded|records_failed|
-------------+-------------------+------------------------------------+---------+--------------------------------------------------+----------------+---------------+------------+---------------+-----------+-----------------+-----------------+--------------+--------------+
training     |chicago_weather_ytd|{'admin@system.chicago_weather_ytd'}|COMPLETED|Completed processing pipeline chicago_weather_ytd.|              11|              1|           0|              0|          1|              1.0|              212|           212|             0|
training     |city_weather       |{'admin@system.city_weather'}       |COMPLETED|Completed processing pipeline city_weather.       |              10|              1|           0|              0|          1|              1.0|               48|            48|             0|
training     |counties_load      |{'admin@system.us_counties'}        |COMPLETED|Completed processing pipeline counties_load.      |              11|              1|           0|              0|          1|              1.0|             3429|          3429|             0|
Display five rows of the loaded data sorted by the date of the weather event.
SQL
 SELECT *
 FROM chicago_weather_ytd
 LIMIT 5
 ORDER BY log_date; 
Text
latitude|longitude|location_name|log_date  |temp|preciptype     |file_name                                                      |
--------+---------+-------------+----------+----+---------------+---------------------------------------------------------------+
 41.8843| -87.6324|chicago, il  |2025-01-01|-1.1|{'Rain','Snow'}|/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
 41.8843| -87.6324|chicago, il  |2025-01-02|-2.1|{'Rain','Snow'}|/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
 41.8843| -87.6324|chicago, il  |2025-01-03|-4.6|{'Snow'}       |/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
 41.8843| -87.6324|chicago, il  |2025-01-04|-7.2|NULL           |/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
 41.8843| -87.6324|chicago, il  |2025-01-05|-6.4|{'Snow'}       |/tmp/data/json_files/nested_json/Chicago_Weather_Daily_YTD.json|
Data Pipelines User-Defined Data Pipeline Functions Lambda Functions Load Metadata and File-Based Partitioned Data in Data Pipelines
Last modified on May 27, 2026