Load Data
Load by Data Source
Data Pipeline Load of JSON Data from Kafka
a common setup for streaming data into {{ocient}} is to load json data from an {{kafka}} topic ocient uses data pipelines to transform each document into rows in one or more different tables the loading and transformation capabilities use a simple sql like syntax for transforming data this tutorial guides you through a simple example load using a small data set in json format the data in this example comes from a test set for the {{metabase}} business intelligence tool prerequisites this tutorial assumes that the ocient system has network access to a kafka broker from the sql and loader nodes an ocient system is installed and configured with an active storage cluster (see the docid 1kyq4 xppwa7pejdm4ysc guide) step 1 create a new database to begin, load two example tables in a database first, connect to a sql node using the docid 7uosju7ajx4yd61vqbhqu then, execute the create database sql statement to create the metabase database create database metabase; step 2 create table create the orders table in the new database first, connect to that database (e g , connect to jdbc\ ocient //sql node 4050/metabase ), and then execute this create table sql statement that specifies to create a table with these columns and a clustering index based on the user id and product id columns created at as a {{timekey}} timestamp that is not nullable id , user id , and product id as integers that are not nullable subtotal , tax , total , and discount as floating point numbers quantity as an integer create table public orders ( created at timestamp time key bucket(30, day) not null, id int not null, user id int not null, product id int not null, subtotal double, tax double, total double, discount double, quantity int, clustering index idx01 (user id, product id) ); the database creates the orders table, and you can begin loading data step 3 create a data pipeline create data pipelines using the create pipeline sql statement to load data, you first create a pipeline with the definition of the source, data format, and transformation rules using a sql like declarative syntax then, you execute the start pipeline command to start the load you can observe progress and status using system tables and views each ocient pipeline defines a single data source and the target table or tables into which data loads a data source includes the location of the source and filters on the source to define the specific data set to load this example loads data from two data sources, where each source is located in a directory on the same s3 bucket first, inspect the data that you plan to load each document has a format similar to this example {"id" 1, "user id" 1, "product id" 14, "subtotal" 37 65, "tax" 2 07, "total" 39 72, "discount" null, "created at" "2019 02 11t21 40 27 892z", "quantity" 2} {"id" 2, "user id" 1, "product id" 123, "subtotal" 110 93, "tax" 6 1, "total" 117 03, "discount" null, "created at" "2018 05 15t08 04 04 580z", "quantity" 3} in this case, ocient automatically transforms the data to the target columns using some sensible conventions in other cases, loads require some transformation most transformations are identical to functions that already exist in the sql dialect of the ocient system create a pipeline named orders pipeline for the orders data set from your database connection prompt specify the kafka source with broker address 192 168 0 1 9092 (r eplace this example kafka broker address with your address ) and topic orders load the data into the public orders table the select part of the sql statement maps the json fields to the target columns in the created table sql create pipeline orders pipeline source kafka bootstrap servers '192 168 0 1 9092' topic 'orders' extract format json into public orders select $id as id, $user id as user id, $product id as product id, $subtotal as subtotal, $tax as tax, $total as total, $discount as discount, $created at as created at, $quantity as quantity; the pipeline has three main sections source — l oads data from kafka specify the address of the bootstrap servers and topic options exist to set kafka consumer configurations extract — sets the format to json into select — targets the public orders table and selects the chosen fields from the json records in this case, all data is available at the top level of the json records, so the example references the fields by the attribute name (e g , $id , $user id , etc ) for nested data, reference the nested fields using dot notation (e g , $order user first name ) each field maps to a target column using the as syntax after you successfully create this pipeline, execute the start pipeline sql statement sql start pipeline orders pipeline; step 4 confirm that loading is operating correctly with the pipeline in place and running, data immediately begins loading off of the kafka topics that are configured in the pipeline if you do not have data in the kafka topics yet, now is a good time to start producing data into the topics produce test data into kafka for test purposes, https //github com/edenhill/kcat is a helpful utility that makes it easy to produce records into a topic for example, if you have a file of sample data orders jsonl in a jsonl format (newline delimited json records), you can run this command to send those records into your kafka broker specify your broker ip address instead of \<broker ip address> and your topic name \<topic name> kafkacat b \<broker ip address> 9092 t \<topic name> t p l orders jsonl save an example document to your file system to use for this test for this example, you can download an example file from https //ocient examples s3 amazonaws com/metabase samples/jsonl/orders jsonl and save it to /orders jsonl assuming the broker is running at the ip address 10 0 0 3 , send data into the orders topic defined in the pipeline definition by executing this command kafkacat b 10 0 0 3 9092 t orders t p l orders jsonl this command pushes the entire jsonl file of messages into kafka with one record per line as these records are produced into kafka, the running pipeline begins to load them into ocient step 5 observe the load progress with your pipeline running, data immediately begins to load from the kafka topic the pipeline creates parallel kafka consumers for each partition if there are more partitions than processing cores available, the pipeline automatically handles spreading consumers across available processing cores and loader nodes you can check the pipeline status and progress by querying information schema pipeline status or executing the show pipeline status sql statement sql show pipeline status; output sql database name pipeline name table names status status message percent complete duration seconds files processed files failed files remaining records processed records loaded records failed \ metabase orders pipeline \["public orders"] running started processing pipeline orders pipeline 0 0 2 025824 0 0 1 0 0 0 after a few seconds, the data is available for query in public orders sql select count( ) from public orders; output sql count( ) \ 18760 unlike a batch file load, kafka pipelines run continuously, so they never change to a status of completed to examine progress, you can use the information schema and system catalog tables key details are in information schema pipelines , information schema pipeline status , and sys pipeline partitions for example, this statement shows the status and key metrics for duration, loaded records, and failed records sql select pipeline name, status, status message, duration seconds, records loaded, records failed from information schema pipeline status; output sql pipeline name status status message duration seconds records loaded records failed \ orders pipeline running started processing pipeline orders pipeline 10 089936 18760 0 in this example, there are no errors the sys pipeline errors system catalog table captures any errors that occur during the pipeline process you can drop the pipeline with the drop pipeline orders pipeline; sql statement execution of this statement leaves the data in your target table, but removes metadata about the pipeline execution from the system related links docid\ l8tdfpfzzvzeyabc2h7bq docid 1 jhzrhblgnqucl6skiaq docid 7s5nztl8lolpwt2pcnjyz docid\ aimcafoydn2xf fgqssys docid\ ay3earmjsfi wkxd bwmv