Loading and Transformation Ref...

Pipeline Configuration

A pipeline configuration is a JSON file that fully describes the necessary elements to run a pipeline, including

  • Source - the source location from which a pipeline should read records to process
  • Extract - how a pipeline should extract data from the source
  • Transform - how a pipeline should transform incoming records
  • Sink - the destination where a pipeline should write transformed rows

The JSON file is a list of keys with (possibly nested) values. The available key-value pairs are documented in these sections.

Pipeline

Top level configuration for a pipeline.

Required keys:

The following is an example of the structure of a pipeline configuration.

JSON


Configuration

version

The pipeline’s version. Required value is 2.

Type:

int

Required:

Yes

Default:



pipeline_id

A unique identifier for this pipeline. Allowed characters are a-z, A-Z, 0-9, _, and -.

The pipeline_id is used to uniquely identify a pipeline. It is used for a few purposes:

Deduplication scope. See Understanding Deduplication.

  1. For
    
    loads, the consumer group.id is set to ocient-lat-[pipeline_id].

For most loads from File Sources, it is advisable to leave the pipeline_id unset when creating a pipeline using the LAT client. The client will assign the pipeline a random UUID.

Type:

string

Required:

No

Default:

LAT client randomly generated UUID.

workers

The number of workers this pipeline should use for processing records.

Type:

int

Required:

No

Default:

DEFAULT_NUM_WORKERS in service configuration

log_original_records

Add original records to the error log when errors occur. When this setting is set to true, LAT writes data extracted from the source to the error log and in some error messages. By default, this setting is false and source data is not written to the error log nor included in error messages.

In order to enable this setting, the LAT_ALLOW_LOG_ORIGINAL_RECORDS service configuration must also be enabled.

This configuration only affects pipelines that do not use an error_topic.

seek_on_rebalance

Whether to seek a newly assigned partition to the latest known durable record prior to resuming processing. Disabling this behavior should typically be reserved for test scenarios and is only supported for Kafka loading.

Type:

boolean

Required:

No

Default:

true

continue_on_unrecoverable_error

Whether to allow workers to continue processing when they encounter an ordinarily unrecoverable error.

Type:

boolean

Required:

No

Default:

false

single_file_mode

Enable the single file mode. This mode is designed for a specific use case where there are few files but each file size is large. When using this mode, only a single file is processed at a time, so Common File Source Configuration must be equal to 1 and only one Common File Source Configuration can be defined. The single file is processed in parallel by the number of workers defined by the pipeline workers setting. There is no need to enable this in common use cases.

Type:

boolean

Required:

No

Default:

false

A known limitation exists with the LAT metrics when you use the single file mode. Metrics returned from the lat_client pipeline status command might not display the expected count of files processed, processing, and so on. However, the load still displays the correct PROCESSING and COMPLETED statuses.

error_topic

A Kafka Topic to write records which cannot be processed. If absent, error records will be logged to the error log file without additional processing.

This configuration is only available if a Kafka Source is configured for the pipeline. The configuration for that source will apply to the Kafka Producer for this topic.

Type:

string

Required:

No

Default:

null

polling_duration

Maximum duration to block while polling for new records from a Source, in milliseconds.

Type:

int

Required:

No

Default:

1000

source

Source configuration section. See Source Configuration for nested configuration.

Type:

object

Required:

Yes

Default:



sink

Sink configuration section. See Sink Configuration for inline configuration details. sink cannot be set if sink_name is set. sink can be omitted if a default sink is defined as an External Sink Configuration.

Type:

object

Required:

No

Default:

null

sink_name

The name of an externally configured sink. sink_name cannot be set if sink is set. See External Sink Configuration. sink_name can be omitted if a default sink is defined as an External Sink Configuration.

Type:

string

Required:

No

Default:

null

extract

Extract configuration section. See Extract Configuration for nested configuration.

Type:

object

Required:

No

Default:

Defaults to JSON record type with default settings

transform

Transform configuration section. See Transform Configuration for nested configuration.

Type:

object

Required:

Yes

Default:



Related Links

Load Data