Monitor Data Pipelines
You can monitor data pipelines through a combination of:
- System catalog tables
- Information schema views
- Operational metrics endpoints for use with operations monitoring tools
This tutorial describes how to use these system catalog objects to monitor your pipelines:
- information_schema.pipelines
- information_schema.pipeline_status
- sys.pipelines
- sys.pipeline_events
- sys.pipeline_errors
- sys.pipeline_files
- sys.pipeline_partitions
- sys.pipeline_metrics
- sys.pipeline_metrics_info
A complete reference of system catalog tables and views is available in the Data Pipelines System Catalog.
After you create a pipeline, two convenient views are most useful for observing the pipeline. At a glance, you can view the configuration of your pipeline in information_schema.pipelines and the status information in information_schema.pipeline_status.
Access these views conveniently with the SHOW command.
For a quick look at the progress of a pipeline, the SHOW PIPELINE_STATUS command provides:
- Name
- Target tables
- Current status
- Last status message that was received in processing
- Percentage completion (from 0.0 - 1.0)
- Duration of pipeline execution
- Number of files and records processed
- Error counts
To join these two views to other catalog tables, use the pipeline_name to join to the sys.pipelines table and then join tables using the pipeline_id.
Beyond the information schema views, many catalog tables offer details about your pipelines.
Table | Description | Importance |
---|---|---|
sys.pipelines | List of pipelines that exist and the configuration information. | High |
sys.pipeline_events | Event log generated through the lifecycle of a pipeline. | High |
sys.pipeline_errors | Log of errors created by pipelines during execution for troubleshooting record-level errors or system errors. | High |
sys.pipeline_files | List of files that each file-based pipeline is loading or has loaded. | High |
sys.pipeline_partitions | List of source partitions on each Kafka pipeline with relevant metadata for offsets. | Medium |
sys.pipeline_metrics | Metrics that are regularly collected for each pipeline to capture performance and progress. | Medium |
sys.pipeline_metrics_info | Descriptive details about the available pipeline metrics. Provides information about how to interpret each metric type. | Low |
While your pipeline is running, the pipeline generates events in the sys.pipeline_events system catalog table to mark significant checkpoints when something has occurred. You can use this table to observe progress and look for detailed messages about changes in the pipeline.
For example, the sys.pipeline_events table captures events when a pipeline is created, started, stopped, fails, and completed. This table also captures relevant system events such as rebalancing Kafka partitions or retrying a process due to a transient failure.
Pipeline events include messages from many different tasks. These tasks are the background processes that execute across different Loader Nodes during pipeline operation.
For file-based loads, the sys.pipeline_files system catalog table contains one row for each file, which contains the status of the file and other file metadata such as filename, creation or modified timestamps, and the file size. The pipeline updates this list of files when it is started and maintains status information as it processes files. You can use this list to understand which files are loading, which succeeded, and which failed.
For Kafka partition-based loads, the sys.pipeline_partitions system catalog table contains one row for each partition, which contains the current offsets and record counts. You can use this information to understand how partitions are distributed across Loader Nodes and also to observe lag on each topic and partition.
Pipeline performance metrics are captured in the sys.pipeline_metrics system catalog table. This table contains samples of the metrics over time. The System collects samples regularly. You can query the samples using standard SQL queries to inspect the behavior of a single pipeline, a single task on a Loader Node, or across all pipelines. The sys.pipeline_metrics_info system catalog table provides metadata that explains the individual metric types.
See Discover Insights from Data Pipeline System Catalog Tables for a detailed description of how to use sys.pipeline_metrics and sys.pipeline_metric_info to monitor the performance of your pipelines. For the system catalog table definitions, see System Catalog.
In the same way that the Ocient System monitors system performance using Statistics Monitoring endpoints with operational monitoring tools, data pipelines expose an API that allows operators to capture performance metrics over time.
You query the metrics using an HTTP endpoint on each Loader Node located at <loader_ip_address_or_hostname>:8090/metrics/v1.
For example, you can retrieve metrics from a Loader Node by executing this command from a shell on the node or by running a similar request from a monitoring agent on the node.
Each metric is represented as a JSON object that contains:
- Metric name
- Metric value
- Scope information
- Query time
- Value units
- Whether or not the metric is incremental
Metrics can be incremental or instantaneous. Incremental metrics accumulate value over time, so they never decrease in value. Instantaneous metrics capture the current value, so they can decrease.
Similar to the sys.pipeline_metrics system catalog table, the Ocient System scopes metrics to a Loader Node, a pipeline task, or a partition. You can determine the scope of a metric based on the presence of the pipeline_name, pipeline_name_external, and partition keys or reference the details in Pipeline Metrics Details.
Scope | Description |
---|---|
Loader | Metrics scoped to a loader do not have information about pipelines or partitions. Because metrics are collected on each node, this scope contains all activity on the node where you access the endpoint. |
Pipeline | Metrics scoped to a pipeline have a key named pipeline_name with a value that corresponds to the identifier of the extractor task with hyphens removed. The metrics also have a key named pipeline_name_external with a value that corresponds to the identifier of the pipeline. |
Partition | Metrics scoped to a partition have the same keys as metrics scoped to a pipeline. The metrics also have a key named partition with a value that is the stream source identifier of the partition. |
Examples
This JSON structure contains a metric scoped to a Loader Node.
This JSON structure contains a metric scoped to a pipeline.
This JSON structure contains a metric scoped to a partition.
Name | Scope | Description | Importance |
---|---|---|---|
count.error | pipeline | The count of failures when processing records during pipeline execution for a pipeline. | High |
count.file.fail | pipeline | The number of files that have failures. | High |
count.file.processed | pipeline | The number of files that have been extracted from the source for processing. The Ocient System tracks this number before tokenization, transformation, or sending to the sink. This number does not indicate the durability of the files or if files load with errors. | Medium |
count.file.total | pipeline | The total number of files in the static file list. | High |
count.partition.durable.offset.committed | partition | The last committed offset of the Kafka partition for the consumer group. | High |
count.partition.offset.lag | partition | count.partition.offset.max - count.partition.durable.offset.committed for a specified Kafka partition. | Medium |
count.partition.offset.max | partition | The maximum offset of the Kafka partition. | High |
count.partition.records.failed | partition | The count of failures when processing records during pipeline execution for a specified Kafka partition. | High |
count.partition.records.processed | partition | The number of records that have been extracted for processing on a specified Kafka partition. | Medium |
count.record.bytes.maximum | partition | The largest record processed in bytes. | Medium |
count.record.bytes.minimum | partition | The smallest record processed in bytes. | Medium |
count.record.bytes.transformed | pipeline | The number of bytes that have been transformed, excluding delimiters. | Medium |
count.record.sent | pipeline | The number of records that have been fully transformed and sent to the sink. | High |
count.sink.bytes.queuedepth | pipeline | The number of transformed records queued to send to the sink. | Medium |
count.stream.paused | partition | The number of times the processing of the record stream has been paused due to reaching a high watermark. | Medium |
count.stream.unpaused | partition | The number of times the processing of the record stream has been unpaused due to reaching a low watermark. | Medium |
duration.file.listed | pipeline | The number of milliseconds spent listing files from the data source. | High |
duration.record.durable | partition | The duration in milliseconds for making records durable. ℹ️ The database takes time to make records durable, so this duration might be longer than the record processing duration. | Medium |
duration.record.processed | partition | The duration in milliseconds for reading the records from the source, transforming them, and buffering them for sending to the sink. | Medium |
uptime | Loader Noce | The number of seconds after the last restart of the extraction process. | High |
usage.os.heap | Loader Noce | The number of bytes used in the heap. | High |
usage.os.percentage.cpu | Loader Noce | The percentage of CPU used by the . | High |