Load Data
Load Data into Multiple Targets in Data Pipelines
a common setup for loading files in a batch into {{ocient}} is to load from a bucket on {{aws}} s3 with time partitioned data often, you must perform a batch load repeatedly to load new files ocient uses data pipelines to transform each document into rows in one or more different tables the loading and transformation capabilities use a simple sql like syntax for transforming data this tutorial guides you through a load into multiple target tables using a small data set in csv format the data in this example comes from a test set for the {{metabase}} business intelligence tool prerequisites this tutorial assumes that the ocient system has network access to s3 from the loader nodes an ocient system is installed and configured with an active storage cluster (see the docid q3srqnu4dww9on6fldj guide) step 1 create a new database connect to a sql node using the docid\ zyojxip3zkuvr9skpiyxo then, execute the create database sql statement for a database named metabase create database metabase; step 2 create tables create two target tables for loading data for two different products create the product129 table in the new database first, connect to that database (e g , connect to jdbc\ ocient //sql node 4050/metabase ), and then execute this create table sql statement that specifies to create a table with these columns and a clustering index based on the user id and product id columns created at as a {{timekey}} timestamp that is not nullable id , user id , and product id as integers that are not nullable subtotal , tax , total , and discount as floating point numbers quantity as an integer create table public product129 ( created at timestamp time key bucket(30, day) not null, id int not null, user id int not null, product id int not null, subtotal double, tax double, total double, discount double, quantity int, clustering index idx01 (user id, product id) ); create the second product161 table in the same way create table public product161 ( created at timestamp time key bucket(30, day) not null, id int not null, user id int not null, product id int not null, subtotal double, tax double, total double, discount double, quantity int, clustering index idx01 (user id, product id) ); the database creates the product129 and product161 tables, and you can begin loading data step 3 create a data pipeline create data pipelines using the create pipeline sql statement to load data, you first create a pipeline with the definition of the source, data format, and transformation rules using a sql like declarative syntax then, you execute the start pipeline sql statement to start the load you can observe progress and status using system tables and views each ocient pipeline defines a single data source and the target table or tables into which data loads a data source includes the location of the source and filters on the source to define the specific data set to load this example loads data from a data source located in a directory within the s3 bucket first, inspect the data that you plan to load each document has a format similar to this example csv file named orders csv id,user id,product id,subtotal,tax,total,discount,created at,quantity 1,1,14,37 65,2 07,39 72,null,2019 02 11t21 40 27 892z,2 2,1,123,110 93,6 1,117 03,null,2018 05 15t08 04 04 580z,3 3,1,105,52 72,2 9,49 2,6 42,2019 12 06t22 22 48 544z,2 in this case, ocient automatically transforms the data to the target columns using some sensible conventions in other cases, loads require some transformation most transformations are identical to functions that already exist in the sql dialect of the ocient system create a pipeline named orders pipeline for the orders data set from your database connection prompt use the s3 data source with endpoint https //s3 us east 1 amazonaws com , bucket ocient examples , and filter metabase samples/csv/orders csv specify the csv format with one header line load the data into the two target tables the select part of the sql statement maps the fields in the csv file to the target columns in the created table sql create pipeline orders pipeline source s3 endpoint 'https //s3 us east 1 amazonaws com' bucket 'ocient examples' filter 'metabase samples/csv/orders csv' extract format csv num header lines 1 into public product129 select $1 as id, $2 as user id, $3 as product id, $4 as subtotal, $5 as tax, $6 as total, $7 as discount, $8 as created at, $9 as quantity where $3 = 129 into public product161 select $1 as id, $2 as user id, $3 as product id, $4 as subtotal, $5 as tax, $6 as total, $7 as discount, $8 as created at, $9 as quantity where $3 = 161; the pipeline has three main sections source — in this case, load data from s3 specify the endpoint and bucket the filter parameter identifies the file or files to load the load uses a single csv file options exist to add wildcards or regular expressions to isolate different file sets extract — set the format to csv files and note that there is one header line in the file this specification skips that row when the ocient system processes the file many other options exist for delimited data such as a record delimiter and field delimiter into select — choose the target tables public product129 and public product161 and select the fields from the csv file the numeric index identifies each file field importantly, similar to other sql syntax, the first field in the file is $1 , not $0 each field maps to a target column using the as syntax the where filter instructs the data pipeline to load order data for only two products, one product for each table after you successfully create the orders pipeline pipeline, execute the start pipeline sql statement sql start pipeline orders pipeline; step 4 observe the load progress with your pipeline running, data immediately begins to load from the s3 files that you defined if there are many files in each file group, the load process first sorts the files into batches, partitions them for parallel processing, and assigns them to loader nodes you can check the pipeline status and progress by querying information schema pipeline status or by executing show pipeline status sql show pipeline status; output sql database name pipeline name table names status status message percent complete duration seconds files processed files failed files remaining records processed records loaded records failed \ metabase orders pipeline \["public product129" "public product161"] running started processing pipeline orders pipeline 0 0 2 025824 0 0 1 0 0 0 after the status of the pipeline changes to completed , all data is available in the target table after a few seconds, the data is available for query in the public product129 table select count( ) from public product129; output sql count( ) \ 93 the data is also available for query in the public product161 table select count( ) from public product161; output sql count( ) \ 92 you can drop the pipeline with the drop pipeline orders pipeline; sql statement execution of this statement leaves the data in your target table, but removes metadata about the pipeline execution from the system related links docid\ pbyszqvu5wonpgoso qto docid\ y731i6lhout2b 7mc1jr8 docid\ jfqu osagg5enkvmeesnl docid\ ti3mdibvgmuudmlqu9xpl docid\ xkhtu5p wt1py0q1rq2xv