LAT Reference
...
Loading by Data Source
LAT Load JSON Data from Kafka
data pipelines are now the preferred method for loading data into the ocient system for details, see docid\ xq0tg7yph vn62uwufibu a common setup for streaming data into {{ocient}} is to send json documents to {{kafka}} and then transform each document into rows in one or more different tables ocient’s loading and transformation capabilities use a simple sql like syntax for transforming data this tutorial will guide users through a simple example load using a small set of data in json format the data in this example is created from a test set for the {{metabase}} business intelligence tool prerequisites this tutorial assumes that a kafka cluster is operational and can be reached by the ocient loader nodes an ocient system is installed and configured with an active storage cluster (see the docid\ nneedy7yn8g1pennmamng guide) the ocient loader nodes are running the latest loading and transformation version which is configured to connect to kafka for stream loading a default "sink" for the ocient loader nodes is configured on the system the docid\ xpvlz0ewuxmgynxvxz jb is installed the test data for this tutorial can be found at the following s3 addresses you must be logged into amazon aws to download these files https //ocient examples s3 amazonaws com/metabase samples/jsonl/orders jsonl https //ocient examples s3 amazonaws com/metabase samples/jsonl/products jsonl https //ocient examples s3 amazonaws com/metabase samples/jsonl/people jsonl https //ocient examples s3 amazonaws com/metabase samples/jsonl/reviews jsonl step 1 create a new database to begin, you are going to load four example tables in a database first, connect to a sql node using the docid 7uosju7ajx4yd61vqbhqu then run the following ddl command create database metabase; step 2 create tables to create tables in the new database, first connect to that database (e g , connect to jdbc\ ocient //sql node 4050/metabase ), then run the following ddl commands create table public orders ( created at timestamp time key bucket(1, day) not null, id int not null, user id int not null, product id int not null, subtotal double, tax double, total double, discount double, quantity int, clustering index idx01 (user id, product id) ); create table public people ( created at timestamp time key bucket(1, day) not null, id int not null, address varchar(255), email varchar(255), password varchar(255), name varchar(255), city varchar(255), longitude double, state varchar(255), source varchar(255), birth date date, zip varchar(255), latitude double, clustering index idx01 (id) ); create table public products( created at timestamp time key bucket(1, day) not null, id int not null, ean varchar(255), title varchar(255), category varchar(255) compression gdc(2) not null, vendor varchar(255), price double, rating double, clustering index idx01 (category) ); create table public reviews ( created at timestamp time key bucket(1, day) not null, id int not null, product id int not null, reviewer varchar(255), rating int, body varchar(255), clustering index idx01 (product id) ); now, the database tables are created, and you can begin loading data step 3 create a data pipeline data pipelines are created using a simple loading configuration that is submitted to the transformation nodes to start loading each kafka topic is routed to one or more ocient tables, and each column is the result of a transformation applied to the source document first, let’s inspect the data that is going to be loaded each document has a format similar to the following example / orders / {"id" 1, "user id" 1, "product id" 14, "subtotal" 37 65, "tax" 2 07, "total" 39 72, "discount" null, "created at" "2019 02 11t21 40 27 892z", "quantity" 2} {"id" 2, "user id" 1, "product id" 123, "subtotal" 110 93, "tax" 6 1, "total" 117 03, "discount" null, "created at" "2018 05 15t08 04 04 580z", "quantity" 3} / products / {"id" 1, "ean" "1018947080336", "title" "rustic paper wallet", "category" "gizmo", "vendor" "swaniawski, casper and hilll", "price" 29 46, "rating" 4 6, "created at" "2017 07 19t19 44 56 582z"} {"id" 2, "ean" "7663515285824", "title" "small marble shoes", "category" "doohickey", "vendor" "balistreri ankunding", "price" 70 08, "rating" 0, "created at" "2019 04 11t08 49 35 932z"} {"id" 3, "ean" "4966277046676", "title" "synergistic granite chair", "category" "doohickey", "vendor" "murray, watsica and wunsch", "price" 35 39, "rating" 4, "created at" "2018 09 08t22 03 20 239z"} / people / {"id" 1, "address" "9611 9809 west rosedale road", "email" "borer hudson\@yahoo com", "password" "ccca881f 3e4b 4e5c 8336 354103604af6", "name" "hudson borer", "city" "wood river", "longitude" 98 5259864, "state" "ne", "source" "twitter", "birth date" "1986 12 12", "zip" "68883", "latitude" 40 71314890000001, "created at" "2017 10 07t01 34 35 462z"} {"id" 2, "address" "101 4th street", "email" "williamson domenica\@yahoo com", "password" "eafc45bf cf8e 4c96 ab35 ce44d0021597", "name" "domenica williamson", "city" "searsboro", "longitude" 92 6991321, "state" "ia", "source" "affiliate", "birth date" "1967 06 10", "zip" "50242", "latitude" 41 5813224, "created at" "2018 04 09t12 10 05 167z"} {"id" 3, "address" "29494 anderson drive", "email" "lina heaney\@yahoo com", "password" "36f67891 34e5 4439 a8a4 2d9246775ff8", "name" "lina heaney", "city" "sandstone", "longitude" 92 8416108, "state" "mn", "source" "facebook", "birth date" "1961 12 18", "zip" "55072", "latitude" 46 11973039999999, "created at" "2017 06 27t06 06 20 625z"} / reviews / {"id" 1, "product id" 1, "reviewer" "christ", "rating" 5, "body" "ad perspiciatis quis et consectetur laboriosam fuga voluptas ut et modi ipsum odio et eum numquam eos nisi assumenda aut magnam libero maiores nobis vel beatae officia ", "created at" "2018 05 15t20 25 48 517z"} {"id" 2, "product id" 1, "reviewer" "xavier", "rating" 4, "body" "reprehenderit non error architecto consequatur tempore temporibus voluptate ut accusantium quae est aut sit quidem nihil maxime dolores molestias enim vel optio est fugiat vitae cumque ut maiores laborum rerum quidem voluptate rerum ", "created at" "2019 08 07t13 50 33 401z"} {"id" 3, "product id" 1, "reviewer" "cameron nitzsche", "rating" 5, "body" "in aut numquam labore fuga et tempora sit et mollitia aut ullam et repellat aliquam sint tenetur culpa eius tenetur molestias ipsa est ut quisquam hic necessitatibus molestias maiores vero nesciunt ", "created at" "2018 03 30t00 28 45 192z"} as you can see, this is similar to the target schema, but will require some transformation most transformations are identical to functions already in ocient’s sql dialect to route data to your tables, you must create a pipeline json file that has the following structure { "version" 2, "workers" 4, "pipeline id" "pipeline metabase", "source" { "type" "kafka", "kafka" { "bootstrap servers" "127 0 0 1 9092", "auto offset reset" "earliest" } }, "transform" { "topics" { "orders" { "tables" { "metabase public orders" { "columns" { "id" "id", "user id" "user id", "product id" "product id", "subtotal" "subtotal", "tax" "tax", "total" "total", "discount" "discount", "created at" "to timestamp(created at, 'yyyy mm dd\\\\'t\\\\'hh\ mm\ ss\[ sss]x')", "quantity" "quantity" } } } }, "people" { "tables" { "metabase public people" { "columns" { "id" "id", "address" "address", "email" "email", "password" "password", "name" "name", "city" "city", "longitude" "longitude", "state" "state", "source" "source", "birth date" "birth date", "zip" "zip", "latitude" "latitude", "created at" "to timestamp(created at, 'yyyy mm dd\\\\'t\\\\'hh\ mm\ ss\[ sss]x')" } } } }, "reviews" { "tables" { "metabase public reviews" { "columns" { "id" "id", "product id" "product id", "reviewer" "reviewer", "rating" "rating", "body" "body", "created at" "to timestamp(created at, 'yyyy mm dd\\\\'t\\\\'hh\ mm\ ss\[ sss]zzzzz')" } } } }, "products" { "tables" { "metabase public products" { "columns" { "id" "id", "ean" "ean", "title" "title", "category" "category", "vendor" "vendor", "price" "price", "rating" "rating", "created at" "to timestamp(created at, 'yyyy mm dd\\\\'t\\\\'hh\ mm\ ss\[ sss]x')" } } } } } } } step 4 using the loading and transformation cli with a pipeline json file ready to go, you can test this pipeline to test, use the lat cli for these examples, you can assume that two lats are configured and will set them using an environment variable first, configure the lat cli to use the hosts of your loading and transformation service you can add these to every cli command as a flag, but for simplicity you can also set them as environment variables from a command line, run the following command replacing the ip addresses with the ip addresses of your lat processes export lat hosts="https //10 0 0 1 8443,https //10 0 0 2 8443" if your lat is running without tls configured, replace the port number of your lat hosts with 8080 and the protocol with http // next, check on the status of the lat lat client pipeline status example response 10 0 0 1 8443 stopped 10 0 0 2 8443 stopped success! this confirms that you can reach the lat from your cli if the status is "running" it means a pipeline is already executing a pipeline you are next going to update and start your new pipeline this example uses secure connections if you receive an ssl error when testing, your service cannot be configured to use tls or you might need to use the no verify flag if certificate validation fails step 5 test the transformation the cli supports previewing a transformation with an example document and the pipeline file this makes it easy to test your transformations first, save an example document to your file system to use for this test for this demo, you can download an example file from https //ocient examples s3 amazonaws com/metabase samples/jsonl/orders jsonl and save it to /orders jsonl next, make sure the pipeline json file that you created is stored at /pipeline json now that both files are available, run the cli to preview the results you can pass the preview command the topic name, the pipeline file, and the sample record file the response contains the transformed data tied to the destination table and a list of any error records lat client preview topic orders pipeline /pipeline json records /orders jsonl example response { "tablerecords" { "metabase public orders" \[ { "id" 1, "user id" 1, "product id" 14, "subtotal" 37 65, "tax" 2 07, "total" 39 72, "discount" null, "created at" 1549921227892000000, "quantity" 2 }, { "id" 2, "user id" 1, "product id" 123, "subtotal" 110 93, "tax" 6 1, "total" 117 03, "discount" null, "created at" 1526371444580000000, "quantity" 3 }, { "id" 3, "user id" 1, "product id" 105, "subtotal" 52 72, "tax" 2 9, "total" 49 2, "discount" 6 42, "created at" 1575670968544000000, "quantity" 2 } ] }, "recorderrors" \[] } you can see that the data is transformed and the columns to which each transformed value will be mapped if there are issues in the values, these will appear in the recorderrors object you can quickly update your pipeline json file and preview again now, you can inspect different documents to confirm that various states of data cleanliness like missing columns, null values, and special characters are well handled by your transformations step 6 configure and start the data pipeline with a tested transformation, the next step is to set up and start the data pipeline first, configure the pipeline using the pipeline create command this validates and creates the pipeline, but will not take effect until you start the pipeline lat client pipeline create pipeline /pipeline json example response 10 0 0 1 8443 created 10 0 0 2 8443 created in cases where there is an existing pipeline operating, it is necessary to stop the pipeline and remove the original pipeline before creating and starting the new pipeline now that the pipeline has been created on all lat nodes, you can start the lat by running the pipeline start commands lat client pipeline start example responses 10 0 0 1 8443 running 10 0 0 2 8443 running step 7 confirm that loading is operating correctly with your pipeline in place and running, data will immediately begin loading off of the kafka topics that are configured in the pipeline if you do not have data in the kafka topics yet, now would be a good time to start producing data into the topics producing test data into kafka for test purposes, https //github com/edenhill/kcat is a helpful utility that makes it easy to product records into a topic for example, if you have a file of sample data orders jsonl in a jsonl format (newline delimited json records), you can run the following command to send those records into your kafka broker kafkacat b \<broker ip address> 9092 t \<topic name> t p l orders jsonl assuming your broker is running at 10 0 0 3 and you want to send data into the four topics defined in your pipeline json definition, you can run kafkacat b 10 0 0 3 9092 t orders t p l orders jsonl kafkacat b 10 0 0 3 9092 t products t p l products jsonl kafkacat b 10 0 0 3 9092 t people t p l people jsonl kafkacat b 10 0 0 3 9092 t reviews t p l reviews jsonl each of these commands will push the entire jsonl file of messages into kafka with one record per line as these are produced into kafka, your running pipeline will begin loading them into ocient observing loading progress with data in kafka, our pipeline will begin loading data immediately and streaming any new data into ocient to observe this progress, you can monitor the metrics endpoint of the loading and transformation nodes this can be done manually from a command line or from a tool like {{grafana}} for this example, you can run a curl command against the endpoint and review the result for details on metrics, see the docid\ eumgc9mmid1dzmpahr9on documentation command curl https //127 0 0 1 8443/v2/metrics/lat\ type=pipeline if your lat is running without tls configured, replace the port number of your lat hosts with 8080 and the protocol with http // example response { "request" { "mbean" "lat\ type=pipeline", "type" "read" }, "value" { "partitions" \[ { "offsets durable" 1, "pushes errors" 0, "pushes attempts" 1, "rows pushed" 1, "offsets written" 18759, "records buffered" 0, "records errors column" 0, "records errors deserialization" 0, "source bytes buffered" 0, "records errors transformation" 0, "offsets processed" 18759, "partition" "table orders 0", "records filter accepted" 1, "records errors row" 0, "records filter rejected" 0, "records errors generic" 0, "producer send attempts" 0, "offsets pushed" 18759, "pushes unacknowledged" 0, "invalid state" 0, "bytes pushed" 88, "records errors total" 0, "offsets buffered" 18759, "complete" 0, "offsets end" 1, "producer send errors" 0 }, { "offsets durable" 1, "pushes errors" 0, "pushes attempts" 1, "rows pushed" 1, "offsets written" 2499, "records buffered" 0, "records errors column" 0, "records errors deserialization" 0, "source bytes buffered" 0, "records errors transformation" 0, "offsets processed" 2499, "partition" "table people 0", "records filter accepted" 1, "records errors row" 0, "records filter rejected" 0, "records errors generic" 0, "producer send attempts" 0, "offsets pushed" 2499, "pushes unacknowledged" 0, "invalid state" 0, "bytes pushed" 223, "records errors total" 0, "offsets buffered" 2499, "complete" 0, "offsets end" 1, "producer send errors" 0 }, { "offsets durable" 1, "pushes errors" 0, "pushes attempts" 1, "rows pushed" 1, "offsets written" 199, "records buffered" 0, "records errors column" 0, "records errors deserialization" 0, "source bytes buffered" 0, "records errors transformation" 0, "offsets processed" 199, "partition" "table products 0", "records filter accepted" 1, "records errors row" 0, "records filter rejected" 0, "records errors generic" 0, "producer send attempts" 0, "offsets pushed" 199, "pushes unacknowledged" 0, "invalid state" 0, "bytes pushed" 145, "records errors total" 0, "offsets buffered" 199, "complete" 0, "offsets end" 1, "producer send errors" 0 }, { "offsets durable" 1, "pushes errors" 0, "pushes attempts" 1, "rows pushed" 1, "offsets written" 1111, "records buffered" 0, "records errors column" 0, "records errors deserialization" 0, "source bytes buffered" 0, "records errors transformation" 0, "offsets processed" 1111, "partition" "table reviews 0", "records filter accepted" 1, "records errors row" 0, "records filter rejected" 0, "records errors generic" 0, "producer send attempts" 0, "offsets pushed" 1111, "pushes unacknowledged" 0, "invalid state" 0, "bytes pushed" 184, "records errors total" 0, "offsets buffered" 1111, "complete" 0, "offsets end" 1, "producer send errors" 0 } ], "paused" 1, "bytes buffered" 0, "workers" 20 }, "timestamp" 1626970368, "status" 200 } check row counts in tables to confirm that you are seeing results in the target tables, you can also run some simple queries to check row counts depending on the streamloader role settings, the time for records to become queryable can vary from a few seconds to minutes example queries ocient> select count( ) from public orders; count( ) \ 18760 ocient> select count() from public people; count() \ 2500 ocient> select count( ) from public products; count( ) \ 200 ocient> select count( ) from public reviews; count( ) \ 1112 success! now you can explore the data in these four tables with any ocient sql queries if more data is pushed into these topics, your pipeline is still running and will automatically load all new data check errors in this example, all rows load successfully however, a successful load does not always happen, and you can inspect errors using the lat client whenever the lat process fails to parse a file correctly or fails to transform or load a record, the lat process records an error the lat client includes the lat client pipeline errors command that reports the latest errors on the pipeline a full error log is available on the loader nodes these logs report all bad records and the reason that the load fails when you load a pipeline from kafka, the load might route errors to an error topic on the kafka broker instead of the logs the lat client does not contain the errors sent to the error topic you can inspect these errors with kafka utilities instead this lat client command displays a maximum of 100 error messages lat client pipeline errors max errors 100 only error messages \| | \| exception message | \| | \| column name time1 message failed to evaluate | \| expression cause | \| java time format datetimeparseexception | \| | \| column name time1 message failed to evaluate | \| expression cause | \| java time format datetimeparseexception | \| | the errors indicate that there is an issue parsing the time1 column options exist on the pipeline errors command to return json and to restrict the response to specific components of the error detail that includes a reference to the source location of this record the following command returns json that is delimited with newline characters you can pass the json output to jq or a file the json includes the source topic or file group, the filename where the error occurred, the offset that indicates the line number or kafka offset, and the exception message that aids in troubleshooting and identifying the incorrect record in the source data you can use the log original message pipeline setting to provide direct access to the parsed source record for errors when appropriate lat client pipeline errors max errors 100 json {"time" "2022 05 17t16 53 50 387386+00 00", "topic" "calcs", "partition" 0, "state" "transformation error", "exception message" "column name time1 message failed to evaluate expression cause java time format datetimeparseexception cannot parse time \\"19 36 22\\" with format string \\"value(hourofday,2)offset(+hhmm,'z')' 'value(minuteofhour,2)' 'value(secondofminute,2)\\"\njava time format datetimeparseexception cannot parse time \\"19 36 22\\" with format string \\"value(hourofday,2)offset(+hhmm,'z')' 'value(minuteofhour,2)' 'value(secondofminute,2)\\"\ncannot parse time \\"19 36 22\\" with format string \\"value(hourofday,2)offset(+hhmm,'z')' 'value(minuteofhour,2)' 'value(secondofminute,2)\\"", "offset" 0, "record" null, "metadata" {"size" "3321", "filename" "calcs/csv/calcs 01 csv"}} {"time" "2022 05 17t16 53 50 404684+00 00", "topic" "calcs", "partition" 0, "state" "transformation error", "exception message" "column name time1 message failed to evaluate expression cause java time format datetimeparseexception cannot parse time \\"02 05 25\\" with format string \\"value(hourofday,2)offset(+hhmm,'z')' 'value(minuteofhour,2)' 'value(secondofminute,2)\\"\njava time format datetimeparseexception cannot parse time \\"02 05 25\\" with format string \\"value(hourofday,2)offset(+hhmm,'z')' 'value(minuteofhour,2)' 'value(secondofminute,2)\\"\ncannot parse time \\"02 05 25\\" with format string \\"value(hourofday,2)offset(+hhmm,'z')' 'value(minuteofhour,2)' 'value(secondofminute,2)\\"", "offset" 1, "record" null, "metadata" {"size" "3321", "filename" "calcs/csv/calcs 01 csv"}} related links docid\ cdutjfrhb4hmidwoafvno docid 4wejuau6gpdqyii5qqtqt docid\ elwhwxe8oruff36xf4fom