Load Data
Load Data with a User-Defined Data Pipeline Function
the data pipeline functionality in the {{ocient}} system enables you to transform data during loading you can create your own data pipeline function to perform a custom transformation or calculation this example uses a user defined data pipeline function to load weather data for chicago the function converts temperatures from fahrenheit to celsius the formula for the conversion is °c = (°f 32) × 5/9 the precipitation type data is an array with zero or more values in lowercase the data pipeline also utilizes a lambda function to convert the first character of each precipitation type to uppercase the pipeline uses the metadata filename in the data load as an audit trail retrieve the weather data from the source retrieve year to date weather data for chicago in json format from the https //www visualcrossing com/weather query builder/ here is a sample row of data the data contains querycost — cost of the query latitude — latitude of the location longitude — longitude of the location resolvedaddress — address of the location with the country address — city and state of the address timezone — time zone of the location tzoffset — time zone offset name — name of the location days — daily weather data in an array with these fields datetime — date of the weather data datetimeepoch — date epoch of the weather data tempmax — maximum temperature of the day tempmin — minimum temperature of the day temp — temperature of the day feelslike — feels like temperature of the day humidity — humidity for the day precip — precipitation amount for the day precipprob — probability of precipitation for the day preciptype — type of precipitation for the day (array of strings for types such as rain, snow, and so on) windspeed — wind speed for the day { "querycost" 212, "latitude" 41 8843, "longitude" 87 6324, "resolvedaddress" "chicago, il, united states", "address" "chicago, il", "timezone" "america/chicago", "tzoffset" 6 0, "name" "chicago, il", "days" \[ { "datetime" "2025 01 01", "datetimeepoch" 1735711200, "tempmax" 35 3, "tempmin" 24 8, "temp" 30 0, "feelslike" 19 5, "humidity" 63 9, "precip" 0 037, "precipprob" 100 0, "preciptype" \[ "rain", "snow" ], "windspeed" 17 6 } ] } create the chicago weather ytd table with these columns latitude — latitude of the location longitude — longitude of the location location name — name of the location log date — date of the weather event temp — temperature of the day preciptype — an array of characters for multiple types of weather events file name — name of the file that contains the weather data create table chicago weather ytd( "latitude" double, "longitude" double, "location name" varchar(20), "log date" date, "temp" double, "preciptype" varchar(100)\[], "file name" varchar(500)); create a user defined function for temperature conversion create a user defined function that converts a degree value from fahrenheit to celsius, and round the returned value to one decimal place use {{groovy}} code to specify the calculation and rounding create pipeline function f to c(f value double not null) language groovy returns double not null imports \[] as $$ return (10 ((f value 32 0) 5 0 / 9 0)) tointeger() / 10 $$; create a data pipeline to load the weather data build a pipeline by using a preview of the data to load with the preview pipeline sql statement to see how a limited number of rows loads preview four rows specify the json format transform the latitude and longitude to the double type using the double cast function for the date of the weather of the event log date , expand the input array of days into its elements using the explode outer function and convert the date to a date type using the date cast function for the temperature of the day, expand the input array of days into its elements using the explode outer function and convert the temperature to the double type using the double cast function execute the f to c user defined function to convert the temperature from fahrenheit to celsius for the type of precipitation, expand the input array of days into its elements using the explode outer function then, transform each element of the input array using a lambda function that converts the first character of the precipitation type to uppercase use the metadata function to capture the filename preview pipeline chicago weather ytd pipeline source filesystem filter '/tmp/data/json files/nested json/chicago weather daily ytd json' limit 4 extract format json into chicago weather ytd select double($latitude) as latitude, double($longitude) as longitude, $address as location name, date(explode outer($days\[] datetime)) as log date, f to c(double(explode outer($days\[] temp))) as temp, explode outer(transform($days\[] preciptype\[], (x varchar) > concat(upper(substring(x,1,1)),substring(x,2,20)))) as preciptype, metadata('filename') as file name; output latitude longitude location name log date temp preciptype file name \ 41 8843 87 6324 chicago, il 2025 01 01 1 1 \[rain,snow] /tmp/data/json files/nested json/chicago weather daily ytd json 41 8843 87 6324 chicago, il 2025 01 02 2 2 \[rain,snow] /tmp/data/json files/nested json/chicago weather daily ytd json 41 8843 87 6324 chicago, il 2025 01 03 4 6 \[snow] /tmp/data/json files/nested json/chicago weather daily ytd json 41 8843 87 6324 chicago, il 2025 01 04 7 2 null /tmp/data/json files/nested json/chicago weather daily ytd json create the data pipeline by replacing preview with the create keyword in the sql statement create pipeline chicago weather ytd pipeline source filesystem filter '/tmp/data/json files/nested json/chicago weather daily ytd json' extract format json into chicago weather ytd select double($latitude) as latitude, double($longitude) as longitude, $address as location name, date(explode outer($days\[] datetime)) as log date, f to c(double(explode outer($days\[] temp))) as temp, explode outer(transform($days\[] preciptype\[], (x varchar) > concat(upper(substring(x,1,1)),substring(x,2,20)))) as preciptype, metadata('filename') as file name; execute the data pipeline load start the data pipeline by allowing 100 errors before the pipeline fails the data pipeline runs to completion start pipeline chicago weather ytd pipeline error limit 100; use the show pipeline status command to view the status of the data pipeline show pipeline status; output database name|pipeline name |table names |status |status message |duration seconds|files processed|files failed|files remaining|files total|fraction complete|records processed|records loaded|records failed| \ + + + + + + + + + + + + + + training |chicago weather ytd|{'admin\@system chicago weather ytd'}|completed|completed processing pipeline chicago weather ytd | 11| 1| 0| 0| 1| 1 0| 212| 212| 0| training |city weather |{'admin\@system city weather'} |completed|completed processing pipeline city weather | 10| 1| 0| 0| 1| 1 0| 48| 48| 0| training |counties load |{'admin\@system us counties'} |completed|completed processing pipeline counties load | 11| 1| 0| 0| 1| 1 0| 3429| 3429| 0| display five rows of the loaded data sorted by the date of the weather event select from chicago weather ytd limit 5 order by log date; latitude|longitude|location name|log date |temp|preciptype |file name | \ + + + + + + + 41 8843| 87 6324|chicago, il |2025 01 01| 1 1|{'rain','snow'}|/tmp/data/json files/nested json/chicago weather daily ytd json| 41 8843| 87 6324|chicago, il |2025 01 02| 2 1|{'rain','snow'}|/tmp/data/json files/nested json/chicago weather daily ytd json| 41 8843| 87 6324|chicago, il |2025 01 03| 4 6|{'snow'} |/tmp/data/json files/nested json/chicago weather daily ytd json| 41 8843| 87 6324|chicago, il |2025 01 04| 7 2|null |/tmp/data/json files/nested json/chicago weather daily ytd json| 41 8843| 87 6324|chicago, il |2025 01 05| 6 4|{'snow'} |/tmp/data/json files/nested json/chicago weather daily ytd json| related links docid\ pbyszqvu5wonpgoso qto docid\ ti3mdibvgmuudmlqu9xpl docid\ ti3mdibvgmuudmlqu9xpl docid\ vqvrmdyk8josxmkfsyprc