SQL Reference
Data Definition Language (DDL)...

Data Pipelines

Data pipelines enable you to load data from your chosen source into the . You can preview a data pipeline using the PREVIEW PIPELINE SQL statement to test the creation statement and any required data transformation. Then, you can create a data pipeline using the CREATE PIPELINE statement. To manage the load, use the START PIPELINE statement to start the load and the STOP PIPELINE statement to stop it. You can rename a data pipeline using the ALTER PIPELINE RENAME statement. To see the full definition of a created pipeline, use the EXPORT PIPELINE statement. When you finish with the load, you can use the DROP PIPELINE statement to remove the data pipeline. You can create user-defined data pipeline functions using the CREATE PIPELINE FUNCTION statement and remove the function using the DROP PIPELINE FUNCTION statement. Also, you can administer privileges for data pipelines and data pipeline functions. For details, see Data Control Language (DCL) Statement Reference.

CREATE PIPELINE

CREATE PIPELINE defines a data pipeline that you can execute with the START PIPELINE SQL statement. Specify the type of load, data source, and data format.

You must have the ALTER privilege on the pipeline. For details, see Data Control Language (DCL) Statement Reference.

Syntax

SQL


Pipeline Identity and Naming

The name of a data pipeline is unique in an System. Reference the pipeline in other SQL statements like START PIPELINE, STOP PIPELINE, and DROP PIPELINE using the name of the pipeline. The SQL statement throws an error if a pipeline with the same name already exists unless you specify the IF NOT EXISTS option. You can rename a pipeline with the ALTER PIPELINE SQL statement.

Update a Pipeline

You can update a pipeline with the OR REPLACE clause in the CREATE PIPELINE SQL statement. Use this clause when you want to continue loading from the current place in a continuous load, but you need to modify transformations or other settings. If you specify the OR REPLACE clause and the pipeline already exists, the database replaces the original pipeline object with the options specified in the new CREATE OR REPLACE PIPELINE statement. When you replace an existing pipeline, the pipeline retains its current position in the source data so that data is not duplicated when the pipeline is resumed with a START PIPELINE SQL statement. First, you must stop a pipeline before executing the CREATE OR REPLACE SQL statement.

Batch and Continuous Pipeline Modes

You can define pipelines in either BATCH or CONTINUOUS mode.

  • File sources (e.g., s3, filesystem) only support BATCH mode. File-based loads default to BATCH mode if you do not specify this keyword.
  •  only supports CONTINUOUS mode. Loads with a Kafka source default to CONTINUOUS mode if you do not specify this keyword.

Required Options

You must define certain options in every pipeline. Optional options need to be present only if you use specific functions. A pipeline must contain the SOURCE, EXTRACT FORMAT, and INTO table_name ... SELECT statements.

These options depend on each other: IF NOT EXISTS and OR REPLACE SQL statements are mutually exclusive.

SELECT Statement and Data Transformation

Use the INTO table_name ... SELECT SQL statement in the CREATE PIPELINE SQL statement to specify how to transform the data. The SELECT statement includes a set of one or more expressions and a target column name in the form expression as column_name. The expression part in the statement contains a source field reference (e.g., $1 or $my_field.subfield) and, optionally, the transformation function you want to apply.

If your data is nested in arrays, you can use special transformation functions, such as the EXPLODE_OUTER function, to expand the data into individual rows.

For details about data transformation and supported transformation functions, see Transform Data in Data Pipelines.

For details about data types and casting, see Data Types for Data Pipelines and Data Formats for Data Pipelines.

You can also specify metadata values, such as filename, to load in the SELECT SQL statement. For details, see Load Metadata Values in Data Pipelines.

This is an example transformation statement snippet.

SQL


NULL and Default Value Handling

The Ocient System has specific ways to handle NULL and default values in the data pipeline. Consider this information when you prepare data to load and write the CREATE TABLE and CREATE PIPELINE SQL statements.

To load NULL values, insert the NULL value in the data. In this case, if you specify NOT NULL for the target column in the CREATE TABLE statement, the data pipeline fails to load.

If you omit a column in the SELECT SQL statement, the data pipeline loads the default value for the column. If you do not specify the default value, the pipeline loads a NULL value. If you specify NOT NULL for the target column, the data pipeline also fails to load.

You can use the DEFAULT keyword to load a default value. In this case, if the column does not have a default value, the pipeline fails to load. If the pipeline loads a NULL into a column with a specified default value, you can use COALESCE(<value>, DEFAULT) to insert the default value instead of the NULL value, where <value> is the NULL column. You can modify the load of one column at a time in this way.

This table describes the data pipeline behavior for NULL or omitted column values.

Value in the Data Pipeline

Nullable Target Column

Default Value in Target Column

Resulting Data Pipeline Behavior

NULL

No

Value might or might not be set.

The pipeline uses the default value if the default exists and you specify the COLUMN_DEFAULT_IF_NULL option.

Otherwise, the pipeline fails.

NULL

Yes

Value might or might not be set.

The pipeline uses the default value if the default exists and you specify the COLUMN_DEFAULT_IF_NULL option.

Otherwise, the pipeline uses the NULL value.

Omitted column in the SELECT SQL statement

NULL value might or might not be set.

Yes

The pipeline uses the default value.

Omitted column in the SELECT SQL statement

No

No

The pipeline fails.

Omitted column in the SELECT SQL statement

Yes

No

The pipeline uses the NULL value.

Required Privileges

You must have the CREATE PIPELINE privilege on the underlying database and the VIEW privilege on each table in the pipeline definition to execute the CREATE PIPELINE SQL statement. The table must already exist.

See the START PIPELINE SQL statement for the required privileges to execute a pipeline.

Limitations

A pipeline can contain, at most, 4,194,304 files. When loading more than 4,194,304 files, you must partition files into smaller batches and serially execute multiple pipelines. Use PREFIX and FILTER settings to control these partitions. It is not recommended to create multiple pipelines with a total of more than 4,194,304 files across the entire system simultaneously.

Examples

This example loads JSON data from Kafka using the CREATE PIPELINE SQL statement.

SQL


For a complete tutorial, see Data Pipeline Load of JSON Data from Kafka.

This example loads delimited data from S3.

SQL


For a complete tutorial, see Data Pipeline Load of CSV Data from S3.

SOURCE Options

File-Based Source Options

Options that apply to both the S3 and FILESYSTEM sources.

Option Key

Default

Required or Optional

Data Type

Description

FILTER | FILTER_GLOB | FILTER_REGEX

None

Required

STRING

The expression for filtering files in the directory or child directories to load. This pattern applies to the full path to the file except the Bucket for S3-compatible file sources.

ℹ️ Paths include a leading forward slash.

Specify one of these options:

  • FILTER_GLOB or FILTER Regular filename patterns. Supported options include:
    • * indicates unlimited wildcard characters except for the path character, /.
    • ** indicates wildcard characters including path separators.
    • ? indicates a single wildcard character.
  • FILTER_REGEX Regular expression patterns to filter files. Some common patterns include:
    • . matches any character.
    • * matches any 0 or more of the preceding character.
    • + matches a single character.
    • [135] matches any one character in the set.
    • [1-5] matches any one character in the range.
    • (a|b) matches a or b.

FILTER_GLOB Examples:

List all CSV files in subdirectories named 2024 that are in any subdirectory of the trades directory, which is in the root of the bucket.

FILTER_GLOB = '/trades/*/2024/*.csv'

List all CSV files in the bucket in all subdirectories.

FILTER_GLOB = '**.csv'

FILTER_REGEX Examples:

List all json.gz files in the bucket that contain the name orders anywhere in the path.

FILTER_REGEX = '.*orders.*\.json\.gz'

List all files in the bucket in the root path metrics or values where the date string on the filename is 2000 to 2004.

FILTER_REGEX = '/(metrics|values)/.*200[0-4].*'

PREFIX

None

Optional

STRING

or

ARRAY OF STRINGS

Specify a prefix in addition to a filter. The prefix must end with a forward slash. The Ocient System uses the more specific option between the specified prefix and filter.

Paths include a leading forward slash.

The system applies the prefix to S3 sources to restrict the scope of listing operations and improve performance. Use this option along with FILTER_REGEX to enforce the listing prefix of your choice.

When you apply a list of prefixes, the system loads the union of the set of files found in each of the paths specified by the prefix.

PREFIX Examples:

List all CSV files in the 2024 subdirectory of files.

PREFIX '/files/2024/' FILTER '**.csv'

List all JSON files of subdirectory 2024/09/ in the orders directory.

PREFIX '/data/orders/2024/09/' FILTER_REGEX '.*/orders/.*json'

List all JSON files in multiple subdirectories.

PREFIX ['/data/orders/2024/09/', '/data/orders/2024/10/']

OBJECT_KEY

None

Optional

STRING

or

ARRAY OF STRINGS

Specify an S3 object key to load. You cannot use this option if the PREFIX option or any FILTER option is set.

ℹ️ Object keys do not include a leading forward slash.

The pipeline loads the object key or keys listed in the pipeline. This way can be faster for loading data by avoiding listing of files in large directories.

Object keys must not include any of the special characters: *?\{}[]

OBJECT_KEY Examples:

Load a single object from the designated bucket at the specified object key.

OBJECT_KEY 'order_data/jsonl/orders_20251101.jsonl'

Load a list of objects from the designated bucket at the specified object keys.

OBJECT_KEY ['order_data/jsonl/orders_20251101.jsonl', 'order_data/jsonl/orders_20251201.jsonl']

COMPRESSION_METHOD

None

Optional

STRING

The method the Ocient System uses to decompress file data. The supported option is gzip.

To load uncompressed data, omit the COMPRESSION_METHOD option.

ℹ️The COMPRESSION_METHOD option is only applicable to file-based sources. Kafka pipelines automatically decompress data based on the Kafka topic configuration.

SORT_BY

filename

Optional

STRING

The sort criteria for sorting the file list before the load. Supported options are:

  • filename Sort files lexicographically by the filename.
  • created — Sort files based on the timestamp that indicates the creation of the file.
  • modified — Sort files based on the timestamp that indicates the modification of the file.

SORT_DIRECTION

ASC

Optional

STRING

The sort direction, either ASC ascending or descending DESC, determines the sort order. If you specify this option, you must also specify the SORT_BY option.

START_TIMESTAMP

None

Optional

TIMESTAMP

The system loads files after this timestamp value.

When you use this option with the SORT_BY option and the filename or modification timestamp, then the system uses the modification timestamp for the file to compare the value. When you use this option with the SORT_BY option and the creation timestamp, then the system uses the creation timestamp for the file to compare the value.

END_TIMESTAMP

None

Optional

TIMESTAMP

The system loads files before this timestamp value.

When you use this option with the SORT_BY option with the filename or modification timestamp, then the system uses the modification timestamp for the file to compare the value. When you use this option with the SORT_BY option and the creation timestamp, then the system uses the creation timestamp for the file to compare the value.

START_FILENAME

None

Optional

STRING

The filename of the file that starts the load. The system filters the files it loads to include files with filenames lexicographically equal to or after this filename.

Use the full path of the file, such as '/dir/load_file.json'. If the file is located in the top-most directory, start the path with a slash /, such as '/load_file.json'.

END_FILENAME

None

Optional

STRING

The filename of the file that finishes the load. The system filters the files it loads to include files with filenames lexicographically equal or before this filename.

Use the full path of the file, such as '/dir/load_file.json'. If the file is located in the top-most directory, start the path with a slash /, such as '/load_file.json'.

S3 Source Options

You can apply these options to data sources of the SOURCE S3 type, which include S3 and S3-compatible services.

Option Key

Default

Required or Optional

Data Type

Description

BUCKET

None

Required

STRING

The name of the bucket in AWS S3.

ENDPOINT

None

Optional

STRING

The endpoint URI for the S3-compatible service API. (e.g., https://s3.us-east-2.amazonaws.com)

If you provide the ENDPOINT option, the Ocient System ignores settings for the REGION option.

REGION

'us-east-1'

Optional

STRING

The region that the Ocient System uses for AWS access.

If you specify the ENDPOINT option, the system ignores this option.

ACCESS_KEY_ID



Optional

STRING

The access key identification for AWS credentials. If you specify this option, you must also specify the SECRET_ACCESS_KEY option. The Ocient System uses anonymous credentials when you specify an empty value for this option.

SECRET_ACCESS_KEY



Optional

STRING

The secret key for AWS credentials. If you specify this option, you must also specify the ACCESS_KEY_ID option. The Ocient System uses anonymous credentials when you specify an empty value for this option.

MAX_CONCURRENCY

50

Optional

INTEGER

Determines the number of parallel connections the Ocient System uses to communicate with the AWS S3 service.

⚠️ This option does not require modification in most cases. Contact Ocient Support to modify these values.

READ_TIMEOUT

0 (unlimited timeout)

Optional

INTEGER

The number of seconds until a read operation times out.

⚠️ This option does not require modification in most cases. Contact Ocient Support to modify these values.

REQUEST_DEPTH

500

Optional

INTEGER

The upper boundary of requests that the Ocient System handles concurrently.

⚠️ This option does not require modification in most cases. Contact Ocient Support to modify these values.

REQUEST_RETRIES

10

Optional

INTEGER

Number of times the AWS SDK retries failing requests before the Ocient System throws an error.

⚠️ This option does not require modification in most cases. Contact Ocient Support to modify these values.

FILESYSTEM Source Options

No options exist specific to the file system source (SOURCE FILESYSTEM) except for general file-based source options.

When you load data using SOURCE FILESYSTEM, the files must be addressable from all of your Loader Nodes. The Ocient System uses the specified path in the pipeline PREFIX and FILTER options to select the files to load.

A shared view of the files you want to load must be available to all Loader Nodes involved in a pipeline. For example, you can use a Network File System (NFS) mount available to all Loader Nodes at a common path on each node.

Example

This example CREATE PIPELINE SQL statement snippet contains a FILESYSTEM source and filters to all CSV files in the /tmp/sample-data/ directory on each of the Loader Nodes.

SQL


KAFKA Source Options

You can apply these options to Kafka data sources (SOURCE KAFKA).

For compression, you do not need to specify a compression option in Kafka-based pipelines, because the Ocient System handles the compression type automatically. Records produced to the Kafka broker with a compression.type setting or with the compression.type set on the topic automatically decompress when the loading process consumes the records. The loading process uses built-in headers in Kafka to determine the required decompression during extraction.

Option Key

Default

Required or Optional

Data Type

Description

TOPIC

None

Required

STRING

The name of the Kafka topic indicates where to consume records.

BOOTSTRAP_SERVERS

None

Required

STRING

A comma-delimited list of IP:port pairs that contain the IP addresses and the associated port numbers of the Kafka Brokers.

Example:

BOOTSTRAP_SERVERS = '198.51.100.1:9092,198.51.100.2:9092'

WRITE_OFFSETS

true

Optional

BOOLEAN

Indicates whether the Kafka consumer should write its durably-made record offsets to the Kafka Broker.

CONFIG

'{ "enable.auto.commit": false, "key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer", "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer", "group.id": <database-name>-<pipeline-name> }'

Optional

STRING

The consumer configuration that the Kafka consumers should use. This option is a JSON-formatted string.

Certain values within this configuration are fixed, whereas the Ocient System provides other values with a default value that you can modify.

For the consumer configuration, to create a secure connection from the Kafka consumer to a Kafka Broker, set the "security.protocol" key along with any SSL or SASL keys.

If a certificate file is required, you must add it to the truststore used by the on all Loader Nodes. The truststore path must be identical on all Loader Nodes. The Kafka configuration can reference this truststore path.

Kafka CONFIG Option Defaults

When you configure the CONFIG option for a Kafka source, the Ocient System merges the values you specify with default values that Ocient uses when the system creates a Kafka Consumer.

You cannot override some values that Ocient provides using the CONFIG option.

Key

Default Value

Override Allowed

group.id

<database name>-<pipeline name>

Yes

enable.auto.commit

false

No

key.deserializer

"org.apache.kafka.common.serialization.ByteArrayDeserializer"

No

value.deserializer

"org.apache.kafka.common.serialization.ByteArrayDeserializer"

No

EXTRACT Options

General Extract Options

You can specify these options on any of the allowed FORMAT types.

Option Key

Default

Required or Optional

Data Type

Description

FORMAT

None

Required

STRING

Specifies the format of the files to load. Supported values are:

  • delimited
  • csv
  • json
  • binary

CHARSET_NAME

Binary format: ibm1047

All other formats: utf-8

Optional

STRING

Specifies the character set for decoding data from the source records into character data. Use this character set when you load data into VARCHAR columns or when you apply CHAR transformation functions.

Defaults to utf-8 for all formats except BINARY, which defaults to ibm1047.

You can configure the default value for BINARY formatted data using a SQL statement such as:

ALTER SYSTEM ALTER CONFIG SET 'sql.pipelineParameters.extract.binary.defaultCharset' = 'ibm1047'

COLUMN_DEFAULT_IF_NULL

false

Optional

BOOLEAN

Specifies whether pipelines should load the column default value when the result of a series of transforms is NULL.

  • If you set this option to false (the default), the pipeline loads NULL values into columns.
  • If you set this option to true, the pipeline loads the defined default value of the column when the result of the execution of transformation on the column is NULL.

NULL_STRINGS

Delimited or CSV format:

['null', 'NULL']

JSON format:

[]

Optional

ARRAY OF STRINGS

Specifies string values that should represent a NULL value when extracted from the source records.

Use this in csv, delimited, and json formats to convert specific values to NULL instead of requiring individual transform function calls to NULL_IF with those values.

This option applies to source data immediately after extraction. If the result of transformations is one of these strings, you must use NULL_IF to transform to NULL with the specified string values.

Delimited and CSV Extract Options

You can specify these options for FORMAT delimited or FORMAT csv data formats, which are aliases.

For details on working with Delimited data, see Load Delimited and CSV Data.

Option Key

Default

Required or Optional

Data Type

Description

COMMENT_CHAR

NULL

Optional

STRING

Specifies the character used to comment out a record in the source file. The load skips records where the first character of a record is equal to this character.

Set this option to NULL or '' to turn off the detection of these control characters.

Example: COMMENT_CHAR '#'

EMPTY_FIELD_AS_NULL

true

Optional

BOOLEAN

Specifies whether the Ocient System should extract an empty source field as NULL or a missing value. When this option is set to true, the Ocient System treats empty fields as NULL. Otherwise, the system treats fields as a missing value. For string-type fields, a missing value is equivalent to an empty string.

Define an empty field as two consecutive delimiters (e.g., the second field is empty in abc,,xyz). If a field is explicitly an empty string as indicated by quote characters, the Ocient System treats the field as an empty string, not an empty field (e.g., the second field is an empty string in abc,"",xyz).

The CHAR($1) transformation function handles both NULLs and empty strings and passes them through.

⚠️ Beware that the NULL_IF($1, '') transformation function directly loads a NULL for both NULLs and missing values. This transformation can override the behavior of EMPTY_FIELD_AS_NULL.

FIELD_OPTIONALLY_ENCLOSED_BY

"

Optional

STRING

Also known as the “quote character,” this option specifies the character for optionally enclosing fields. Fields enclosed by this character can include delimiters, the enclosure character, or the escape character.

Set this option to NULL or '' to turn off the detection of these control characters.

Examples:

Use a double quote character:

FIELD_OPTIONALLY_ENCLOSED_BY = '"'

Use a single quote character:

FIELD_OPTIONALLY_ENCLOSED_BY = ''''

ESCAPE_CHAR

"

Optional

STRING

Specifies the escape character within fields enclosed by the FIELD_OPTIONALLY_ENCLOSED_BY option. Use this option to escape the enclosure character or escape character.

Set this option to NULL or '' to turn off the detection of these control characters.

Examples:

Use a double quote as the escape character.

ESCAPE_CHAR = '"'

Use a single quote as the escape character.

ESCAPE_CHAR = ''''

FIELD_DELIMITER

','



Optional

STRING

or

ARRAY OF STRINGS

Specifies a character or list of possible characters for delimiting fields. The default value sets the field delimiter to only a comma. The value must be one byte.

The Ocient System automatically interprets the values you specify as C-style escaped strings. This means you do not need to specify an escape string (e'some value') is not required to specify control characters. This differs from the default string behavior in Ocient. For details, see String Literals and Escape Sequences.

FIELD_DELIMITER and FIELD_DELIMITERS are aliases.

Examples:

Use a tab character:

FIELD_DELIMITER = '\t'

Use a pipe character:

FIELD_DELIMITER = '|'

Use either a pipe or a comma character:

FIELD_DELIMITER = [',', '|']

NUM_HEADER_LINES

0

Optional

INTEGER

Specifies the number of header lines, typically 0 or 1. The Ocient System skips this number of lines and does not load them as data when files are processed. Use this option when your data includes a row of header values.

RECORD_DELIMITER

['\r\n', '\n']



Optional

STRING

or

ARRAY OF STRINGS

Specifies the string or an array of strings for delimiting records. The file is split into individual records using this character during processing. Common values include '\r\n' and '\n'. The value must be one or two bytes.

The Ocient System automatically interprets the values you specify as C-style escaped strings. An escape string (e'some value') is not required to specify control characters. This differs from the default strings behavior in Ocient.

The system chooses the first specified delimiter and uses that delimiter for the rest of the data. Data with mixed delimiters is not supported.

RECORD_DELIMITER and RECORD_DELIMITERS are aliases.

Examples:

Use a linefeed character:

RECORD_DELIMITER = '\n'

Use a carriage return and linefeed character sequence:

RECORD_DELIMITER = '\r\n'

SKIP_EMPTY_LINES

false

Optional

BOOLEAN

Specifies whether or not to skip empty lines.

CLOSE_ARRAY

']'

Optional

STRING

Specifies the character that indicates the end of an array in a csv or delimited field. Use this option to parse array data types.

Specify the OPEN_ARRAY option also when using this option.

Set this option to NULL or '' to turn off the detection of these control characters. If you set this option to either of these characters, the system also turns off the detection of these characters for the OPEN_ARRAY option.

Example:

Convert source data such as val1,"{1,2,3}",val2 to an array when referenced as $2[].

CLOSE_ARRAY '}' OPEN_ARRAY '{' ARRAY_ELEMENT_DELIMITER ','

OPEN_ARRAY

'['

Optional

STRING

Specifies the character that indicates the start of an array in a csv or delimited field. Use this option to parse array data types.

Specify the CLOSE_ARRAY option also when using this option.

Set this option to NULL or '' to turn off the detection of these control characters. If you set this option to either of these characters, the system also turns off the detection of these characters for the CLOSE_ARRAY option.

Example:

Convert source data such as val1,"[1,2,3]",val2 to an array when referenced as $2[].

OPEN_ARRAY '[' CLOSE_ARRAY ']' ARRAY_ELEMENT_DELIMITER ','

CLOSE_OBJECT

'}'

Optional

STRING

Specifies the character that indicates the end of a tuple in a field. Use this option to parse tuple data types.

Specify the OPEN_OBJECT option also when using this option.

Set this option to NULL or '' to turn off the detection of these control characters. If you set this option to either of these characters, the system also turns off the detection of these characters for the OPEN_OBJECT option.

OPEN_OBJECT

'{'

Optional

STRING

Specifies the character that indicates the start of a tuple in a field. Use this option to parse tuple data types.

Specify the CLOSE_OBJECT option also when using this option.

Set this option to NULL or '' to turn off the detection of these control characters. If you set this option to either of these characters, the system also turns off the detection of these characters for the CLOSE_OBJECT option.

ARRAY_ELEMENT_DELIMITER

','

Optional

STRING

Specifies the character that separates values in an array within a csv or delimited field. Use this option to parse array data types.

Set this option to NULL or '' to turn off the detection of these control characters.

Example:

Convert source data such as val1,"[1;2;3]",val2 to an array when referenced as $2[].

ARRAY_ELEMENT_DELIMITER = ';'

When you specify the escape character, you often have to use an escape sequence. This action follows standard SQL rules.

JSON Extract Options

No options exist for JSON data record extraction (FORMAT json).

For details about JSON-formatted data, see Load JSON Data.

PARQUET Extract Options

No options exist for data record extraction (FORMAT parquet).

For details about Parquet-formatted data, see Load Parquet Data.

When you use the FORMAT PARQUET option with an AWS S3 Source, the ENDPOINT option is required.

BINARY Extract Options

You can apply these options to binary data record extraction (FORMAT BINARY).

For details about BINARY-formatted data, see Load Binary Data.

The general option CHARSET_NAME has a different default value for FORMAT BINARY.

Option Key

Default

Required or Optional

Data Type

Description

RECORD_LENGTH

None

Required

INTEGER

Specifies the fixed size in bytes of each record in the source data. The Ocient System splits the binary data into binary chunks according to this length value and processes them individually.

ENDIANNESS

'big'

Optional

STRING

Specifies the endianness used to interpret multi-byte sequences in various transforms of binary data. Accepted values are ‘big' and 'little'.

AUTO_TRIM_PADDING

TRUE

Optional

BOOL

Determines if padding characters should be trimmed after decoding the binary data into string data. If you set this option to TRUE, the Ocient System trims all instances of the PADDING_CHARACTER value from the end of a string after the system decodes the string from BINARY type.

The Ocient System trims the default padding character of a space from the end of the text data in binary data.

Bad Data Targets

Bad data represents records that the Ocient System could not load due to errors in the transformations or invalid data in the source records. You can provide options for a bad data target that the Ocient System uses during pipeline execution to capture the records that are not loaded. The original bytes that the pipeline tried to load are captured in the bad data target along with the metadata about the error, such as the error message or source.

Kafka is the only supported bad data target.

Kafka Bad Data Target

When you use Kafka as a bad data target, the Ocient System produces the original bytes of the source record into the Kafka topic of your choice. The Ocient System includes the metadata about the record in the header of the record as it is sent to Kafka. You can configure the Kafka topic on your Kafka Brokers using the retention and partition settings of your choice.

In the event that the Kafka Broker is unreachable when the Ocient System attempts to produce a bad data record to the bad data target, the system logs an error on the Loader Node and the pipeline continues.

Example

This example CREATE PIPELINE SQL statement snippet contains a bad data target definition using the BAD_DATA_TARGET option.

SQL


Kafka Bad Data Target Options

Option Key

Default

Required or Optional

Data Type

Description

TOPIC

None

Required

STRING

The name of the Kafka topic where the Ocient System should produce bad data records.

BOOTSTRAP_SERVERS

None

Required

STRING

A comma-delimited list of IP:port pairs that contain the IP addresses of the Kafka Brokers and the associated port number.

Example:

BOOTSTRAP_SERVERS = '111.11.111.1:9092,111.11.111.2:9092'

CONFIG

'{

"compression.type": "none"

}'

Optional

STRING

The producer configuration that the Kafka producer should use. This option is a JSON-formatted string.

Advanced Pipeline Tuning Options

You can use pipeline tuning options to control the parallelism or batching dynamics of your pipelines. This tuning can throttle the resources used on a pipeline or increase parallel processing across Loader Nodes. These options are advanced settings that might require a detailed understanding of the underlying mechanics of the loading infrastructure in the Ocient System to employ. Due to the inherent nature of each source type, the behavior of these parameters can differ between file-based and Kafka-based loads.

Option Key

Default

Required or Optional

Data Type

Description

CORES

The maximum number of CPU cores available on each Loader Node.

Optional

INTEGER

Maximum number of processing threads that the Ocient System uses during execution on each Loader Node. The system creates this number of threads on each Loader Node.

The Ocient System automatically determines the default value by finding the number of cores of a Loader Node. You can use this option for performance tuning.

The calculation for maximum parallelism of a pipeline is: number_of_loaders * CORES.

About Kafka Partitions and Parallelism

For Kafka Loads, this option determines the number of Kafka Consumers created on each Loader Node.

For Kafka Pipelines, the recommendation is that number_of_loaders * CORES equals the number of Kafka topic partitions.

If this number exceeds the number of Kafka topic partitions, the work might spread unevenly across Loader Nodes.

If this number is less than the number of Kafka topic partitions, some Kafka Consumers might receive uneven amounts of work. In this case, use a value for number_of_loaders * CORES that is an even divisor of the number of Kafka topic partitions to avoid a skew in the rates of processing across partitions.

PARTITIONS

Equal to the value of CORES.

Optional

INTEGER

Specifies the number of partitions over which to split the file list. Not applicable to Kafka loads.

The Ocient System automatically sets a default value based on the configured value for the CORES option. You can use this option for performance tuning.

The number of partitions determines how many buckets of work the Ocient System generates for each batch of files processed on a Loader Node. The pipeline processes this number of partitions in parallel using the specified number of cores.

If you specify fewer partitions than cores, some cores are not fully utilized, and resources are wasted. If you specify more partitions than cores, the Ocient System divides partitions in a round-robin fashion over the available cores.

BATCH_SIZE

A dynamic value, determined by the Ocient System for each pipeline to maximize performance.

Optional

INTEGER

Number of rows in the batch to load at one time.

The Ocient System automatically calculates a dynamic value depending on the table columns and the utilization of internal buffers to transfer records to the database backend. You can use this option to turn off the dynamic adjustments for performance tuning.

⚠️ Only change this setting in rare cases where loading performance is slower than expected, and you have a large record size. If this setting is improperly set, pipelines might fail with out-of-memory exceptions.

You can configure the default value (for the batch payload target) using a SQL statement such as:

ALTER SYSTEM ALTER CONFIG SET 'streamloader.extractorEngineParameters.configurationOption.osc.batch.payload.target' '65536'

RECORD_NUMBER_FORMAT

For file loads that do not use the EXPLODE_OUTER function, the default is [19, 45, 0]. For Kafka loads that do not use the EXPLODE_OUTER function, the default is [0, 64, 0]. For loads that use the EXPLODE_OUTER function, the default is the specified load-type-specific default value with 13 subtracted from the record index bits. The system adds these bits to the bits for rows within a record. For example, the default for file loads that use this function is [19, 32, 13].

Optional

ARRAY

The 64-bit record number for each record of the load. This number uniquely identifies a row within its partition.

The format is an array with three values in the format[<file_index_bits>, <record_index_bits>, <rows_per_record_index_bits>]

The file index bits <file_index_bits> value is the number of bits used to represent the file index within a partition.

The record index bits <record_index_bits> value is the number of bits used to represent the record index within a file.

The rows per record index bits <rows_per_record_index_bits> is the number of bits used to represent the row within a record. The system uses this value with the EXPLODE_OUTER function.

These three values must sum to 64.

Example

RECORD_NUMBER_FORMAT= [10, 54, 0]

Set the number of file index bits to 10 and the number of record index bits to 54, allowing up to 2^10 files and 2^54 records per file. The system does not support the EXPLODE_OUTER function in this configuration because the rows per record index bits are 0.

DROP PIPELINE

DROP PIPELINE removes an existing pipeline in the current database. You cannot remove a pipeline that is running.

You must have the DROP privilege on the pipeline to execute this SQL statement. For details, see Data Control Language (DCL) Statement Reference.

When you drop a pipeline, the Ocient System also removes the associated system catalog information, such as pipeline errors, events, files, partitions, and metrics.

Syntax

SQL


Parameter

Data Type

Description

pipeline_name

string

The name of the specified pipeline to remove.

You can drop multiple pipelines by specifying additional pipeline names and separating each with commas.

Examples

Remove an existing pipeline named ad_data_pipeline.

SQL


Remove an existing pipeline named ad_data_pipeline or return a warning if the Ocient System does not find the pipeline in the database.

SQL


PREVIEW PIPELINE

PREVIEW PIPELINE enables you to view the results of loading data for a specific CREATE PIPELINE SQL statement without creating a whole data pipeline and without storing those results in the target table. Using this SQL statement, you can iterate quickly and modify the syntax as needed to achieve your expected results. After you confirm your expected results, you can use the same syntax in the body of the CREATE PIPELINE statement with the appropriate source.

A table must exist in the database to serve as the target of your PREVIEW PIPELINE statement. This table ensures the pipeline matches the column types of the target table. However, the execution of this statement does not load data into the target table.

Preview Sources

The SOURCE INLINE source type is available only for the PREVIEW PIPELINE SQL statement. You cannot create a data pipeline with inline source data.

Other source types defined in the CREATE PIPELINE statement, S3, KAFKA, and FILESYSTEM, are compatible with the PREVIEW PIPELINE statement. The extract options vary by the source type to mirror the CREATE PIPELINE statement. The database returns 10 records by default.

Preview Error Handling

Pipeline-level errors cause the PREVIEW PIPELINE SQL statement to fail. The Ocient System returns an error and no result set. However, the Ocient System accumulates record-level errors that occur during the execution of this statement in a single warning that the system returns along with the result set. Each line of the warning describes a record-level error in human-readable form or as a JSON blob, depending on the value of the SHOW_ERRORS_AS_JSON option. Rows or columns that encounter record-level errors have NULL values in the result set.

Preview Limitations

Limitations of this SQL statement are:

  • Before executing a PREVIEW PIPELINE SQL statement, you must create a table for the Ocient System to have context for the preview.
  • The COLUMN_DEFAULT_IF_NULL option from the CREATE PIPELINE SQL statement has no effect on the PREVIEW PIPELINE SQL statement.
  • The PREVIEW PIPELINE SQL statement does not honor the assignment of a service class based on text matching.
  • These source options are not supported:
    • START_TIMESTAMP
    • END_TIMESTAMP
    • START_FILENAME
    • END_FILENAME

Syntax

SQL


SQL Statement Options

Parameter

Data Type

Description

pipeline_name

identifier

The identifier for the name of the preview pipeline.

inline_string

string

The string that contains data for the preview of the data pipeline load.

For example, source data can be 'oci,ent,ocient|ware,house,warehouse', where | is the record delimiter and , is the field delimiter.

For special characters, such as \t, use an escape sequence such as e'oci,ent,oci\tent|ware,house,ware\thouse'.

intermediate_values

Boolean

Optional.

Indicates whether to capture intermediate values during a transformation sequence. Values are true or false (default is false). If the value is true, the Ocient System appends an extra column to the result set. Each value in the column contains a JSON blob that describes the intermediate values processed for each column after each transformation.

show_errors_as_json

Boolean

Optional.

Indicates whether to show errors. Values are true or false (default is false). If the value is true, the Ocient System returns record-level errors as JSON blobs rather than human-readable messages.

limit

INT

Optional.

The number of rows, specified as an integer, to return in the preview results for sources with many rows. The default value is 10 rows.

created_tablename

identifier

The identifier for the name of the table that you create before executing the PREVIEW PIPELINE SQL statement.

preview_column_formula

identifier

The identifier for the formula of the data to load.

For example, for the data in the first field of the inline source, use $1.

If you need to add a transformation, you can use functions to transform data, such as CONCAT($1, $2), to load the concatenation of the first two fields in the inline source data.

preview_column_name

identifier

The name of the column in the target table.

You must specify at least one column name in the SELECT part of the syntax. The name of the specified column must match the name of the column in the created table. The number of columns in the SELECT part can be less than those in the created table.

For definitions of other extract options, see the CREATE PIPELINE SQL statement options in CREATE PIPELINE.

Examples

Preview Pipeline Using CSV Format

Preview the load of two rows of data. First, create a table to serve as the context for the load. The previewload table contains three columns with these data types: string, integer, and Boolean.

SQL


Create the preview pipeline testpipeline with this data: 'hello,2,true|bye,3,false'. Specify the CSV extract format, | record delimiter, and the , field delimiter. Load the data without transformation.

SQL


Output

col1 col2 col3 -------------------------------------------------------------- hello 2 true bye 3 false Fetched 2 rows

Delete the previewload table.

SQL


Preview Pipeline Using CSV Format with Escape Characters

Preview the load of two rows of data. First, create a table to serve as the context for the load. The previewload table contains three columns with these data types: string, integer, and Boolean.

SQL


Create the preview pipeline testpipeline with this data: 'hello\tworld,2,true|bye\tworld,3,false'. Specify the CSV extract format, | record delimiter, and , field delimiter. Load the data without transformation. In this case, the data contains the special character \t. You must escape the character by using the escape sequence e.

SQL


Output

col1 col2 col3 -------------------------------------------------------------- hello world 2 true bye world 3 false Fetched 2 rows

Delete the previewload table.

SQL


Preview Pipeline Using CSV Format with Transformation

Create a table to serve as the context for the load. The previewload table contains three string columns.

SQL


Create the preview pipeline testpipeline with this data: 'hello,world|bye,world'. Specify the CSV extract format, | record delimiter, and the , field delimiter. Load the data with a transformation to concatenate the two strings and return the result in the third column.

SQL


Output

col1 col2 col3 --------------------------------------------------------------------------------------------------------------------------------------- hello world helloworld bye world byeworld Fetched 2 rows

The third column contains the concatenated result of the first two columns.

Delete the previewload table.

SQL


Preview Pipeline Using the Kafka Source

Create the previewload table with these columns:

  • id — Non-NULL integer
  • salut — Non-NULL string
  • name — Non-NULL string
  • surname — Non-NULL string
  • zipcode — Non-NULL integer
  • age — Non-NULL integer
  • rank — Non-NULL integer
SQL


Create the preview pipeline test_small_kafka_simple_csv. Specify the ddl_csv topic. Indicate that the Kafka consumer should not write its durably-made record offsets to the Kafka Broker by using the WRITE_OFFSETS option set to false. Specify the bootstrap server as servername:0000 and configuration options as "auto.offset.reset": "earliest" by using the BOOTSTRAP_SERVERS and CONFIG options, respectively. Limit the returned results to three rows by using the LIMIT option. Specify the CSV extract format and \n record delimiter by using the FORMAT and RECORD_DELIMITER extract options, respectively.

SQL

id salut name surname zipcode age rank ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 105 Mr Jmhsuxofspx Uaofgayjugb 85573 29 2 101 Mr Ijmmtbddkyh Yqbxqnkgidp 52393 43 1 109 Mr Bigohpwfwmr Qcxgakpkoeu 74420 1 3 Fetched 3 rows

START PIPELINE

START PIPELINE begins the execution of the specified data pipeline that extracts data and loads it into the target tables specified by the CREATE PIPELINE SQL statement.

When you execute the START PIPELINE SQL statement, the Ocient System creates a static list of files in the sys.pipeline_files system catalog table and marks them with the PENDING status. After the system assigns a file to an underlying task, the system marks the file as QUEUED. After the system verifies that the file exists, the system marks the file as LOADING to signify that a Loader Node has started reading the source data. Finally, upon successfully loading the file, the system transitions the status of the file to the terminal status LOADED.

A Kafka pipeline never enters the COMPLETED state in the information_schema.pipeline_status view. Instead, the pipeline remains running after you start the pipeline until you stop it or the pipeline reaches the specified error limit using the ERROR LIMIT option.

You must have the EXECUTE privilege on the pipeline and the INSERT privilege on any table that is a target in the pipeline. For details, see Data Control Language (DCL) Statement Reference.

Syntax

SQL


SQL Statement Options

Option Key

Default

Required or Optional

Data Type

Description

ERROR LIMIT <integer_value>

0

Optional

integer

Error log option that determines the number of record-level errors that can occur during the execution of a pipeline that the load tolerates before the whole pipeline execution fails.

<integer_value> is a number greater than or equal to -1. When you set <integer_value> to -1, the load tolerates an unlimited number of record-level errors.

By default, continuous pipelines tolerate an unlimited number of record-level errors, whereas batch pipelines tolerate zero errors.

ERROR SCOPE <scope_value>

PIPELINE

Optional

string

Error log option that defines the scope at which the load applies the specified error limit.

<scope_value> supports the PIPELINE keyword, which is the scope for the error limit for the whole pipeline. When you set this option, if the pipeline reaches the error limit, the database rolls back all data loaded by the pipeline.

ERROR FILE_ERROR <error_action>

FAIL

Optional

string

For pipelines that load data from S3 or local file sources, this error configuration option determines how to treat unrecoverable file-level errors. Examples of unrecoverable file-level errors are:

  • The file is listed when the pipeline starts but is missing later during the load.
  • The Gzip file is corrupted and cannot be decompressed.
  • The file cannot be downloaded from the source.
  • Record-level error that is not tolerable occurs when tokenizing or transforming data in the file.

<error_action> can be one of these keywords:

  • FAIL — Fail the whole pipeline because of a file-level error.
  • SKIP_MISSING_FILE — Only tolerate errors that occur due to missing files. If a file exists in the list when the pipeline starts but is missing later during the load, skip the file and continue with the next file.
  • TOLERATE — Tolerate all unrecoverable file-level errors. In this mode, the load also tolerates an unlimited number of record-level errors.

The FAILED, SKIPPED, and LOADED_WITH_ERRORS file statuses appear in the sys.pipeline_files system catalog tables, respectively, and indicate how the pipeline handled the file error.

USING LOADERS loader_names

none

Optional

list of strings

Specify one or more names of Loader Nodes as a comma-separated list for executing the START PIPELINE SQL statement. If you do not use this option, the Ocient System uses all of the Loader Nodes that are active to execute the pipeline.

You can find node names in the sys.nodes system catalog table.

ON COMPLETION <completion_mode>

NO_FLUSH

Optional

string

Completion type option that specifies the behavior when the pipeline finishes loading. This option determines when the remaining pages are converted into Segments.

  • NO_FLUSH — Do not force a flush of pages. Rely on watermarks and timeouts to trigger final conversion to Segments.
  • FLUSH_AND_WAIT — Trigger a flush of pages, initiating final conversion to Segments. The pipeline blocks and waits for the conversion to Segments to complete before marking the pipeline as COMPLETED.
  • FLUSH_AND_RETURN — Trigger a flush of pages, initiating the final conversion to Segments. The Ocient System marks the pipeline as COMPLETED immediately following the flush without waiting for conversion to Segments to complete.

For the query to execute successfully, the specified node names must identify nodes that have:

  • ACTIVE operational status
  • streamloader role

Examples

Start an existing pipeline named ad_data_pipeline with default settings.

SQL


Start an existing pipeline named ad_data_pipeline with error tolerance (tolerate 10 errors before aborting the pipeline). For details about error tolerance, see Error Tolerance in Data Pipelines.

SQL


Data pipelines log a message for each pipeline error to the sys.pipeline_errors system catalog table, even if you do not specify the ERROR option. Use BAD_DATA_TARGET settings to capture the original source data.

Start an existing pipeline named ad_data_pipeline using the Loader Node named stream-loader1.

SQL


Resume a Pipeline with File Loading

In many cases, a file-based pipeline stops executing before completion. You cannot resume a pipeline in a COMPLETED status.

To resume a pipeline, use the START PIPELINE SQL statement. Before you resume a pipeline, the status of the pipeline must be CREATED, FAILED, or STOPPED. When a pipeline resumes, individual files remain in their most recent status as defined in the sys.pipeline_files system catalog table. For BATCH pipelines, the Ocient System does not add new files to the eligible file list when the pipeline resumes.

If you modify the contents of files during the loading process, the Ocient System might experience issues with deduplication that cause duplicated rows or missing data. Avoid modifying files after you start a pipeline for the first time.

Creating new files on your data source does not impact deduplication logic.

The START operation groups files using their extractor_task_id and stream_source_id identifiers. The stream_source_id uniquely identifies partitions (i.e., an ordered list of files), and extractor_task_id identifies the batch that loads a group of partitions.

File Statuses

The Ocient System considers files with the statuses LOADED, LOADED_WITH_ERRORS, or SKIPPED to be in the terminal status, whereas other file statuses are still in process.

  • Completed Batches If all the files in a particular batch have terminal status, then the pipeline does not attempt to reload the batch. These files have been completely processed, so the Ocient System ignores modifications to these files.
  • In-Process Batches If at least one file in a particular batch does not have terminal status, then the pipeline reloads the entire batch. The pipeline reprocesses the in-process batches and relies on row deduplication to prevent duplication of rows in the target tables.
    • Modifications to files in an in-process batch can but are not guaranteed to be picked up by a restart.
    • Modifications to any files in this batch with the LOADED, LOADED_WITH_ERRORS, or SKIPPED statuses might cause issues with deduplication, leading to duplicate or missed data.
  • Pending Files The Ocient System does not assign all PENDING files to a partition. The pipeline attempts to load these files after reloading any in-progress batches.

Load Duplicate Data from Files

Sometimes you might want to load the same data multiple times. If you want to load a second copy of the source data, you can either:

  • Drop and recreate the pipeline to reset the sys.pipeline_files system catalog table.
  • Create a second pipeline with a new name and the same configuration.

When you truncate the target tables and restart the pipeline, the Ocient System does not reload the data.

Restart with Kafka Loading

Ocient relies on the offset management and consumer group behavior in Kafka to deliver exactly-once loading semantics and to control the Ocient pipeline behavior.

Kafka Offsets and Consumer Group Identifiers

If you set the WRITE_OFFSETS option to true (default value is true), the Kafka consumers commit offsets back to Kafka after data is considered durable in the database. The Kafka Broker stores these offsets as the last committed offset for the group identifier group.id.

For each pipeline, the group identifier defaults to <database-name>-<pipeline-name>, where the <database-name> is the name of your database and <pipeline-name> is the name of the data pipeline. In most use cases, you should not manually change the group.id field for a pipeline.

Any Kafka pipeline that has the same group.id starts consuming from its last committed offset, or if you do not set the value, the pipeline uses the Kafka auto.offset.reset policy to determine where to start. For details, see Kafka offset management.

If you want to start loading from the beginning of a topic, configure an unused group.id field (or use a group.id field that did not commit any of its offsets back) and ensure the auto.offset.reset Kafka configuration is appropriately set in the CONFIG option.

Kafka Pipeline Deduplication

The committed offset of a Kafka partition lags slightly behind the rows that have been loaded into Ocient. These lags do not cause an issue with data duplication. If you stop a pipeline before it can commit its most recent durable offset to Kafka, restarting the same pipeline starts loading from the last committed offset. However, the database deduplicates records sent twice for the same pipeline.

Ocient deduplicates Kafka data for the specified combination of pipeline identifier, Kafka topic, and the Kafka partition number.

While the consumer group offsets manage where the pipeline resumes loading, the Ocient System enforces the exactly-once loading of a Kafka partition only if you stop or restart a pipeline with the same pipeline identifier.

If you drop a pipeline and create a new one with the same name, the Ocient System creates a new pipeline identifier. This action does not deduplicate data against data loaded in the original pipeline. To preserve deduplication, instead of dropping the pipeline, use the CREATE OR REPLACE PIPELINE SQL statement with the original pipeline name and the pipeline correctly deduplicates against the original data.

Do not run multiple pipelines concurrently with the same consumer group identifier. This action leads to unpredictable data duplication. If you want to increase the number of consumers that read from a Kafka topic, increase the value of the CORES parameter.

Load Duplicate Data on Kafka

Sometimes you might want to load the same data multiple times. If you want to load a second copy of the source data from Kafka, you can either:

  • Drop the pipeline, recreate it with the same name, and reset the consumer group offsets manually.
  • Create a new pipeline with a different name and load from the beginning of the topic.

Pipeline Database Dependency

Each pipeline belongs to a database. You cannot drop a database that has a running pipeline. To drop a database, ensure that all pipelines in the database are in a non-running status.

Pipeline Table Dependency

Each pipeline has a target table. You cannot drop a table that has a running pipeline. To drop a table, ensure that all pipelines that are loading data into the table are in a non-running status.

STOP PIPELINE

STOP PIPELINE stops the execution of the pipeline and its associated tasks. After you stop a pipeline, you can execute the START PIPELINE SQL statement on the pipeline to run the pipeline again. Regardless, the load deduplicates any records previously loaded in the same pipeline.

You must have the EXECUTE privilege on the pipeline to execute this SQL statement. For details, see Data Control Language (DCL) Statement Reference.

Syntax

SQL


Example

Stop an existing pipeline named ad_data_pipeline.

SQL


You can see the status of the parent tasks in the sys.tasks system catalog table and see the status of the child tasks in the sys.subtasks system catalog table.

ALTER PIPELINE RENAME

ALTER PIPELINE RENAME TO SQL statement changes the name of the pipeline object, while retaining its identifier, options, and other metadata. The Ocient System reflects this change in the sys.pipelines system catalog table. Then, you must use the new name when you refer to the pipeline in SQL statements.

You must have the ALTER privilege on the pipeline to execute this SQL statement. For details, see Data Control Language (DCL) Statement Reference.

Syntax

SQL


Example

Rename an existing pipeline named ad_data_pipeline to renamed_pipeline.

SQL


EXPORT PIPELINE

EXPORT PIPELINE returns the CREATE PIPELINE SQL statement used to create the pipeline object. You can use the output of this statement to recreate an identical pipeline when you remove the original pipeline.

The execution of this statement censors sensitive S3 values like ACCESS_KEY_ID and SECRET_ACCESS_KEY and Kafka Consumer Configuration password-type fields. The database replaces them with *****.

To execute this statement, you must have the VIEW privilege on the pipeline and any table the pipeline targets. For details, see Data Control Language (DCL) Statement Reference.

Syntax

SQL


Example

Export an existing pipeline in the database ad_data_pipeline.

SQL


CREATE PIPELINE FUNCTION

CREATE PIPELINE FUNCTION enables you to define a function for loading data. Define the function behavior using the language. For details about this language, see The Apache Groovy Programming Language.

Function arguments and output are strongly typed and immutable.

You can test the execution of your function using the PREVIEW PIPELINE SQL statement.

The Ocient System does not support the overload of function names.

Syntax

SQL


Parameter

Type

Description

function_name

string

A unique identifier for the data pipeline function.

input_argument

string

The name of one or more input arguments of the function. Specify data types for input arguments according to the support data types defined in Data Types for Data Pipelines.

For the data type declaration, use NOT NULL where applicable for maximum performance.

output_argument_definition

string

The type definition of the output from the function.

library_name

string

The name of one or more libraries.

You can include libraries by using the IMPORTS clause or specifying the fully-qualified class (e.g., java.lang.Integer) path in the source definition.

groovy_declaration

string

The Groovy definition of the function.

Install and Enable Third-Party Libraries

You can use the default list of supported third-party libraries or additional third-party libraries that you install.

Supported Libraries

Data pipeline functions can import classes from the default list of supported third-party libraries. This table provides the resources for each supported library package.

Library Package

Resource

java.lang.*

java.lang

java.util.*

java.util

java.nio.ByteBuffer.*

groovy.json.*

groovy.xml.*

groovy.yaml.*

org.apache.groovy.datetime.extensions.*

org.apache.groovy.dateutil

com.ocient.streaming.data.types.*

Additional Libraries

You can install and enable additional third-party libraries to import for use in your data pipeline functions.

You must install the JAR package on all Loader Nodes in the /opt/ocient/current/lib/extractorengine_udt folder.

Then, add the fully qualified class name in the function import list as part of the library_name parameter. For example, to reference the ByteBuffer class from the com.fastbuffer package, specify com.fastbuffer.ByteBuffer in the library_name parameter and use the class in the Groovy definition as var x = new com.fastbuffer.ByteBuffer().

Groovy Data Type Mapping

For the Groovy definition, the Ocient System maps its SQL data type to the corresponding Groovy data type. Your Groovy code should use the Groovy data type defined in this table for any input arguments and output.

SQL Data Type

Groovy Data Type

BIGINT

java.lang.Long

BINARY(N) or HASH(N)

byte[]

BOOLEAN

java.lang.Boolean

CHAR(N) or VARCHAR(N)

java.lang.String

DATE

java.time.LocalDate

DECIMAL(P,S)

com.ocient.streaming.data.types.Decimal

DOUBLE

java.lang.Double

INT

java.lang.Integer

IPV4

java.net.Inet4Address

IP

java.net.Inet6Address

ST_POINT

com.ocient.streaming.data.types.gis.STPoint

ST_LINESTRING

com.ocient.streaming.data.types.gis.STLinestring

ST_POLYGON

com.ocient.streaming.data.types.gis.STPolygon

FLOAT

java.lang.Float

SMALLINT

java.lang.Short

TIME

com.ocient.streaming.data.types.Time

TIMESTAMP

com.ocient.streaming.data.types.Timestamp

BYTE

java.lang.Byte

TUPLE<<type1, type2, …​>>

com.ocient.streaming.data.types.OcientTuple

TYPE[]

java.util.List<TYPE>

UUID

java.util.UUID

VARBINARY(N)

byte[]

VARCHAR(N)

java.lang.String



Example

Create the sort_function data pipeline function to sort an array of integers. The function has two input arguments: value, a non-NULL array of integers, and ascending, the sort order. The function returns a non-NULL array of integers.

The function imports these Java libraries:

  • java.lang.Integer
  • java.util.ArrayList
  • java.util.Collections
  • java.util.Comparator
  • java.util.List

Define the Groovy code. Because the input arguments do not change, the example Groovy code first copies the value argument, sorts the copied list according to the sort order, and returns the sorted array.

SQL


View the creation information about the sort_function function using the sys.pipeline_functions system catalog table. This statement returns the function name, return type, argument names, data types of the arguments, and the list of imported libraries.

SQL


DROP PIPELINE FUNCTION

DROP PIPELINE FUNCTION removes an existing pipeline function.

You must have the DROP privilege on the pipeline function to execute this SQL statement. For details, see Data Control Language (DCL) Statement Reference.

Syntax

SQL


Parameter

Data Type

Description

function_name

string

The name of the specified data pipeline function to remove.

You can drop multiple pipelines by specifying additional function names and separating each with commas.

Examples

Remove the Existing Pipeline Function

Remove an existing pipeline function named sort_function.

SQL


Remove an Existing Pipeline Function by Checking for Existence

Remove an existing pipeline function named sort_function or return a warning if the Ocient System does not find the function in the database.

SQL


Related Links

Load Data