Load Data
Manage Errors in Data Pipelines
when you load data into {{ocient}} , you can manage the way that the system responds to errors this guide outlines different mechanisms for managing errors through examples the guide uses the same setup as docid 2ua4kqafcqplu gi6oorz but it modifies that pipeline to illustrate how to manage errors that you might encounter in data pipeline functionality you learn about bad data and error handling inspecting record level processing errors configuring a bad data target controlling pipeline behavior when you encounter errors file level error behavior record level error behavior understanding pipeline statuses in file loading step 1 complete the load json data from kafka tutorial see docid 2ua4kqafcqplu gi6oorz and complete the tutorial, then return to this guide for step 2 step 2 modify the pipeline with a bad data target bad data targets are the way to capture record level errors for later troubleshooting and correction data pipelines support an {{kafka}} bad data target that allows you to send any failing records to a designated kafka topic this topic supports easy replay of those records in a separate recovery pipeline or inspection of the failing records for details about bad data targets, see the docid\ l8tdfpfzzvzeyabc2h7bq stop the data pipeline stop the orders pipeline data pipeline using the stop pipeline sql statement stop pipeline orders pipeline; inspect the pipeline status using this select sql statement and the pipeline status information schema view select status from information schema pipeline status where pipeline name = 'orders pipeline'; while the pipeline is stopping, the status might appear as stopping while the pipeline wraps up work in process and commits offsets back to kafka after the status changes to stopped , you can proceed to the next step a pipeline might not be in a running state when you modify it modify the data pipeline in this example, use the create pipeline or replace sql statement to update the data pipeline this statement maintains the same offsets with the kafka topic and ensures that data is deduplicated correctly when the pipeline resumes add the bad data target option and specify the kafka target and details use the same kafka broker as the source and bad data target in this example create pipeline or replace orders pipeline bad data target kafka bootstrap servers '192 0 2 1 9092' topic 'orders errors' source kafka bootstrap servers '192 0 2 1 9092' topic 'orders' extract format json into public orders select $id as id, $user id as user id, $product id as product id, $subtotal as subtotal, $tax as tax, $total as total, $discount as discount, $created at as created at, $quantity as quantity; step 3 start the data pipeline with an error limit start the orders pipeline data pipeline in this case, set a maximum error limit so that the pipeline fails if 10 errors occur an error limit might not be chosen for a continuously streaming pipeline if it is critical to keep up with newly arriving data, because the pipeline can stop suddenly and cause data freshness to fall behind however, this example shows how error limits behave and how error states appear in pipelines start pipeline orders pipeline error limit 10; the pipeline status changes to started to indicate that it has begun processing for file based loads, the start pipeline sql statement includes options for file errors that are useful for controlling error handling in different file failure modes options include failing on error, skipping on file error, and tolerating file level errors see the docid\ l8tdfpfzzvzeyabc2h7bq for details step 4 produce data with errors into kafka with the data pipeline in place and running, data immediately begins to load off of the kafka topics that are configured in the pipeline from the last committed offsets after the pipelined stopped for test purposes, https //github com/edenhill/kcat is a helpful utility that makes it easy to produce records in a topic for example, if you have a file of sample data orders with errors jsonl in a jsonl format (newline delimited json records), you can run this command to send those records into your kafka broker \<broker ip address> is the ip address of the kafka broker and \<topic name> is the name of the kafka topic kafkacat b \<broker ip address> 9092 t \<topic name> t p l orders with errors jsonl save an example document to your file system to use for this test for this example, you can download an example file from https //ocient examples s3 amazonaws com/metabase samples/jsonl/orders jsonl and save it to /orders with errors jsonl assume the broker is running at 192 0 2 1 , to send data into the orders topic defined in the pipeline definition, run this command kafkacat b 192 0 2 1 9092 t orders t p l orders with errors jsonl this command pushes the entire jsonl file of messages into kafka with one record per line as these records are produced into kafka, the running pipeline begins to load them into ocient step 5 observe loading progress with your pipeline running, data immediately begins to load from the kafka topic the pipeline creates parallel kafka consumers for each partition if there are more partitions than processing cores available, the pipeline automatically handles spreading consumers across available processing cores and loader nodes you can check the pipeline status and progress by querying the information schema pipeline status view or running the command show pipeline status use the view to query the name of the pipeline, status, number of failed records, and the status message sql select pipeline name, status, records failed, status message from information schema pipeline status; output sql pipeline name status records failed status message \ orders pipeline failed 11 started processing pipeline orders pipeline inspect errors examine the pipeline errors directly using the sys pipeline errors system catalog table sql select count( ) from sys pipeline errors; output sql count( ) \ 11 using this table, you can see detailed error messages, the source of the error, the column that encountered the error, and other helpful details to help troubleshoot why the record did not load select from sys pipeline errors; output pipeline id extractor task id error index error type error code source name error message partition id record number record offset field index column name created at \ f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 1 transformation olt01 orders 0 failed to transform value 'not int' using asciiinttransform details invalid ascii digit 'n' expected ascii digit in \[0, 9] 0 22324401 0 1 user id 2024 04 18 23 17 07 022000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 2 transformation olt01 orders 0 failed to transform value '2019 08 22t 30 42 392z' using bytebufferasciitimestampwithvariablelengthfractiontransform details invalid ascii digit ' ' expected ascii digit in \[0, 9] 0 22324403 0 7 created at 2024 04 18 23 17 07 391000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 3 transformation olt01 orders 0 failed to transform value 'not decimal' using stringdoubletransform details error parsing string 'not decimal' as a double cause for input string "not decimal" 0 22324404 0 4 tax 2024 04 18 23 17 07 397000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 4 transformation olt01 orders 0 failed to transform value '55 11' using asciiinttransform details invalid ascii digit ' ' expected ascii digit in \[0, 9] 0 22324406 0 2 product id 2024 04 18 23 17 07 399000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 5 transformation olt01 orders 0 failed to transform value 'not int' using asciiinttransform details invalid ascii digit 'n' expected ascii digit in \[0, 9] 0 22324408 0 1 user id 2024 04 18 23 17 07 400000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 6 transformation ols00 orders 0 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '1' and name = 'user id', but this column disallows nulls 0 22324410 0 1 user id 2024 04 18 23 17 07 402000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 7 transformation ols00 orders 0 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '1' and name = 'user id', but this column disallows nulls 0 22324411 0 1 user id 2024 04 18 23 17 07 404000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 8 transformation ols00 orders 0 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '2' and name = 'product id', but this column disallows nulls 0 22324412 0 2 product id 2024 04 18 23 17 07 405000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 9 transformation ols00 orders 0 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '1' and name = 'user id', but this column disallows nulls 0 22324413 0 1 user id 2024 04 18 23 17 07 409000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 10 transformation olt03 orders 0 failed to transform value '2018 06 26' using bytebufferasciitimestampwithvariablelengthfractiontransform because the value is too short expected a timestamp composed of the 0 22324414 0 7 created at 2024 04 18 23 17 07 410000000 f3ca5a1c 1bf4 40a4 8d01 b8c028fdfb58 1ecc5c4a 613f 48a5 8234 2257317a4a85 11 transformation ols00 orders 0 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '2' and name = 'product id', but this column disallows nulls 0 22324415 0 2 product id 2024 04 18 23 17 07 411000000 for a more abbreviated view, query a few columns in the table this example queries the error type, error code, error message, and name of the column select error type, error code, error message, column name from sys pipeline errors; output error type error code error message column name \ transformation olt01 failed to transform value 'not int' using asciiinttransform details invalid ascii digit 'n' expected ascii digit in \[0, 9] user id transformation olt01 failed to transform value '2019 08 22t 30 42 392z' using bytebufferasciitimestampwithvariablelengthfractiontransform details invalid ascii digit ' ' expected ascii digit in \[0, 9] created at transformation olt01 failed to transform value 'not decimal' using stringdoubletransform details error parsing string 'not decimal' as a double cause for input string "not decimal" tax transformation olt01 failed to transform value '55 11' using asciiinttransform details invalid ascii digit ' ' expected ascii digit in \[0, 9] product id transformation olt01 failed to transform value 'not int' using asciiinttransform details invalid ascii digit 'n' expected ascii digit in \[0, 9] user id transformation ols00 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '1' and name = 'user id', but this column disallows nulls user id transformation ols00 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '1' and name = 'user id', but this column disallows nulls user id transformation ols00 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '2' and name = 'product id', but this column disallows nulls product id transformation ols00 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '1' and name = 'user id', but this column disallows nulls user id transformation olt03 failed to transform value '2018 06 26' using bytebufferasciitimestampwithvariablelengthfractiontransform because the value is too short expected a timestamp composed of the created at transformation ols00 failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '2' and name = 'product id', but this column disallows nulls product id from this result set, you can quickly inspect error messages such as the first error message failed to transform value 'not int' using asciiinttransform details invalid ascii digit 'n' expected ascii digit in \[0, 9] this message occurs for the user id column from this information, you can see that you have some bad data in the data set in other errors, you can see the error code ols00 and a message that indicates some issues with null handling the table has a not null setting on a few columns, but the data is missing on the source records fix this by updating the pipeline to supply a default, or you can modify the table schema if you expect nulls to occur and you do not have a reasonable default for a continuous pipeline, you might expect to see an error like this if something changes in the data source inspect bad data to observe the original data, you can go to the original source file from this example, or you can load the specific message from the bad data topic that was configured to inspect it using the kafkacat utility, you can consume the messages on the topic the following command consumes one message on the orders errors topic kafkacat c b 192 0 2 1 9092 t orders errors o 1 c 1 output {"id" 1011, "user id" 3, "subtotal" 76 83, "tax" 5 28, "total" 82 11, "discount" null, "created at" "2017 12 14t11 28 30 031z", "quantity" 1} a quick inspection of the source data reveals that this record is missing the product id field, but this is a required column in our table if you run this command again, but with the j flag, you can inspect the headers that are associated with the pipeline the headers in the bad data target capture important metadata from the data pipeline execution that you can use to troubleshoot the failing data including the error message kafkacat c b 192 0 2 1 9092 t orders errors o 1 c 1 j output {"topic" "orders errors","partition" 0,"offset" 89,"tstype" "create","ts" 1713566346480,"broker" 1,"headers" \["error index","66","error code","ols00","error message","failed to load transformed value 'null' to sink after zcnullifemptytransform details provided null for column with index = '2' and name = 'product id', but this column disallows nulls ","source name","orders 0","record number","22361935","record offset","0","field index","2","column name","product id"],"key"\ null,"payload" "{\\"id\\" 1011, \\"user id\\" 3, \\"subtotal\\" 76 83, \\"tax\\" 5 28, \\"total\\" 82 11, \\"discount\\" null, \\"created at\\" \\"2017 12 14t11 28 30 031z\\", \\"quantity\\" 1}"} next steps now that you have loaded data into tables, you can explore a few advanced topics including handling pipeline errors, loading from other data sources, loading different data formats, and transforming data related links docid 1 jhzrhblgnqucl6skiaq docid 1 jhzrhblgnqucl6skiaq docid 7s5nztl8lolpwt2pcnjyz docid\ aimcafoydn2xf fgqssys docid\ l8tdfpfzzvzeyabc2h7bq docid\ f9wusuqndrusdbzuwr g9