SQL Reference
Data Definition Language (DDL)...
Data Pipelines
data pipelines enable you to load data from your chosen source into the {{ocienthyperscaledatawarehouse}} you can preview a data pipeline using the preview pipeline sql statement to test the creation statement and any required data transformation then, you can create a data pipeline using the create pipeline statement to manage the load, use the start pipeline statement to start the load and the stop pipeline statement to stop it you can rename a data pipeline using the alter pipeline rename statement to see the full definition of a created pipeline, use the export pipeline statement when you finish with the load, you can use the drop pipeline statement to remove the data pipeline you can create user defined data pipeline functions using the create pipeline function statement and remove the function using the drop pipeline function statement also, you can administer privileges for data pipelines and data pipeline functions for details, see docid\ f55ngxtki0f7kkmyatvug create pipeline create pipeline defines a data pipeline that you can execute with the start pipeline sql statement specify the type of load, data source, and data format you must have the alter privilege on the pipeline for details, see docid\ f55ngxtki0f7kkmyatvug syntax create \[ or replace ] \[ batch | continuous | transactional ] pipeline \[ if not exists ] pipeline name \[ \<advanced pipeline options> ] \[ bad data target \<kafka bad data target> ] source ( \<s3 source> | \<filesystem source> | \<hdfs source> | \<kafka source> ) \[ monitor ( \<sqs monitor> | \<kafka monitor> ) ] \[ lookup lookup source \<lookup options> ] \[ ] extract ( \<delimited extract options> | \<asn1 extract options> | \<avro extract options> | \<json extract options> | \<parquet extract options> | \<binary extract options> | \<xml extract options> ) \[ \<general extract options> ] \[ insert ] into destination table name select expression as col alias target, expression2 as col alias target2, \[ where filter expression ] \[ \[ insert ] into destination table name n select expression n as col alias target n, expression2 n as col alias target2 n, \[ where filter expression n ] ] \[ ] / / / advanced options / / / advanced pipeline options = \[ cores processing cores ] \[ partitions file partitions ] \[ batch size number of rows ] \[ record number format record number format ] / / / bad data target options / / / kafka bad data target = kafka bootstrap servers bootstrap servers topic topic name \[ config config option ] / / / source options / / / s3 source = s3 \<file based source options> bucket bucket name ( filter | filter glob | filter regex | object key ) specifiers \[ region region ] \[ endpoint endpoint ] \[ enable path style access enable path style access ] \[ access key id access key credentials ] \[ secret access key secret key credentials ] \[ session token session token ] \[ role arn role arn ] \[ assume role config assume role config] \[ max concurrency parallel connections ] \[ read timeout num seconds ] \[ request depth num requests ] \[ request retries num retries ] \[ headers headers ] filesystem source = filesystem \<file based source options> ( filter | filter glob | filter regex ) specifiers hdfs source = hdfs \<file based source options> ( filter | filter glob | filter regex ) specifiers endpoint endpoint \[ config hdfs config ] kafka source = kafka bootstrap servers bootstrap servers topic topic name \[ write offsets write offsets ] \[ config config option ] \[ auto offset reset ( ( 'largest' | 'latest' ) | ( 'earliest' | 'smallest' ) | 'error' ) file based source options = \[ prefix prefix ] \[ compression method 'gzip' ] \[ sort by ( 'filename' | 'created' | 'modified' ) \[ sort direction ( asc | desc ) ] ] \[ sort rewrite sort rewrite ] \[ start filename start filename ] \[ end filename end filename ] \[ start created timestamp start created timestamp ] \[ end created timestamp end created timestamp ] \[ start modified timestamp start modified timestamp ] \[ end modified timestamp end modified timestamp ] / / / monitor options / / / sqs monitor = sqs \<general monitor options> queue url queue url \[ region region ] \[ endpoint endpoint ] \[ access key id access key id ] \[ secret access key secret access key ] 	 kafka monitor = kafka \<general monitor options> bootstrap servers bootstrap servers topic topic auto offset reset ( ( 'largest' | 'latest' ) | ( 'earliest' | 'smallest' ) | 'error' ) \[ group id group id ] \[ message { filename file selector timestamp timestamp selector size size selector } ] \[ consumer { max messages max messages timeout consumer timeout } ] general monitor options = \[ polling interval polling interval ] \[ batch { max files max files timeout batch timeout lookback lookback } ] / / / lookup options / / / lookup options = connection type connection type connection string connection string lookup schema lookup schema lookup table lookup table \[ refresh period refresh period ] \[ config config json ] / / / extract options / / / general extract options = \[ charset name charset name ] \[ column default if null column default if null ] \[ null strings null strings ] \[ trim whitespace trim whitespace ] \[ validate characters validate characters ] \[ replace invalid characters replace invalid characters ] \[ replacement character replacement character ] delimited extract options = format ( 'delimited' | 'csv' ) \[ comment char comment char ] \[ empty field as null empty field as null ] \[ escape char escape char ] \[ field delimiter field delimiter ] \[ field optionally enclosed by enclosure char ] \[ headers delimited headers ] \[ num header lines num header lines ] \[ num footer lines num footer lines ] \[ record delimiter record delimiter ] \[ skip empty lines skip empty lines ] \[ open array open array char ] \[ close array close array char ] \[ array element delimiter array delimiter ] \[ open object open object char ] \[ close object close object char ] asn1 extract options = format 'asn 1' schema { url url asn1 record type record type } avro extract options = format 'avro' \[ schema { \[ inline inline string ] \[ infer from ( 'sample file' | 'all files' ) ] } ] binary extract options = format 'binary' record length length in bytes \[ endianness ( 'big' | 'little' ) ] \[ auto trim padding auto trim padding ] \[ padding character padding character ] json extract options = format 'json' parquet extract options = format 'parquet' schema { infer from infer from } xml extract options = format 'xml' pipeline identity and naming the name of a data pipeline is unique in an {{ocient}} system reference the pipeline in other sql statements like start pipeline , stop pipeline , and drop pipeline using the name of the pipeline the sql statement throws an error if a pipeline with the same name already exists unless you specify the if not exists option you can rename a pipeline with the alter pipeline sql statement update a pipeline you can update a pipeline with the or replace clause in the create pipeline sql statement use this clause when you want to continue loading from the current place in a continuous load, but you need to modify transformations or other settings if you specify the or replace clause and the pipeline already exists, the database replaces the original pipeline object with the options specified in the new create or replace pipeline statement when you replace an existing pipeline, the pipeline retains its current position in the source data so that data is not duplicated when the pipeline is resumed with a start pipeline sql statement first, you must stop a pipeline before executing the create or replace sql statement data pipeline modes you can define pipelines in either batch batch , continuous continuous , or transactional transactional mode file sources (e g , s3 , filesystem ) support batch and continuous modes file based loads default to batch mode if you do not specify this keyword {{kafka}} only supports continuous mode loads with a kafka source default to continuous mode if you do not specify this keyword when you execute the start pipeline sql statement using a data pipeline in the batch mode, the system creates a static list of files with the pending status in the sys pipeline files system catalog table with the continuous mode, the monitor appends new incoming files to the list of files in the sys pipeline files system catalog table with the continuous mode, you can apply filters for the data pipeline the system only uses files in the consumed messages if the filenames match the filters for this mode, these options are invalid prefix sort by sort direction sort rewrite start created timestamp end created timestamp start modified timestamp end modified timestamp the transactional mode enables the data pipeline to roll back the interim results of the load if an error occurs during load execution the system makes all data visible only after the load completes successfully this mode supports all options that the batch mode does required options you must define certain options in every pipeline for these options, there is no default value optional options need to be present only if you use specific functions a pipeline must contain the source , extract format , and into table name select statements docid\ pbyszqvu5wonpgoso qto docid\ pbyszqvu5wonpgoso qto docid\ y731i6lhout2b 7mc1jr8 docid\ ti3mdibvgmuudmlqu9xpl these options depend on each other if not exists and or replace sql statements are mutually exclusive select statement and data transformation use the into table name select sql statement in the create pipeline sql statement to specify how to transform the data the select statement includes a set of one or more expressions and a target column name in the form expression as column name the expression part in the statement contains a source field reference (e g , $1 or $my field subfield ) and, optionally, the transformation function you want to apply if your data is nested in arrays, you can use special transformation functions, such as the docid\ od i nse3vcplc3nixzgc function, to expand the data into individual rows for details about data transformation and supported transformation functions, see docid\ ti3mdibvgmuudmlqu9xpl for details about data types and casting, see docid\ jfqu osagg5enkvmeesnl and docid\ y731i6lhout2b 7mc1jr8 you can also specify metadata values, such as filename, to load in the select sql statement for details, see docid\ vqvrmdyk8josxmkfsyprc this is an example of a transformation statement snippet format json into public orders select timestamp(bigint($created timestamp)) as ordertime, metadata('filename') as source filename, $order number as ordernumber, $customer first name as fname, left($customer middle initial,1) as minitial, $customer last name as lname, $postal code as postal code, $promo code as promo code, $order total as ordertotal, decimal($tax,8,2) as tax, char\[]\($line items\[] product name) as product names, char\[]\($line items\[] sku) as skus optionally, you can filter the load by using the where clause in the form where filter expression this expression should evaluate to a boolean value or null and can include more than one filter expression the system loads rows that contain data matching the filter criteria when the expression evaluates to true the system does not load the rows when the expression evaluates to false or null you can include any transform in the where clause as in the select clause this is an example of a filter snippet that loads order data for customers whose last name starts with the letter a use the coalesce function to return only non null values format json into public orders select timestamp(bigint($created timestamp)) as ordertime, metadata('filename') as source filename, $order number as ordernumber, $customer first name as fname, left($customer middle initial,1) as minitial, $customer last name as lname, $postal code as postal code, $promo code as promo code, $order total as ordertotal, decimal($tax,8,2) as tax, char\[]\($line items\[] product name) as product names, char\[]\($line items\[] sku) as skus where startswith(coalesce($customer last name, ''),'a') null and default value handling the ocient system has specific ways to handle null and default values in the data pipeline consider this information when you prepare data to load and write the create table and create pipeline sql statements to load null values, insert the null value in the data in this case, if you specify not null for the target column in the create table statement, the data pipeline fails to load if you omit a column in the select sql statement, the data pipeline loads the default value for the column if you do not specify the default value, the pipeline loads a null value if you specify not null for the target column, the data pipeline also fails to load you can use the default keyword to load a default value in this case, if the column does not have a default value, the pipeline fails to load if the pipeline loads a null into a column with a specified default value, you can use coalesce(\<value>, default) to insert the default value instead of the null value, where \<value> is the null column you can modify the load of one column at a time in this way this table describes the data pipeline behavior for null or omitted column values value in the data pipeline nullable target column default value in target column resulting data pipeline behavior null no value might or might not be set the pipeline uses the default value if the default exists and you specify the column default if null option otherwise, the pipeline fails null yes value might or might not be set the pipeline uses the default value if the default exists and you specify the column default if null option otherwise, the pipeline uses the null value omitted column in the select sql statement null value might or might not be set yes the pipeline uses the default value omitted column in the select sql statement no no the pipeline fails omitted column in the select sql statement yes no the pipeline uses the null value required privileges you must have the create pipeline privilege on the underlying database and the view privilege on each table in the pipeline definition to execute the create pipeline sql statement the table must already exist see the start pipeline sql statement for the required privileges to execute a pipeline for details, see docid\ f55ngxtki0f7kkmyatvug examples load json data from kafka this example loads json data from kafka using the create pipeline sql statement use the bootstrap server 192 168 0 1 9092 using the kafka topic orders load data into the public orders table specify the data to load as these json selectors identifier $id user identifier $user id product identifier $product id subtotal amount $subtotal tax $tax total amount $total discount amount $discount created time $created at quantity $quantity create pipeline orders pipeline source kafka bootstrap servers '192 168 0 1 9092' topic 'orders' extract format json into public orders select $id as id, $user id as user id, $product id as product id, $subtotal as subtotal, $tax as tax, $total as total, $discount as discount, $created at as created at, $quantity as quantity; for a complete tutorial, see docid\ jloppc7jopkao4enkj6 d load delimited data from s3 this example loads delimited data in csv format from {{aws}} s3 use the https //s3 us east 1 amazonaws com https //s3 us east 1 amazonaws com endpoint with the ocient examples bucket and path metabase samples/csv/orders csv denote path style access using the enable path style access option set to true specify one header line with the num header lines option load data into the public orders table specify the data to load using the column numbers idenfier user identifier product identifier subtotal amount tax total amount discount amount created time quantity create pipeline orders pipeline source s3 endpoint 'https //s3 us east 1 amazonaws com' bucket 'ocient examples' filter 'metabase samples/csv/orders csv' enable path style access true extract format csv num header lines 1 into public orders select $1 as id, $2 as user id, $3 as product id, $4 as subtotal, $5 as tax, $6 as total, $7 as discount, $8 as created at, $9 as quantity; for a complete tutorial, see docid\ stial7oztpmpndfwcsm29 continuous file load of delimited data in csv files create a data pipeline that uses continuous file loading with delimited data in csv files specify to use 32 partitions and 16 cores with the partitions and cores options, respectively the source is s3 with the http //endpoint ocient com endpoint and cs data bucket specify the filter ' csv' to find all files with a filename that matches a glob pattern without subdirectories, for example, data csv the system filters filenames with subdirectories such as data/data sample csv for continuous file loading, specify the monitor option to use the kafka monitor specify the test broker 9092 bootstrap server, cfl kafka ten adtech flat small kafka topic, and reset the offset to the smallest offset using the earliest value of the auto offset reset option set the client group identifier to 84079bf1 bdc4 4b10 ba12 41ba6b17dffe the format is csv with the record delimiter as the newline character \n load data in the public ad sessions table the select statement identifies the columns to load by number there are 39 fields in the csv data for each column, transform each column using cast functions for details, see docid\ iteimff9ycez0ctcfgqb for each function see docid\ hem zd qwquv8dfnvnqz8 for the to timestamp function create continuous pipeline test continuous pipeline partitions 32 cores 16 source s3 endpoint 'http //endpoint ocient com/' bucket 'cs data' filter ' csv' monitor kafka bootstrap servers 'test broker 9092' topic 'cfl kafka ten adtech flat small' auto offset reset 'earliest' group id '84079bf1 bdc4 4b10 ba12 41ba6b17dffe' extract format csv record delimiter '\n' into public ad sessions select to timestamp(char($1), 'yyyy mm dd hh\ mm\ ss ssssss', 'java') as event date time, char($2) as device model, tinyint($8) as device user age, boolean($10) as device ad tracking disabled, binary($11) as device mac, int($14) as ip zip, float($19) as ip zip latitude, double($20) as ip zip longitude, bigint($21) as session id, smallint($32) as session response latency, decimal($34, 10, 1) as session transaction revenue, char($39) as session app name ; transactional load of delimited data in csv files create a data pipeline that uses transactional file loading with delimited data in csv files the source is s3 with the http //endpoint ocient com endpoint and cs data bucket specify the filter ' csv' to find all files with a filename that matches a glob pattern without subdirectories, for example, data csv the system filters filenames with subdirectories such as data/data sample csv the format is csv load data in the public ad sessions table the select statement identifies the columns to load by number for each column, transform each column using cast functions for details, see docid\ iteimff9ycez0ctcfgqb for each function see docid\ hem zd qwquv8dfnvnqz8 for the to timestamp function create transactional pipeline test continuous pipeline source s3 endpoint 'http //endpoint ocient com/' bucket 'cs data' filter ' csv' extract format csv into public ad sessions select to timestamp(char($1), 'yyyy mm dd hh\ mm\ ss ssssss', 'java') as event date time, char($2) as device model, tinyint($8) as device user age, boolean($10) as device ad tracking disabled, binary($11) as device mac, int($14) as ip zip, float($19) as ip zip latitude, double($20) as ip zip longitude, bigint($21) as session id, smallint($32) as session response latency, decimal($34, 10, 1) as session transaction revenue, char($39) as session app name ; source options file based source options options that apply to both the s3 , filesystem , and hdfs sources option key default data type description filter | filter glob | filter regex none string the expression for filtering files in the directory or child directories to load if you do not specify the prefix option, this pattern applies to the full path to the file, except for the bucket for s3 compatible file sources if you specify the prefix option, this pattern applies to the file path portion after the specified prefix value paths include a leading forward slash for filesystem sources, this option is required for s3 sources, one of the filter , filter glob , filter regex , or object key keys is required specify one of these options filter glob or filter — regular {{unix}} filename patterns supported options include indicates unlimited wildcard characters except for the path character, / indicates wildcard characters, including path separators ? indicates a single wildcard character filter regex — regular expression patterns to filter files some common patterns include matches any character matches any 0 or more of the preceding character + matches a single character \[135] matches any one character in the set \[1 5] matches any one character in the range (a|b) matches a or b ℹ️ when you use the filter or filter glob options with a continuous data pipeline, the system supports only basic globbing, and extended, range globbing is not supported filter glob examples list all csv files in subdirectories named 2024 that are in any subdirectory of the trades directory, which is in the root of the bucket filter glob = '/trades/ /2024/ csv' list all csv files in the bucket in all subdirectories filter glob = ' / csv' filter regex examples list all json gz files in the bucket that contain the name orders anywhere in the path filter regex = ' orders \\ json\\ gz' list all files in the bucket in the root path metrics or values where the date string on the filename is 2000 to 2004 filter regex = '/(metrics|values)/ 200\[0 4] ' prefix null string or array of strings optional specify a prefix within which to apply the filter when you specify a list of prefixes, the system applies the filter to each one, and the results are unioned together prefixes must be paths to directories, meaning they end with a forward slash for filesystem sources, paths should be absolute, meaning they begin with a forward slash for s3 sources, use prefixes to reduce the search space when listing objects within the specified bucket for maximum performance, use the prefix option whenever possible, especially when combined with the filter regex option ℹ️this option is not available for continuous data pipelines examples list all csv files in the 2024 subdirectory of files prefix '/files/2024/' filter ' / csv' list all json files of subdirectory 2024/09/ in the orders directory prefix '/data/orders/2024/09/' filter regex ' /orders/ json' list all json files in multiple subdirectories prefix \['/data/orders/2024/09/', '/data/orders/2024/10/'] compression method null string optional the method the ocient system uses to decompress file data the only supported option is gzip to load uncompressed data, omit the compression method option ℹ️the compression method option is only applicable to file based sources kafka pipelines automatically decompress data based on the kafka topic configuration sort by filename string optional the sort criteria for sorting the file list before the load supported options are filename — sort files lexicographically by the filename created — sort files based on the file created timestamp modified — sort files based on the file modified timestamp sorting files such that the ocient system sorts the data within the files according to the {{timekey}} column in the target table can lead to better query performance ℹ️this option is not available for continuous data pipelines sort direction asc string optional the sort direction, either asc ascending or desc descending, determines the sort order if you specify this option, you must also specify the sort by option ℹ️this option is not available for continuous data pipelines sort rewrite null string optional this option is a sort comparator, allowing files to be renamed during the sort operation using capture groups from the filter regular expression if you specify this option, you must also specify the filter regex option if you do not specify this option, the system uses the sorting specified by the sort by option ℹ️this option is not available for continuous data pipelines example filter regex '(\d+) (\d+) (\d+) order json' sort rewrite '\3 \1 \2 json' this specification renames the file from 5 6 1999 orders json to 1999 5 6 json by switching the order of the date fields start filename null string optional the filename string that is the lower bound to filter for files lexicographically for batch pipelines (inclusive) use the full path of the file, such as '/dir/load file json' if the file is located in the top most directory, start the path with a slash / , such as '/load file json' you can use this option without the end filename option if you specify the end filename option, the value of the start filename option must be smaller than the value for the end filename option lexicographically end filename null string optional the filename string that is the upper bound to filter for files lexicographically for batch pipelines (inclusive) use the full path of the file, such as '/dir/load file json' if the file is located in the top most directory, start the path with a slash / , such as '/load file json' you can use this option without the start filename option if you specify the end filename option, the value of the start filename option must be smaller than the value for the end filename option lexicographically start created timestamp null timestamp formatted string optional the iso 8601 compliant date or date time that is used as the lower bound (inclusive) to filter files by created timestamp for batch pipelines the time zone should match the file metadata if you specify both, the value for the start created timestamp option must be before the value for the end created timestamp option ℹ️this option is not available for continuous data pipelines end created timestamp null timestamp formatted string optional the iso 8601 compliant date or date time that is used as the lower bound (inclusive) to filter files by created timestamp for batch pipelines the time zone should match the file metadata if you specify both, the value for the start created timestamp option must be before the value for the end created timestamp option ℹ️this option is not available for continuous data pipelines start modified timestamp null timestamp formatted string optional the iso 8601 compliant date or date time that is used as the lower bound (inclusive) to filter files by modified timestamp for batch pipelines the time zone should match the file metadata if you specify both, the value for the start modified timestamp option must be before the value for the end modified timestamp option ℹ️this option is not available for continuous data pipelines end modified timestamp null timestamp formatted string optional the iso 8601 compliant date or date time that is used as the lower bound (inclusive) to filter files by modified timestamp for batch pipelines the time zone should match the file metadata if you specify both, the value for the start modified timestamp option must be before the value for the end modified timestamp option ℹ️this option is not available for continuous data pipelines s3 source options you can apply these options to data sources of the source s3 type, which include s3 and s3 compatible services option key default data type description bucket none string the name of the bucket in aws s3 object key null string or array of strings optional specify an s3 object key(s) to load you cannot use this option if you specify prefix , filter , filter glob , or filter regex keys are specified if you do not specify one of the filter , filter glob , or filter regex keys, this option is required object keys do not include a leading forward slash specifying individual objects can be faster than filtering, since the ocient system avoids listing all the files in a large directory object keys must not include any of the special characters ?\\{}\[] ℹ️this option is not available for continuous data pipelines examples load a single object from the designated bucket at the specified object key object key 'order data/jsonl/orders 20251101 jsonl' load a list of objects from the designated bucket at the specified object keys object key \['order data/jsonl/orders 20251101 jsonl', 'order data/jsonl/orders 20251201 jsonl'] access key id '' string optional the access key identification for aws credentials if you specify this option, you must also specify the secret access key option the ocient system uses anonymous credentials when you specify an empty value for this option secret access key '' string optional the secret key for aws credentials if you specify this option, you must also specify the access key id option the ocient system uses anonymous credentials when you specify an empty value for this option session token null string optional temporary credentials can be made with a combination with the existing access key identifier and secret key, and an additional session token you must specify the access key id and secret access key options when you use this option role arn null string optional the {{amazon}} resource name (arn) of the role to retrieve temporary credentials the sts client used to retrieve the temporary credentials specifies the https //docs aws amazon com/sdk for java/v1/developer guide/java dg region selection html#automatically determine the aws region from the environment loaded from defaultawsregionproviderchain and https //docs aws amazon com/sdk for java/v1/developer guide/credentials html loaded from defaultcredentialsprovider you cannot use this option with the access key id and secret access key options assume role config {} string optional additional configuration used to configure the stsassumerolecredentialsprovider or assumerolerequest used to retrieve and refresh temporary credentials with the role arn option this option is a json formatted string that you can only specify at the same time as the role arn option the supported configuration options are "asynccredentialupdateenabled" , "prefetchtimeseconds" , "staletimeseconds" for stsassumerolecredentialsprovider and "durationseconds" , "externalid" , "policy" , "rolesessionname" , "serialnumber" , "tokencode" for assumerolerequest region 'us east 1' string optional the region that the ocient system uses for aws access if you specify the endpoint option, the system ignores this option endpoint null string optional the endpoint uri for the s3 compatible service api (e g , https //s3 us east 2 amazonaws com ) when unspecified, this option defaults to https //s3 region amazonaws com if you provide this option, the ocient system ignores the region option enable path style access null boolean optional whether to use path style access, where the path includes the bucket name in the url for example, https //s3 us east 1 amazonaws com/bucket name/key name the false value denotes virtual hosted style access, where the url contains the bucket name as part of the domain name for example, https //bucket name s3 us east 1 amazonaws com/key name when unspecified, this option defaults to true if the endpoint option is specified and false otherwise headers null string optional the headers to send with every request this option is a json formatted string represent the chosen header names as keys with corresponding values as scalars or lists of scalars the system converts scalars that are not strings to strings during the load, the system maps each element in a list to the header name represented by the corresponding key examples headers '{"x amz request payer" "requester"}' returns this header for each request x amz request payer requester headers '{"header name" \["list", "of", "values"]}' returns this header for each request header name list, of, values max concurrency 50 integer optional determines the number of parallel connections the ocient system uses to communicate with the aws s3 service ⚠️ this option does not require modification in most cases contact ocient support to modify these values read timeout 0 (unlimited timeout) time interval optional the time until a read operation times out ⚠️ this option does not require modification in most cases contact ocient support to modify these values request depth 500 integer optional the upper boundary of requests that the ocient system handles concurrently ⚠️ this option does not require modification in most cases contact ocient support to modify these values request retries 10 integer optional number of times the aws sdk retries failing requests before the ocient system throws an error ⚠️ this option does not require modification in most cases contact ocient support to modify these values s3 credentials hierarchy the s3 source configuration supports this hierarchy to obtain s3 credentials level 1 data pipeline configuration (set by using the access key id and secret access key options) level 2 https //docs aws amazon com/sdk for java/v1/developer guide/credentials html (set by using the instructions provided in the web page) level 3 anonymous access (set by default) if the ocient system does not obtain the credentials at a level, the system tries a lower level you can choose the level to store the credentials, where level 1 is the highest higher levels take precedence for credential storage filesystem source options no options exist specific to the file system source ( source filesystem ) except for general docid\ pbyszqvu5wonpgoso qto when you load data using source filesystem , the files must be addressable from all of your loader nodes the ocient system uses the specified path in the pipeline prefix and filter options to select the files to load a shared view of the files you want to load must be available to all loader nodes involved in a pipeline for example, you can use a network file system (nfs) mount available to all loader nodes at a common path on each node example this example create pipeline sql statement snippet contains a filesystem source and filters to all csv files in the /tmp/sample data/ directory on each of the loader nodes create pipeline source filesystem prefix '/tmp/sample data/' filter ' / csv' extract format delimited into public orders select $1 as username, $2 as subtotal, hdfs source options you can specify these options for {{hdfs}} data sources ( source hdfs ) for advanced options, see docid 3g1syam6xwz466vfjrdz9 option key default data type description endpoint none string required the hostname and port number of the namenode server for example, 'my hdfs server 1234' the namenode server manages the file system and regulates access to data config none json formatted string optional the json string that contains hdfs https //hadoop apache org/docs/current/hadoop project dist/hadoop hdfs/hdfs default xml to use during the load the properties are primarily for authentication you can also use these properties to configure advanced properties such as connection timeouts, parallelism, and network chunk sizes example config '{"dfs client use datanode hostname" true, "dfs client socket timeout" 300000}' kafka source options you can apply these options to kafka data sources ( source kafka ) for compression, you do not need to specify a compression option in kafka based pipelines, because the ocient system handles the compression type automatically records produced to the kafka broker with a compression type setting or with the compression type set on the topic automatically decompress when the loading process consumes the records the loading process uses built in headers in kafka to determine the required decompression during extraction option key default data type description bootstrap servers none string a comma delimited list of ip\ port pairs that contain the ip addresses and the associated port numbers of the kafka brokers you can also use a hostname instead of the ip address example bootstrap servers = '198 51 100 1 9092,198 51 100 2 9092' topic none string the name of the kafka topic indicates where to consume records write offsets true boolean optional indicates whether the kafka consumer should write its durably made record offsets to the kafka broker config '{ "group id" "\<systemid> \<databasename> \<pipelinename>" }' json formatted string optional the https //docs confluent io/platform/current/installation/configuration/consumer configs html that the kafka consumers should use frequently specified configurations include group id and security protocol these configurations are fixed and cannot be overridden { "enable auto commit" false, "key deserializer" "org apache kafka common serialization bytearraydeserializer", "value deserializer" "org apache kafka common serialization bytearraydeserializer" } auto offset reset 'latest' string optional determine which action to take for the kafka configuration when there is no initial offset in the offset store or the specified offset is out of range supported values are 'smallest' or 'earliest' — automatically reset the offset value to the smallest value 'largest' or 'latest' — automatically reset the offset value to the largest value 'error' — throw an error group id '\<systemid> \<databasename> \<pipelinename>' string optional client group identifier for the kafka source configuration for the consumer configuration, to create a secure connection from the kafka consumer to a kafka broker, set the "security protocol" key along with any ssl or sasl keys if a certificate file is required, you must add it to the truststore used by the {{jvm}} on all loader nodes the truststore path must be identical on all loader nodes the kafka configuration can reference this truststore path if you specify the ssl certificate location or ssl ca location consumer configuration, you must specify both of these configurations otherwise, the system throws an error for example config '{"auto offset reset" "earliest","ssl certificate location" "/etc/blab/file1 txt","ssl ca location" "/etc/blab/file2 txt"}' continuous file loading source options specify these options for data pipelines that use filesystem or s3 sources with the continuous mode for a kafka source, do not use these options general file monitor options option key default data type description monitor none string the type of monitor valid values are sqs or kafka polling interval 10 seconds time interval optional unit of time to consume the event topic valid value range 10 to 120 seconds max files 100 integer optional in the batch syntax , t he maximum number of pending files to collect before starting a batch valid value range 10 to 2000 timeout 1 minute time interval optional in the batch syntax , the time to wait before starting a batch valid value range 10 seconds to 30 minutes lookback 1 day time interval optional in the batch syntax , t he unit of time to look back for file deduplication valid value range 0 to 48 hours sqs monitor options use these options when you use monitor sqs for amazon sqs option key default data type description queue url none string the url of the target queue for example http //localhost 32769/000000000000/queue1 region null string optional the region for the sqs operation for signing requests when unspecified, the region is inferred from queue url by looking for sqs region or sqs fips region if the region cannot be inferred, it defaults to us east 1 endpoint null string optional the endpoint url for the client for example http //localhost 32769 when unspecified, defaults to https //sqs\[ fips] region amazonaws com if region was inferred or specified otherwise, this option is required access key id '' string optional access key identifier for sqs authentication if you specify this option, you must also specify the secret access key option the ocient system uses anonymous credentials when you specify an empty value for this option secret access key '' string optional the secret access key for sqs authentication kafka monitor options use these options when you use monitor kafka for kafka option key default data type description bootstrap servers none string the bootstrap servers for the kafka consumer configuration the string contains a list of brokers as a comma separated list of the broker hostnames or broker hostnames and port number combinations, in the format hostname\ port topic none string the name of the kafka topic indicates where to consume records auto offset reset 'latest' string optional action to take for the kafka configuration when there is no initial offset in the offset store or the chosen offset is out of range 'smallest','earliest' — automatically reset the offset to the smallest offset 'largest','latest' — automatically reset the offset to the largest offset 'error' — trigger an error ( err auto offset reset ) retrieved by consuming messages and checking 'message >err' group id '\<systemid> \<databasename> \<pipelinename> monitor' string optional client group identifier for the kafka configuration config '{ "enable auto commit" false, "group id" " \<systemid> \<databasename> \<pipelinename> monitor" , "max poll interval ms" "600000", "heartbeat interval ms" "6000", "session timeout ms" "600000" }' json formatted string optional the https //docs confluent io/platform/current/installation/configuration/consumer configs html that the kafka consumers should use this option is a json formatted string certain values within this configuration are fixed, whereas the ocient system provides other values with a default value that you can modify max messages 100 integer optional in the consumer syntax , the maximum number of messages to poll each time timeout 1000 milliseconds time interval optional in the consumer syntax , the operation timeout that controls how long the consume request waits for the response the valid range is 0 milliseconds to 1 hour filename $"records"\[1] s3 object key corresponds to a s3\ objectcreated\ put event selector optional in the message syntax, defines the json selector for the filename of an incoming message in a monitor the default value works for s3\ objectcreated events if you override this default configuration, you can parse custom json messages the format must still adhere to s3 standards for details, see https //docs aws amazon com/amazons3/latest/userguide/notification content structure html the message must be in the json format if you specify this option, then you must specify the other two options in the message syntax timestamp $"records"\[1] "eventtime" corresponds to a s3\ objectcreated\ put event selector optional in the message syntax, defines the json selector for the last modification timestamp of the file of an incoming message in a monitor the default value works for s3\ objectcreated events if you override this default configuration, you can parse custom json messages the format must still adhere to s3 standards for details, see https //docs aws amazon com/amazons3/latest/userguide/notification content structure html the message must be in the json format if you specify this option, then you must specify the other two options in the message syntax size $"records"\[1] s3 object size corresponds to a s3\ objectcreated\ put event selector optional in the message syntax, defines the json selector for the file size of an incoming message in a monitor the default value works for s3\ objectcreated events if you override this default configuration, you can parse custom json messages the format must still adhere to s3 standards for details, see https //docs aws amazon com/amazons3/latest/userguide/notification content structure html the message must be in the json format if you specify this option, then you must specify the other two options in the message syntax the same kafka config option override considerations apply for details, see docid\ pbyszqvu5wonpgoso qto lookup options you can optionally look up data in a table from an external database you can include this table in the select statement of the create pipeline sql statement and perform join operations on its columns to use the lookup keyword, specify the external source name lookup source you can look up data in multiple external databases in this case, use the lookup keyword with the source name and corresponding options for each database you must provide the appropriate jdbc jar file for the jdbc connection to external databases for tables in the ocient system, this jar file is not needed the ocient system executes the jdbc code as trusted code only provide trusted jar files to the loading process the streamloader extractorengineparameters configurationoption engine external jdbc jarrootdirectory configuration parameter specifies the location of the jar file for details, see docid\ ejaxrd6rmniv3qqjbjuml option key default data type description connection type none string the type of connection only 'jdbc' is supported connection string none string the string for connection to the external database for example 'jdbc\ sqlite /path/to/file' lookup schema none string the schema of the table in the external database lookup table none string the name of the table in the external database refresh period 30 time interval optional the time between lookup cache refreshes config null json formatted string optional the json string that contains additional configuration options for example '{"force refresh if null" false}' supported json keys are force refresh if null , specified as a boolean that defaults to true when you set this value to true , the system triggers a lookup cache refresh for each value missing from the lookup table (to account for a concurrent load) this cache refresh can have a noticeable performance impact as such, if you expect that some values cannot result in successful lookups, then set this key to the false value load timeout seconds , specified as a long integer with no default value this option is the timeout, in seconds, for a lookup cache refresh driver class , specified as a string with no default value this option is the fully qualified jdbc driver class name if you specify this option, the system throws an error when the system cannot find this driver or the driver cannot load into the process if the jar file for the connection is a jdbc 4 file, the system automatically loads the driver class otherwise, you must specify the fully qualified class name (from class getname in {{java}} ) using this option, such as '{"driver class" "org test example driver"}' extract options general extract options you can specify these options on any of the allowed format types option key default data type description format none string specifies the format of the files to load supported values are delimited csv json binary asn 1 parquet xml charset name binary format ibm1047 all other formats utf 8 string optional specifies the character set for decoding data from the source records into character data use this character set when you load data into varchar columns or when you apply char transformation functions defaults to utf 8 for all formats except binary, which defaults to ibm1047 you can configure the default value for binary formatted data using a sql statement such as alter system alter config set 'sql pipelineparameters extract binary defaultcharset' = 'ibm1047' column default if null false boolean optional specifies whether pipelines should load the column default value when the result of a series of transforms is null if you set this option to false (the default), the pipeline loads null values into columns if you set this option to true , the pipeline loads the defined default value of the column when the result of the execution of the transformation on the column is null null strings delimited or csv format \['null', 'null'] json format \[] array of strings optional specifies string values that should represent a null value when extracted from the source records use this in csv , delimited , and json formats to convert specific values to null instead of requiring individual transform function calls to null if with those values this option applies to source data immediately after extraction if the result of transformations is one of these strings, you must use null if to transform to null with the specified string values trim whitespace false boolean optional specifies whether to trim whitespace from the beginning and end of each string field during extraction in this case, whitespace is a space, tab, carriage return, or linefeed character if the trimmed field value matches one of the values specified by the null strings option, the system converts the value in the field to null if you set this value to true and the empty field as null option to true , the system converts any field that contains only whitespace to null this option does not trim arrays of strings to trim elements in arrays, you can use the trim transform function or the transform function with a lambda expression that uses the trim function validate characters false boolean optional specifies whether to detect invalid characters based on the encoding type during extraction if the system detects invalid characters, the system throws a file level error for file loads or a record level error for kafka loads this option is not supported for asn 1, {{parquet}} , and binary formats replace invalid characters false boolean optional specifies whether to replace invalid characters based on the encoding type with the replacement character u+fffd this option is not supported for asn 1, parquet, and binary formats replacement character � string optional specifies the character that indicates invalid data when you set the replace invalid characters option to true the default value is the replacement character u+fffd this option is not supported for asn 1, parquet, and binary formats for asn 1, {{avro}} , and parquet extract options, the schema option is a map type with a null default value this option is optionally specified depending on the format schema specifies the syntax of options for schema retrieval asn 1 extract options you can specify these options for asn 1 data record extraction ( format asn 1 ) for details about asn 1 formatted data, see docid\ y731i6lhout2b 7mc1jr8 option key default data type description url none string in the schema syntax, s pecify the url to the asn 1 schema file in the asn format record type none string in the schema syntax, s pecify the fully qualified name of the record type to parse the name is the root level asn 1 type to extract from each der encoded record (e g , example personnelrecord ) avro extract options you can specify these options for avro data record extraction the schema options are all optional for object container files (ocf) for kafka loads, the schema configuration must include either the url option or the inline option, but not both for file based loads, the schema configuration can include e ither the inline option or the infer from option, but not both of these options neither the inline nor the infer from options for details about loading avro formatted data, see docid\ y731i6lhout2b 7mc1jr8 option key default data type description inline none string the json specification of the schema declaration as a string specify either a json string or object infer from none string for file based loads, specifies that the system infers the schema from files supported values are 'sample file' — use one sample file 'all files' — use all files binary extract options you can apply these options to binary data record extraction ( format binary ) for details about binary formatted data, see docid\ y731i6lhout2b 7mc1jr8 the general option charset name has a different default value for format binary option key default data type description record length none integer specifies the fixed size in bytes of each record in the source data the ocient system splits the binary data into binary chunks according to this length value and processes them individually endianness big string optional specifies the endianness used to interpret multi byte sequences in various transforms of binary data accepted values are ‘big' and 'little' auto trim padding true bool optional determines if padding characters should be trimmed after decoding the binary data into string data if you set this option to true , the ocient system trims all instances of the padding character value from the end of a string after the system decodes the string from binary type padding character (space) string optional the padding character from the string after the system decodes the string from binary type you can change the default padding character for binary formatted data using a sql statement such as alter system alter config set 'sql pipelineparameters extract binaryformat defaultpaddingcharacter' = ' ' the ocient system trims the default padding character of a space from the end of the text data in binary data delimited and csv extract options you can specify these options for delimited and csv format data record extraction ( format delimited or format csv data formats, which are aliases) for details about working with delimited and csv data, see docid\ y731i6lhout2b 7mc1jr8 option key default data type description field delimiter , string or array of strings optional specifies a character or list of possible characters for delimiting fields the default value sets the field delimiter to only a comma the value must be one byte the ocient system automatically interprets the values you specify as c style escaped strings this means you do not need to specify an escape string ( e'some value' ) is not required to specify control characters this differs from the default string behavior in ocient for details, see docid\ nw9vavkey2v75moxm muo field delimiter and field delimiters are aliases examples use a tab character field delimiter = '\t' use a pipe character field delimiter = '|' use either a pipe or a comma character field delimiter = \[',', '|'] record delimiter \['\r\n', '\n'] string or array of strings optional specifies the string or an array of strings for delimiting records the file is split into individual records using this character during processing common values include '\r\n' and '\n' the value must be one or two bytes the ocient system automatically interprets the values you specify as c style escaped strings an escape string ( e'some value' ) is not required to specify control characters this differs from the default strings behavior in ocient the system chooses the first specified delimiter and uses that delimiter for the rest of the data data with mixed delimiters is not supported for details, see docid\ nw9vavkey2v75moxm muo record delimiter and record delimiters are aliases examples use a linefeed character record delimiter = '\n' use a carriage return and linefeed character sequence record delimiter = '\r\n' num header lines 0 integer optional specifies the number of header lines, typically 0 or 1 the ocient system skips this number of lines and does not load them as data during file processing use this option when your data includes a row of header values num footer lines 0 integer optional specifies the number of footer lines, typically 0 or 1 the ocient system skips this number of lines starting from the end of the file and does not load them as data during file processing use this option when your data includes a row of footer values open array \[ string optional specifies the character that indicates the start of an array in a csv or delimited field use this option to parse array data types specify the close array option also when using this option set this option to null or '' to turn off the detection of these control characters if you set this option to either of these characters, the system also turns off the detection of these characters for the close array option example convert source data such as val1,"\[1,2,3]",val2 to an array when referenced as $2\[] open array '\[' close array ']' array element delimiter ',' close array ] string optional specifies the character that indicates the end of an array in a csv or delimited field use this option to parse array data types specify the open array option also when using this option set this option to null or '' to turn off the detection of these control characters if you set this option to either of these characters, the system also turns off the detection of these characters for the open array option example convert source data such as val1,"{1,2,3}",val2 to an array when referenced as $2\[] close array '}' open array '{' array element delimiter ',' array element delimiter , string optional specifies the character that separates values in an array within a csv or delimited field use this option to parse array data types set this option to null or '' to turn off the detection of these control characters example convert source data such as val1,"\[1;2;3]",val2 to an array when referenced as $2\[] array element delimiter = ';' open object { string optional specifies the character that indicates the start of a tuple in a field use this option to parse tuple data types specify the close object option also when using this option set this option to null or '' to turn off the detection of these control characters if you set this option to either of these characters, the system also turns off the detection of these characters for the close object option close object } string optional specifies the character that indicates the end of a tuple in a field use this option to parse tuple data types specify the open object option also when using this option set this option to null or '' to turn off the detection of these control characters if you set this option to either of these characters, the system also turns off the detection of these characters for the open object option empty field as null true boolean optional specifies whether the ocient system should extract an empty source field as null or a missing value when this option is set to true , the ocient system treats empty fields as null otherwise, the system treats fields as a missing value for string type fields, a missing value is equivalent to an empty string define an empty field as two consecutive delimiters (e g , the second field is empty in abc,,xyz ) if a field is explicitly an empty string as indicated by quote characters, the ocient system treats the field as an empty string, not an empty field (e g , the second field is an empty string in abc,"",xyz ) the char($1) transformation function handles both nulls and empty strings and passes them through ⚠️ beware that the null if($1, '') transformation function directly loads a null for both nulls and missing values this transformation can override the behavior of empty field as null field optionally enclosed by " string optional also known as the “quote character,” this option specifies the character for optionally enclosing fields fields enclosed by this character can include delimiters, the enclosure character, or the escape character set this option to null or '' to turn off the detection of these control characters examples use a double quote character field optionally enclosed by = '"' use a single quote character field optionally enclosed by = '''' escape char " string optional specifies the escape character within fields enclosed by the field optionally enclosed by option use this option to escape the enclosure character or escape character set this option to null or '' to turn off the detection of these control characters examples use a double quote as the escape character escape char = '"' use a single quote as the escape character when you specify the escape character, you often have to use an escape sequence this action follows standard sql rules escape char = '''' skip empty lines false boolean optional specifies whether or not to skip empty lines comment char null string optional specifies the character used to comment out a record in the source file the load skips records where the first character of a record is equal to this character set this option to null or '' to turn off the detection of these control characters example comment char '#' headers null array of strings optional specifies the header labels associated with each column in a delimited file this array of values corresponds to the columns in order from left to right use these labels in the create pipeline select sql statement to refer to column values for example, if you specify headers \['col1', 'col2', col3'] , you can refer to the first column as $"col1" instead of $1 in the select statement json extract options no options exist for json data record extraction ( format json ) for details about json formatted data, see docid\ y731i6lhout2b 7mc1jr8 parquet extract options you can specify these options for parquet data record extraction ( format parquet ) for details about parquet formatted data, see docid\ y731i6lhout2b 7mc1jr8 when you use the format parquet option with an aws s3 source, the endpoint option is required option key default data type description schema infer from sample file string in the schema syntax, specify how to infer the parquet schema valid values are sample file — infer from a random file all files — read the schema from all files and union the retrieved schemas if a field is present in two different schemas, it must have the same data type in both schemas example schema (infer from sample file) xml extract options no options exist for the xml format extraction ( format xml ) for details about xml formatted data, see docid\ y731i6lhout2b 7mc1jr8 bad data targets bad data represents records that the ocient system could not load due to errors in the transformations or invalid data in the source records you can provide options for a bad data target that the ocient system uses during pipeline execution to capture the records that are not loaded the original bytes that the pipeline tried to load are captured in the bad data target along with the metadata about the error, such as the error message or source kafka is the only supported bad data target kafka bad data target when you use kafka as a bad data target, the ocient system produces the original bytes of the source record into the kafka topic of your choice the ocient system includes the metadata about the record in the header of the record as it is sent to kafka you can configure the kafka topic on your kafka brokers using the retention and partition settings of your choice in the event that the kafka broker is unreachable when the ocient system attempts to produce a bad data record to the bad data target, the system logs an error on the loader node, and the pipeline continues example this example create pipeline sql statement snippet contains a bad data target definition using the bad data target option create pipeline bad data target kafka topic 'orders errors' bootstrap servers '111 11 111 1 9092,111 11 111 2 9092' config '{"compression type" "gzip"}' source extract into public orders select $order billing name as username, $order subtotal as subtotal, kafka bad data target options option key default data type description bootstrap servers none string a comma delimited list of ip\ port pairs that contain the ip addresses of the kafka brokers and the associated port number example bootstrap servers = '111 11 111 1 9092,111 11 111 2 9092' topic none string the name of the kafka topic where the ocient system should produce bad data records config '{ "compression type" "none" }' json formatted string optional the https //docs confluent io/platform/current/installation/configuration/producer configs html that the kafka producer should use advanced pipeline tuning options you can use pipeline tuning options to control the parallelism or batching dynamics of your pipelines this tuning can throttle the resources used on a pipeline or increase parallel processing across loader nodes these options are advanced settings that might require a detailed understanding of the underlying mechanics of the loading infrastructure in the ocient system to employ due to the inherent nature of each source type, the behavior of these options can differ between file based and kafka based loads all these options are optional option key default data type description cores the maximum number of cpu cores available on each loader node integer maximum number of processing threads that the ocient system uses during execution on each loader node the system creates this number of threads on each loader node the ocient system automatically determines the default value by finding the number of cores of a loader node you can use this option for performance tuning the calculation for maximum parallelism of a pipeline is number of loaders cores about kafka partitions and parallelism for kafka loads, this option determines the number of kafka consumers created on each loader node for kafka pipelines, the recommendation is that number of loaders cores equals the number of kafka topic partitions if this number exceeds the number of kafka topic partitions, the work might spread unevenly across loader nodes if this number is less than the number of kafka topic partitions, some kafka consumers might receive uneven amounts of work in this case, use a value for number of loaders cores that is an even divisor of the number of kafka topic partitions to avoid a skew in the rates of processing across partitions partitions equal to the value of cores integer specifies the number of partitions over which to split the file list not applicable to kafka loads the ocient system automatically sets a default value based on the configured value for the cores option you can use this option for performance tuning the number of partitions determines how many buckets of work the ocient system generates for each batch of files processed on a loader node the pipeline processes this number of partitions in parallel using the specified number of cores if you specify fewer partitions than cores, some cores are not fully utilized, and resources are wasted if you specify more partitions than cores, the ocient system divides partitions in a round robin fashion over the available cores batch size a dynamic value, determined by the ocient system for each pipeline to maximize performance integer number of rows in the batch to load at one time the ocient system automatically calculates a dynamic value depending on the table columns and the utilization of internal buffers to transfer records to the database backend you can use this option to turn off the dynamic adjustments for performance tuning ⚠️ only change this setting in rare cases where loading performance is slower than expected, and you have a large record size if this setting is improperly set, pipelines might fail with out of memory exceptions you can configure the default value (for the batch payload target) using a sql statement such as alter system alter config set 'streamloader extractorengineparameters configurationoption osc batch payload target' '65536' record number format for file loads that do not use the docid\ od i nse3vcplc3nixzgc function, the default is \[19, 45, 0] for kafka loads that do not use the explode outer function, the default is \[0, 64, 0] for loads that use the explode outer function, the default is the specified load type specific default value with 13 subtracted from the record index bits the system adds these bits to the bits for rows within a record for example, the default for file loads that use this function is \[19, 32, 13] array the 64 bit record number for each record of the load this number uniquely identifies a row within its partition the format is an array with three values in the format \[\<file index bits>, \<record index bits>, \<rows per record index bits>] the file index bits \<file index bits> value is the number of bits used to represent the file index within a partition the record index bits \<record index bits> value is the number of bits used to represent the record index within a file the rows per record index bits \<rows per record index bits> is the number of bits used to represent the row within a record the system uses this value with the explode outer function these three values must sum to 64 example record number format= \[10, 54, 0] set the number of file index bits to 10 and the number of record index bits to 54 , allowing up to 2^10 files and 2^54 records per file the system does not support the explode outer function in this configuration because the rows per record index bits are 0 drop pipeline drop pipeline removes an existing pipeline in the current database you cannot remove a pipeline that is running you must have the drop privilege on the pipeline to execute this sql statement for details, see docid\ f55ngxtki0f7kkmyatvug when you drop a pipeline, the ocient system also removes the associated system catalog information, such as pipeline errors, events, files, partitions, and metrics syntax drop pipeline \[ if exists ] pipeline name \[, ] parameter data type description pipeline name string the name of the specified pipeline to remove you can drop multiple pipelines by specifying additional pipeline names, separated by commas examples remove existing data pipeline remove an existing pipeline named ad data pipeline drop pipeline ad data pipeline; remove existing data pipeline by checking for existence remove an existing pipeline named ad data pipeline or return a warning if the ocient system does not find the pipeline in the database drop pipeline if exists ad data pipeline; preview pipeline preview pipeline enables you to view the results of loading data for a specific create pipeline sql statement without creating a whole data pipeline and without storing those results in the target table using this sql statement, you can iterate quickly and modify the syntax as needed to achieve your expected results after you confirm your expected results, you can use the same syntax in the body of the create pipeline statement with the appropriate source a table must exist in the database to serve as the target of your preview pipeline statement this table ensures the pipeline matches the column types of the target table however, the execution of this statement does not load data into the target table preview sources the source inline source type is available only for the preview pipeline sql statement you cannot create a data pipeline with inline source data other source types defined in the create pipeline statement, s3 , kafka , and filesystem , are compatible with the preview pipeline statement the extract options vary by the source type to mirror the create pipeline statement the database returns 10 records by default preview error handling pipeline level errors cause the preview pipeline sql statement to fail the ocient system returns an error and no result set however, the ocient system accumulates record level errors that occur during the execution of this statement in a single warning that the system returns along with the result set each line of the warning describes a record level error in human readable form or as a json blob, depending on the value of the show errors as json option rows or columns that encounter record level errors have null values in the result set preview limitations limitations of this sql statement are before executing a preview pipeline sql statement, you must create a table for the ocient system to have context for the preview the column default if null option from the create pipeline sql statement has no effect on the preview pipeline sql statement the preview pipeline sql statement does not honor the assignment of a service class based on text matching these source options are not supported start filename end filename when you execute two duplicate preview pipeline statements for a specific kafka topic, the two statements share a consumer group if the topic is small, one or both of the result sets might only be a partial result for multiple tables, you can preview only one table at a time you must specify the name of the table you want to preview using the for keyword the preview of a continuous data pipeline for loading data from files is not supported syntax preview pipeline pipeline name \[ mode mode ] \[ show errors as json show errors as json ] source \[ inline ] (inline string | \<s3 source> | \<filesystem source> | \<kafka source>) \[ limit limit ] extract format csv record delimiter record delimiter field delimiters \['delim1', 'delim2', ] \[ intermediate values intermediate values ] \[ insert ] into created tablename select preview column formula as preview column name, \[ where filter expression ] \[ \[ insert ] into created tablename n select preview column formula as preview column name, \[ where filter expression ] ] \[ , ] \[ for created tablename n ] though this syntax shows the csv format, you can also use the preview pipeline statement with the other formats parameter data type description pipeline name string the name of the specified data pipeline for the preview created tablename identifier the identifier for the name of the table that you create before executing the preview pipeline sql statement created tablename n identifier for multiple tables, the identifier for the name of another table that you create before executing the preview pipeline sql statement use the for keyword to specify which table content to preview preview column formula identifier the identifier for the formula of the data to load for example, for the data in the first field of the inline source, use $1 if you need to add a transformation, you can use functions to transform data, such as concat($1, $2) , to load the concatenation of the first two fields in the inline source data preview column name identifier the name of the column in the target table sql statement options option key default data type description source inline required string the string that contains data for the preview of the data pipeline load for example, source data can be 'oci,ent,ocient|ware,house,warehouse' , where | is the record delimiter and , is the field delimiter for special characters, such as \t , use an escape sequence such as e'oci,ent,oci\tent|ware,house,ware\thouse' this option supports these text based formats csv, json, and xml mode 'transform' string indicates whether to perform a validation of the preview pipeline sql statement valid values are 'validate' and 'transform' set this option to 'validate' for checking that the creation of the data pipeline succeeds if the pipeline is valid, the statement produces no output; otherwise, it returns an error set this option to 'transform' to retrieve a preview of the results of the pipeline show errors as json false boolean indicates whether to show errors values are true or false if the value is true , the ocient system returns record level errors as json blobs rather than human readable messages limit 10 integer the number of rows, specified as an integer, to return in the preview results for sources with many rows the default value is 10 rows intermediate values false boolean indicates whether to capture intermediate values during a transformation sequence values are true or false if the value is true , the ocient system appends an extra column to the result set each value in the column contains a json blob that describes the intermediate values processed for each column after each transformation you must specify at least one column name in the select part of the syntax the name of the specified column must match the name of the column in the created table the number of columns in the select part can be less than those in the created table for definitions of other extract options, see the create pipeline sql statement options in docid\ pbyszqvu5wonpgoso qto examples preview pipeline using csv format preview the load of two rows of data first, create a table to serve as the context for the load the previewload table contains three columns with these data types string, integer, and boolean create table previewload (col1 varchar, col2 int, col3 boolean); create the preview pipeline testpipeline with this data 'hello,2,true|bye,3,false' specify the csv extract format, | record delimiter, and the , field delimiter load the data without transformation preview pipeline testpipeline source inline 'hello,2,true|bye,3,false' extract format csv record delimiter '|' field delimiters \[','] into previewload select $1 as col1, $2 as col2, $3 as col3; output col1 col2 col3 \ hello 2 true bye 3 false fetched 2 rows delete the previewload table drop table previewload; preview pipeline using csv format with escape characters preview the load of two rows of data first, create a table to serve as the context for the load the previewload table contains three columns with these data types string, integer, and boolean create table previewload (col1 varchar, col2 int, col3 boolean); create the preview pipeline testpipeline with this data 'hello\tworld,2,true|bye\tworld,3,false' specify the csv extract format, | record delimiter, and , field delimiter load the data without transformation in this case, the data contains the special character \t you must escape the character by using the escape sequence e preview pipeline testpipeline source inline e'hello\tworld,2,true|bye\tworld,3,false' extract format csv record delimiter '|' field delimiters \[','] into previewload select $1 as col1, $2 as col2, $3 as col3; output col1 col2 col3 \ hello world 2 true bye world 3 false fetched 2 rows delete the previewload table drop table previewload; preview pipeline using csv format with transformation create a table to serve as the context for the load the previewload table contains three string columns create table previewload (col1 varchar, col2 varchar, col3 varchar); create the preview pipeline testpipeline with this data 'hello,world|bye,world' specify the csv extract format, | record delimiter, and the , field delimiter load the data with a transformation to concatenate the two strings and return the result in the third column preview pipeline testpipeline source inline 'hello,world|bye,world' extract format csv record delimiter '|' field delimiters \[','] into previewload select $1 as col1, $2 as col2, concat($1,$2) as col3; output col1 col2 col3 \ hello world helloworld bye world byeworld fetched 2 rows the third column contains the concatenated result of the first two columns delete the previewload table drop table previewload; preview pipeline using the kafka source create the previewload table with these columns id — non null integer salut — non null string name — non null string surname — non null string zipcode — non null integer age — non null integer rank — non null integer create table previewload ( id int not null, salut varchar(3) not null, name varchar(10) not null, surname varchar(10) not null, zipcode int not null, age int not null, rank int not null); create the preview pipeline test small kafka simple csv specify the ddl csv topic indicate that the kafka consumer should not write its durably made record offsets to the kafka broker by using the write offsets option set to false specify the bootstrap server as servername 0000 and configuration options as "auto offset reset" "earliest" by using the bootstrap servers and config options, respectively limit the returned results to three rows by using the limit option specify the csv extract format and \n record delimiter by using the format and record delimiter extract options, respectively preview pipeline test small kafka simple csv source kafka topic 'ddl csv' write offsets false bootstrap servers 'servername 0000' config '{"auto offset reset" "earliest"}' limit 3 extract format csv record delimiter '\n' into previewload select int($1) as id, char($2) as salut, char($3) as name, char($4) as surname, int($5) as zipcode, int($6) as age, int($7) as rank; id salut name surname zipcode age rank \ 105 mr jmhsuxofspx uaofgayjugb 85573 29 2 101 mr ijmmtbddkyh yqbxqnkgidp 52393 43 1 109 mr bigohpwfwmr qcxgakpkoeu 74420 1 3 fetched 3 rows start pipeline start pipeline begins the execution of the specified data pipeline that extracts data and loads it into the target tables specified by the create pipeline sql statement when you execute the start pipeline sql statement, the ocient system creates a static list of files in the sys pipeline files system catalog table and marks them with the pending status after the system assigns a file to an underlying task, the system marks the file as queued after the system verifies that the file exists, the system marks the file as loading to signify that a loader node has started reading the source data finally, upon successfully loading the file, the system transitions the status of the file to the terminal status loaded a kafka pipeline never enters the completed state in the information schema pipeline status view instead, the pipeline remains running after you start the pipeline until you stop it or the pipeline reaches the specified error limit using the error limit option you must have the execute privilege on the pipeline and the insert privilege on any table that is a target in the pipeline for details, see docid\ f55ngxtki0f7kkmyatvug syntax start pipeline pipeline name \[ error \[ limit \<integer value> ] \[ file error (fail | skip missing file | tolerate) ] ] \[ using loaders \<loader names> ] \[ on completion (no flush | flush and wait | flush and return) ] parameter data type description pipeline name string the name of the specified data pipeline sql statement options option key default data type description error limit \<integer value> 0 integer error log option that determines the number of record level errors that can occur during the execution of a pipeline that the load tolerates before the whole pipeline execution fails \<integer value> is a number greater than or equal to 1 when you set \<integer value> to 1, the load tolerates an unlimited number of record level errors by default, continuous pipelines tolerate an unlimited number of record level errors, whereas batch pipelines tolerate zero errors with multiple target tables, record level errors are specific to a table for example, if a transformation for loading one table succeeds but another transformation for loading another table fails, the row corresponding to the successful transformation loads to the first table but not the other error file error \<error action> fail string for pipelines that load data from s3 or local file sources, this error configuration option determines how to treat unrecoverable file level errors examples of unrecoverable file level errors are the file is listed when the pipeline starts, but is missing later during the load the gzip file is corrupted and cannot be decompressed the file cannot be downloaded from the source record level error that is not tolerable occurs when tokenizing or transforming data in the file \<error action> can be one of these keywords fail — fail the whole pipeline because of a file level error skip missing file — only tolerate errors that occur due to missing files if a file exists in the list when the pipeline starts but is missing later during the load, skip the file and continue with the next file tolerate — tolerate all unrecoverable file level errors in this mode, the load also tolerates an unlimited number of record level errors the failed , skipped , and loaded with errors file statuses appear in the sys pipeline files system catalog tables, respectively, and indicate how the pipeline handled the file error using loaders loader names null list of strings specify one or more names of loader nodes as a comma separated list for executing the start pipeline sql statement if you do not use this option, the ocient system uses all of the loader nodes that are active to execute the pipeline you can find node names in the sys nodes system catalog table on completion \<completion mode> no flush string completion type option that specifies the behavior when the pipeline finishes loading this option determines when the remaining pages are converted into segments no flush — do not force a flush of pages rely on watermarks and timeouts to trigger final conversion to segments flush and wait — trigger a flush of pages, initiating final conversion to segments the pipeline blocks and waits for the conversion to segments to complete before marking the pipeline as completed flush and return — trigger a flush of pages, initiating the final conversion to segments the ocient system marks the pipeline as completed immediately following the flush without waiting for conversion to segments to complete for the query to execute successfully, the specified node names must identify nodes that have active operational status streamloader role when you execute the start pipeline sql statement, the ocient system creates a static list of files only for batch pipelines and a dynamic list for continuous pipelines in the sys pipeline files system catalog table examples start an existing pipeline named ad data pipeline with default settings start pipeline ad data pipeline; start an existing pipeline named ad data pipeline with error tolerance (tolerate 10 errors before aborting the pipeline) for details about error tolerance, see docid\ bq3s2bzqqm76v4khi9 ig start pipeline ad data pipeline error limit 10; data pipelines log a message for each pipeline error to the sys pipeline errors system catalog table, even if you do not specify the error option use bad data target settings to capture the original source data start an existing pipeline named ad data pipeline using the loader node named stream loader1 start pipeline ad data pipeline using loaders "stream loader1"; to resume a pipeline with file loading, see docid\ cxjn6qtjcjmsxxdb5b044 to restart a kafka data pipeline, see docid\ cxjn6qtjcjmsxxdb5b044 for pipeline dependencies, see docid\ cxjn6qtjcjmsxxdb5b044 stop pipeline stop pipeline stops the execution of the pipeline and its associated tasks after you stop a pipeline, you can execute the start pipeline sql statement on the pipeline to run the pipeline again regardless, the load deduplicates any records previously loaded in the same pipeline you must have the execute privilege on the pipeline to execute this sql statement for details, see docid\ f55ngxtki0f7kkmyatvug syntax stop pipeline pipeline name parameter data type description pipeline name string the name of the specified data pipeline example stop an existing pipeline named ad data pipeline stop pipeline ad data pipeline; you can see the status of the parent tasks in the sys tasks system catalog table and see the status of the child tasks in the sys subtasks system catalog table alter pipeline rename alter pipeline rename to sql statement changes the name of the pipeline object, while retaining its identifier, options, and other metadata the ocient system reflects this change in the sys pipelines system catalog table then, you must use the new name when you refer to the pipeline in sql statements you must have the alter privilege on the pipeline to execute this sql statement for details, see docid\ f55ngxtki0f7kkmyatvug syntax alter pipeline \[ if exists ] pipeline original name rename to pipeline new name parameter data type description pipeline original name string the name of the existing data pipeline pipeline new name string the new name of the data pipeline example rename an existing pipeline named ad data pipeline to renamed pipeline alter pipeline ad data pipeline rename to renamed pipeline; export pipeline export pipeline returns the create pipeline sql statement used to create the pipeline object you can use the output of this statement to recreate an identical pipeline when you remove the original pipeline the execution of this statement censors sensitive s3 values like access key id and secret access key and kafka https //docs confluent io/platform/current/installation/configuration/consumer configs html password type fields the database replaces them with to execute this statement, you must have the view privilege on the pipeline and any table the pipeline targets for details, see docid\ f55ngxtki0f7kkmyatvug syntax export pipeline pipeline name parameter data type description pipeline name string the name of the specified data pipeline example export an existing pipeline in the database ad data pipeline export pipeline ad data pipeline; create pipeline function create pipeline function enables you to define a function for loading data define the function behavior using the {{groovy}} language for details bout this language, see https //groovy lang org/index html function arguments and output are strongly typed and immutable you can test the execution of your function using the preview pipeline sql statement the ocient system does not support the overload of function names syntax create \[ or replace ] pipeline function \[ if not exists ] function name( input argument \[, ] ) language groovy returns output argument definition imports \[ library name \[, ] ] as $$ groovy declaration $$ parameter type description function name string a unique identifier for the data pipeline function input argument string the name of one or more input arguments of the function specify data types for input arguments according to the support data types defined in docid\ jfqu osagg5enkvmeesnl for the data type declaration, use not null where applicable for maximum performance output argument definition string the type definition of the output from the function library name string the name of one or more java libraries you can include libraries by using the imports clause or specifying the fully qualified class (e g , java lang integer ) path in the source definition groovy declaration string the groovy definition of the function install and enable third party libraries you can use the default list of supported third party libraries or additional third party libraries that you install supported libraries data pipeline functions can import classes from the default list of supported third party libraries this table provides the resources for each supported library package library package resource java lang https //docs oracle com/javase/8/docs/api/java/lang/package summary html java util https //docs oracle com/javase/8/docs/api/java/util/package summary html java nio bytebuffer https //docs oracle com/javase/8/docs/api/java/nio/bytebuffer html groovy json https //docs groovy lang org/latest/html/gapi/groovy/json/package summary html groovy xml https //docs groovy lang org/latest/html/gapi/groovy/xml/package summary html groovy yaml https //docs groovy lang org/latest/html/api/groovy/yaml/package summary html org apache groovy datetime extensions https //docs groovy lang org/latest/html/api/org/apache/groovy/datetime/extensions/package summary html org apache groovy dateutil https //docs groovy lang org/latest/html/api/org/apache/groovy/dateutil/extensions/package summary html com ocient streaming data types docid\ w4zmi3mt097ocbtz sbvt additional libraries you can install and enable additional third party libraries to import for use in your data pipeline functions you must install the jar package on all loader nodes in the /opt/ocient/current/lib/extractorengine udt folder then, add the fully qualified class name in the function import list as part of the library name parameter for example, to reference the bytebuffer class from the com fastbuffer package, specify com fastbuffer bytebuffer in the library name parameter and use the class in the groovy definition as var x = new com fastbuffer bytebuffer() groovy data type mapping for the groovy definition, the ocient system maps its sql data type to the corresponding groovy data type your groovy code should use the groovy data type defined in this table for any input arguments and output sql data type groovy data type bigint java lang long binary(n) or hash(n) byte\[] boolean java lang boolean char(n) or varchar(n) java lang string date java time localdate decimal(p,s) com ocient streaming data types decimal double java lang double int java lang integer ipv4 java net inet4address ip java net inet6address st point com ocient streaming data types gis stpoint st linestring com ocient streaming data types gis stlinestring st polygon com ocient streaming data types gis stpolygon float java lang float smallint java lang short time com ocient streaming data types time timestamp com ocient streaming data types timestamp byte java lang byte tuple<\<type1, type2, …>> com ocient streaming data types ocienttuple type\[] java util list\<type> uuid java util uuid varbinary(n) byte\[] varchar(n) java lang string example create the sort function data pipeline function to sort an array of integers the function has two input arguments value , a non null array of integers, and ascending , the sort order the function returns a non null array of integers if value is empty, the function throws an error the function imports these java libraries java lang integer java util arraylist java util collections java util comparator java util list define the groovy code because the input arguments do not change, the example groovy code first copies the value argument, sorts the copied list according to the sort order, and returns the sorted array create pipeline function sort function( value int\[] not null, ascending boolean not null) language groovy returns int\[] not null imports \[ 'java lang integer', 'java util arraylist', 'java util collections', 'java util comparator', 'java util list' ] as $$ / throw an error if the array is empty / if (value isempty()){ throw new pipelinefunctionexception("unexpected empty array"); } / make a copy of the list / list\<integer> sorted = new arraylist<>((list\<integer>)value); / sort the array elements according to the specified order / sorted sort(ascending ? comparator naturalorder() comparator reverseorder()); / return the sorted array / return sorted; $$; view the creation information about the sort function function using the sys pipeline functions system catalog table this statement returns the function name, return type, argument names, data types of the arguments, and the list of imported libraries select name, return type, argument names, argument types, imported libraries from sys pipeline functions; drop pipeline function drop pipeline function removes an existing pipeline function you must have the drop privilege on the pipeline function to execute this sql statement for details, see docid\ f55ngxtki0f7kkmyatvug syntax drop pipeline function \[ if exists ] function name \[, ] parameter data type description function name string the name of the specified data pipeline function to remove you can drop multiple pipelines by specifying additional function names, separated by commas examples remove the existing pipeline function remove an existing pipeline function named sort function drop pipeline function sort function; remove an existing pipeline function by checking for existence remove an existing pipeline function named sort function or return a warning if the ocient system does not find the function in the database drop pipeline function if exists sort function; related links docid\ zncvnrhsf6fg1yvqk6mxt docid\ stial7oztpmpndfwcsm29 docid ymealxr8i3ef2yhn9r8a docid\ ti3mdibvgmuudmlqu9xpl docid\ cxjn6qtjcjmsxxdb5b044 docid\ vqvrmdyk8josxmkfsyprc docid\ f55ngxtki0f7kkmyatvug docid\ g6voewkufcxz2yscdfxx