> ## Documentation Index
> Fetch the complete documentation index at: https://docs.ocient.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Data Pipelines

export const Unix = "Unix®";

export const TimeKey = "TimeKey®";

export const SQS = "Amazon® SQS";

export const Parquet = "Apache® Parquet™";

export const OcientDataIntelligencePlatform = "OcientAIQ™ Unified Data Platform";

export const Ocient = "Ocient®";

export const Kafka = "Apache® Kafka®";

export const JVM = "JVM®";

export const Java = "Java®";

export const HDFS = "Apache® Hadoop® Distributed File System (HDFS)";

export const Groovy = "Apache® Groovy™";

export const AWS = "Amazon® Web Services℠ (AWS℠)";

export const Avro = "Apache® Avro™";

export const Amazon = "Amazon®";

Data pipelines enable you to load data from your chosen source into the {OcientDataIntelligencePlatform}. You can preview a data pipeline using the `PREVIEW PIPELINE` SQL statement to test the creation statement and any required data transformation. Then, you can create a data pipeline using the `CREATE PIPELINE` statement. To manage the load, use the `START PIPELINE` statement to start the load and the `STOP PIPELINE` statement to stop it. You can evolve the schema of a data pipeline using the `ALTER PIPELINE` statement. You can also rename a data pipeline using the `ALTER PIPELINE RENAME` statement. To see the full definition of a created pipeline, use the `EXPORT PIPELINE` statement. When you finish with the load, you can use the `DROP PIPELINE` statement to remove the data pipeline. You can create user-defined data pipeline functions using the `CREATE PIPELINE FUNCTION` statement and remove the function using the `DROP PIPELINE FUNCTION` statement. Also, you can administer privileges for data pipelines and data pipeline functions. For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

## CREATE PIPELINE

`CREATE PIPELINE` defines a data pipeline that you can execute with the `START PIPELINE` SQL statement. Specify the type of load, data source, and data format.

You must have the `ALTER` privilege on the pipeline. For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

**Syntax**

```sql SQL theme={null}
CREATE [ OR REPLACE ] [ BATCH | CONTINUOUS | TRANSACTIONAL ] PIPELINE [ IF NOT EXISTS ] pipeline_name
    [ <advanced_pipeline_options> ]
    [ BAD_DATA_TARGET <kafka_bad_data_target> ]

    SOURCE ( <s3_source> | <filesystem_source> | <hdfs_source> | <kafka_source> )
        [ MONITOR ( <sqs_monitor> | <kafka_monitor> ) ]
    [ LOOKUP lookup_source <lookup_options> ]  [ ... ]

    EXTRACT
        ( <delimited_extract_options> | <asn1_extract_options> |
        <avro_extract_options> | <json_extract_options> |
        <parquet_extract_options> | <binary_extract_options> |
        <xml_extract_options> )
        [ <general_extract_options> ]

    [ INSERT ] INTO destination_table_name

    SELECT
        expression as col_alias_target,
        expression2 as col_alias_target2,
        ...
    [ WHERE filter_expression ]

    [ [ INSERT ] INTO destination_table_name_n
    SELECT
        expression_n as col_alias_target_n,
        expression2_n as col_alias_target2_n,
        ...
    [ WHERE filter_expression_n ] ] [ ... ]

/********************/
/* ADVANCED OPTIONS */
/********************/
advanced_pipeline_options ::=
    [ CORES processing_cores ]
    [ PARTITIONS file_partitions ]
    [ BATCH_SIZE number_of_rows ]
    [ RECORD_NUMBER_FORMAT record_number_format ]

/***************************/
/* BAD_DATA_TARGET OPTIONS */
/***************************/
kafka_bad_data_target ::=
    KAFKA
        BOOTSTRAP_SERVERS bootstrap_servers
        TOPIC topic_name
        [ CONFIG config_option ]

/******************/
/* SOURCE OPTIONS */
/******************/
s3_source ::=
    S3
        <file_based_source_options>
        BUCKET bucket_name
        ( FILTER | FILTER_GLOB | FILTER_REGEX | OBJECT_KEY ) specifiers
        [ REGION region ]
        [ ENDPOINT endpoint ]
        [ ENABLE_PATH_STYLE_ACCESS enable_path_style_access ]
        [ ACCESS_KEY_ID access_key_credentials ]
        [ SECRET_ACCESS_KEY secret_key_credentials ]
        [ SESSION_TOKEN session_token ]
        [ ROLE_ARN role_arn ]
        [ ASSUME_ROLE_CONFIG assume_role_config]
        [ MAX_CONCURRENCY parallel_connections ]
        [ READ_TIMEOUT num_seconds ]
        [ REQUEST_DEPTH num_requests ]
        [ REQUEST_RETRIES num_retries ]
        [ HEADERS headers ]

filesystem_source ::=
    FILESYSTEM
        <file_based_source_options>
        ( FILTER | FILTER_GLOB | FILTER_REGEX ) specifiers

hdfs_source ::=
    HDFS
        <file_based_source_options>
        ( FILTER | FILTER_GLOB | FILTER_REGEX ) specifiers
        ENDPOINT endpoint
        [ CONFIG hdfs_config ]

kafka_source ::=
    KAFKA
        BOOTSTRAP_SERVERS bootstrap_servers
        TOPIC topic_name
        [ WRITE_OFFSETS write_offsets ]
        [ CONFIG config_option ]
        [ AUTO_OFFSET_RESET ( 'latest' | 'earliest' )

file_based_source_options ::=
    [ PREFIX prefix ]
    [ COMPRESSION_METHOD 'gzip' ]
    [ SORT_BY ( 'filename' | 'created' | 'modified' )
        [ SORT_DIRECTION ( ASC | DESC ) ] ]
    [ SORT_REWRITE sort_rewrite ]
    [ START_FILENAME start_filename ]
    [ END_FILENAME end_filename ]
    [ START_CREATED_TIMESTAMP start_created_timestamp ]
    [ END_CREATED_TIMESTAMP end_created_timestamp ]
    [ START_MODIFIED_TIMESTAMP start_modified_timestamp ]
    [ END_MODIFIED_TIMESTAMP end_modified_timestamp ]

/******************/
/* MONITOR OPTIONS */
/******************/
sqs_monitor ::=
    SQS
        <general_monitor_options>
        QUEUE_URL queue_url
        [ REGION region ]
        [ ENDPOINT endpoint ]
        [ ACCESS_KEY_ID access_key_id ]
        [ SECRET_ACCESS_KEY secret_access_key ]

kafka_monitor ::=
    KAFKA
        <general_monitor_options>
        BOOTSTRAP_SERVERS bootstrap_servers
        TOPIC topic
        AUTO_OFFSET_RESET ( 'latest' | 'earliest' )
        [ GROUP_ID group_id ]
        [ MESSAGE {
            FILENAME file_selector
            TIMESTAMP timestamp_selector
            SIZE size_selector
        } ]
        [ CONSUMER {
            MAX_MESSAGES max_messages
            TIMEOUT consumer_timeout
        } ]

general_monitor_options ::=
    [ POLLING_INTERVAL polling_interval ]
    [ BATCH {
        MAX_FILES max_files
        TIMEOUT batch_timeout
        LOOKBACK lookback
     } ]

/*******************/
/* LOOKUP OPTIONS  */
/*******************/
lookup_options ::=
    CONNECTION_TYPE connection_type
    CONNECTION_STRING connection_string
    LOOKUP_SCHEMA lookup_schema
    LOOKUP_TABLE lookup_table
    [ REFRESH_PERIOD refresh_period ]
    [ CONFIG config_json ]

/*******************/
/* EXTRACT OPTIONS */
/*******************/
general_extract_options ::=
    [ CHARSET_NAME charset_name ]
    [ COLUMN_DEFAULT_IF_NULL column_default_if_null ]
    [ NULL_STRINGS null_strings ]
    [ TRIM_WHITESPACE trim_whitespace ]
    [ VALIDATE_CHARACTERS validate_characters ]
    [ REPLACE_INVALID_CHARACTERS replace_invalid_characters ]
    [ REPLACEMENT_CHARACTER replacement_character ]

delimited_extract_options ::=
    FORMAT ( 'delimited' | 'csv' )
        [ COMMENT_CHAR comment_char ]
        [ EMPTY_FIELD_AS_NULL empty_field_as_null ]
        [ ESCAPE_CHAR escape_char ]
        [ FIELD_DELIMITER field_delimiter ]
        [ FIELD_OPTIONALLY_ENCLOSED_BY enclosure_char ]
        [ HEADERS delimited_headers ]
        [ NUM_HEADER_LINES num_header_lines ]
        [ NUM_FOOTER_LINES num_footer_lines ]
        [ RECORD_DELIMITER record_delimiter ]
        [ SKIP_EMPTY_LINES skip_empty_lines ]
        [ OPEN_ARRAY open_array_char ]
        [ CLOSE_ARRAY close_array_char ]
        [ ARRAY_ELEMENT_DELIMITER array_delimiter ]
        [ OPEN_OBJECT open_object_char ]
        [ CLOSE_OBJECT close_object_char ]
        [ STRIP_ARRAY_ELEMENT_QUOTES strip_array_element_quotes ]
        [ STRIP_FIELD_QUOTES strip_field_quotes ]
        [ TRIM_ARRAY_ELEMENTS trim_array_elements]

asn1_extract_options ::=
    FORMAT 'asn.1'
        SCHEMA {
            URL url_asn1
            RECORD_TYPE record_type
        }

avro_extract_options ::=
    FORMAT 'avro'
        [ SCHEMA {
            [ INLINE inline_string ]
            [ INFER_FROM 'sample_file' ]
        } ]

binary_extract_options ::=
    FORMAT 'binary'
        RECORD_LENGTH length_in_bytes
        [ ENDIANNESS ( 'big' | 'little' ) ]
        [ AUTO_TRIM_PADDING auto_trim_padding ]
        [ PADDING_CHARACTER padding_character ]

json_extract_options ::=
    FORMAT 'json'

parquet_extract_options ::=
    FORMAT 'parquet'
        SCHEMA {
            INFER_FROM infer_from
        }

xml_extract_options ::=
    FORMAT 'xml'
```

### Pipeline Identity and Naming

The name of a data pipeline is unique in an {Ocient} System. Reference the pipeline in other SQL statements like `START PIPELINE`, `STOP PIPELINE`, and `DROP PIPELINE` using the name of the pipeline. The SQL statement throws an error if a pipeline with the same name already exists unless you specify the `IF NOT EXISTS` option. You can rename a pipeline with the `ALTER PIPELINE RENAME` SQL statement.

### Update a Pipeline

You can update a pipeline with the `OR REPLACE` clause in the `CREATE PIPELINE` SQL statement. Use this clause when you want to continue loading from the current place in a continuous load, but you need to modify transformations or other settings. If you specify the `OR REPLACE` clause and the pipeline already exists, the database replaces the original pipeline object with the options specified in the new `CREATE OR REPLACE PIPELINE` statement. When you replace an existing pipeline, the pipeline retains its current position in the source data so that data is not duplicated when the pipeline is resumed with a `START PIPELINE` SQL statement. First, you must stop a pipeline before executing the `CREATE OR REPLACE` SQL statement.

### Data Pipeline Modes

You can define pipelines in either batch `BATCH`, continuous `CONTINUOUS`, or transactional `TRANSACTIONAL` mode.

* File sources (e.g., `s3`, `filesystem`) support `BATCH` and `CONTINUOUS` modes. File-based loads default to `BATCH` mode if you do not specify this keyword.
* {Kafka} only supports `CONTINUOUS` mode. Loads with a Kafka source default to `CONTINUOUS` mode if you do not specify this keyword.
* When you execute the `START PIPELINE` SQL statement using a data pipeline in the `BATCH` mode, the system creates a static list of files with the `PENDING` status in the `sys.pipeline_files` system catalog table. With the `CONTINUOUS` mode, the monitor appends new incoming files to the list of files in the `sys.pipeline_files` system catalog table.
* With the `CONTINUOUS` mode, you can apply filters for the data pipeline. The system only uses files in the consumed messages if the filenames match the filters. For this mode, these options are invalid:
  * PREFIX
  * SORT\_BY
  * SORT\_DIRECTION
  * SORT\_REWRITE
  * START\_CREATED\_TIMESTAMP
  * END\_CREATED\_TIMESTAMP
  * START\_MODIFIED\_TIMESTAMP
  * END\_MODIFIED\_TIMESTAMP
* The `TRANSACTIONAL` mode enables the data pipeline to roll back the interim results of the load if an error occurs during load execution. The system makes all data visible only after the load completes successfully. This mode supports all options that the `BATCH` mode does.

### Required Options

You must define certain options in every pipeline. For these options, there is no default value. Optional options need to be present only if you use specific functions. A pipeline must contain the `SOURCE`, `EXTRACT FORMAT`, and `INTO table_name ... SELECT` statements.

* [SOURCE Options](#source-options)
* [EXTRACT Options](#extract-options)
* [Data Formats for Data Pipelines](/data-formats-for-data-pipelines)
* [Transform Data in Data Pipelines](/transform-data-in-data-pipelines)

These options depend on each other: `IF NOT EXISTS` and `OR REPLACE` SQL statements are mutually exclusive.

### SELECT Statement and Data Transformation

Use the `INTO table_name ... SELECT` SQL statement in the `CREATE PIPELINE` SQL statement to specify how to transform the data. The `SELECT` statement includes a set of one or more expressions and a target column name in the form `expression as column_name`. The `expression` part in the statement contains a source field reference (e.g., `$1` or `$my_field.subfield`) and, optionally, the transformation function you want to apply.

If your data is nested in arrays, you can use special transformation functions, such as the [EXPLODE\_OUTER](/special-data-pipeline-transformation-functions) function, to expand the data into individual rows.

For details about data transformation and supported transformation functions, see [Transform Data in Data Pipelines](/transform-data-in-data-pipelines).

For details about data types and casting, see [Data Types for Data Pipelines](/data-types-for-data-pipelines) and [Data Formats for Data Pipelines](/data-formats-for-data-pipelines).

You can also specify metadata values, such as filename, to load in the `SELECT` SQL statement. For details, see [Load Metadata and File-Based Partitioned Data in Data Pipelines](/load-metadata-and-file-based-partitioned-data-in-data-pipelines).

This is an example of a transformation statement snippet.

```sql SQL theme={null}
...
FORMAT json
INTO public.orders
SELECT
    TIMESTAMP(BIGINT($created_timestamp)) as ordertime,
    metadata('filename') as source_filename,
    $order_number as ordernumber,
    $customer.first_name as fname,
    LEFT($customer.middle_initial,1) as minitial,
    $customer.last_name as lname,
    $postal_code as postal_code,
    $promo_code as promo_code,
    $order_total as ordertotal,
    DECIMAL($tax,8,2) as tax,
    CHAR[]($line_items[].product_name) as product_names,
    CHAR[]($line_items[].sku) as skus
```

Optionally, you can filter the load by using the `WHERE` clause in the form `WHERE filter_expression`. This expression should evaluate to a Boolean value or NULL and can include more than one filter expression. The system loads rows that contain data matching the filter criteria when the expression evaluates to `true`. The system does not load the rows when the expression evaluates to `false` or NULL. You can include any transform in the `WHERE` clause as in the `SELECT` clause.

This is an example of a filter snippet that loads order data for customers whose last name starts with the letter `a`. Use the `COALESCE` function to return only non-NULL values.

```sql SQL theme={null}
...
FORMAT json
INTO public.orders
SELECT
    TIMESTAMP(BIGINT($created_timestamp)) as ordertime,
    metadata('filename') as source_filename,
    $order_number as ordernumber,
    $customer.first_name as fname,
    LEFT($customer.middle_initial,1) as minitial,
    $customer.last_name as lname,
    $postal_code as postal_code,
    $promo_code as promo_code,
    $order_total as ordertotal,
    DECIMAL($tax,8,2) as tax,
    CHAR[]($line_items[].product_name) as product_names,
    CHAR[]($line_items[].sku) as skus
WHERE
    STARTSWITH(COALESCE($customer.last_name, ''),'a')
```

#### NULL and Default Value Handling

The Ocient System has specific ways to handle NULL and default values in the data pipeline. Consider this information when you prepare data to load and write the `CREATE TABLE` and `CREATE PIPELINE` SQL statements.

To load NULL values, insert the NULL value in the data. In this case, if you specify `NOT NULL` for the target column in the `CREATE TABLE` statement, the data pipeline fails to load.

If you omit a column in the `SELECT` SQL statement, the data pipeline loads the default value for the column. If you do not specify the default value, the pipeline loads a NULL value. If you specify `NOT NULL` for the target column, the data pipeline also fails to load.

You can use the `DEFAULT` keyword to load a default value. In this case, if the column does not have a default value, the pipeline fails to load. If the pipeline loads a NULL into a column with a specified default value, you can use `COALESCE(<value>, DEFAULT)` to insert the default value instead of the NULL value, where `<value>` is the NULL column. You can modify the load of one column at a time in this way.

This table describes the data pipeline behavior for NULL or omitted column values.

| **Value in the Data Pipeline**               | **Nullable Target Column**            | **Default Value in Target Column** | **Resulting Data Pipeline Behavior**                                                                                                                               |
| -------------------------------------------- | ------------------------------------- | ---------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| NULL                                         | No                                    | Value might or might not be set.   | The pipeline uses the default value if the default exists and you specify the `COLUMN_DEFAULT_IF_NULL` option.  <br />Otherwise, the pipeline fails.               |
| NULL                                         | Yes                                   | Value might or might not be set.   | The pipeline uses the default value if the default exists and you specify the `COLUMN_DEFAULT_IF_NULL` option.  <br />Otherwise, the pipeline uses the NULL value. |
| Omitted column in the `SELECT` SQL statement | NULL value might or might not be set. | Yes                                | The pipeline uses the default value.                                                                                                                               |
| Omitted column in the `SELECT` SQL statement | No                                    | No                                 | The pipeline fails.                                                                                                                                                |
| Omitted column in the `SELECT` SQL statement | Yes                                   | No                                 | The pipeline uses the NULL value.                                                                                                                                  |

### Required Privileges

You must have the `CREATE PIPELINE` privilege on the underlying database and the `VIEW` privilege on each table in the pipeline definition to execute the `CREATE PIPELINE` SQL statement. The table must already exist.

See the `START PIPELINE` SQL statement for the required privileges to execute a pipeline.

For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

### Examples

#### Load JSON Data from Kafka

This example loads JSON data from Kafka using the `CREATE PIPELINE` SQL statement. Use the bootstrap server `192.168.0.1:9092` using the Kafka topic `orders`. Load data into the `public.orders` table. Specify the data to load as these JSON selectors:

* Identifier `$id`
* User identifier `$user_id`
* Product identifier `$product_id`
* Subtotal amount `$subtotal`
* Tax `$tax`
* Total amount `$total`
* Discount amount `$discount`
* Created time `$created_at`
* Quantity `$quantity`

```sql SQL theme={null}
CREATE PIPELINE orders_pipeline
    SOURCE KAFKA
        BOOTSTRAP_SERVERS '192.168.0.1:9092'
        TOPIC 'orders'
    EXTRACT
        FORMAT json
INTO public.orders
SELECT
    $id as id,
    $user_id as user_id,
    $product_id as product_id,
    $subtotal as subtotal,
    $tax as tax,
    $total as total,
    $discount as discount,
    $created_at as created_at,
    $quantity as quantity;
```

For a complete tutorial, see [Data Pipeline Load of JSON Data from Kafka](/data-pipeline-load-of-json-data-from-kafka).

#### Load Delimited Data from S3

This example loads delimited data in CSV format from {AWS} S3. Use the [`https://s3.us-east-1.amazonaws.com`](https://s3.us-east-1.amazonaws.com) endpoint with the `ocient-docs` bucket and path `metabase_samples/csv/orders.csv`. Denote path-style access using the `ENABLE_PATH_STYLE_ACCESS` option set to `true`. Specify one header line with the `NUM_HEADER_LINES` option. Load data into the `public.orders` table. Specify the data to load using the column numbers:

* Idenfier
* User identifier
* Product identifier
* Subtotal amount
* Tax
* Total amount
* Discount amount
* Created time
* Quantity

```sql SQL theme={null}
CREATE PIPELINE orders_pipeline
    SOURCE S3
        ENDPOINT 'https://s3.us-east-1.amazonaws.com'
        BUCKET 'ocient-docs'
        FILTER 'metabase_samples/csv/orders.csv'
        ENABLE_PATH_STYLE_ACCESS true
    EXTRACT
        FORMAT csv
        NUM_HEADER_LINES 1
INTO public.orders
SELECT
    $1 as id,
    $2 as user_id,
    $3 as product_id,
    $4 as subtotal,
    $5 as tax,
    $6 as total,
    $7 as discount,
    $8 as created_at,
    $9 as quantity;
```

For a complete tutorial, see [Data Pipeline Load of CSV Data from S3](/data-pipeline-load-of-csv-data-from-s3).

#### Continuous File Load of Delimited Data in CSV Files

Create a data pipeline that uses continuous file loading with delimited data in CSV files. Specify to use 32 partitions and 16 cores with the `PARTITIONS` and `CORES` options, respectively. The source is S3 with the `http://endpoint.ocient.com` endpoint and `cs_data` bucket. Specify the filter `'*.csv'` to find all files with a filename that matches a glob pattern without subdirectories, for example, `data.csv`. The system filters filenames with subdirectories such as `data/data_sample.csv`.

For continuous file loading, specify the `MONITOR` option to use the Kafka monitor. Specify the `test-broker:9092` bootstrap server, `cfl_kafka_ten_adtech_flat_small` Kafka topic, and reset the offset to the smallest offset using the `earliest` value of the `AUTO_OFFSET_RESET` option. Set the client group identifier to `84079bf1-bdc4-4b10-ba12-41ba6b17dffe`.

The format is `csv` with the record delimiter as the newline character `\n`. Load data in the `public.ad_sessions` table.

The `SELECT` statement identifies the columns to load by number. There are 39 fields in the CSV data. For each column, transform each column using cast functions. For details, see [Scalar Data Conversion Functions](/scalar-data-conversion-functions) for each function. See [Date and Time Functions](/date-and-time-functions) for the `TO_TIMESTAMP` function.

```sql SQL theme={null}
CREATE CONTINUOUS PIPELINE test_continuous_pipeline
    PARTITIONS 32
    CORES 16
    SOURCE S3
        ENDPOINT 'http://endpoint.ocient.com/'
        BUCKET 'cs_data'
        FILTER '*.csv'
        MONITOR kafka
            BOOTSTRAP_SERVERS 'test-broker:9092'
            TOPIC 'cfl_kafka_ten_adtech_flat_small'
            AUTO_OFFSET_RESET 'earliest'
            GROUP_ID '84079bf1-bdc4-4b10-ba12-41ba6b17dffe'
    EXTRACT
        FORMAT csv
        RECORD_DELIMITER '\n'
    INTO public.ad_sessions
SELECT
    TO_TIMESTAMP(CHAR($1), 'yyyy-MM-dd HH:mm:ss.SSSSSS', 'java') AS event_date_time,
    CHAR($2)                    AS device_model,
    TINYINT($8)                 AS device_user_age,
    BOOLEAN($10)                AS device_ad_tracking_disabled,
    BINARY($11)                 AS device_mac,
    INT($14)                    AS ip_zip,
    FLOAT($19)                  AS ip_zip_latitude,
    DOUBLE($20)                 AS ip_zip_longitude,
    BIGINT($21)                 AS session_id,
    SMALLINT($32)               AS session_response_latency,
    DECIMAL($34, 10, 1)         AS session_transaction_revenue,
    CHAR($39)                   AS session_app_name
;
```

#### Transactional Load of Delimited Data in CSV Files

Create a data pipeline that uses transactional file loading with delimited data in CSV files. The source is S3 with the `http://endpoint.ocient.com` endpoint and `cs_data` bucket. Specify the filter `'*.csv'` to find all files with a filename that matches a glob pattern without subdirectories, for example, `data.csv`. The system filters filenames with subdirectories such as `data/data_sample.csv`.

The format is `csv`. Load data in the `public.ad_sessions` table.

The `SELECT` statement identifies the columns to load by number. For each column, transform each column using cast functions. For details, see [Scalar Data Conversion Functions](/scalar-data-conversion-functions) for each function. See [Date and Time Functions](/date-and-time-functions) for the `TO_TIMESTAMP` function.

```sql SQL theme={null}
CREATE TRANSACTIONAL PIPELINE test_continuous_pipeline
    SOURCE S3
        ENDPOINT 'http://endpoint.ocient.com/'
        BUCKET 'cs_data'
        FILTER '*.csv'
    EXTRACT
        FORMAT csv
    INTO public.ad_sessions
SELECT
    TO_TIMESTAMP(CHAR($1), 'yyyy-MM-dd HH:mm:ss.SSSSSS', 'java') AS event_date_time,
    CHAR($2)                    AS device_model,
    TINYINT($8)                 AS device_user_age,
    BOOLEAN($10)                AS device_ad_tracking_disabled,
    BINARY($11)                 AS device_mac,
    INT($14)                    AS ip_zip,
    FLOAT($19)                  AS ip_zip_latitude,
    DOUBLE($20)                 AS ip_zip_longitude,
    BIGINT($21)                 AS session_id,
    SMALLINT($32)               AS session_response_latency,
    DECIMAL($34, 10, 1)         AS session_transaction_revenue,
    CHAR($39)                   AS session_app_name
;
```

### SOURCE Options

#### File-Based Source Options

Options that apply to both the `S3`, `FILESYSTEM`, and `HDFS` sources.

| **Option Key**                          | **Default** | **Data Type**                            | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| --------------------------------------- | ----------- | ---------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| FILTER \| FILTER\_GLOB \| FILTER\_REGEX | None        | STRING                                   | The expression for filtering files in the directory or child directories to load. If you do not specify the `PREFIX` option, this pattern applies to the full path to the file, except for the Bucket for S3-compatible file sources. If you specify the `PREFIX` option, this pattern applies to the file path portion after the specified prefix value.  <br />  <br />Paths include a leading forward slash.  <br />  <br />For `FILESYSTEM` sources, this option is required.  <br />  <br />For `S3` sources, one of the `FILTER`, `FILTER_GLOB`, `FILTER_REGEX`, or `OBJECT_KEY` keys is required.  <br />  <br />Specify one of these options:  <br />\* `FILTER_GLOB` or `FILTER` — Regular {Unix} filename patterns. Supported options include:  <br />- `*` indicates unlimited wildcard characters except for the path character, `/`.  <br />- `**` indicates wildcard characters, including path separators.  <br />- `?` indicates a single wildcard character.  <br />\* `FILTER_REGEX` — Regular expression patterns to filter files. Some common patterns include:  <br />- `.` matches any character.  <br />- `*` matches any 0 or more of the preceding character.  <br />- `+` matches a single character.  <br />- `[135]` matches any one character in the set.  <br />- `[1-5]` matches any one character in the range.  <br />- `(a\|b)` matches a or b.  <br />ℹ️ When you use the FILTER or FILTER\_GLOB options with a continuous data pipeline, the system supports only basic globbing, and extended, range globbing is not supported.  <br />  <br />**FILTER\_GLOB Examples:**  <br />List all CSV files in subdirectories named `2024` that are in any subdirectory of the `trades` directory, which is in the root of the bucket.  <br />`FILTER_GLOB = '/trades/*/2024/*.csv'`  <br />List all CSV files in the bucket in all subdirectories.  <br />`FILTER_GLOB = '**/*.csv'`  <br />  <br />**FILTER\_REGEX Examples:**  <br />List all `json.gz` files in the bucket that contain the name `orders` anywhere in the path.  <br />`FILTER_REGEX = '.*orders.*\.json\.gz'`  <br />List all files in the bucket in the root path `metrics` or `values` where the date string on the filename is 2000 to 2004.  <br />`FILTER_REGEX = '/(metrics\|values)/.*200[0-4].*'` |
| PREFIX                                  | `NULL`      | STRING  <br />or  <br />ARRAY OF STRINGS | Optional.  <br />  <br />Specify a prefix within which to apply the filter. When you specify a list of prefixes, the system applies the filter to each one, and the results are unioned together.  <br />  <br />Prefixes must be paths to directories, meaning they end with a forward slash. For `FILESYSTEM` sources, paths should be absolute, meaning they begin with a forward slash.  <br />  <br />For `S3` sources, use prefixes to reduce the search space when listing objects within the specified `BUCKET`. For maximum performance, use the `PREFIX` option whenever possible, especially when combined with the `FILTER_REGEX` option.  <br />  <br />ℹ️This option is not available for continuous data pipelines.  <br />  <br />**Examples:**  <br />List all CSV files in the `2024` subdirectory of `files`.  <br />`PREFIX '/files/2024/' FILTER '**/*.csv'`  <br />List all JSON files of subdirectory `2024/09/` in the `orders` directory.  <br />`PREFIX '/data/orders/2024/09/' FILTER_REGEX '.*/orders/.*json'`  <br />List all JSON files in multiple subdirectories.  <br />`PREFIX ['/data/orders/2024/09/', '/data/orders/2024/10/']`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| COMPRESSION\_METHOD                     | `NULL`      | STRING                                   | Optional.  <br />  <br />The method the Ocient System uses to decompress file data. The only supported option is `gzip`.  <br />To load uncompressed data, omit the COMPRESSION\_METHOD option.  <br />  <br />ℹ️The COMPRESSION\_METHOD option is only applicable to file-based sources. Kafka pipelines automatically decompress data based on the Kafka topic configuration.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| SORT\_BY                                | `filename`  | STRING                                   | Optional.  <br />  <br />The sort criteria for sorting the file list before the load. Supported options are:  <br />\* `filename` — Sort files lexicographically by the filename.  <br />\* `created` — Sort files based on the file created timestamp.  <br />\* `modified` — Sort files based on the file modified timestamp.  <br />Sorting files such that the Ocient System sorts the data within the files according to the {TimeKey} column in the target table can lead to better query performance.  <br />  <br />ℹ️This option is not available for continuous data pipelines.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| SORT\_DIRECTION                         | `ASC`       | STRING                                   | Optional.  <br />  <br />The sort direction, either `ASC` ascending or `DESC` descending, determines the sort order. If you specify this option, you must also specify the SORT\_BY option.  <br />  <br />ℹ️This option is not available for continuous data pipelines.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| SORT\_REWRITE                           | `NULL`      | STRING                                   | Optional.  <br />  <br />This option is a sort comparator, allowing files to be renamed during the sort operation using capture groups from the filter regular expression. If you specify this option, you must also specify the `FILTER_REGEX` option.  <br />  <br />If you do not specify this option, the system uses the sorting specified by the SORT\_BY option.  <br />  <br />ℹ️This option is not available for continuous data pipelines.  <br />  <br />Example:  <br />  <br />`FILTER_REGEX '(\d+)_(\d+)_(\d+)-order.json'`  <br />`SORT_REWRITE '\3-\1-\2.json'`  <br />This specification renames the file from `5_6_1999-orders.json` to `1999_5_6.json` by switching the order of the date fields.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| START\_FILENAME                         | `NULL`      | STRING                                   | Optional.  <br />  <br />The filename string that is the lower bound to filter for files lexicographically for batch pipelines. (inclusive)  <br />Use the full path of the file, such as `'/dir/load_file.json'`. If the file is located in the top-most directory, start the path with a slash `/`, such as `'/load_file.json'`.  <br />You can use this option without the END\_FILENAME option.  <br />If you specify the END\_FILENAME option, the value of the START\_FILENAME option must be smaller than the value for the END\_FILENAME option lexicographically.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| END\_FILENAME                           | `NULL`      | STRING                                   | Optional.  <br />  <br />The filename string that is the upper bound to filter for files lexicographically for batch pipelines. (inclusive)  <br />Use the full path of the file, such as `'/dir/load_file.json'`. If the file is located in the top-most directory, start the path with a slash `/`, such as `'/load_file.json'`.  <br />You can use this option without the START\_FILENAME option.  <br />If you specify the END\_FILENAME option, the value of the START\_FILENAME option must be smaller than the value for the END\_FILENAME option lexicographically.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| START\_CREATED\_TIMESTAMP               | `NULL`      | TIMESTAMP-formatted STRING               | Optional.  <br />  <br />The ISO-8601-compliant date or date time that is used as the lower bound (inclusive) to filter files by created timestamp for batch pipelines. The time zone should match the file metadata.  <br />  <br />If you specify both, the value for the START\_CREATED\_TIMESTAMP option must be before the value for the END\_CREATED\_TIMESTAMP option.  <br />  <br />ℹ️This option is not available for continuous data pipelines.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| END\_CREATED\_TIMESTAMP                 | `NULL`      | TIMESTAMP-formatted STRING               | Optional.  <br />  <br />The ISO-8601-compliant date or date time that is used as the lower bound (inclusive) to filter files by created timestamp for batch pipelines. The time zone should match the file metadata.  <br />  <br />If you specify both, the value for the START\_CREATED\_TIMESTAMP option must be before the value for the END\_CREATED\_TIMESTAMP option.  <br />  <br />ℹ️This option is not available for continuous data pipelines.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| START\_MODIFIED\_TIMESTAMP              | `NULL`      | TIMESTAMP-formatted STRING               | Optional.  <br />  <br />The ISO-8601-compliant date or date time that is used as the lower bound (inclusive) to filter files by modified timestamp for batch pipelines. The time zone should match the file metadata.  <br />  <br />If you specify both, the value for the START\_MODIFIED\_TIMESTAMP option must be before the value for the END\_MODIFIED\_TIMESTAMP option.  <br />  <br />ℹ️This option is not available for continuous data pipelines.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| END\_MODIFIED\_TIMESTAMP                | `NULL`      | TIMESTAMP-formatted STRING               | Optional.  <br />  <br />The ISO-8601-compliant date or date time that is used as the lower bound (inclusive) to filter files by modified timestamp for batch pipelines. The time zone should match the file metadata.  <br />  <br />If you specify both, the value for the START\_MODIFIED\_TIMESTAMP option must be before the value for the END\_MODIFIED\_TIMESTAMP option.  <br />  <br />ℹ️This option is not available for continuous data pipelines.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |

#### S3 Source Options

You can apply these options to data sources of the `SOURCE S3` type, which include S3 and S3-compatible services.

| **Option Key**              | **Default**           | **Data Type**                            | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| --------------------------- | --------------------- | ---------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| BUCKET                      | None                  | STRING                                   | The name of the bucket in AWS S3.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| OBJECT\_KEY                 | `NULL`                | STRING  <br />or  <br />ARRAY OF STRINGS | Optional.  <br />  <br />Specify an S3 object key(s) to load. You cannot use this option if you specify `PREFIX`, `FILTER`, `FILTER_GLOB`, or `FILTER_REGEX` keys are specified. If you do not specify one of the `FILTER`, `FILTER_GLOB`, or `FILTER_REGEX` keys, this option is required.  <br />  <br />Object keys do not include a leading forward slash. Specifying individual objects can be faster than filtering, since the Ocient System avoids listing all the files in a large directory.  <br />  <br />Object keys must not include any of the special characters: `*?{}[]`  <br />  <br />ℹ️This option is not available for continuous data pipelines.  <br />  <br />**Examples:**  <br />Load a single object from the designated bucket at the specified object key.  <br />`OBJECT_KEY 'order_data/jsonl/orders_20251101.jsonl'`  <br />Load a list of objects from the designated bucket at the specified object keys.  <br />`OBJECT_KEY ['order_data/jsonl/orders_20251101.jsonl', 'order_data/jsonl/orders_20251201.jsonl']` |
| ACCESS\_KEY\_ID             | `''`                  | STRING                                   | Optional.  <br />  <br />The access key identification for AWS credentials. If you specify this option, you must also specify the `SECRET_ACCESS_KEY` option. The Ocient System uses anonymous credentials when you specify an empty value for this option.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| SECRET\_ACCESS\_KEY         | `''`                  | STRING                                   | Optional.  <br />  <br />The secret key for AWS credentials. If you specify this option, you must also specify the ACCESS\_KEY\_ID option. The Ocient System uses anonymous credentials when you specify an empty value for this option.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| SESSION\_TOKEN              | `NULL`                | STRING                                   | Optional.  <br />  <br />Temporary credentials can be made with a combination with the existing access key identifier and secret key, and an additional session token. You must specify the ACCESS\_KEY\_ID and SECRET\_ACCESS\_KEY options when you use this option.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| ROLE\_ARN                   | `NULL`                | STRING                                   | Optional.  <br />  <br />The {Amazon} Resource Name (ARN) of the role to retrieve temporary credentials. The STS client used to retrieve the temporary credentials specifies the [region](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-region-selection.html#automatically-determine-the-aws-region-from-the-environment) loaded from `DefaultAwsRegionProviderChain` and [credentials](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html) loaded from `DefaultCredentialsProvider`. You cannot use this option with the ACCESS\_KEY\_ID and SECRET\_ACCESS\_KEY options.                                                                                                                                                                                                                                                                                                                                                                                                                          |
| ASSUME\_ROLE\_CONFIG        | `{}`                  | STRING                                   | Optional.  <br />  <br />Additional configuration used to configure the `StsAssumeRoleCredentialsProvider` or `AssumeRoleRequest` used to retrieve and refresh temporary credentials with the ROLE\_ARN option. This option is a JSON-formatted string that you can only specify at the same time as the ROLE\_ARN option. The supported configuration options are: `"asyncCredentialUpdateEnabled"`, `"prefetchTimeSeconds"`, `"staleTimeSeconds"` for `StsAssumeRoleCredentialsProvider` and `"durationSeconds"`, `"externalId"`, `"policy"`, `"roleSessionName"`, `"serialNumber"`, `"tokenCode"` for `AssumeRoleRequest`.                                                                                                                                                                                                                                                                                                                                                                                                                        |
| REGION                      | `'us-east-1'`         | STRING                                   | Optional.  <br />  <br />The region that the Ocient System uses for AWS access.  <br />If you specify the `ENDPOINT` option, the system ignores this option.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| ENDPOINT                    | `NULL`                | STRING                                   | Optional.  <br />  <br />The endpoint URI for the S3-compatible service API. (e.g., `https://s3.us-east-2.amazonaws.com`)  <br />  <br />When unspecified, this option defaults to `https://s3.REGION.amazonaws.com`.  <br />  <br />If you provide this option, the Ocient System ignores the `REGION` option.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| ENABLE\_PATH\_STYLE\_ACCESS | `NULL`                | BOOLEAN                                  | Optional.  <br />  <br />Whether to use path-style access, where the path includes the bucket name in the URL. For example, `https://s3.us-east-1.amazonaws.com/bucket-name/key-name`. The `false` value denotes virtual-hosted style access, where the URL contains the bucket name as part of the domain name. For example, `https://bucket-name.s3.us-east-1.amazonaws.com/key-name`.  <br />  <br />When unspecified, this option defaults to `true` if the `ENDPOINT` option is specified and `false` otherwise.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| HEADERS                     | `NULL`                | STRING                                   | Optional.  <br />  <br />The headers to send with every request. This option is a JSON-formatted string. Represent the chosen header names as keys with corresponding values as scalars or lists of scalars. The system converts scalars that are not strings to strings. During the load, the system maps each element in a list to the header name represented by the corresponding key.  <br />  <br />Examples:  <br />  <br />`HEADERS '{"x-amz-request-payer": "requester"}'` returns this header for each request: `x-amz-request-payer: requester`.  <br />  <br />`HEADERS '{"header-name": ["list", "of", "values"]}'` returns this header for each request: `header-name: list, of, values`.                                                                                                                                                                                                                                                                                                                                              |
| MAX\_CONCURRENCY            | 50                    | INTEGER                                  | Optional.  <br />  <br />Determines the number of parallel connections the Ocient System uses to communicate with the AWS S3 service.  <br />⚠️ This option does not require modification in most cases. Contact Ocient Support to modify these values.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| READ\_TIMEOUT               | 0 (unlimited timeout) | TIME INTERVAL                            | Optional.  <br />  <br />The time until a read operation times out.  <br />⚠️ This option does not require modification in most cases. Contact Ocient Support to modify these values.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| REQUEST\_DEPTH              | 500                   | INTEGER                                  | Optional.  <br />  <br />The upper boundary of requests that the Ocient System handles concurrently.  <br />⚠️ This option does not require modification in most cases. Contact Ocient Support to modify these values.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| REQUEST\_RETRIES            | 10                    | INTEGER                                  | Optional.  <br />  <br />Number of times the AWS SDK retries failing requests before the Ocient System throws an error.  <br />⚠️ This option does not require modification in most cases. Contact Ocient Support to modify these values.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |

**S3 Credentials Hierarchy**

The S3 source configuration supports this hierarchy to obtain S3 credentials:

* Level 1: Data pipeline configuration (set by using the `ACCESS_KEY_ID` and `SECRET_ACCESS_KEY` options)
* Level 2: [AWS SDK Default Credential Provider Chain ](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html)(set by using the instructions provided in the web page)
* Level 3: Anonymous access (set by default)

If the Ocient System does not obtain the credentials at a level, the system tries a lower level.

<Info>
  You can choose the level to store the credentials, where Level 1 is the highest. Higher levels take precedence for credential storage.
</Info>

#### FILESYSTEM Source Options

No options exist specific to the file system source (`SOURCE FILESYSTEM`) except for general [file-based source options](#file-based-source-options).

<Info>
  When you load data using `SOURCE FILESYSTEM`, the files must be addressable from all of your Loader Nodes. The Ocient System uses the specified path in the pipeline PREFIX and FILTER options to select the files to load.

  A shared view of the files you want to load must be available to all Loader Nodes involved in a pipeline. For example, you can use a Network File System (NFS) mount available to all Loader Nodes at a common path on each node.
</Info>

**Example**

This example `CREATE PIPELINE` SQL statement snippet contains a `FILESYSTEM` source and filters to all CSV files in the `/tmp/sample-data/` directory on each of the Loader Nodes.

```sql SQL theme={null}
CREATE PIPELINE ...
    SOURCE FILESYSTEM
        PREFIX '/tmp/sample-data/'
        FILTER '**/*.csv'
    EXTRACT
        FORMAT delimited
INTO public.orders
SELECT
    $1 as username,
    $2 as subtotal,
    ...
```

#### HDFS Source Options

You can specify these options for {HDFS} data sources (`SOURCE HDFS`).

For advanced options, see [Data Pipeline Load of JSON Data from HDFS](/data-pipeline-load-of-json-data-from-hdfs).

| **Option Key** | **Default** | **Data Type**         | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| -------------- | ----------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| ENDPOINT       | None        | STRING                | Required.  <br />  <br />The hostname and port number of the namenode server. For example, `'my-hdfs-server:1234'`.  <br />  <br />The namenode server manages the file system and regulates access to data.                                                                                                                                                                                                                                                                                                                               |
| CONFIG         | None        | JSON-formatted STRING | Optional.  <br />  <br />The JSON string that contains  <br />HDFS [configuration properties](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml)  <br />to use during the load. The properties are primarily for authentication. You can also use these properties to configure advanced properties such as connection timeouts, parallelism, and network chunk sizes.  <br />  <br />**Example:**  <br />`CONFIG '{"dfs.client.use.datanode.hostname": true, "dfs.client.socket-timeout": 300000}'` |

#### KAFKA Source Options

You can apply these options to Kafka data sources (`SOURCE KAFKA`).

<Info>
  For compression, you do not need to specify a compression option in Kafka-based pipelines, because the Ocient System handles the compression type automatically. Records produced to the Kafka broker with a `compression.type` setting or with the `compression.type` set on the topic automatically decompress when the loading process consumes the records. The loading process uses built-in headers in Kafka to determine the required decompression during extraction.
</Info>

| **Option Key**      | **Default**                                                                        | **Data Type**         | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| ------------------- | ---------------------------------------------------------------------------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| BOOTSTRAP\_SERVERS  | None                                                                               | STRING                | A comma-delimited list of `IP:port` pairs that contain the IP addresses and the associated port numbers of the Kafka Brokers. You can also use a hostname instead of the IP address.  <br />**Example:**  <br />`BOOTSTRAP_SERVERS = '198.51.100.1:9092,198.51.100.2:9092'`                                                                                                                                                                                                                                                                                                                                         |
| TOPIC               | None                                                                               | STRING                | The name of the Kafka topic indicates where to consume records.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| WRITE\_OFFSETS      | `true`                                                                             | BOOLEAN               | Optional.  <br />  <br />Indicates whether the Kafka consumer should write its durably-made record offsets to the Kafka Broker.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| CONFIG              | `'{`  <br />`"group.id": "<systemId>__<databaseName>__<pipelineName>"`  <br />`}'` | JSON-formatted STRING | Optional.  <br />  <br />The [consumer configuration](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) that the Kafka consumers should use.  <br />  <br />Frequently specified configurations include `group.id` and `security.protocol`.  <br />  <br />These configurations are fixed and cannot be overridden:  <br />`{`  <br />`"enable.auto.commit": false,`  <br />`"key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",`  <br />`"value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"`  <br />`}` |
| AUTO\_OFFSET\_RESET | `'latest'`                                                                         | STRING                | Optional.  <br />  <br />Determine which action to take for the Kafka configuration when there is no initial offset in the offset store or the specified offset is out of range.  <br />  <br />Supported values are:  <br />  <br />`'earliest'` — Automatically reset the offset value to the smallest value.  <br />  <br />`'latest'` — Automatically reset the offset value to the largest value.                                                                                                                                                                                                              |
| GROUP\_ID           | `'<systemId>__<databaseName>__<pipelineName>'`                                     | STRING                | Optional.  <br />  <br />Client group identifier for the Kafka source configuration.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |

<Info>
  For the consumer configuration, to create a secure connection from the Kafka consumer to a Kafka Broker, set the `"security.protocol"` key along with any SSL or SASL keys.

  If a certificate file is required, you must add it to the truststore used by the {JVM} on all Loader Nodes. The truststore path must be identical on all Loader Nodes. The Kafka configuration can reference this truststore path.
</Info>

<Info>
  If you specify the `ssl.certificate.location` or `ssl.ca.location` consumer configuration, you must specify both of these configurations. Otherwise, the system throws an error. For example: `CONFIG '{"auto.offset.reset": "earliest","ssl.certificate.location":"/etc/blab/file1.txt","ssl.ca.location":"/etc/blab/file2.txt"}'`
</Info>

#### Continuous File Loading Source Options

Specify these options for data pipelines that use `FILESYSTEM` or `S3` sources with the `CONTINUOUS` mode. For a Kafka source, do not use these options.

**General File Monitor Options**

| **Option Key**    | **Default** | **Data Type** | **Description**                                                                                                                                        |
| ----------------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| MONITOR           | None        | STRING        | The type of monitor. Valid values are: `sqs` or `kafka`.                                                                                               |
| POLLING\_INTERVAL | 10 SECONDS  | TIME INTERVAL | Optional.  <br />  <br />Unit of time to consume the event topic. Valid value range: 10 to 120 seconds.                                                |
| MAX\_FILES        | 100         | INTEGER       | Optional.  <br />  <br />In the `BATCH` syntax, the maximum number of pending files to collect before starting a batch. Valid value range: 10 to 2000. |
| TIMEOUT           | 1 MINUTE    | TIME INTERVAL | Optional.  <br />  <br />In the `BATCH` syntax, the time to wait before starting a batch. Valid value range: 10 seconds to 30 minutes.                 |
| LOOKBACK          | 1 DAY       | TIME INTERVAL | Optional.  <br />  <br />In the `BATCH` syntax, the unit of time to look back for file deduplication. Valid value range: 0 to 48 hours.                |

**SQS Monitor Options**

Use these options when you use `MONITOR sqs` for {SQS}.

| **Option Key**      | **Default** | **Data Type** | **Description**                                                                                                                                                                                                                                                          |
| ------------------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| QUEUE\_URL          | None        | STRING        | The URL of the target queue. For example: `http://localhost:32769/000000000000/queue1`                                                                                                                                                                                   |
| REGION              | `NULL`      | STRING        | Optional.  <br />  <br />The region for the SQS operation for signing requests.  <br />  <br />When unspecified, the region is inferred from `QUEUE_URL` by looking for `sqs.REGION` or `sqs-fips.REGION`. If the region cannot be inferred, it defaults to `us-east-1`. |
| ENDPOINT            | `NULL`      | STRING        | Optional.  <br />  <br />The endpoint URL for the client. For example: `http://localhost:32769`  <br />  <br />When unspecified, defaults to `https://sqs[-fips].REGION.amazonaws.com` if `REGION` was inferred or specified. Otherwise, this option is required.        |
| ACCESS\_KEY\_ID     | `''`        | STRING        | Optional.  <br />  <br />Access key identifier for SQS authentication.  <br />  <br />If you specify this option, you must also specify the `SECRET_ACCESS_KEY` option. The Ocient System uses anonymous credentials when you specify an empty value for this option.    |
| SECRET\_ACCESS\_KEY | `''`        | STRING        | Optional.  <br />  <br />The secret access key for SQS authentication.                                                                                                                                                                                                   |

**Kafka Monitor Options**

Use these options when you use `MONITOR kafka` for Kafka.

| **Option Key**      | **Default**                                                                                                                                                                                                                                  | **Data Type**         | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| ------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| BOOTSTRAP\_SERVERS  | None                                                                                                                                                                                                                                         | STRING                | The bootstrap servers for the Kafka consumer configuration. The string contains a list of brokers as a comma-separated list of the broker hostnames or broker hostnames and port number combinations, in the format `hostname:port`.                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| TOPIC               | None                                                                                                                                                                                                                                         | STRING                | The name of the Kafka topic indicates where to consume records.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| AUTO\_OFFSET\_RESET | `'latest'`                                                                                                                                                                                                                                   | STRING                | Optional.  <br />  <br />Action to take for the Kafka configuration when there is no initial offset in the offset store or the chosen offset is out of range:  <br />`'earliest'` — Automatically reset the offset to the smallest offset.  <br />`'latest'` — Automatically reset the offset to the largest offset.                                                                                                                                                                                                                                                                                                                                                  |
| GROUP\_ID           | `'<systemId>__<databaseName>__<pipelineName>_monitor'`                                                                                                                                                                                       | STRING                | Optional.  <br />  <br />Client group identifier for the Kafka configuration.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| CONFIG              | `'{ "enable.auto.commit": false,`  <br />`"group.id": "<systemId>__<databaseName>__<pipelineName>_monitor",`  <br />`"max.poll.interval.ms": "600000",`  <br />`"heartbeat.interval.ms": "6000",`  <br />`"session.timeout.ms": "600000" }'` | JSON-formatted STRING | Optional.  <br />  <br />The [consumer configuration](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) that the Kafka consumers should use. This option is a JSON-formatted string.  <br />Certain values within this configuration are fixed, whereas the Ocient System provides other values with a default value that you can modify.                                                                                                                                                                                                                                                                                  |
| MAX\_MESSAGES       | 100                                                                                                                                                                                                                                          | INTEGER               | Optional.  <br />  <br />In the `CONSUMER` syntax, the maximum number of messages to poll each time.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| TIMEOUT             | 1000 MILLISECONDS                                                                                                                                                                                                                            | TIME INTERVAL         | Optional.  <br />  <br />In the `CONSUMER` syntax, the operation timeout that controls how long the consume request waits for the response. The valid range is: 0 milliseconds to 1 hour.                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| FILENAME            | `$"Records"[1].s3.object.key`  <br />corresponds to a `s3:ObjectCreated:Put` event.                                                                                                                                                          | SELECTOR              | Optional.  <br />  <br />In the `MESSAGE` syntax, defines the JSON selector for the filename of an incoming message in a monitor. The default value works for `s3:ObjectCreated:*` events. If you override this default configuration, you can parse custom JSON messages. The format must still adhere to S3 standards. For details, see [Amazon Simple Storage Service Event message structure](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html). The message must be in the JSON format. If you specify this option, then you must specify the other two options in the `MESSAGE` syntax.                                |
| TIMESTAMP           | `$"Records"[1]."eventTime"`  <br />corresponds to a `s3:ObjectCreated:Put` event.                                                                                                                                                            | SELECTOR              | Optional.  <br />  <br />In the `MESSAGE` syntax, defines the JSON selector for the last modification timestamp of the file of an incoming message in a monitor. The default value works for `s3:ObjectCreated:*` events. If you override this default configuration, you can parse custom JSON messages. The format must still adhere to S3 standards. For details, see [Amazon Simple Storage Service Event message structure](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html). The message must be in the JSON format. If you specify this option, then you must specify the other two options in the `MESSAGE` syntax. |
| SIZE                | `$"Records"[1].s3.object.size`  <br />corresponds to a `s3:ObjectCreated:Put` event.                                                                                                                                                         | SELECTOR              | Optional.  <br />  <br />In the `MESSAGE` syntax, defines the JSON selector for the file size of an incoming message in a monitor. The default value works for `s3:ObjectCreated:*` events. If you override this default configuration, you can parse custom JSON messages. The format must still adhere to S3 standards. For details, see [Amazon Simple Storage Service Event message structure](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html). The message must be in the JSON format. If you specify this option, then you must specify the other two options in the `MESSAGE` syntax.                               |

<Info>
  The same Kafka `CONFIG` option override considerations apply. For details, see [KAFKA Source Options](#kafka-source-options).
</Info>

### LOOKUP Options

You can optionally look up data in a table from an external database. You can include this table in the `SELECT` statement of the `CREATE PIPELINE` SQL statement and perform join operations on its columns. To use the `LOOKUP` keyword, specify the external source name `lookup_source`.

You can look up data in multiple external databases. In this case, use the `LOOKUP` keyword with the source name and corresponding options for each database.

You must provide the appropriate JDBC JAR file for the JDBC connection to external databases. For tables in the Ocient System, this JAR file is not needed.

<Warning>
  The Ocient System executes the JDBC code as trusted code. Only provide trusted JAR files to the loading process. The `streamloader.extractorEngineParameters.configurationOption.engine.external.jdbc.jarRootDirectory` configuration parameter specifies the location of the JAR file. For details, see [Configuration Settings for Data Pipelines](/configuration-settings-for-data-pipelines).
</Warning>

| **Option Key**     | **Default** | **Data Type**         | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| ------------------ | ----------- | --------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| CONNECTION\_TYPE   | None        | STRING                | The type of connection. Only `'jdbc'` is supported.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| CONNECTION\_STRING | None        | STRING                | The string for connection to the external database. For example: `'jdbc:sqlite:/path/to/file'`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| LOOKUP\_SCHEMA     | None        | STRING                | The schema of the table in the external database.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| LOOKUP\_TABLE      | None        | STRING                | The name of the table in the external database.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| REFRESH\_PERIOD    | 30          | TIME INTERVAL         | Optional.  <br />  <br />The time between lookup cache refreshes.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| CONFIG             | `NULL`      | JSON-formatted STRING | Optional.  <br />  <br />The JSON string that contains additional configuration options. For example: `'{"force_refresh_if_null": false}'`  <br />  <br />Supported JSON keys are:  <br />  <br />`force_refresh_if_null`, specified as a Boolean that defaults to `true`. When you set this value to `true`, the system triggers a lookup cache refresh for each value missing from the lookup table (to account for a concurrent load). This cache refresh can have a noticeable performance impact. As such, if you expect that some values cannot result in successful lookups, then set this key to the `false` value.  <br />  <br />`load_timeout_seconds`, specified as a long integer with no default value. This option is the timeout, in seconds, for a lookup cache refresh.  <br />  <br />`driver_class`, specified as a string with no default value. This option is the fully qualified JDBC driver class name. If you specify this option, the system throws an error when the system cannot find this driver or the driver cannot load into the process. If the JAR file for the connection is a JDBC 4 file, the system automatically loads the driver class. Otherwise, you must specify the fully qualified class name (from `Class::getName` in {Java}) using this option, such as `'{"driver_class": "org.test.example.Driver"}'`. |

### EXTRACT Options

#### General Extract Options

You can specify these options on any of the allowed `FORMAT` types.

| **Option Key**               | **Default**                                                                        | **Data Type**    | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| ---------------------------- | ---------------------------------------------------------------------------------- | ---------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| FORMAT                       | None                                                                               | STRING           | Specifies the format of the files to load. Supported values are:  <br />`delimited`  <br />`csv`  <br />`json`  <br />`binary`  <br />`asn.1`  <br />`parquet`  <br />`xml`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| CHARSET\_NAME                | Binary format: `ibm1047`  <br />All other formats: `utf-8`                         | STRING           | Optional.  <br />  <br />Specifies the character set for decoding data from the source records into character data. Use this character set when you load data into VARCHAR columns or when you apply `CHAR` transformation functions.  <br />Defaults to `utf-8` for all formats except BINARY, which defaults to `ibm1047`.  <br />You can configure the default value for `BINARY` formatted data using a SQL statement such as:  <br />`ALTER SYSTEM ALTER CONFIG SET 'sql.pipelineParameters.extract.binary.defaultCharset' = 'ibm1047'`                                                                                                                                                                                                                                     |
| COLUMN\_DEFAULT\_IF\_NULL    | `false`                                                                            | BOOLEAN          | Optional.  <br />  <br />Specifies whether pipelines should load the column default value when the result of a series of transforms is NULL.  <br />  <br />If you set this option to `false` (the default), the pipeline loads NULL values into columns.  <br />  <br />If you set this option to `true`, the pipeline loads the defined default value of the column when the result of the execution of the transformation on the column is NULL.                                                                                                                                                                                                                                                                                                                              |
| NULL\_STRINGS                | Delimited or CSV format:  <br />`['null', 'NULL']`  <br />JSON format:  <br />`[]` | ARRAY OF STRINGS | Optional.  <br />  <br />Specifies string values that should represent a NULL value when extracted from the source records.  <br />Use this in `csv`, `delimited`, and `json` formats to convert specific values to NULL instead of requiring individual transform function calls to `NULL_IF` with those values.  <br />This option applies to source data immediately after extraction. If the result of transformations is one of these strings, you must use `NULL_IF` to transform to NULL with the specified string values.                                                                                                                                                                                                                                                |
| TRIM\_WHITESPACE             | `false`                                                                            | BOOLEAN          | Optional.  <br />  <br />Specifies whether to trim whitespace from the beginning and end of each string field during extraction. In this case, whitespace is a space, tab, carriage return, or linefeed character.  <br />  <br />If the trimmed field value matches one of the values specified by the `NULL_STRINGS` option, the system converts the value in the field to `NULL`. If you set this value to `true` and the `EMPTY_FIELD_AS_NULL` option to `true`, the system converts any field that contains only whitespace to `NULL`.  <br />  <br />This option does not trim arrays of strings.  <br />  <br />To trim elements in arrays, you can use the `TRIM` transform function or the `TRANSFORM` function with a Lambda expression that uses the `TRIM` function. |
| VALIDATE\_CHARACTERS         | `false`                                                                            | BOOLEAN          | Optional.  <br />  <br />Specifies whether to detect invalid characters based on the encoding type during extraction. If the system detects invalid characters, the system throws a file-level error for file loads or a record-level error for Kafka loads.  <br />  <br />This option is not supported for ASN.1, {Parquet}, and binary formats.                                                                                                                                                                                                                                                                                                                                                                                                                               |
| REPLACE\_INVALID\_CHARACTERS | `false`                                                                            | BOOLEAN          | Optional.  <br />  <br />Specifies whether to replace invalid characters based on the encoding type with the replacement character `U+FFFD`.  <br />  <br />This option is not supported for ASN.1, Parquet, and binary formats.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| REPLACEMENT\_CHARACTER       | `�`                                                                                | STRING           | Optional.  <br />  <br />Specifies the character that indicates invalid data when you set the REPLACE\_INVALID\_CHARACTERS option to `true`. The default value is the replacement character `U+FFFD`.  <br />  <br />This option is not supported for ASN.1, Parquet, and binary formats.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |

<Info>
  For ASN.1, {Avro}, and Parquet extract options, the SCHEMA option is a `MAP` type with a `NULL` default value. This option is optionally specified depending on the format. SCHEMA specifies the syntax of options for schema retrieval.
</Info>

#### ASN.1 Extract Options

You can specify these options for ASN.1 data record extraction (`FORMAT ASN.1`).

For details about ASN.1-formatted data, see [Load ASN.1 Data](/data-formats-for-data-pipelines#load-asn-1-data).

| **Option Key** | **Default** | **Data Type** | **Description**                                                                                                                                                                                        |
| -------------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| URL            | None        | STRING        | In the `SCHEMA` syntax, specify the URL to the ASN.1 schema file in the `.asn` format.                                                                                                                 |
| RECORD\_TYPE   | None        | STRING        | In the `SCHEMA` syntax, specify the fully qualified name of the record type to parse. The name is the root-level ASN.1 type to extract from each DER-encoded record. (e.g., `Example.PersonnelRecord`) |

#### Avro Extract Options

You can specify these options for Avro data record extraction. The `SCHEMA` options are all optional for object container files (OCF).

For Kafka loads, the schema configuration must include either the `URL` option or the `INLINE` option, but not both.

For file-based loads, the schema configuration can include:

* Either the `INLINE` option or the `INFER_FROM` option, but not both of these options
* Neither the `INLINE` nor the `INFER_FROM` options

For details about loading Avro-formatted data, see [Load Avro Data](/data-formats-for-data-pipelines#load-avro-data).

| **Option Key** | **Default** | **Data Type** | **Description**                                                                                                                        |
| -------------- | ----------- | ------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| INLINE         | None        | STRING        | The JSON specification of the schema declaration as a string. Specify either a JSON string or object.                                  |
| INFER\_FROM    | None        | STRING        | For file-based loads, specifies that the system infers the schema from files. Supported value is: `'sample_file'` — Use a sample file. |

#### BINARY Extract Options

You can apply these options to binary data record extraction (`FORMAT BINARY`).

For details about BINARY-formatted data, see [Load Binary Data](/data-formats-for-data-pipelines#load-binary-data).

<Info>
  The general option `CHARSET_NAME` has a different default value for `FORMAT BINARY`.
</Info>

| **Option Key**      | **Default** | **Data Type** | **Description**                                                                                                                                                                                                                                                                                                                                   |
| ------------------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| RECORD\_LENGTH      | None        | INTEGER       | Specifies the fixed size in bytes of each record in the source data. The Ocient System splits the binary data into binary chunks according to this length value and processes them individually.                                                                                                                                                  |
| ENDIANNESS          | `big`       | STRING        | Optional.  <br />  <br />Specifies the endianness used to interpret multi-byte sequences in various transforms of binary data. Accepted values are `‘big'` and `'little'`.                                                                                                                                                                        |
| AUTO\_TRIM\_PADDING | `true`      | BOOL          | Optional.  <br />  <br />Determines if padding characters should be trimmed after decoding the binary data into string data. If you set this option to `TRUE`, the Ocient System trims all instances of the `PADDING_CHARACTER` value from the end of a string after the system decodes the string from `BINARY` type.                            |
| PADDING\_CHARACTER  | (space)     | STRING        | Optional.  <br />  <br />The padding character from the string after the system decodes the string from `BINARY` type.  <br />You can change the default padding character for `BINARY` formatted data using a SQL statement such as: `ALTER SYSTEM ALTER CONFIG SET 'sql.pipelineParameters.extract.binaryFormat.defaultPaddingCharacter' = '*'` |

<Info>
  The Ocient System trims the default padding character of a space from the end of the text data in binary data.
</Info>

#### **Delimited and CSV** Extract Options

You can specify these options for delimited and CSV format data record extraction (`FORMAT DELIMITED` or `FORMAT CSV` data formats, which are aliases).

For details about working with delimited and CSV data, see [Load Delimited and CSV Data](/data-formats-for-data-pipelines#load-delimited-and-csv-data).

| **Option Key**                  | **Default**      | **Data Type**                            | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| ------------------------------- | ---------------- | ---------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| FIELD\_DELIMITER                | `,`              | STRING  <br />or  <br />ARRAY OF STRINGS | Optional.  <br />  <br />Specifies a character or list of possible characters for delimiting fields. The default value sets the field delimiter to only a comma. The value must be one byte.  <br />The Ocient System automatically interprets the values you specify as C-style escaped strings. This means you do not need to specify an escape string (`e'some value'`) is not required to specify control characters. This differs from the default string behavior in Ocient. For details, see [String Literals and Escape Sequences](/general-sql-syntax#string-literals-and-escape-sequences).  <br />`FIELD_DELIMITER` and `FIELD_DELIMITERS` are aliases.  <br />**Examples:**  <br />Use a tab character:  <br />`FIELD_DELIMITER = '\t'`  <br />Use a pipe character:  <br />`FIELD_DELIMITER = '\|'`  <br />Use either a pipe or a comma character:  <br />`FIELD_DELIMITER = [',', '\|']`                                                                                                            |
| RECORD\_DELIMITER               | `['\r\n', '\n']` | STRING  <br />or  <br />ARRAY OF STRINGS | Optional.  <br />  <br />Specifies the string or an array of strings for delimiting records. The file is split into individual records using this character during processing. Common values include `'\r\n'` and `'\n'`. The value must be one or two bytes.  <br />The Ocient System automatically interprets the values you specify as C-style escaped strings. An escape string (`e'some value'`) is not required to specify control characters. This differs from the default strings behavior in Ocient.  <br />The system chooses the first specified delimiter and uses that delimiter for the rest of the data. Data with mixed delimiters is not supported.  <br />For details, see [String Literals and Escape Sequences](/general-sql-syntax).  <br />`RECORD_DELIMITER` and `RECORD_DELIMITERS` are aliases.  <br />**Examples:**  <br />Use a linefeed character:  <br />`RECORD_DELIMITER = '\n'`  <br />Use a carriage return and linefeed character sequence:  <br />`RECORD_DELIMITER = '\r\n'` |
| NUM\_HEADER\_LINES              | `0`              | INTEGER                                  | Optional.  <br />  <br />Specifies the number of header lines, typically 0 or 1. The Ocient System skips this number of lines and does not load them as data during file processing. Use this option when your data includes a row of header values.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| NUM\_FOOTER\_LINES              | `0`              | INTEGER                                  | Optional.  <br />  <br />Specifies the number of footer lines, typically 0 or 1. The Ocient System skips this number of lines starting from the end of the file and does not load them as data during file processing. Use this option when your data includes a row of footer values.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| OPEN\_ARRAY                     | `[`              | STRING                                   | Optional.  <br />  <br />Specifies the character that indicates the start of an array in a `csv` or `delimited` field. Use this option to parse array data types.  <br />Specify the `CLOSE_ARRAY` option also when using this option.  <br />Set this option to NULL or `''` to turn off the detection of these control characters. If you set this option to either of these characters, the system also turns off the detection of these characters for the `CLOSE_ARRAY` option.  <br />**Example:**  <br />Convert source data such as `val1,"[1,2,3]",val2` to an array when referenced as `$2[]`.  <br />`OPEN_ARRAY '['`  <br />`CLOSE_ARRAY ']'`  <br />`ARRAY_ELEMENT_DELIMITER ','`                                                                                                                                                                                                                                                                                                                    |
| CLOSE\_ARRAY                    | `]`              | STRING                                   | Optional.  <br />  <br />Specifies the character that indicates the end of an array in a `csv` or `delimited` field. Use this option to parse array data types.  <br />Specify the `OPEN_ARRAY` option also when using this option.  <br />Set this option to NULL or `''` to turn off the detection of these control characters. If you set this option to either of these characters, the system also turns off the detection of these characters for the `OPEN_ARRAY` option.  <br />**Example:**  <br />Convert source data such as `val1,"{1,2,3}",val2` to an array when referenced as `$2[]`.  <br />`CLOSE_ARRAY '}'`  <br />`OPEN_ARRAY '{'`  <br />`ARRAY_ELEMENT_DELIMITER ','`                                                                                                                                                                                                                                                                                                                        |
| ARRAY\_ELEMENT\_DELIMITER       | `,`              | STRING                                   | Optional.  <br />  <br />Specifies the character that separates values in an array within a `csv` or `delimited` field. Use this option to parse array data types.  <br />Set this option to NULL or `''` to turn off the detection of these control characters.  <br />**Example:**  <br />Convert source data such as `val1,"[1;2;3]",val2` to an array when referenced as `$2[]`.  <br />`ARRAY_ELEMENT_DELIMITER = ';'`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| OPEN\_OBJECT                    | `{`              | STRING                                   | Optional.  <br />  <br />Specifies the character that indicates the start of a tuple in a field. Use this option to parse tuple data types.  <br />Specify the `CLOSE_OBJECT` option also when using this option.  <br />Set this option to NULL or `''` to turn off the detection of these control characters. If you set this option to either of these characters, the system also turns off the detection of these characters for the `CLOSE_OBJECT` option.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| CLOSE\_OBJECT                   | `}`              | STRING                                   | Optional.  <br />  <br />Specifies the character that indicates the end of a tuple in a field. Use this option to parse tuple data types.  <br />Specify the `OPEN_OBJECT` option also when using this option.  <br />Set this option to NULL or `''` to turn off the detection of these control characters. If you set this option to either of these characters, the system also turns off the detection of these characters for the `OPEN_OBJECT` option.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| EMPTY\_FIELD\_AS\_NULL          | `true`           | BOOLEAN                                  | Optional.  <br />  <br />Specifies whether the Ocient System should extract an empty source field as NULL or a missing value. When this option is set to `true`, the Ocient System treats empty fields as NULL. Otherwise, the system treats fields as a missing value. For string-type fields, a missing value is equivalent to an empty string.  <br />Define an empty field as two consecutive delimiters (e.g., the second field is empty in `abc,,xyz`). If a field is explicitly an empty string as indicated by quote characters, the Ocient System treats the field as an empty string, not an empty field (e.g., the second field is an empty string in `abc,"",xyz`).  <br />The `CHAR($1)` transformation function handles both NULLs and empty strings and passes them through.  <br />⚠️ Beware that the `NULL_IF($1, '')` transformation function directly loads a NULL for both NULLs \*and \*missing values. This transformation can override the behavior of `EMPTY_FIELD_AS_NULL`.              |
| FIELD\_OPTIONALLY\_ENCLOSED\_BY | `"`              | STRING                                   | Optional.  <br />  <br />Also known as the “quote character,” this option specifies the character for optionally enclosing fields. Fields enclosed by this character can include delimiters, the enclosure character, or the escape character.  <br />Set this option to NULL or `''` to turn off the detection of these control characters.  <br />**Examples:**  <br />Use a double quote character:  <br />`FIELD_OPTIONALLY_ENCLOSED_BY = '"'`  <br />Use a single quote character:  <br />`FIELD_OPTIONALLY_ENCLOSED_BY = ''''`                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| ESCAPE\_CHAR                    | `"`              | STRING                                   | Optional.  <br />  <br />Specifies the escape character within fields enclosed by the `FIELD_OPTIONALLY_ENCLOSED_BY` option. Use this option to escape the enclosure character or escape character.  <br />Set this option to NULL or `''` to turn off the detection of these control characters.  <br />**Examples:**  <br />Use a double quote as the escape character.  <br />`ESCAPE_CHAR = '"'`  <br />Use a single quote as the escape character. When you specify the escape character, you often have to use an escape sequence. This action follows standard SQL rules.  <br />`ESCAPE_CHAR = ''''`                                                                                                                                                                                                                                                                                                                                                                                                      |
| SKIP\_EMPTY\_LINES              | `false`          | BOOLEAN                                  | Optional.  <br />  <br />Specifies whether or not to skip empty lines.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| COMMENT\_CHAR                   | `NULL`           | STRING                                   | Optional.  <br />  <br />Specifies the character used to comment out a record in the source file. The load skips records where the first character of a record is equal to this character.  <br />Set this option to NULL or `''` to turn off the detection of these control characters.  <br />Example: `COMMENT_CHAR '#'`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| HEADERS                         | `NULL`           | ARRAY OF STRINGS                         | Optional.  <br />  <br />Specifies the header labels associated with each column in a delimited file. This array of values corresponds to the columns in order from left to right. Use these labels in the `CREATE PIPELINE SELECT` SQL statement to refer to column values. For example, if you specify `HEADERS ['col1', 'col2', col3']`, you can refer to the first column as `$"col1"` instead of `$1` in the `SELECT` statement.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| STRIP\_ARRAY\_ELEMENT\_QUOTES   | `false`          | BOOLEAN                                  | Optional.  <br />  <br />When you set this option to `true`, the system removes quote characters from array elements, so that, for example, `["str"]` becomes `[str]`.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| STRIP\_FIELD\_QUOTES            | `true`           | BOOLEAN                                  | Optional.  <br />  <br />When you set this option to `true`, the system removes quote characters from the string, so that, for example, `"str"` becomes `str`.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| TRIM\_ARRAY\_ELEMENTS           | `false`          | BOOLEAN                                  | Optional.  <br />  <br />When you set this option to `true`, the system removes whitespace characters from the array elements, so that, for example, `" str"` becomes `"str"`.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |

#### JSON Extract Options

No options exist for JSON data record extraction (`FORMAT JSON`).

For details about JSON-formatted data, see [Load JSON Data](/data-formats-for-data-pipelines#load-json-data).

#### PARQUET Extract Options

You can specify these options for Parquet data record extraction (`FORMAT ``PARQUET`).

For details about Parquet-formatted data, see [Load Parquet Data](/data-formats-for-data-pipelines#load-parquet-data).

<Warning>
  When you use the `FORMAT PARQUET` option with an AWS S3 Source, the `ENDPOINT` option is required.
</Warning>

| **Option Key**     | **Default**   | **Data Type** | **Description**                                                                                                                                                                                                         |
| ------------------ | ------------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| SCHEMA INFER\_FROM | `sample_file` | STRING        | In the `SCHEMA` syntax, specify how to infer the Parquet schema. The supported value is:  <br />`sample_file` — Infer from a random file.  <br />  <br />**Example:**  <br />`SCHEMA (INFER_FROM`  <br />`sample_file)` |

#### XML Extract Options

No options exist for the XML format extraction (`FORMAT XML`).

For details about XML-formatted data, see [Load XML Data](/data-formats-for-data-pipelines#load-xml-data).

### Bad Data Targets

Bad data represents records that the Ocient System could not load due to errors in the transformations or invalid data in the source records. You can provide options for a bad data target that the Ocient System uses during pipeline execution to capture the records that are not loaded. The original bytes that the pipeline tried to load are captured in the bad data target along with the metadata about the error, such as the error message or source.

Kafka is the only supported bad data target.

#### Kafka Bad Data Target

When you use Kafka as a bad data target, the Ocient System produces the original bytes of the source record into the Kafka topic of your choice. The Ocient System includes the metadata about the record in the header of the record as it is sent to Kafka. You can configure the Kafka topic on your Kafka Brokers using the retention and partition settings of your choice.

<Warning>
  In the event that the Kafka Broker is unreachable when the Ocient System attempts to produce a bad data record to the bad data target, the system logs an error on the Loader Node, and the pipeline continues.
</Warning>

**Example**

This example `CREATE PIPELINE` SQL statement snippet contains a bad data target definition using the `BAD_DATA_TARGET` option.

```sql SQL theme={null}
CREATE PIPELINE ...
    BAD_DATA_TARGET
        KAFKA
        TOPIC 'orders_errors'
        BOOTSTRAP_SERVERS '111.11.111.1:9092,111.11.111.2:9092'
        CONFIG '{"compression.type": "gzip"}'
    SOURCE
        ...
    EXTRACT
        ...
INTO public.orders
SELECT
    $order.billing.name as username,
    $order.subtotal as subtotal,
    ...
```

#### Kafka Bad Data Target Options

| **Option Key**     | **Default**                                          | **Data Type**         | **Description**                                                                                                                                                                                                     |
| ------------------ | ---------------------------------------------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| BOOTSTRAP\_SERVERS | None                                                 | STRING                | A comma-delimited list of `IP:port` pairs that contain the IP addresses of the Kafka Brokers and the associated port number.  <br />**Example:**  <br />`BOOTSTRAP_SERVERS = '111.11.111.1:9092,111.11.111.2:9092'` |
| TOPIC              | None                                                 | STRING                | The name of the Kafka topic where the Ocient System should produce bad data records.                                                                                                                                |
| CONFIG             | `'{`  <br />`"compression.type": "none"`  <br />`}'` | JSON-formatted STRING | Optional.  <br />  <br />The [producer configuration](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html) that the Kafka producer should use.                              |

### **Advanced Pipeline Tuning Options**

You can use pipeline tuning options to control the parallelism or batching dynamics of your pipelines. This tuning can throttle the resources used on a pipeline or increase parallel processing across Loader Nodes. These options are advanced settings that might require a detailed understanding of the underlying mechanics of the loading infrastructure in the Ocient System to employ. Due to the inherent nature of each source type, the behavior of these options can differ between file-based and Kafka-based loads. All these options are optional.

\| **Option Key** | **Default** | **Data Type** | **Description** |
\| --- | --- | --- | --- | --- |
\| CORES | The maximum number of CPU cores available on each Loader Node. | INTEGER | Maximum number of processing threads that the Ocient System uses during execution on each Loader Node. The system creates this number of threads on each Loader Node.  <br />The Ocient System automatically determines the default value by finding the number of cores of a Loader Node. You can use this option for performance tuning.  <br />The calculation for maximum parallelism of a pipeline is: `number_of_loaders * CORES`.  <br />**About Kafka Partitions and Parallelism**  <br />For Kafka Loads, this option determines the number of Kafka Consumers created on each Loader Node.  <br />For Kafka Pipelines, the recommendation is that `number_of_loaders * CORES` equals the number of Kafka topic partitions.  <br />If this number exceeds the number of Kafka topic partitions, the work might spread unevenly across Loader Nodes.  <br />If this number is less than the number of Kafka topic partitions, some Kafka Consumers might receive uneven amounts of work. In this case, use a value for `number_of_loaders * CORES` that is an even divisor of the number of Kafka topic partitions to avoid a skew in the rates of processing across partitions. |
\| PARTITIONS | Equal to the value of CORES. | INTEGER | Specifies the number of partitions over which to split the file list. Not applicable to Kafka loads.  <br />The Ocient System automatically sets a default value based on the configured value for the `CORES` option. You can use this option for performance tuning.  <br />The number of partitions determines how many buckets of work the Ocient System generates for each batch of files processed on a Loader Node. The pipeline processes this number of partitions in parallel using the specified number of cores.  <br />If you specify fewer partitions than cores, some cores are not fully utilized, and resources are wasted. If you specify more partitions than cores, the Ocient System divides partitions in a round-robin fashion over the available cores. |
\| BATCH\_SIZE | A dynamic value, determined by the Ocient System for each pipeline to maximize performance. | INTEGER | Number of rows in the batch to load at one time.  <br />The Ocient System automatically calculates a dynamic value depending on the table columns and the utilization of internal buffers to transfer records to the database backend. You can use this option to turn off the dynamic adjustments for performance tuning.  <br />⚠️ Only change this setting in rare cases where loading performance is slower than expected, and you have a large record size. If this setting is improperly set, pipelines might fail with out-of-memory exceptions.  <br />You can configure the default value (for the batch payload target) using a SQL statement such as:  <br />`ALTER SYSTEM ALTER CONFIG SET 'streamloader.extractorEngineParameters.configurationOption.osc.batch.payload.target' '65536'` |
\| RECORD\_NUMBER\_FORMAT | For file loads that do not use the [EXPLODE\_OUTER](/special-data-pipeline-transformation-functions#explode_outer) function, the default is \`\[19, 45, 0]\`.  <br />For Kafka loads that do not use the \`EXPLODE\_OUTER\` function, the default is \`\[0, 64, 0]\`.  <br />For loads that use the \`EXPLODE\_OUTER\` function, the default is the specified load-type-specific default value with 13 subtracted from the record index bits. The system adds these bits to the bits for rows within a record.  <br />For example, the default for file loads that use this function is \`\[19, 32, 13]\`. | ARRAY | The 64-bit record number for each record of the load. This number uniquely identifies a row within its partition.  <br />The format is an array with three values in the format\`\[, , ]\`  <br />The file index bits \`\` value is the number of bits used to represent the file index within a partition.  <br />The record index bits \`\` value is the number of bits used to represent the record index within a file.  <br />The rows per record index bits \`\` is the number of bits used to represent the row within a record. The system uses this value with the \`EXPLODE\_OUTER\` function.  <br />These three values must sum to 64.  <br />\*\*Example\*\*  <br />\`RECORD\_NUMBER\_FORMAT= \[10, 54, 0]\`  <br />Set the number of file index bits to \`10\` and the number of record index bits to \`54\`, allowing up to 2^10 files and 2^54 records per file. The system does not support the \`EXPLODE\_OUTER\` function in this configuration because the rows per record index bits are \`0\`. |     |

## DROP PIPELINE

`DROP PIPELINE` removes an existing pipeline in the current database. You cannot remove a pipeline that is running.

You must have the DROP privilege on the pipeline to execute this SQL statement. For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

<Warning>
  When you drop a pipeline, the Ocient System also removes the associated system catalog information, such as pipeline errors, events, files, partitions, and metrics.
</Warning>

**Syntax**

```sql SQL theme={null}
DROP PIPELINE [ IF EXISTS ] pipeline_name [, ...]
```

| **Parameter**   | **Data** **Type** | **Description**                                                                                                                                    |
| --------------- | ----------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- |
| `pipeline_name` | string            | The name of the specified pipeline to remove.  <br />You can drop multiple pipelines by specifying additional pipeline names, separated by commas. |

**Examples**

**Remove Existing Data Pipeline**

Remove an existing pipeline named `ad_data_pipeline`.

```sql SQL theme={null}
DROP PIPELINE ad_data_pipeline;
```

**Remove Existing Data Pipeline by Checking for Existence**

Remove an existing pipeline named `ad_data_pipeline` or return a warning if the Ocient System does not find the pipeline in the database.

```sql SQL theme={null}
DROP PIPELINE IF EXISTS ad_data_pipeline;
```

## PREVIEW PIPELINE

`PREVIEW PIPELINE` enables you to view the results of loading data for a specific `CREATE PIPELINE` SQL statement without creating a whole data pipeline and without storing those results in the target table. Using this SQL statement, you can iterate quickly and modify the syntax as needed to achieve your expected results. After you confirm your expected results, you can use the same syntax in the body of the `CREATE PIPELINE` statement with the appropriate source.

A table must exist in the database to serve as the target of your `PREVIEW PIPELINE` statement. This table ensures the pipeline matches the column types of the target table. However, the execution of this statement does not load data into the target table.

### Preview Sources

The `SOURCE INLINE` source type is available only for the `PREVIEW PIPELINE` SQL statement.  You cannot create a data pipeline with inline source data. The inline source data limit is 1,000 rows.

Other source types defined in the `CREATE PIPELINE` statement, `S3`, `KAFKA`, and `FILESYSTEM`, are compatible with the `PREVIEW PIPELINE` statement. The extract options vary by the source type to mirror the `CREATE PIPELINE` statement. The database returns 10 records by default.

### Preview Error Handling

Pipeline-level errors cause the `PREVIEW PIPELINE` SQL statement to fail. The Ocient System returns an error and no result set. However, the Ocient System accumulates record-level errors that occur during the execution of this statement in a single warning that the system returns along with the result set. Each line of the warning describes a record-level error in human-readable form or as a JSON blob, depending on the value of the `SHOW_ERRORS_AS_JSON` option. Rows or columns that encounter record-level errors have `NULL` values in the result set.

### Preview Limitations

Limitations of this SQL statement are:

* Before executing a `PREVIEW PIPELINE` SQL statement, you must create a table for the Ocient System to have context for the preview.
* The maximum number of rows a `PREVIEW PIPELINE` SQL statement can return is 1,000 rows.
* The `COLUMN_DEFAULT_IF_NULL` option from the `CREATE PIPELINE` SQL statement has no effect on the `PREVIEW PIPELINE` SQL statement.
* The `PREVIEW PIPELINE` SQL statement does not honor the assignment of a service class based on text matching.
* These source options are not supported:
  * `START_FILENAME`
  * `END_FILENAME`
* When you execute two duplicate `PREVIEW PIPELINE` statements for a specific Kafka topic, the two statements share a consumer group. If the topic is small, one or both of the result sets might only be a partial result.
* For multiple tables, you can preview only one table at a time. You must specify the name of the table you want to preview using the `FOR` keyword.
* Previewing a continuous data pipeline is not supported.

**Syntax**

```sql SQL theme={null}
PREVIEW PIPELINE pipeline_name
    [ MODE mode ]
    [ SHOW_ERRORS_AS_JSON show_errors_as_json ]
    SOURCE [ INLINE ] (inline_string | <s3_source> | <filesystem_source> | <kafka_source>)
        [ LIMIT limit ]
    EXTRACT
        FORMAT csv
        RECORD_DELIMITER record_delimiter
        FIELD_DELIMITERS ['delim1', 'delim2', ...]
        [ INTERMEDIATE_VALUES intermediate_values ]
[ INSERT ] INTO created_tablename
SELECT
    preview_column_formula AS preview_column_name, ...
[ WHERE filter_expression ]
[ [ INSERT ] INTO created_tablename_n
SELECT
    preview_column_formula AS preview_column_name, ...
[ WHERE filter_expression ] ] [ ,... ]
[ FOR created_tablename_n ]
```

<Info>
  Though this syntax shows the CSV format, you can also use the `PREVIEW PIPELINE` statement with the other formats.
</Info>

| **Parameter**            | **Data** **Type** | **Description**                                                                                                                                                                                                                                                                                                                             |
| ------------------------ | ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `pipeline_name`          | string            | The name of the specified data pipeline for the preview.                                                                                                                                                                                                                                                                                    |
| `created_tablename`      | identifier        | The identifier for the name of the table that you create before executing the `PREVIEW PIPELINE` SQL statement.                                                                                                                                                                                                                             |
| `created_tablename_n`    | identifier        | For multiple tables, the identifier for the name of another table that you create before executing the `PREVIEW PIPELINE` SQL statement. Use the `FOR` keyword to specify which table content to preview.                                                                                                                                   |
| `preview_column_formula` | identifier        | The identifier for the formula of the data to load.  <br />For example, for the data in the first field of the inline source, use `$1`.  <br />  <br />If you need to add a transformation, you can use functions to transform data, such as `CONCAT($1, $2)`, to load the concatenation of the first two fields in the inline source data. |
| `preview_column_name`    | identifier        | The name of the column in the target table.                                                                                                                                                                                                                                                                                                 |

**SQL Statement Options**

| **Option Key**         | **Default**   | **Data** **Type** | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| ---------------------- | ------------- | ----------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| SOURCE INLINE          | Required      | STRING            | The string that contains data for the preview of the data pipeline load. For example, source data can be `'oci,ent,ocient\|ware,house,warehouse'`, where `\|` is the record delimiter and `,` is the field delimiter. For special characters, such as `\t`, use an escape sequence such as `e'oci,ent,oci\tent\|ware,house,ware\thouse'`.  <br />  <br />This option supports these text-based formats: CSV, JSON, and XML.                                      |
| MODE                   | `'transform'` | STRING            | Indicates whether to perform a validation of the PREVIEW PIPELINE SQL statement.  <br />  <br />Valid values are: `'validate'` and `'transform'`.  <br />  <br />Set this option to `'validate'` for checking that the creation of the data pipeline succeeds. If the pipeline is valid, the statement produces no output; otherwise, it returns an error.  <br />  <br />Set this option to `'transform'` to retrieve a preview of the results of the pipeline. |
| SHOW\_ERRORS\_AS\_JSON | `false`       | BOOLEAN           | Indicates whether to show errors. Values are `true` or `false`. If the value is `true`, the Ocient System returns record-level errors as JSON blobs rather than human-readable messages.                                                                                                                                                                                                                                                                         |
| LIMIT                  | 10            | INTEGER           | The number of rows, specified as an integer, to return in the preview results for sources with many rows. The default value is 10 rows.                                                                                                                                                                                                                                                                                                                          |
| INTERMEDIATE\_VALUES   | `false`       | BOOLEAN           | Indicates whether to capture intermediate values during a transformation sequence. Values are `true` or `false`. If the value is `true`, the Ocient System appends an extra column to the result set. Each value in the column contains a JSON blob that describes the intermediate values processed for each column after each transformation.                                                                                                                  |

<Info>
  You must specify at least one column name in the `SELECT` part of the syntax. The name of the specified column must match the name of the column in the created table. The number of columns in the `SELECT` part can be less than those in the created table.
</Info>

For definitions of other extract options, see the `CREATE PIPELINE` SQL statement options in [CREATE PIPELINE](#create-pipeline).

**Examples**

**Preview Pipeline Using CSV Format**

Preview the load of two rows of data. First, create a table to serve as the context for the load. The `previewload` table contains three columns with these data types: string, integer, and Boolean.

```sql SQL theme={null}
CREATE TABLE previewload (col1 VARCHAR, col2 INT, col3 BOOLEAN);
```

Create the preview pipeline `testpipeline` with this data: `'hello,2,true|bye,3,false'`. Specify the CSV extract format, `|` record delimiter, and the `,` field delimiter. Load the data without transformation.

```sql SQL theme={null}
PREVIEW PIPELINE testpipeline
    SOURCE INLINE 'hello,2,true|bye,3,false'
    EXTRACT
        FORMAT CSV
        RECORD_DELIMITER '|'
        FIELD_DELIMITERS [',']
INTO previewload
SELECT
    $1 AS col1,
    $2 AS col2,
    $3 AS col3;
```

## *Output*

```text Text theme={null}
col1                                         col2       col3
\--------------------------------------------------------------
hello                                        2          true
bye                                          3          false

Fetched 2 rows
```

Delete the `previewload` table.

```sql SQL theme={null}
DROP TABLE previewload;
```

**Preview Pipeline Using CSV Format with Escape Characters**

Preview the load of two rows of data. First, create a table to serve as the context for the load. The `previewload` table contains three columns with these data types: string, integer, and Boolean.

```sql SQL theme={null}
CREATE TABLE previewload (col1 VARCHAR, col2 INT, col3 BOOLEAN);
```

Create the preview pipeline `testpipeline` with this data: `'hello\tworld,2,true|bye\tworld,3,false'`. Specify the CSV extract format, `|` record delimiter, and `,` field delimiter. Load the data without transformation. In this case, the data contains the special character `\t`. You must escape the character by using the escape sequence `e`.

```sql SQL theme={null}
PREVIEW PIPELINE testpipeline
    SOURCE INLINE e'hello\tworld,2,true|bye\tworld,3,false'
    EXTRACT
        FORMAT CSV
        RECORD_DELIMITER '|'
        FIELD_DELIMITERS [',']
    INTO previewload
    SELECT
        $1 AS col1,
        $2 AS col2,
        $3 AS col3;
```

## *Output*

```text Text theme={null}
col1                                         col2       col3
\--------------------------------------------------------------
hello   world                                  2        true
bye     world                                  3        false

Fetched 2 rows
```

Delete the `previewload` table.

```sql SQL theme={null}
DROP TABLE previewload;
```

**Preview Pipeline Using CSV Format with Transformation**

Create a table to serve as the context for the load. The `previewload` table contains three string columns.

```sql SQL theme={null}
CREATE TABLE previewload (col1 VARCHAR, col2 VARCHAR, col3 VARCHAR);
```

Create the preview pipeline `testpipeline` with this data: `'hello,world|bye,world'`. Specify the CSV extract format, `|` record delimiter, and the `,` field delimiter. Load the data with a transformation to concatenate the two strings and return the result in the third column.

```sql SQL theme={null}
PREVIEW PIPELINE testpipeline
    SOURCE INLINE 'hello,world|bye,world'
    EXTRACT
        FORMAT CSV
        RECORD_DELIMITER '|'
        FIELD_DELIMITERS [',']
    INTO previewload
    SELECT
        $1 AS col1,
        $2 AS col2,
        CONCAT($1,$2) AS col3;
```

## *Output*

```text Text theme={null}
col1                                         col2                                         col3
\---------------------------------------------------------------------------------------------------------------------------------------
hello                                        world                                        helloworld
bye                                          world                                        byeworld

Fetched 2 rows
```

The third column contains the concatenated result of the first two columns.

Delete the `previewload` table.

```sql SQL theme={null}
DROP TABLE previewload;
```

**Preview Pipeline Using the Kafka Source**

Create the `previewload` table with these columns:

* `id` — Non-NULL integer
* `salut` — Non-NULL string
* `name` — Non-NULL string
* `surname` — Non-NULL string
* `zipcode` — Non-NULL integer
* `age` — Non-NULL integer
* `rank` — Non-NULL integer

```sql SQL theme={null}
CREATE TABLE previewload (
    id INT NOT NULL,
    salut VARCHAR(3) NOT NULL,
    name VARCHAR(10) NOT NULL,
    surname VARCHAR(10) NOT NULL,
    zipcode INT NOT NULL,
    age INT NOT NULL,
    rank INT NOT NULL);

```

Create the preview pipeline `test_small_kafka_simple_csv`. Specify the `ddl_csv` topic. Indicate that the Kafka consumer should not write its durably-made record offsets to the Kafka Broker by using the `WRITE_OFFSETS` option set to `false`. Specify the bootstrap server as `servername:0000` and configuration options as `"auto.offset.reset": "earliest" `by using the `BOOTSTRAP_SERVERS` and `CONFIG` options, respectively. Limit the returned results to three rows by using the `LIMIT` option. Specify the CSV extract format and `\n` record delimiter by using the `FORMAT` and `RECORD_DELIMITER` extract options, respectively.

```sql SQL theme={null}
PREVIEW PIPELINE test_small_kafka_simple_csv
    SOURCE KAFKA
        TOPIC 'ddl_csv'
        WRITE_OFFSETS false
        BOOTSTRAP_SERVERS 'servername:0000'
        CONFIG '{"auto.offset.reset": "earliest"}'
        LIMIT 3
    EXTRACT
        FORMAT csv
        RECORD_DELIMITER '\n'
    INTO previewload
    SELECT
        INT($1) AS id,
        CHAR($2) AS salut,
        CHAR($3) AS name,
        CHAR($4) AS surname,
        INT($5) AS zipcode,
        INT($6) AS age,
        INT($7) AS rank;

```

***

```text Text theme={null}
id         salut                                        name                                         surname                                      zipcode    age        rank
\-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
105        Mr                                           Jmhsuxofspx                                  Uaofgayjugb                                  85573      29         2
101        Mr                                           Ijmmtbddkyh                                  Yqbxqnkgidp                                  52393      43         1
109        Mr                                           Bigohpwfwmr                                  Qcxgakpkoeu                                  74420      1          3

Fetched 3 rows
```

## START PIPELINE

`START PIPELINE` begins the execution of the specified data pipeline that extracts data and loads it into the target tables specified by the `CREATE PIPELINE` SQL statement.

When you execute the `START PIPELINE` SQL statement, the Ocient System creates a static list of files in the `sys.pipeline_files` system catalog table and marks them with the `PENDING` status. After the system assigns a file to an underlying task, the system marks the file as `QUEUED`. After the system verifies that the file exists, the system marks the file as `LOADING` to signify that a Loader Node has started reading the source data. Finally, upon successfully loading the file, the system transitions the status of the file to the terminal status `LOADED`.

A Kafka pipeline never enters the `COMPLETED` state in the `information_schema.pipeline_status` view. Instead, the pipeline remains running after you start the pipeline until you stop it or the pipeline reaches the specified error limit using the `ERROR LIMIT` option.

You must have the EXECUTE privilege on the pipeline and the INSERT privilege on any table that is a target in the pipeline. For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

**Syntax**

```sql SQL theme={null}
START PIPELINE pipeline_name
    [ ERROR
        [ LIMIT <integer_value> ]
        [ FILE_ERROR (FAIL | SKIP_MISSING_FILE | TOLERATE) ] ]
    [ USING LOADERS <loader_names> ]
    [ ON COMPLETION (NO_FLUSH | FLUSH_AND_WAIT | FLUSH_AND_RETURN) ]
```

| **Parameter**   | **Data** **Type** | **Description**                          |
| --------------- | ----------------- | ---------------------------------------- |
| `pipeline_name` | string            | The name of the specified data pipeline. |

**SQL Statement Options**

| **Option Key**                     | **Default** | **Data Type**   | **Description**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| ---------------------------------- | ----------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| ERROR LIMIT `<integer_value>`      | 0           | INTEGER         | Error log option that determines the number of record-level errors that can occur during the execution of a pipeline that the load tolerates before the whole pipeline execution fails.  <br />`<integer_value>` is a number greater than or equal to -1. When you set `<integer_value>` to -1, the load tolerates an unlimited number of record-level errors.  <br />By default, continuous pipelines tolerate an unlimited number of record-level errors, whereas batch pipelines tolerate zero errors.  <br />  <br />With multiple target tables, record-level errors are specific to a table. For example, if a transformation for loading one table succeeds but another transformation for loading another table fails, the row corresponding to the successful transformation loads to the first table but not the other.                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| ERROR FILE\_ERROR `<error_action>` | `FAIL`      | STRING          | For pipelines that load data from S3 or local file sources, this error configuration option determines how to treat unrecoverable file-level errors. Examples of unrecoverable file-level errors are:  <br />  <br />The file is listed when the pipeline starts, but is missing later during the load.  <br />  <br />The Gzip file is corrupted and cannot be decompressed.  <br />  <br />The file cannot be downloaded from the source.  <br />  <br />Record-level error that is not tolerable occurs when tokenizing or transforming data in the file.  <br />  <br />`<error_action>` can be one of these keywords:  <br />`FAIL` — Fail the whole pipeline because of a file-level error.  <br />`SKIP_MISSING_FILE` — Only tolerate errors that occur due to missing files. If a file exists in the list when the pipeline starts but is missing later during the load, skip the file and continue with the next file.  <br />`TOLERATE` — Tolerate all unrecoverable file-level errors. In this mode, the load also tolerates an unlimited number of record-level errors.  <br />The `FAILED`, `SKIPPED`, and `LOADED_WITH_ERRORS` file statuses appear in the `sys.pipeline_files` system catalog tables, respectively, and indicate how the pipeline handled the file error. |
| USING LOADERS `loader_names`       | `NULL`      | LIST OF STRINGS | Specify one or more names of Loader Nodes as a comma-separated list for executing the `START PIPELINE` SQL statement. If you do not use this option, the Ocient System uses all of the Loader Nodes that are active to execute the pipeline.  <br />You can find node names in the `sys.nodes` system catalog table.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| ON COMPLETION `<completion_mode>`  | `NO_FLUSH`  | STRING          | Completion type option that specifies the behavior when the pipeline finishes loading. This option determines when the remaining pages are converted into Segments.  <br />`NO_FLUSH` — Do not force a flush of pages. Rely on watermarks and timeouts to trigger final conversion to Segments.  <br />`FLUSH_AND_WAIT` — Trigger a flush of pages, initiating final conversion to Segments. The pipeline blocks and waits for the conversion to Segments to complete before marking the pipeline as `COMPLETED`.  <br />`FLUSH_AND_RETURN` — Trigger a flush of pages, initiating the final conversion to Segments. The Ocient System marks the pipeline as `COMPLETED` immediately following the flush without waiting for conversion to Segments to complete.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |

<Info>
  For the query to execute successfully, the specified node names must identify nodes that have:

  * `ACTIVE` operational status
  * `streamloader` role
</Info>

<Info>
  When you execute the `START PIPELINE` SQL statement, the Ocient System creates a static list of files only for batch pipelines and a dynamic list for continuous pipelines in the `sys.pipeline_files` system catalog table.
</Info>

**Examples**

Start an existing pipeline named `ad_data_pipeline` with default settings.

```sql SQL theme={null}
START PIPELINE ad_data_pipeline;
```

Start an existing pipeline named `ad_data_pipeline` with error tolerance (tolerate 10 errors before aborting the pipeline). For details about error tolerance, see [Error Tolerance in Data Pipelines](/error-tolerance-in-data-pipelines).

```sql SQL theme={null}
START PIPELINE ad_data_pipeline
    ERROR LIMIT 10;
```

<Info>
  Data pipelines log a message for each pipeline error to the `sys.pipeline_errors` system catalog table, even if you do not specify the ERROR option. Use `BAD_DATA_TARGET` settings to capture the original source data.
</Info>

Start an existing pipeline named `ad_data_pipeline` using the Loader Node named `stream-loader1`.

```sql SQL theme={null}
START PIPELINE ad_data_pipeline
    USING LOADERS "stream-loader1";
```

To resume a pipeline with file loading, see [Data Pipeline Behavior Considerations](/data-pipeline-behavior-considerations).

To restart a Kafka data pipeline, see [Data Pipeline Behavior Considerations](/data-pipeline-behavior-considerations).

For pipeline dependencies, see [Data Pipeline Behavior Considerations](/data-pipeline-behavior-considerations).

## STOP PIPELINE

`STOP PIPELINE` stops the execution of the pipeline and its associated tasks. After you stop a pipeline, you can execute the `START PIPELINE` SQL statement on the pipeline to run the pipeline again. Regardless, the load deduplicates any records previously loaded in the same pipeline.

You must have the EXECUTE privilege on the pipeline to execute this SQL statement. For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

**Syntax**

```sql SQL theme={null}
STOP PIPELINE pipeline_name
```

| **Parameter**   | **Data** **Type** | **Description**                          |
| --------------- | ----------------- | ---------------------------------------- |
| `pipeline_name` | string            | The name of the specified data pipeline. |

**Example**

Stop an existing pipeline named `ad_data_pipeline`.

```sql SQL theme={null}
STOP PIPELINE ad_data_pipeline;
```

You can see the status of the parent tasks in the `sys.tasks` system catalog table and see the status of the child tasks in the `sys.subtasks` system catalog table.

## ALTER PIPELINE

`ALTER PIPELINE` evolves the schema of the data pipeline in the `SELECT` clause. For pipelines with multiple tables, specify each `SELECT` clause for tables in the existing pipeline. The `WHERE` clause must match the corresponding clause of the existing pipeline. Use the `FORCE` keyword to change a non-backward-compatible schema or add, modify, or remove the `WHERE` clause. For details about schema evolution, see [Load Avro Data](/data-formats-for-data-pipelines).

<Info>
  To execute this SQL statement, the data pipeline must not be running. You can use the `STOP PIPELINE` SQL statement to stop the execution of the data pipeline.
</Info>

You must have the ALTER privilege on the pipeline to execute this SQL statement. For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

**Syntax**

```sql SQL theme={null}
ALTER PIPELINE [ IF EXISTS ] pipeline_name
[ INSERT ] INTO table_name
SELECT
    $field_name AS field_name [ , ... ]
[ WHERE ... ]
[ INSERT INTO table_nameN SELECT ... ]
[ FORCE ]
```

| **Parameter**   | **Data** **Type** | **Description**                              |
| --------------- | ----------------- | -------------------------------------------- |
| `pipeline_name` | string            | The name of the specified pipeline to alter. |

**Examples**

These examples use this table and data pipeline definition. You must stop the execution of the data pipeline before performing schema evolution operations.

Create the `users` table with these columns:

* `id` — Universally Unique IDentifier (UUID) of the user
* `firstname` — First name of the user
* `lastname` — Last name of the user
* `birthyear` — Year of birth
* `groups` — List of groups where the user belongs

```sql SQL theme={null}
CREATE TABLE users(
    id         UUID NOT NULL,
    firstname  VARCHAR(255) NOT NULL,
    lastname   VARCHAR(255) NOT NULL,
    birthyear  INT,
    groups     VARCHAR(255)[] NOT NULL DEFAULT 'char[]'
);
```

Assume you have user data in Avro format in multiple files in the `/data/users` directory. Create the `users_pipeline` data pipeline for the Avro files containing user data `*.avro`. The schema configuration instructs the system to infer from all files using the `INFER_FROM` option.

```sql SQL theme={null}
CREATE PIPELINE users_pipeline
    SOURCE filesystem
        FILTER '/data/users/*.avro'
    EXTRACT
        FORMAT avro
        SCHEMA {
            INFER_FROM all_files
        }
INTO users
SELECT
    $id         AS id,
    $firstname  AS firstname,
    $lastname   AS lastname,
    $birthyear  AS birthyear;
```

Start the data pipeline.

```sql SQL theme={null}
START PIPELINE users_pipeline;
```

**Schema Evolution with a Column Addition**

Stop the data pipeline.

```sql SQL theme={null}
STOP PIPELINE users_pipeline;
```

Change the schema of an existing pipeline named `users_pipeline` to add the `groups` column. Access the array of strings within the column.

```sql SQL theme={null}
ALTER PIPELINE users_pipeline
INTO users
SELECT
    $id         AS id,
    $firstname  AS firstname,
    $lastname   AS lastname,
    $birthyear  AS birthyear,
    $groups[]   AS groups;
```

Start the data pipeline again.

```sql SQL theme={null}
START PIPELINE users_pipeline;
```

**Check Whether the Data Pipeline to Change Exists for Column Addition**

Stop the data pipeline.

```sql SQL theme={null}
STOP PIPELINE users_pipeline;
```

Change the schema of an existing pipeline named `users_pipeline` to add the `groups` column. Use the `IF EXISTS` keywords to check whether the data pipeline exists. Access the array of strings within the column.

```sql SQL theme={null}
ALTER PIPELINE IF EXISTS users_pipeline
INTO users
SELECT
    $id         AS id,
    $firstname  AS firstname,
    $lastname   AS lastname,
    $birthyear  AS birthyear,
    $groups[]   AS groups;
```

Start the data pipeline again.

```sql SQL theme={null}
START PIPELINE users_pipeline;
```

**Schema Evolution with a Column Removal**

Stop the data pipeline.

```sql SQL theme={null}
STOP PIPELINE users_pipeline;
```

Change the schema of an existing pipeline named `users_pipeline` to remove the `birthyear` column.

```sql SQL theme={null}
ALTER PIPELINE users_pipeline
INTO users
SELECT
    $id         AS id,
    $firstname  AS firstname,
    $lastname   AS lastname;
```

Start the data pipeline again.

```sql SQL theme={null}
START PIPELINE users_pipeline;
```

**Schema Evolution with a Column Data Type Modification**

Stop the data pipeline.

```sql SQL theme={null}
STOP PIPELINE users_pipeline;
```

Change the schema of an existing pipeline named `users_pipeline` to narrow the `INT` data type to a `SMALLINT` type for the `birthyear` column using the `SMALLINT` casting function.

```sql SQL theme={null}
ALTER PIPELINE users_pipeline
INTO users
SELECT
    $id         AS id,
    $firstname  AS firstname,
    $lastname   AS lastname,
    SMALLINT($birthyear)  AS birthyear;
```

Start the data pipeline again.

```sql SQL theme={null}
START PIPELINE users_pipeline;
```

**Schema** **Evolution with the Addition of a Filter Condition**

Stop the data pipeline.

```sql SQL theme={null}
STOP PIPELINE users_pipeline;
```

Change the schema of an existing pipeline named `users_pipeline` to filter the year of birth to be greater than 1950 using the `WHERE` clause. Add the `FORCE` keyword to instruct the data pipeline to evolve the schema with the filter condition.

```sql SQL theme={null}
ALTER PIPELINE users_pipeline
INTO users
SELECT
    $id         AS id,
    $firstname  AS firstname,
    $lastname   AS lastname,
    $birthyear  AS birthyear,
WHERE
    $birthyear > 1950
FORCE;
```

Start the data pipeline again.

```sql SQL theme={null}
START PIPELINE users_pipeline;
```

### ALTER PIPELINE RENAME

`ALTER PIPELINE RENAME TO` SQL statement changes the name of the pipeline object, while retaining its identifier, options, and other metadata. The Ocient System reflects this change in the `sys.pipelines` system catalog table. Then, you must use the new name when you refer to the pipeline in SQL statements.

You must have the ALTER privilege on the pipeline to execute this SQL statement. For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

**Syntax**

```sql SQL theme={null}
ALTER PIPELINE [ IF EXISTS ] pipeline_original_name RENAME TO pipeline_new_name
```

| **Parameter**            | **Data** **Type** | **Description**                         |
| ------------------------ | ----------------- | --------------------------------------- |
| `pipeline_original_name` | string            | The name of the existing data pipeline. |
| `pipeline_new_name`      | string            | The new name of the data pipeline.      |

**Example**

Rename an existing pipeline named `ad_data_pipeline` to `renamed_pipeline`.

```sql SQL theme={null}
ALTER PIPELINE ad_data_pipeline RENAME TO renamed_pipeline;
```

## **EXPORT** PIPELINE

`EXPORT PIPELINE` returns the `CREATE PIPELINE` SQL statement used to create the pipeline object. You can use the output of this statement to recreate an identical pipeline when you remove the original pipeline.

<Info>
  The execution of this statement censors sensitive S3 values like `ACCESS_KEY_ID` and `SECRET_ACCESS_KEY` and Kafka [Consumer Configuration](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) password-type fields. The database replaces them with `*`.
</Info>

To execute this statement, you must have the VIEW privilege on the pipeline and any table the pipeline targets. For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

**Syntax**

```sql SQL theme={null}
EXPORT PIPELINE pipeline_name
```

| **Parameter**   | **Data** **Type** | **Description**                          |
| --------------- | ----------------- | ---------------------------------------- |
| `pipeline_name` | string            | The name of the specified data pipeline. |

**Example**

Export an existing pipeline in the database `ad_data_pipeline`.

```sql SQL theme={null}
EXPORT PIPELINE ad_data_pipeline;
```

## CREATE PIPELINE FUNCTION

`CREATE PIPELINE FUNCTION` enables you to define a function for loading data. Define the function behavior using the {Groovy} language. For details bout this language, see [The Apache Groovy Programming Language](https://groovy-lang.org/index.html).

Function arguments and output are strongly typed and immutable.

You can test the execution of your function using the `PREVIEW PIPELINE` SQL statement.

<Info>
  The Ocient System does not support the overload of function names.
</Info>

**Syntax**

```sql SQL theme={null}
CREATE [ OR REPLACE ] PIPELINE FUNCTION [ IF NOT EXISTS ]
    function_name( input_argument [, ...] )
    LANGUAGE GROOVY
    RETURNS output_argument_definition
    IMPORTS [ library_name [, ...] ]
    AS $$
        groovy_declaration
    $$
```

| **Parameter**                | **Type** | **Description**                                                                                                                                                                                                                                                                                                |
| ---------------------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `function_name`              | string   | A unique identifier for the data pipeline function.                                                                                                                                                                                                                                                            |
| `input_argument`             | string   | The name of one or more input arguments of the function. Specify data types for input arguments according to the support data types defined in [Data Types for Data Pipelines](/data-types-for-data-pipelines).  <br />For the data type declaration, use `NOT NULL` where applicable for maximum performance. |
| `output_argument_definition` | string   | The type definition of the output from the function.                                                                                                                                                                                                                                                           |
| `library_name`               | string   | The name of one or more Java libraries.  <br />You can include libraries by using the `IMPORTS` clause or specifying the fully-qualified class (e.g., `java.lang.Integer`) path in the source definition.                                                                                                      |
| `groovy_declaration`         | string   | The Groovy definition of the function.                                                                                                                                                                                                                                                                         |

### Install and Enable Third-Party Libraries

You can use the default list of supported third-party libraries or additional third-party libraries that you install.

#### Supported Libraries

Data pipeline functions can import classes from the default list of supported third-party libraries. This table provides the resources for each supported library package.

| **Library Package**                       | **Resource**                                                                                                                                     |
| ----------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ |
| `java.lang.*`                             | [java.lang](https://docs.oracle.com/javase/8/docs/api/java/lang/package-summary.html)                                                            |
| `java.util.*`                             | [java.util](https://docs.oracle.com/javase/8/docs/api/java/util/package-summary.html)                                                            |
| `java.nio.ByteBuffer.*`                   | [ByteBuffer](https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html)                                                                 |
| `groovy.json.*`                           | [groovy.json](https://docs.groovy-lang.org/latest/html/gapi/groovy/json/package-summary.html)                                                    |
| `groovy.xml.*`                            | [groovy.xml](https://docs.groovy-lang.org/latest/html/gapi/groovy/xml/package-summary.html)                                                      |
| `groovy.yaml.*`                           | [groovy.yaml](https://docs.groovy-lang.org/latest/html/api/groovy/yaml/package-summary.html)                                                     |
| `org.apache.groovy.datetime.extensions.*` | [org.apache.groovy.datetime.extensions](https://docs.groovy-lang.org/latest/html/api/org/apache/groovy/datetime/extensions/package-summary.html) |
| `org.apache.groovy.dateutil`              | [org.apache.groovy.dateutil.extensions](https://docs.groovy-lang.org/latest/html/api/org/apache/groovy/dateutil/extensions/package-summary.html) |
| `com.ocient.streaming.data.types.*`       | [Data Types for User-Defined Data Pipeline Functions](/data-types-for-user-defined-data-pipeline-functions)                                      |

#### Additional Libraries

You can install and enable additional third-party libraries to import for use in your data pipeline functions.

You must install the JAR package on all Loader Nodes in the `/opt/ocient/current/lib/extractorengine_udt` folder.

Then, add the fully qualified class name in the function import list as part of the `library_name` parameter. For example, to reference the `ByteBuffer` class from the `com.fastbuffer` package, specify `com.fastbuffer.ByteBuffer` in the `library_name` parameter and use the class in the Groovy definition as `var x = new com.fastbuffer.ByteBuffer()`.

### Groovy Data Type Mapping

For the Groovy definition, the Ocient System maps its SQL data type to the corresponding Groovy data type. Your Groovy code should use the Groovy data type defined in this table for any input arguments and output.

| **SQL Data Type**          | **Groovy Data Type**                               |
| -------------------------- | -------------------------------------------------- |
| `BIGINT`                   | `java.lang.Long`                                   |
| `BINARY(N) or HASH(N)`     | `byte[]`                                           |
| `BOOLEAN`                  | `java.lang.Boolean`                                |
| `CHAR(N) or VARCHAR(N)`    | `java.lang.String`                                 |
| `DATE`                     | `java.time.LocalDate`                              |
| `DECIMAL(P,S)`             | `com.ocient.streaming.data.types.Decimal`          |
| `DOUBLE`                   | `java.lang.Double`                                 |
| `INT`                      | `java.lang.Integer`                                |
| `IPV4`                     | `java.net.Inet4Address`                            |
| `IP`                       | `java.net.Inet6Address`                            |
| `ST_POINT`                 | `com.ocient.streaming.data.types.gis.STPoint`      |
| `ST_LINESTRING`            | `com.ocient.streaming.data.types.gis.STLinestring` |
| `ST_POLYGON`               | `com.ocient.streaming.data.types.gis.STPolygon`    |
| `FLOAT`                    | `java.lang.Float`                                  |
| `SMALLINT`                 | `java.lang.Short`                                  |
| `TIME`                     | `com.ocient.streaming.data.types.Time`             |
| `TIMESTAMP`                | `com.ocient.streaming.data.types.Timestamp`        |
| `BYTE`                     | `java.lang.Byte`                                   |
| `TUPLE<<type1, type2, …>>` | `com.ocient.streaming.data.types.OcientTuple`      |
| `TYPE[]`                   | `java.util.List<TYPE>`                             |
| `UUID`                     | `java.util.UUID`                                   |
| `VARBINARY(N)`             | `byte[]`                                           |
| `VARCHAR(N)`               | `java.lang.String`                                 |

**Example**

Create the `sort_function` data pipeline function to sort an array of integers. The function has two input arguments: `value`, a non-NULL array of integers, and `ascending`, the sort order. The function returns a non-NULL array of integers. If `value` is empty, the function throws an error.

The function imports these Java libraries:

* `java.lang.Integer`
* `java.util.ArrayList`
* `java.util.Collections`
* `java.util.Comparator`
* `java.util.List`

Define the Groovy code. Because the input arguments do not change, the example Groovy code first copies the `value` argument, sorts the copied list according to the sort order, and returns the sorted array.

```sql SQL theme={null}
CREATE PIPELINE FUNCTION
    sort_function( value INT[] NOT NULL, ascending BOOLEAN NOT NULL)
    LANGUAGE GROOVY
    RETURNS INT[] NOT NULL
    IMPORTS [
        'java.lang.Integer',
        'java.util.ArrayList',
        'java.util.Collections',
        'java.util.Comparator',
        'java.util.List'
        ]
    AS $$
        /* Throw an error if the array is empty */
        if (value.isEmpty()){
            throw new PipelineFunctionException("Unexpected empty array");
        }

        /* Make a copy of the list. */
        List<Integer> sorted = new ArrayList<>((List<Integer>)value);

        /* Sort the array elements according to the specified order. */
        sorted.sort(ascending ? Comparator.naturalOrder() : Comparator.reverseOrder());

        /* Return the sorted array. */
        return sorted;
    $$;
```

View the creation information about the `sort_function` function using the `sys.pipeline_functions` system catalog table. This statement returns the function name, return type, argument names, data types of the arguments, and the list of imported libraries.

```sql SQL theme={null}
SELECT name, return_type, argument_names, argument_types, imported_libraries
FROM sys.pipeline_functions;
```

## DROP PIPELINE FUNCTION

`DROP PIPELINE FUNCTION` removes an existing pipeline function.

You must have the DROP privilege on the pipeline function to execute this SQL statement. For details, see [Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference).

**Syntax**

```sql SQL theme={null}
DROP PIPELINE FUNCTION [ IF EXISTS ] function_name [, ...]
```

| **Parameter**   | **Data** **Type** | **Description**                                                                                                                                                  |
| --------------- | ----------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `function_name` | string            | The name of the specified data pipeline function to remove.  <br />You can drop multiple pipelines by specifying additional function names, separated by commas. |

**Examples**

**Remove the Existing Pipeline Function**

Remove an existing pipeline function named `sort_function`.

```sql SQL theme={null}
DROP PIPELINE FUNCTION sort_function;
```

**Remove an Existing Pipeline Function by Checking for Existence**

Remove an existing pipeline function named `sort_function` or return a warning if the Ocient System does not find the function in the database.

```sql SQL theme={null}
DROP PIPELINE FUNCTION IF EXISTS sort_function;
```

## Related Links

[Load Data](/load-data)

[Data Pipeline Load of CSV Data from S3](/data-pipeline-load-of-csv-data-from-s3)

[Data Pipeline Load of Parquet Data from S3](/data-pipeline-load-of-parquet-data-from-s3)

[Transform Data in Data Pipelines](/transform-data-in-data-pipelines)

[Data Pipeline Behavior Considerations](/data-pipeline-behavior-considerations)

[Load Metadata and File-Based Partitioned Data in Data Pipelines](/load-metadata-and-file-based-partitioned-data-in-data-pipelines)

[Data Control Language (DCL) Statement Reference](/data-control-language-dcl-statement-reference)

[Identifiers](/identifiers)
