Manage Errors in Data Pipelines
When you load data into , you can manage the way that the system responds to errors. This guide outlines different mechanisms for managing errors through examples.
The guide uses the same setup as Data Pipeline Load of JSON Data from Kafka but it modifies that pipeline to illustrate how to manage errors that you might encounter in data pipeline functionality.
You learn about:
- Bad data and error handling
- Inspecting record-level processing errors
- Configuring a bad data target
- Controlling pipeline behavior when you encounter errors
- File-level error behavior
- Record-level error behavior
- Understanding pipeline statuses in file loading
See Data Pipeline Load of JSON Data from Kafka and complete the tutorial, then return to this guide for Step 2.
Bad data targets are the way to capture record-level errors for later troubleshooting and correction. Data pipelines support an bad data target that allows you to send any failing records to a designated Kafka topic. This topic supports easy replay of those records in a separate recovery pipeline or inspection of the failing records.
For details about bad data targets, see the Bad Data Targets Reference.
Stop the orders_pipeline data pipeline using the STOP PIPELINE SQL statement.
Inspect the pipeline status using this SELECT SQL statement and the pipeline_status information schema view.
While the pipeline is stopping, the status might appear as STOPPING while the pipeline wraps up work in process and commits offsets back to Kafka. After the status changes to STOPPED, you can proceed to the next step. A pipeline might not be in a running state when you modify it.
In this example, use the CREATE PIPELINE OR REPLACE SQL statement to update the data pipeline. This statement maintains the same offsets with the Kafka topic and ensures that data is deduplicated correctly when the pipeline resumes.
Add the BAD_DATA_TARGET option and specify the Kafka target and details. Use the same Kafka broker as the source and bad data target in this example.
Start the orders_pipeline data pipeline. In this case, set a maximum error limit so that the pipeline fails if 10 errors occur. An ERROR LIMIT might not be chosen for a continuously streaming pipeline if it is critical to keep up with newly arriving data, because the pipeline can stop suddenly and cause data freshness to fall behind. However, this example shows how error limits behave and how error states appear in pipelines.
The pipeline status changes to STARTED to indicate that it has begun processing.
For file-based loads, the START PIPELINE SQL statement includes options for FILE_ERRORS that are useful for controlling error handling in different file failure modes. Options include failing on error, skipping on file error, and tolerating file-level errors. See the Start Pipeline Command Reference for details.
With the data pipeline in place and running, data immediately begins to load off of the Kafka topics that are configured in the pipeline from the last committed offsets after the pipelined stopped.
For test purposes, kafkacat is a helpful utility that makes it easy to produce records in a topic. For example, if you have a file of sample data orders_with_errors.jsonl in a JSONL format (newline-delimited JSON records), you can run this command to send those records into your Kafka broker. <broker_ip_address> is the IP address of the Kafka broker and <topic_name> is the name of the Kafka topic.
Save an example document to your file system to use for this test. For this example, you can download an example file from https://ocient-examples.s3.amazonaws.com/metabase_samples/jsonl/orders_with_errors.jsonl and save it to ~/orders_with_errors.jsonl.
Assume the broker is running at 192.0.2.1, to send data into the orders topic defined in the pipeline definition, run this command.
This command pushes the entire JSONL file of messages into Kafka with one record per line. As these records are produced into Kafka, the running pipeline begins to load them into Ocient.
With your pipeline running, data immediately begins to load from the Kafka topic. The pipeline creates parallel Kafka Consumers for each partition. If there are more partitions than processing cores available, the pipeline automatically handles spreading consumers across available processing cores and Loader Nodes.
You can check the pipeline status and progress by querying the information_schema.pipeline_status view or running the command SHOW PIPELINE_STATUS. Use the view to query the name of the pipeline, status, number of failed records, and the status message.
Output
Examine the pipeline errors directly using the sys.pipeline_errors system catalog table.
Output
Using this table, you can see detailed error messages, the source of the error, the column that encountered the error, and other helpful details to help troubleshoot why the record did not load.
Output
For a more abbreviated view, query a few columns in the table. This example queries the error type, error code, error message, and name of the column.
Output
From this result set, you can quickly inspect error messages such as the first error message.
This message occurs for the user_id column. From this information, you can see that you have some bad data in the data set.
In other errors, you can see the error code OLS00 and a message that indicates some issues with NULL handling. The table has a NOT NULL setting on a few columns, but the data is missing on the source records. Fix this by updating the pipeline to supply a default, or you can modify the table schema if you expect NULLs to occur and you do not have a reasonable default. For a continuous pipeline, you might expect to see an error like this if something changes in the data source.
To observe the original data, you can go to the original source file from this example, or you can load the specific message from the bad data topic that was configured to inspect it.
Using the kafkacat utility, you can consume the messages on the topic. The following command consumes one message on the orders_errors topic:
Output
A quick inspection of the source data reveals that this record is missing the product_id field, but this is a required column in our table.
If you run this command again, but with the -J flag, you can inspect the headers that are associated with the pipeline. The headers in the bad data target capture important metadata from the data pipeline execution that you can use to troubleshoot the failing data including the error_message.
Output
Now that you have loaded data into tables, you can explore a few advanced topics including handling pipeline errors, loading from other data sources, loading different data formats, and transforming data.