Frequently Asked Questions for Data Pipelines
The questions featured in this topic cover information that you need most often when you start creating and running data pipelines.
Any database user with EXECUTE pipeline privileges can run a data pipeline on their database. This privilege allows database administrators to distinguish the users who can create pipelines from those who can start them.
To learn more about access control for pipelines, see DCL Pipeline Privileges.
No, pipelines have a user-controlled lifecycle that starts with creation. After you create a data pipeline, you determine when to start the pipeline. A pipeline either completes successfully or moves to a failed status if the System reaches an error limit.
For a basic batch file load, you would first use the CREATE PIPELINE SQL statement and then the START PIPELINE statement. After all data is successfully loaded, the pipeline status changes to completed.
Multiple data pipelines can run concurrently in Ocient. To run more than one simultaneously, any user with privileges to start a pipeline can use the START PIPELINE SQL statement. Execution of this statement begins the execution of the selected pipeline. The Ocient System uses Loader Nodes in parallel to process all of the pipelines that are running.
If you need to control the relative priority of one pipeline over another when running concurrently, use advanced pipeline settings to limit the number of cores allocated to a specified pipeline or isolate the pipeline to a particular set of Loader Nodes.
When you execute a START PIPELINE SQL statement on a pipeline that has been stopped due to either hitting an ERROR LIMIT or due to user intervention, the load resumes from the last point in the pipeline. Hence, you can effectively pause a pipeline and resume from the last checkpoint in the pipeline. This feature works for continuous pipelines such as pipelines as well as batch pipelines.
Common uses of stopping and resuming a pipeline are:
- Stopping to update the transformations in a continuous pipeline.
- Stopping to update the table schema and refresh a pipeline.
- Stopping to allow a higher priority workload to proceed.
- Stopping for maintenance.
It is important to exercise caution when resuming a pipeline to ensure that the files in the underlying data source have not changed. Modifying data in the source for an in-process pipeline can result in unexpected results such as duplicate data.
When you load a batch of files in a data pipeline, the Ocient System determines the batch when the pipeline starts using the bucket, the prefix, and the filter parameters you specify in your pipeline. This batch of files is frozen until the pipeline completes. Even if the batch pipeline stops and restarts, the Ocient System keeps the list of files constant.
You cannot manually modify the file list for a batch pipeline. When a pipeline restarts, it finishes the files that were determined when the batch was first established.
To add files that arrive on the file system after a batch pipeline was first started, you can create a new batch pipeline that uses filters to load all files created after a specified timestamp.
When you start a file-based pipeline that has been stopped, the pipeline adds any new files to the pipeline as files with the PENDING status. The pipeline then loads these files as execution proceeds. The Ocient System does not reload any files that the pipeline has already loaded.
It is important to exercise caution when resuming a pipeline to ensure that the files in the underlying data source have not changed. When you modify data in the source for an in-process pipeline, the Ocient System can have unexpected results, such as duplicate data.
The most common performance enhancement for S3 file loads is the PREFIX parameter in your pipeline. This setting restricts the files in the bucket that your pipeline considers, speeding up listing operations.
You can also use other advanced options such as MAX_CONCURRENCY, REQUEST_DEPTH, and REQUEST_RETRIES described in S3 Source Options.
Continuous pipelines for Kafka stream records from Kafka partitions and ensure exactly-once semantics are delivered. Ocient assigns consumers to different Loader Nodes and commits offsets back on the assigned partitions as the source data loads into Ocient. This operation critically only occurs after data is durable in Ocient, which means that the system replicates data in such a way that the loss of a configurable number of nodes would not lead to data loss.
All supported data formats are compatible with Kafka pipelines.
When you run a continuous Kafka pipeline, the pipeline is never complete so it remains in a running state. You can observe progress using the standard system catalog tables and the Kafka-specific table sys.pipeline_partitions that captures individual partition assignments and lag.
When you stop and restart a Kafka pipeline, it resumes from the last commit checkpoint and continues processing records. The Ocient System deduplicates any record that has already been sent to Ocient but not committed to Kafka in the Loader Nodes.
To update transforms in a Kafka pipeline, first, stop the pipeline. After the pipeline stops, you can execute a CREATE OR REPLACE PIPELINE SQL statement. This action allows you to update the transformations in the pipeline and maintains all of the checkpoint information about progress on the pipeline. When you execute the START PIPELINE SQL statement, the pipeline resumes from the last Kafka committed offsets. Then, the Loader Nodes deduplicate any record that has already been sent to the Loader Nodes but is not yet committed to Kafka.
When you execute the START PIPELINE SQL statement, the pipeline resumes from the last Kafka committed offsets. The Loader Nodes deduplicate any record that has already been sent to the Loader Nodes but is not yet committed to Kafka.
If you have never started a Kafka pipeline, the pipeline starts from the configured auto.offset.reset value. You can set this value in the CONFIG option of a Kafka pipeline. The Ocient System uses this value when a pipeline first starts to establish the consumer group for the pipeline.
For more details about Kafka configuration, see Kafka Source Options.
Ocient Loader Nodes deduplicate data that is loaded on the same pipeline. You can think of the deduplication scope of a pipeline as being tied to the life of the pipeline database object.
If you drop a pipeline and create a new one with the same name, this data is not deduplicated.
For a specified pipeline, the Ocient System deduplicates data automatically when you stop and resume a pipeline. The system ensures that no duplicate data is loaded for any reprocessed data due to managing watermarks with data sources such as Kafka or S3.
It is important to note that deduplication in Ocient is based on a shared contract between the data source and the Ocient System. Deduplication is not based on a primary key or record identifier that is part of the data itself. The What mistakes can accidentally lead to duplicating data? question explains the implications more clearly.
Sometimes, it is useful to load data multiple times for testing, but deduplication can get in the way.
To load the same data set twice in the same table, there are two simple approaches:
- Drop the pipeline from the first load, create the same pipeline, and run it to completion.
- Create a new pipeline with the same SQL statement as the original, but with a new name, and run it to completion.
In both cases, the Ocient System does not deduplicate the data and you can load a second copy of your data.
The Ocient System can have duplicate data when the deduplication contract between the source and the pipeline is not maintained.
Ocient uses a proprietary approach to deduplication that helps it deliver high throughput when loading while still ensuring exactly-once semantics. If the data in the source changes during the execution of a pipeline, the changes can lead to a mismatch in the way the Ocient System determines the deduplication identifier for a record.
Things to avoid include:
- Modifying a file in the source file system after it has been included in a pipeline, but the pipeline has not been completed.
- Deleting a file in the source that has been included in a pipeline, but the pipeline has not been completed.
You can add new files safely to a pipeline before it completes, but you should avoid modifying files already registered by the pipeline and appear in the sys.pipeline_files system catalog table.
If an entire file has failed, you can see this in the sys.pipeline_files system catalog table. Any file with the FAILED status indicates that the file failed to process. This failure can occur due to a file-level error such as corruption.
In addition, if a file has the SKIPPED status, this status indicates that the file was skipped when processing the pipeline. You can control this behavior using the FILE_ERROR setting in the START PIPELINE SQL statement.
If a specified file experiences record-level errors, but the Ocient System processes the file, the file has the LOADED_WITH_ERRORS status.
You can find the individual errors in the sys.pipeline_errors system catalog table.
You can determine if a pipeline encounters errors by viewing the information_schema.pipeline_status view that includes counts of records with errors.
The Ocient System automatically converts a string representation of an array into an array using the settings defined on your pipeline. The system uses the CLOSE_ARRAY, OPEN_ARRAY, and ARRAY_ELEMENT_DELIMITER settings to convert a string to a target array column type. These types default to a -style array (e.g., {1,2,3}).
For details, see the supported options in Extract Options.
Data pipelines do not yet support the mapping of arbitrary transformation functions over an array. However, the Ocient System supports casting and type conversion. You can apply an array cast to an array and the system applies the type of each element as a conversion to the elements of the array.
For details, see Scalar Transformation Functions and Casting.
You can change the character set of your pipeline using the CHARSET_NAME option in a pipeline. For details, see the supported options in Extract Options.
Ocient supports a JSON selector syntax similar to JSON Path. The Ocient System can extract individual keys as well as nested data. A simple example is $order.user.first_name, which extracts the first name first_name from the user object on the order object in JSON.
For details, see Supported JSON Selectors.
Ocient supports a JSON selector syntax similar to JSON Path. The Ocient System can extract individual keys, nested objects, arrays, and arrays of objects. The system can also project a selector into the arrays. A simple example is $data.line_items[].price, which extracts a one-dimensional array of prices price from the line_items array inside the data object.
For details, see Supported JSON Selectors.
Data Pipelines support the creation of user-defined functions that you can use to apply more advanced transformations. For details, see the CREATE PIPELINE FUNCTION SQL statement.
The Ocient System supports many automatic conversions to make it easy to load source data into your target tables. VARCHAR data automatically casts into most target column types. The system casts many data types automatically to a VARCHAR target column. Some other common automatic conversions, such as BIGINT to TIMESTAMP, are supported, especially where the conversion is lossless.
For details, see Data Types for Data Pipelines.
To cast to an array column, use a casting function. This function converts a string of data in the configured array format to the array type you specify, casting the elements to the inner type. For example, if $my_column represents the data '{123,456,789}' and your pipeline is using the default CLOSE_ARRAY, OPEN_ARRAY, and ARRAY_ELEMENT_DELIMITER settings, this expression snippet casts the elements to an array of INT values. Ocient also automatically casts $my_column in this example to an INT[] when that is the type of the target column.
Similar to casting to an array column, you can cast to a tuple column using tuple casting syntax. For example, if $my_column represents the data '(123,test)', and your pipeline is using the default CLOSE_TUPLE, OPEN_TUPLE, and TUPLE_ELEMENT_DELIMITER settings, this expression snippet casts the elements as a tuple of an integer and a string TUPLE<<int,varchar>>.
Load Data