Load Data
Load by Data Source
Data Pipeline Load of Parquet Data from S3
a common setup for loading files in a batch into the {{ocient}} system is to load from a bucket on {{aws}} s3 with time partitioned data often, you must perform a batch load repeatedly to load new files ocient uses data pipelines to transform each document into rows in one or more different tables the loading and transformation capabilities use a simple sql like syntax for transforming data this tutorial guides you through a simple example load using a small data set in {{parquet}} format the data in this example comes from a test set for the {{metabase}} business intelligence tool parquet loading recommendations follow this set of recommendations for an optimal loading experience of parquet files file configuration files should have row groups of less than 128 mb larger row groups can impact memory usage during loading, and row groups of 512 mb can cause loading failures on 1 tb or more data sets encoding fields in a parquet file reduces the space of the file on disk but can impact memory usage during loading enable encoding on fields that you expect to have less than 256 unique values and for fields that contain short strings you do not have to encode other fields multiple files you can load row groups of multiple parquet files in parallel for large data sets, load the data set as multiple files loading files with differing schemas is not supported prerequisites this tutorial assumes that the ocient system has network access to s3 from the loader nodes an ocient system is installed and configured with an active storage cluster for details, see docid 1kyq4 xppwa7pejdm4ysc step 1 create a new database to begin, load two example tables in a database first, connect to a sql node using the docid 7uosju7ajx4yd61vqbhqu then, execute the create database sql statement to create the metabase database create database metabase; step 2 create a new table in the database create the orders table in the new database first, connect to that database (e g , connect to jdbc\ ocient //sql node 4050/metabase ), and then execute this create table sql statement that specifies to create a table with these columns and a clustering index based on the user id and product id columns created at as a {{timekey}} timestamp that is not nullable id , user id , and product id as integers that are not nullable subtotal , tax , total , and discount as floating point numbers quantity as an integer create table public orders ( created at timestamp time key bucket(30, day) not null, id int not null, user id int not null, product id int not null, subtotal double, tax double, total double, discount double, quantity int, clustering index idx01 (user id, product id) ); the database creates the orders table, and you can begin loading data step 3 preview and create a data pipeline create data pipelines using the create pipeline sql statement to load data, you first create a pipeline with the definition of the source, data format, and transformation rules using a sql like declarative syntax then, you execute the start pipeline sql statement to start the load you can observe progress and status using system catalog tables and views each ocient pipeline defines a single data source and the target table or tables into which data loads a data source includes the location of the source and filters on the source to define the specific data set to load this tutorial loads data from two data sources, where each source is located in a directory on the same s3 bucket first, inspect the data that you plan to load each document has a json format similar to this example {"id" 1, "user id" 1, "product id" 14, "subtotal" 37 65, "tax" 2 07, "total" 39 72, "discount" null, "created at" "2019 02 11t21 40 27 892z", "quantity" 2} {"id" 2, "user id" 1, "product id" 123, "subtotal" 110 93, "tax" 6 1, "total" 117 03, "discount" null, "created at" "2018 05 15t08 04 04 580z", "quantity" 3} inspecting the file with pandas, you can see this schema with details parquet schema description # column non null count dtype \ 0 id 18760 non null int64 1 user id 18760 non null int64 2 product id 18760 non null int64 3 subtotal 18760 non null float64 4 tax 18760 non null float64 5 total 18760 non null float64 6 discount 1915 non null float64 7 created at 18760 non null datetime64\[ns, utc] 8 quantity 18760 non null int64 in this case, ocient automatically transforms the data to the target columns using some sensible conventions in other cases, loads require some transformation most transformations are identical to functions that already exist in the sql syntax of the ocient system prior to creating a pipeline, you can use the preview pipeline sql statement to create your pipeline iteratively this statement returns a result set that shows the final values that would be loaded but does not load the data into the target table preview the orders pipeline pipeline for the orders data set from your database connection prompt use the s3 data source with endpoint https //s3 us east 1 amazonaws com , bucket ocient examples , and filter metabase samples/parquet/orders parquet specify the parquet format load the data into the public orders table the select part of the sql statement maps the fields in the parquet file to the target columns in the created table in this example, limit the result set to the first five records in the data source preview pipeline orders pipeline source s3 endpoint 'https //s3 us east 1 amazonaws com' bucket 'ocient examples' filter 'metabase samples/parquet/orders parquet' limit 5 extract format parquet into public orders select $id as id, $user id as user id, $product id as product id, $subtotal as subtotal, $tax as tax, $total as total, $discount as discount, $created at as created at, $quantity as quantity; output sql id user id product id subtotal tax total discount created at quantity \ 1 1 14 37 65 2 07 39 72 null 2019 02 11 21 40 27 892000000 2 8 1 123 110 93 6 1 117 03 null 2018 05 15 08 04 04 580000000 3 8 1 105 52 72 2 9 49 2 6 42 2019 12 06 22 22 48 544000000 2 8 1 94 109 22 6 01 115 23 null 2019 08 22 16 30 42 392000000 6 8 1 132 127 88 7 03 134 91 null 2018 10 10 03 34 47 309000000 5 fetched 5 rows with this preview, you can confirm that the results of the pipeline match your requirements if there is an issue, you can update the statement and run the preview pipeline statement again until it meets your needs next, create the pipeline named orders pipeline for the orders data set from your database connection prompt the pipeline has three main sections source — l oads data from s3 set the s3 endpoint, bucket name, and filter for the parquet files extract — sets the format to parquet into select — targets the public orders table and selects the chosen fields from the parquet records in this case, all data is available at the top level of the parquet records, so the example references the fields by the attribute name (e g , $id , $user id , etc ) for nested data, reference the nested fields using dot notation (e g , $order user first name ) each field maps to a target column using the as syntax sql create pipeline orders pipeline source s3 endpoint 'https //s3 us east 1 amazonaws com' bucket 'ocient examples' filter 'metabase samples/parquet/orders parquet' extract format parquet into public orders select $id as id, $user id as user id, $product id as product id, $subtotal as subtotal, $tax as tax, $total as total, $discount as discount, $created at as created at, $quantity as quantity; after you successfully create this pipeline, execute the start pipeline sql statement to start the load sql start pipeline orders pipeline; step 4 observe the load progress with your pipeline running, data begins to load immediately from the s3 files that you defined if there are many files in each file group, the load process first sorts the files into batches, partitions them for parallel processing, and assigns them to loader nodes you can check the pipeline status and progress by querying the information schema pipeline status system catalog table or executing the show pipeline status sql statement sql show pipeline status; output sql database name pipeline name table names status status message percent complete duration seconds files processed files failed files remaining records processed records loaded records failed \ metabase orders pipeline \["public orders"] running started processing pipeline orders pipeline 0 0 2 025824 0 0 1 0 0 0 after the status of the pipeline changes to completed , all data is available in the target table after a few seconds, the data is available for query in the public orders table sql select count( ) from public orders; output sql count( ) \ 18760 you can drop the pipeline with the drop pipeline orders pipeline; sql statement execution of this statement leaves the data in your target table, but removes metadata about the pipeline execution from the system related links docid\ l8tdfpfzzvzeyabc2h7bq docid 1 jhzrhblgnqucl6skiaq docid 7s5nztl8lolpwt2pcnjyz docid\ aimcafoydn2xf fgqssys docid\ ay3earmjsfi wkxd bwmv