Load Data
Frequently Asked Questions for Data Pipelines
the questions featured in this topic cover information that you need most often when you start creating and running data pipelines running pipelines who can run a data pipeline? any database user with execute pipeline privileges can run a data pipeline on their database this privilege allows database administrators to distinguish the users who can create pipelines from those who can start them to learn more about access control for pipelines, see docid\ asr8r6xqiyofgaz5qnbiw do pipelines run automatically after creation? no, pipelines have a user controlled lifecycle that starts with creation after you create a data pipeline, you determine when to start the pipeline a pipeline either completes successfully or moves to a failed status if the {{ocient}} system reaches an error limit for a basic batch file load, you would first use the create pipeline sql statement and then the start pipeline statement after all data is successfully loaded, the pipeline status changes to completed how do you run multiple pipelines at the same time? multiple data pipelines can run concurrently in ocient to run more than one simultaneously, any user with privileges to start a pipeline can use the start pipeline sql statement execution of this statement begins the execution of the selected pipeline the ocient system uses loader nodes in parallel to process all of the pipelines that are running if you need to control the relative priority of one pipeline over another when running concurrently, use advanced pipeline settings to limit the number of cores allocated to a specified pipeline or isolate the pipeline to a particular set of loader nodes what happens if i start a pipeline that has been stopped? when you execute a start pipeline sql statement on a pipeline that has been stopped due to either hitting an error limit or due to user intervention, the load resumes from the last point in the pipeline hence, you can effectively pause a pipeline and resume from the last checkpoint in the pipeline this feature works for continuous pipelines such as {{kafka}} pipelines as well as batch pipelines common uses of stopping and resuming a pipeline are stopping to update the transformations in a continuous pipeline stopping to update the table schema and refresh a pipeline stopping to allow a higher priority workload to proceed stopping for maintenance it is important to exercise caution when resuming a pipeline to ensure that the files in the underlying data source have not changed modifying data in the source for an in process pipeline can result in unexpected results such as duplicate data file loading when is the file list determined for a batch file load? when you load a batch of files in a data pipeline, the ocient system determines the batch when the pipeline starts using the bucket, the prefix, and the filter parameters you specify in your pipeline this batch of files is frozen until the pipeline completes even if the batch pipeline stops and restarts, the ocient system keeps the list of files constant can i update the file list after starting a pipeline? you cannot manually modify the file list for a batch pipeline when a pipeline restarts, it finishes the files that were determined when the batch was first established to add files that arrive on the file system after a batch pipeline was first started, you can create a new batch pipeline that uses filters to load all files created after a specified timestamp what is the behavior if i start a pipeline that has been stopped? when you start a file based pipeline that has been stopped, the pipeline adds any new files to the pipeline as files with the pending status the pipeline then loads these files as execution proceeds the ocient system does not reload any files that the pipeline has already loaded it is important to exercise caution when resuming a pipeline to ensure that the files in the underlying data source have not changed when you modify data in the source for an in process pipeline, the ocient system can have unexpected results, such as duplicate data what options exist to maximize performance on s3 type file loads? the most common performance enhancement for {{aws}} s3 file loads is the prefix parameter in your pipeline this setting restricts the files in the bucket that your pipeline considers, speeding up listing operations you can also use other advanced options such as max concurrency , request depth , and request retries described in docid\ l8tdfpfzzvzeyabc2h7bq kafka loading how does a continuous kafka pipeline work? continuous pipelines for kafka stream records from kafka partitions and ensure exactly once semantics are delivered ocient assigns consumers to different loader nodes and commits offsets back on the assigned partitions as the source data loads into ocient this operation critically only occurs after data is durable in ocient, which means that the system replicates data in such a way that the loss of a configurable number of nodes would not lead to data loss all supported data formats are compatible with kafka pipelines when you run a continuous kafka pipeline, the pipeline is never complete so it remains in a running state you can observe progress using the standard system catalog tables and the kafka specific table sys pipeline partitions that captures individual partition assignments and lag when you stop and restart a kafka pipeline, it resumes from the last commit checkpoint and continues processing records the ocient system deduplicates any record that has already been sent to ocient but not committed to kafka in the loader nodes how do i update the transforms or the table schema in a continuous kafka pipeline? to update transforms in a kafka pipeline, first, stop the pipeline after the pipeline stops, you can execute a create or replace pipeline sql statement this action allows you to update the transformations in the pipeline and maintains all of the checkpoint information about progress on the pipeline when you execute the start pipeline sql statement, the pipeline resumes from the last kafka committed offsets then, the loader nodes deduplicate any record that has already been sent to the loader nodes but is not yet committed to kafka where does a kafka pipeline resume loading after a stop and then start of the pipeline? when you execute the start pipeline sql statement, the pipeline resumes from the last kafka committed offsets the loader nodes deduplicate any record that has already been sent to the loader nodes but is not yet committed to kafka if you have never started a kafka pipeline, the pipeline starts from the configured auto offset reset value you can set this value in the config option of a kafka pipeline the ocient system uses this value when a pipeline first starts to establish the consumer group for the pipeline for more details about kafka configuration, see docid\ l8tdfpfzzvzeyabc2h7bq data duplication and deduplication what records are deduplicated during loading? ocient loader nodes deduplicate data that is loaded on the same pipeline you can think of the deduplication scope of a pipeline as being tied to the life of the pipeline database object if you drop a pipeline and create a new one with the same name, this data is not deduplicated for a specified pipeline, the ocient system deduplicates data automatically when you stop and resume a pipeline the system ensures that no duplicate data is loaded for any reprocessed data due to managing watermarks with data sources such as kafka or s3 it is important to note that deduplication in ocient is based on a shared contract between the data source and the ocient system deduplication is not based on a primary key or record identifier that is part of the data itself the /#what mistakes can accidentally lead to duplicating data question explains the implications more clearly how can i load the same data set twice into the same table? sometimes, it is useful to load data multiple times for testing, but deduplication can get in the way to load the same data set twice in the same table, there are two simple approaches drop the pipeline from the first load, create the same pipeline, and run it to completion create a new pipeline with the same sql statement as the original, but with a new name, and run it to completion in both cases, the ocient system does not deduplicate the data and you can load a second copy of your data what mistakes can accidentally lead to duplicating data? the ocient system can have duplicate data when the deduplication contract between the source and the pipeline is not maintained ocient uses a proprietary approach to deduplication that helps it deliver high throughput when loading while still ensuring exactly once semantics if the data in the source changes during the execution of a pipeline, the changes can lead to a mismatch in the way the ocient system determines the deduplication identifier for a record things to avoid include modifying a file in the source file system after it has been included in a pipeline, but the pipeline has not been completed deleting a file in the source that has been included in a pipeline, but the pipeline has not been completed you can add new files safely to a pipeline before it completes, but you should avoid modifying files already registered by the pipeline and appear in the sys pipeline files system catalog table troubleshooting and errors how do i detect when an entire file is failed in a pipeline? if an entire file has failed, you can see this in the sys pipeline files system catalog table any file with the failed status indicates that the file failed to process this failure can occur due to a file level error such as corruption in addition, if a file has the skipped status, this status indicates that the file was skipped when processing the pipeline you can control this behavior using the file error setting in the start pipeline sql statement if a specified file experiences record level errors, but the ocient system processes the file, the file has the loaded with errors status how do i find the individual records that failed to load? you can find the individual errors in the sys pipeline errors system catalog table you can determine if a pipeline encounters errors by viewing the information schema pipeline status view that includes counts of records with errors transformations how can i convert a string representation of an array of data into an array data type? the ocient system automatically converts a string representation of an array into an array using the settings defined on your pipeline the system uses the close array , open array , and array element delimiter settings to convert a string to a target array column type these types default to a {{postgresql}} style array (e g , {1,2,3} ) for details, see the supported options in docid\ l8tdfpfzzvzeyabc2h7bq how do i apply a transformation function to an array of data? data pipelines do not yet support the mapping of arbitrary transformation functions over an array however, the ocient system supports casting and type conversion you can apply an array cast to an array and the system applies the type of each element as a conversion to the elements of the array for details, see docid\ aimcafoydn2xf fgqssys my data is in a non utf 8 encoding how do i get the load to decode my character set properly? you can change the character set of your pipeline using the charset name option in a pipeline for details, see the supported options in docid\ l8tdfpfzzvzeyabc2h7bq can a data pipeline extract nested data in json? ocient supports a json selector syntax similar to json path the ocient system can extract individual keys as well as nested data a simple example is $order user first name , which extracts the first name first name from the user object on the order object in json for details, see docid 1 jhzrhblgnqucl6skiaq can a data pipeline extract attributes of objects in an array in json? ocient supports a json selector syntax similar to json path the ocient system can extract individual keys, nested objects, arrays, and arrays of objects the system can also project a selector into the arrays a simple example is $data line items\[] price , which extracts a one dimensional array of prices price from the line items array inside the data object for details, see docid 1 jhzrhblgnqucl6skiaq how do i apply a user defined transformation to the data? data pipelines support the creation of user defined functions that you can use to apply more advanced transformations for details, see the docid\ l8tdfpfzzvzeyabc2h7bq sql statement what automatic conversions does ocient perform from my source data to the target columns? the ocient system supports many automatic conversions to make it easy to load source data into your target tables varchar data automatically casts into most target column types the system casts many data types automatically to a varchar target column some other common automatic conversions, such as bigint to timestamp , are supported, especially where the conversion is lossless for details, see docid 7s5nztl8lolpwt2pcnjyz how do i cast data to an array column? to cast to an array column, use a casting function this function converts a string of data in the configured array format to the array type you specify, casting the elements to the inner type for example, if $my column represents the data '{123,456,789}' and your pipeline is using the default close array , open array , and array element delimiter settings, this expression snippet casts the elements to an array of int values ocient also automatically casts $my column in this example to an int\[] when that is the type of the target column create pipeline select int\[]\($my column) as column array of ints; how do i cast data to a tuple column? similar to casting to an array column, you can cast to a tuple column using tuple casting syntax for example, if $my column represents the data '(123,test)' , and your pipeline is using the default close tuple , open tuple , and tuple element delimiter settings, this expression snippet casts the elements as a tuple of an integer and a string tuple<\<int,varchar>> create pipeline select tuple<\<int,varchar>>($my column) as column tuple; related links docid\ xq0tg7yph vn62uwufibu docid\ l8tdfpfzzvzeyabc2h7bq