LAT Pipeline Configuration
data pipelines are now the preferred method for loading data into the system for details, see load data docid\ lk7xyhhwzkwj32rx8p v2 a pipeline configuration is a json file that fully describes the necessary elements to run a pipeline, including source the source location from which a pipeline should read records to process extract how a pipeline should extract data from the source transform how a pipeline should transform incoming records sink the destination where a pipeline should write transformed rows the json file is a list of keys with (possibly nested) values the available key value pairs are documented in these sections pipeline top level configuration for a pipeline required keys version docid 1qrwr8fsxcddmku gqdi9 source docid 1qrwr8fsxcddmku gqdi9 extract docid 1qrwr8fsxcddmku gqdi9 transform docid 1qrwr8fsxcddmku gqdi9 the following is an example of the structure of a pipeline configuration { "version" 2, "source" { "type" "kafka" // kafka source configuration }, "sink" { "type" "ocient" // ocient sink configuration }, "extract" { // extract configuration }, "transform" { "topics" { "my topic" { "tables" { "my table" { "columns" { "my col1" "record field 1", "my col2" "record field 2" } } } } } } } configuration version the pipeline’s version required value is 2 type int required yes default pipeline id a unique identifier for this pipeline allowed characters are a z , a z , 0 9 , , and the pipeline id is used to uniquely identify a pipeline it is used for a few purposes deduplication scope see understanding deduplication docid\ lkkxmmze41qivfinetrpu for loads, the consumer group id is set to ocient lat \[pipeline id] for most loads from file sources, it is advisable to leave the pipeline id unset when creating a pipeline using the lat client the client will assign the pipeline a random uuid type string required no default lat client randomly generated uuid workers the number of workers this pipeline should use for processing records type int required no default default num workers in service configuration log original records add original records to the error log when errors occur when this setting is set to true , lat writes data extracted from the source to the error log and in some error messages by default, this setting is false and source data is not written to the error log nor included in error messages in order to enable this setting, the lat allow log original records docid\ jnh88ovwoc6zpye1j3oky service configuration must also be enabled this configuration only affects pipelines that do not use an error topic docid 1qrwr8fsxcddmku gqdi9 seek on rebalance whether to seek a newly assigned partition to the latest known durable record prior to resuming processing disabling this behavior should typically be reserved for test scenarios and is only supported for kafka loading type boolean required no default true continue on unrecoverable error whether to allow workers to continue processing when they encounter an ordinarily unrecoverable error type boolean required no default false single file mode enable the single file mode this mode is designed for a specific use case where there are few files but each file size is large when using this mode, only a single file is processed at a time, so common file source configuration docid 4wluw0a mbwdhgim36btt must be equal to 1 and only one common file source configuration docid 4wluw0a mbwdhgim36btt can be defined the single file is processed in parallel by the number of workers defined by the pipeline workers setting there is no need to enable this in common use cases type boolean required no default false a known limitation exists with the lat metrics when you use the single file mode metrics returned from the lat client pipeline status command might not display the expected count of files processed, processing, and so on however, the load still displays the correct processing and completed statuses error topic a kafka topic to write records which cannot be processed if absent, error records will be logged to the error log file without additional processing this configuration is only available if a kafka source is configured for the pipeline the configuration for that source will apply to the kafka producer for this topic type string required no default null polling duration maximum duration to block while polling for new records from a source, in milliseconds type int required no default 1000 source source configuration section see lat source configuration docid 4wluw0a mbwdhgim36btt for nested configuration type object required yes default sink sink configuration section see lat sink configuration docid\ g qjt5knws0fzr6cixl q for inline configuration details sink can not be set if sink name is set sink can be omitted if a default sink is defined as an external sink configuration docid\ g qjt5knws0fzr6cixl q type object required no default null sink name the name of an externally configured sink sink name can not be set if sink docid 1qrwr8fsxcddmku gqdi9 is set see external sink configuration docid\ g qjt5knws0fzr6cixl q sink name can be omitted if a default sink is defined as an external sink configuration docid\ g qjt5knws0fzr6cixl q type string required no default null extract extract configuration section see lat extract configuration docid\ sj76ad2ifqhegkehff9ux for nested configuration type object required no default defaults to json record type with default settings transform transform configuration section see lat transform configuration docid\ p9n5tcvf pyd gpoxxmat for nested configuration type object required yes default related links ingest data with legacy lat reference docid\ ojlxwqbiokfcar alnhuo