SQL Reference
Data Definition Language (DDL)...
Data Pipelines
data pipelines enable you to load data from your chosen source into the {{ocienthyperscaledatawarehouse}} you can preview a data pipeline using the preview pipeline sql statement to test the creation statement and any required data transformation then, you can create a data pipeline using the create pipeline statement to manage the load, use the start pipeline statement to start the load and the stop pipeline statement to stop it you can rename a data pipeline using the alter pipeline rename statement to see the full definition of a created pipeline, use the export pipeline statement when you finish with the load, you can use the drop pipeline statement to remove the data pipeline you can create user defined data pipeline functions using the create pipeline function statement and remove the function using the drop pipeline function statement also, you can administer privileges for data pipelines and data pipeline functions for details, see docid\ f55ngxtki0f7kkmyatvug create pipeline create pipeline defines a data pipeline that you can execute with the start pipeline sql statement specify the type of load, data source, and data format you must have the alter privilege on the pipeline for details, see docid\ f55ngxtki0f7kkmyatvug syntax create \[ or replace ] \[ batch | continuous | transactional ] pipeline \[ if not exists ] pipeline name \[ \<advanced pipeline options> ] \[ bad data target \<kafka bad data target> ] source ( \<s3 source> | \<filesystem source> | \<hdfs source> | \<kafka source> ) \[ monitor ( \<sqs monitor> | \<kafka monitor> ) ] \[ lookup lookup source \<lookup options> ] \[ ] extract ( \<delimited extract options> | \<asn1 extract options> | \<avro extract options> | \<json extract options> | \<parquet extract options> | \<binary extract options> | \<xml extract options> ) \[ \<general extract options> ] \[ insert ] into destination table name select expression as col alias target, expression2 as col alias target2, \[ where filter expression ] \[ \[ insert ] into destination table name n select expression n as col alias target n, expression2 n as col alias target2 n, \[ where filter expression n ] ] \[ ] / / / advanced options / / / advanced pipeline options = \[ cores processing cores ] \[ partitions file partitions ] \[ batch size number of rows ] \[ record number format record number format ] / / / bad data target options / / / kafka bad data target = kafka bootstrap servers bootstrap servers topic topic name \[ config config option ] / / / source options / / / s3 source = s3 \<file based source options> bucket bucket name ( filter | filter glob | filter regex | object key ) specifiers \[ region region ] \[ endpoint endpoint ] \[ enable path style access enable path style access ] \[ access key id access key credentials ] \[ secret access key secret key credentials ] \[ session token session token ] \[ role arn role arn ] \[ assume role config assume role config] \[ max concurrency parallel connections ] \[ read timeout num seconds ] \[ request depth num requests ] \[ request retries num retries ] \[ headers headers ] filesystem source = filesystem \<file based source options> ( filter | filter glob | filter regex ) specifiers hdfs source = hdfs \<file based source options> ( filter | filter glob | filter regex ) specifiers endpoint endpoint \[ config hdfs config ] kafka source = kafka bootstrap servers bootstrap servers topic topic name \[ write offsets write offsets ] \[ config config option ] \[ auto offset reset ( ( 'largest' | 'latest' ) | ( 'earliest' | 'smallest' ) | 'error' ) file based source options = \[ prefix prefix ] \[ compression method 'gzip' ] \[ sort by ( 'filename' | 'created' | 'modified' ) \[ sort direction ( asc | desc ) ] ] \[ sort rewrite sort rewrite ] \[ start filename start filename ] \[ end filename end filename ] \[ start created timestamp start created timestamp ] \[ end created timestamp end created timestamp ] \[ start modified timestamp start modified timestamp ] \[ end modified timestamp end modified timestamp ] / / / monitor options / / / sqs monitor = sqs \<general monitor options> queue url queue url \[ region region ] \[ endpoint endpoint ] \[ access key id access key id ] \[ secret access key secret access key ] 	 kafka monitor = kafka \<general monitor options> bootstrap servers bootstrap servers topic topic auto offset reset ( ( 'largest' | 'latest' ) | ( 'earliest' | 'smallest' ) | 'error' ) \[ group id group id ] \[ message { filename file selector timestamp timestamp selector size size selector } ] \[ consumer { max messages max messages timeout consumer timeout } ] general monitor options = \[ polling interval polling interval ] \[ batch { max files max files timeout batch timeout lookback lookback } ] / / / lookup options / / / lookup options = connection type connection type connection string connection string lookup schema lookup schema lookup table lookup table \[ refresh period refresh period ] \[ config config json ] / / / extract options / / / general extract options = \[ charset name charset name ] \[ column default if null column default if null ] \[ null strings null strings ] \[ trim whitespace trim whitespace ] \[ validate characters validate characters ] \[ replace invalid characters replace invalid characters ] \[ replacement character replacement character ] delimited extract options = format ( 'delimited' | 'csv' ) \[ comment char comment char ] \[ empty field as null empty field as null ] \[ escape char escape char ] \[ field delimiter field delimiter ] \[ field optionally enclosed by enclosure char ] \[ headers delimited headers ] \[ num header lines num header lines ] \[ num footer lines num footer lines ] \[ record delimiter record delimiter ] \[ skip empty lines skip empty lines ] \[ open array open array char ] \[ close array close array char ] \[ array element delimiter array delimiter ] \[ open object open object char ] \[ close object close object char ] asn1 extract options = format 'asn 1' schema { url url asn1 record type record type } avro extract options = format 'avro' \[ schema { \[ inline inline string ] \[ infer from ( 'sample file' | 'all files' ) ] } ] binary extract options = format 'binary' record length length in bytes \[ endianness ( 'big' | 'little' ) ] \[ auto trim padding auto trim padding ] \[ padding character padding character ] json extract options = format 'json' parquet extract options = format 'parquet' schema { infer from infer from } xml extract options = format 'xml' pipeline identity and naming the name of a data pipeline is unique in an {{ocient}} system reference the pipeline in other sql statements like start pipeline , stop pipeline , and drop pipeline using the name of the pipeline the sql statement throws an error if a pipeline with the same name already exists unless you specify the if not exists option you can rename a pipeline with the alter pipeline sql statement update a pipeline you can update a pipeline with the or replace clause in the create pipeline sql statement use this clause when you want to continue loading from the current place in a continuous load, but you need to modify transformations or other settings if you specify the or replace clause and the pipeline already exists, the database replaces the original pipeline object with the options specified in the new create or replace pipeline statement when you replace an existing pipeline, the pipeline retains its current position in the source data so that data is not duplicated when the pipeline is resumed with a start pipeline sql statement first, you must stop a pipeline before executing the create or replace sql statement data pipeline modes you can define pipelines in either batch batch , continuous continuous , or transactional transactional mode file sources (e g , s3 , filesystem ) support batch and continuous modes file based loads default to batch mode if you do not specify this keyword {{kafka}} only supports continuous mode loads with a kafka source default to continuous mode if you do not specify this keyword when you execute the start pipeline sql statement using a data pipeline in the batch mode, the system creates a static list of files with the pending status in the sys pipeline files system catalog table with the continuous mode, the monitor appends new incoming files to the list of files in the sys pipeline files system catalog table with the continuous mode, you can apply filters for the data pipeline the system only uses files in the consumed messages if the filenames match the filters for this mode, these options are invalid prefix sort by sort direction sort rewrite start created timestamp end created timestamp start modified timestamp end modified timestamp the transactional mode enables the data pipeline to roll back the interim results of the load if an error occurs during load execution the system makes all data visible only after the load completes successfully this mode supports all options that the batch mode does required options you must define certain options in every pipeline for these options, there is no default value optional options need to be present only if you use specific functions a pipeline must contain the source , extract format , and into table name select statements docid\ pbyszqvu5wonpgoso qto docid\ pbyszqvu5wonpgoso qto docid\ y731i6lhout2b 7mc1jr8 docid\ ti3mdibvgmuudmlqu9xpl these options depend on each other if not exists and or replace sql statements are mutually exclusive select statement and data transformation use the into table name select sql statement in the create pipeline sql statement to specify how to transform the data the select statement includes a set of one or more expressions and a target column name in the form expression as column name the expression part in the statement contains a source field reference (e g , $1 or $my field subfield ) and, optionally, the transformation function you want to apply if your data is nested in arrays, you can use special transformation functions, such as the docid\ od i nse3vcplc3nixzgc function, to expand the data into individual rows for details about data transformation and supported transformation functions, see docid\ ti3mdibvgmuudmlqu9xpl for details about data types and casting, see docid\ jfqu osagg5enkvmeesnl and docid\ y731i6lhout2b 7mc1jr8 you can also specify metadata values, such as filename, to load in the select sql statement for details, see docid\ vqvrmdyk8josxmkfsyprc this is an example of a transformation statement snippet format json into public orders select timestamp(bigint($created timestamp)) as ordertime, metadata('filename') as source filename, $order number as ordernumber, $customer first name as fname, left($customer middle initial,1) as minitial, $customer last name as lname, $postal code as postal code, $promo code as promo code, $order total as ordertotal, decimal($tax,8,2) as tax, char\[]\($line items\[] product name) as product names, char\[]\($line items\[] sku) as skus optionally, you can filter the load by using the where clause in the form where filter expression this expression should evaluate to a boolean value or null and can include more than one filter expression the system loads rows that contain data matching the filter criteria when the expression evaluates to true the system does not load the rows when the expression evaluates to false or null you can include any transform in the where clause as in the select clause this is an example of a filter snippet that loads order data for customers whose last name starts with the letter a use the coalesce function to return only non null values format json into public orders select timestamp(bigint($created timestamp)) as ordertime, metadata('filename') as source filename, $order number as ordernumber, $customer first name as fname, left($customer middle initial,1) as minitial, $customer last name as lname, $postal code as postal code, $promo code as promo code, $order total as ordertotal, decimal($tax,8,2) as tax, char\[]\($line items\[] product name) as product names, char\[]\($line items\[] sku) as skus where startswith(coalesce($customer last name, ''),'a') null and default value handling the ocient system has specific ways to handle null and default values in the data pipeline consider this information when you prepare data to load and write the create table and create pipeline sql statements to load null values, insert the null value in the data in this case, if you specify not null for the target column in the create table statement, the data pipeline fails to load if you omit a column in the select sql statement, the data pipeline loads the default value for the column if you do not specify the default value, the pipeline loads a null value if you specify not null for the target column, the data pipeline also fails to load you can use the default keyword to load a default value in this case, if the column does not have a default value, the pipeline fails to load if the pipeline loads a null into a column with a specified default value, you can use coalesce(\<value>, default) to insert the default value instead of the null value, where \<value> is the null column you can modify the load of one column at a time in this way this table describes the data pipeline behavior for null or omitted column values true 165,165,165,168 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type required privileges you must have the create pipeline privilege on the underlying database and the view privilege on each table in the pipeline definition to execute the create pipeline sql statement the table must already exist see the start pipeline sql statement for the required privileges to execute a pipeline for details, see docid\ f55ngxtki0f7kkmyatvug examples load json data from kafka this example loads json data from kafka using the create pipeline sql statement use the bootstrap server 192 168 0 1 9092 using the kafka topic orders load data into the public orders table specify the data to load as these json selectors identifier $id user identifier $user id product identifier $product id subtotal amount $subtotal tax $tax total amount $total discount amount $discount created time $created at quantity $quantity create pipeline orders pipeline source kafka bootstrap servers '192 168 0 1 9092' topic 'orders' extract format json into public orders select $id as id, $user id as user id, $product id as product id, $subtotal as subtotal, $tax as tax, $total as total, $discount as discount, $created at as created at, $quantity as quantity; for a complete tutorial, see docid\ jloppc7jopkao4enkj6 d load delimited data from s3 this example loads delimited data in csv format from {{aws}} s3 use the https //s3 us east 1 amazonaws com https //s3 us east 1 amazonaws com endpoint with the ocient examples bucket and path metabase samples/csv/orders csv denote path style access using the enable path style access option set to true specify one header line with the num header lines option load data into the public orders table specify the data to load using the column numbers idenfier user identifier product identifier subtotal amount tax total amount discount amount created time quantity create pipeline orders pipeline source s3 endpoint 'https //s3 us east 1 amazonaws com' bucket 'ocient examples' filter 'metabase samples/csv/orders csv' enable path style access true extract format csv num header lines 1 into public orders select $1 as id, $2 as user id, $3 as product id, $4 as subtotal, $5 as tax, $6 as total, $7 as discount, $8 as created at, $9 as quantity; for a complete tutorial, see docid\ stial7oztpmpndfwcsm29 continuous file load of delimited data in csv files create a data pipeline that uses continuous file loading with delimited data in csv files specify to use 32 partitions and 16 cores with the partitions and cores options, respectively the source is s3 with the http //endpoint ocient com endpoint and cs data bucket specify the filter ' csv' to find all files with a filename that matches a glob pattern without subdirectories, for example, data csv the system filters filenames with subdirectories such as data/data sample csv for continuous file loading, specify the monitor option to use the kafka monitor specify the test broker 9092 bootstrap server, cfl kafka ten adtech flat small kafka topic, and reset the offset to the smallest offset using the earliest value of the auto offset reset option set the client group identifier to 84079bf1 bdc4 4b10 ba12 41ba6b17dffe the format is csv with the record delimiter as the newline character \n load data in the public ad sessions table the select statement identifies the columns to load by number there are 39 fields in the csv data for each column, transform each column using cast functions for details, see docid\ iteimff9ycez0ctcfgqb for each function see docid\ hem zd qwquv8dfnvnqz8 for the to timestamp function create continuous pipeline test continuous pipeline partitions 32 cores 16 source s3 endpoint 'http //endpoint ocient com/' bucket 'cs data' filter ' csv' monitor kafka bootstrap servers 'test broker 9092' topic 'cfl kafka ten adtech flat small' auto offset reset 'earliest' group id '84079bf1 bdc4 4b10 ba12 41ba6b17dffe' extract format csv record delimiter '\n' into public ad sessions select to timestamp(char($1), 'yyyy mm dd hh\ mm\ ss ssssss', 'java') as event date time, char($2) as device model, tinyint($8) as device user age, boolean($10) as device ad tracking disabled, binary($11) as device mac, int($14) as ip zip, float($19) as ip zip latitude, double($20) as ip zip longitude, bigint($21) as session id, smallint($32) as session response latency, decimal($34, 10, 1) as session transaction revenue, char($39) as session app name ; transactional load of delimited data in csv files create a data pipeline that uses transactional file loading with delimited data in csv files the source is s3 with the http //endpoint ocient com endpoint and cs data bucket specify the filter ' csv' to find all files with a filename that matches a glob pattern without subdirectories, for example, data csv the system filters filenames with subdirectories such as data/data sample csv the format is csv load data in the public ad sessions table the select statement identifies the columns to load by number for each column, transform each column using cast functions for details, see docid\ iteimff9ycez0ctcfgqb for each function see docid\ hem zd qwquv8dfnvnqz8 for the to timestamp function create transactional pipeline test continuous pipeline source s3 endpoint 'http //endpoint ocient com/' bucket 'cs data' filter ' csv' extract format csv into public ad sessions select to timestamp(char($1), 'yyyy mm dd hh\ mm\ ss ssssss', 'java') as event date time, char($2) as device model, tinyint($8) as device user age, boolean($10) as device ad tracking disabled, binary($11) as device mac, int($14) as ip zip, float($19) as ip zip latitude, double($20) as ip zip longitude, bigint($21) as session id, smallint($32) as session response latency, decimal($34, 10, 1) as session transaction revenue, char($39) as session app name ; source options file based source options options that apply to both the s3 , filesystem , and hdfs sources true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type s3 source options you can apply these options to data sources of the source s3 type, which include s3 and s3 compatible services true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type s3 credentials hierarchy the s3 source configuration supports this hierarchy to obtain s3 credentials level 1 data pipeline configuration (set by using the access key id and secret access key options) level 2 https //docs aws amazon com/sdk for java/v1/developer guide/credentials html (set by using the instructions provided in the web page) level 3 anonymous access (set by default) if the ocient system does not obtain the credentials at a level, the system tries a lower level you can choose the level to store the credentials, where level 1 is the highest higher levels take precedence for credential storage filesystem source options no options exist specific to the file system source ( source filesystem ) except for general docid\ pbyszqvu5wonpgoso qto when you load data using source filesystem , the files must be addressable from all of your loader nodes the ocient system uses the specified path in the pipeline prefix and filter options to select the files to load a shared view of the files you want to load must be available to all loader nodes involved in a pipeline for example, you can use a network file system (nfs) mount available to all loader nodes at a common path on each node example this example create pipeline sql statement snippet contains a filesystem source and filters to all csv files in the /tmp/sample data/ directory on each of the loader nodes create pipeline source filesystem prefix '/tmp/sample data/' filter ' / csv' extract format delimited into public orders select $1 as username, $2 as subtotal, hdfs source options you can specify these options for {{hdfs}} data sources ( source hdfs ) for advanced options, see docid 3g1syam6xwz466vfjrdz9 true 147,122 63099630996311,131 45937521487014,360 9096284751667 left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type kafka source options you can apply these options to kafka data sources ( source kafka ) for compression, you do not need to specify a compression option in kafka based pipelines, because the ocient system handles the compression type automatically records produced to the kafka broker with a compression type setting or with the compression type set on the topic automatically decompress when the loading process consumes the records the loading process uses built in headers in kafka to determine the required decompression during extraction true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type for the consumer configuration, to create a secure connection from the kafka consumer to a kafka broker, set the "security protocol" key along with any ssl or sasl keys if a certificate file is required, you must add it to the truststore used by the {{jvm}} on all loader nodes the truststore path must be identical on all loader nodes the kafka configuration can reference this truststore path if you specify the ssl certificate location or ssl ca location consumer configuration, you must specify both of these configurations otherwise, the system throws an error for example config '{"auto offset reset" "earliest","ssl certificate location" "/etc/blab/file1 txt","ssl ca location" "/etc/blab/file2 txt"}' continuous file loading source options specify these options for data pipelines that use filesystem or s3 sources with the continuous mode for a kafka source, do not use these options general file monitor options true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type sqs monitor options use these options when you use monitor sqs for amazon sqs true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type kafka monitor options use these options when you use monitor kafka for kafka true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type the same kafka config option override considerations apply for details, see docid\ pbyszqvu5wonpgoso qto lookup options you can optionally look up data in a table from an external database you can include this table in the select statement of the create pipeline sql statement and perform join operations on its columns to use the lookup keyword, specify the external source name lookup source you can look up data in multiple external databases in this case, use the lookup keyword with the source name and corresponding options for each database you must provide the appropriate jdbc jar file for the jdbc connection to external databases for tables in the ocient system, this jar file is not needed the ocient system executes the jdbc code as trusted code only provide trusted jar files to the loading process the streamloader extractorengineparameters configurationoption engine external jdbc jarrootdirectory configuration parameter specifies the location of the jar file for details, see docid\ ejaxrd6rmniv3qqjbjuml true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type extract options general extract options you can specify these options on any of the allowed format types true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type for asn 1, {{avro}} , and parquet extract options, the schema option is a map type with a null default value this option is optionally specified depending on the format schema specifies the syntax of options for schema retrieval asn 1 extract options you can specify these options for asn 1 data record extraction ( format asn 1 ) for details about asn 1 formatted data, see docid\ y731i6lhout2b 7mc1jr8 true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type avro extract options you can specify these options for avro data record extraction the schema options are all optional for object container files (ocf) for kafka loads, the schema configuration must include either the url option or the inline option, but not both for file based loads, the schema configuration can include e ither the inline option or the infer from option, but not both of these options neither the inline nor the infer from options for details about loading avro formatted data, see docid\ y731i6lhout2b 7mc1jr8 true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type left 1 1 unhandled content type binary extract options you can apply these options to binary data record extraction ( format binary ) for details about binary formatted data, see docid\ y731i6lhout2b 7mc1jr8 the general option charset name has a different default value for format binary true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type the ocient system trims the default padding character of a space from the end of the text data in binary data delimited and csv extract options you can specify these options for delimited and csv format data record extraction ( format delimited or format csv data formats, which are aliases) for details about working with delimited and csv data, see docid\ y731i6lhout2b 7mc1jr8 true 110,83,100,100 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type 1 1 unhandled content type 1 1 unhandled content type 1 1 unhandled content type 1 1 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type json extract options no options exist for json data record extraction ( format json ) for details about json formatted data, see docid\ y731i6lhout2b 7mc1jr8 parquet extract options you can specify these options for parquet data record extraction ( format parquet ) for details about parquet formatted data, see docid\ y731i6lhout2b 7mc1jr8 when you use the format parquet option with an aws s3 source, the endpoint option is required true 165,165,165,168 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type xml extract options no options exist for the xml format extraction ( format xml ) for details about xml formatted data, see docid\ y731i6lhout2b 7mc1jr8 bad data targets bad data represents records that the ocient system could not load due to errors in the transformations or invalid data in the source records you can provide options for a bad data target that the ocient system uses during pipeline execution to capture the records that are not loaded the original bytes that the pipeline tried to load are captured in the bad data target along with the metadata about the error, such as the error message or source kafka is the only supported bad data target kafka bad data target when you use kafka as a bad data target, the ocient system produces the original bytes of the source record into the kafka topic of your choice the ocient system includes the metadata about the record in the header of the record as it is sent to kafka you can configure the kafka topic on your kafka brokers using the retention and partition settings of your choice in the event that the kafka broker is unreachable when the ocient system attempts to produce a bad data record to the bad data target, the system logs an error on the loader node, and the pipeline continues example this example create pipeline sql statement snippet contains a bad data target definition using the bad data target option create pipeline bad data target kafka topic 'orders errors' bootstrap servers '111 11 111 1 9092,111 11 111 2 9092' config '{"compression type" "gzip"}' source extract into public orders select $order billing name as username, $order subtotal as subtotal, kafka bad data target options true 165,165,165,168left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type advanced pipeline tuning options you can use pipeline tuning options to control the parallelism or batching dynamics of your pipelines this tuning can throttle the resources used on a pipeline or increase parallel processing across loader nodes these options are advanced settings that might require a detailed understanding of the underlying mechanics of the loading infrastructure in the ocient system to employ due to the inherent nature of each source type, the behavior of these options can differ between file based and kafka based loads all these options are optional true 117,100,100,100left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type drop pipeline drop pipeline removes an existing pipeline in the current database you cannot remove a pipeline that is running you must have the drop privilege on the pipeline to execute this sql statement for details, see docid\ f55ngxtki0f7kkmyatvug when you drop a pipeline, the ocient system also removes the associated system catalog information, such as pipeline errors, events, files, partitions, and metrics syntax drop pipeline \[ if exists ] pipeline name \[, ] 149,134,380 true unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type examples remove existing data pipeline remove an existing pipeline named ad data pipeline drop pipeline ad data pipeline; remove existing data pipeline by checking for existence remove an existing pipeline named ad data pipeline or return a warning if the ocient system does not find the pipeline in the database drop pipeline if exists ad data pipeline; preview pipeline preview pipeline enables you to view the results of loading data for a specific create pipeline sql statement without creating a whole data pipeline and without storing those results in the target table using this sql statement, you can iterate quickly and modify the syntax as needed to achieve your expected results after you confirm your expected results, you can use the same syntax in the body of the create pipeline statement with the appropriate source a table must exist in the database to serve as the target of your preview pipeline statement this table ensures the pipeline matches the column types of the target table however, the execution of this statement does not load data into the target table preview sources the source inline source type is available only for the preview pipeline sql statement you cannot create a data pipeline with inline source data other source types defined in the create pipeline statement, s3 , kafka , and filesystem , are compatible with the preview pipeline statement the extract options vary by the source type to mirror the create pipeline statement the database returns 10 records by default preview error handling pipeline level errors cause the preview pipeline sql statement to fail the ocient system returns an error and no result set however, the ocient system accumulates record level errors that occur during the execution of this statement in a single warning that the system returns along with the result set each line of the warning describes a record level error in human readable form or as a json blob, depending on the value of the show errors as json option rows or columns that encounter record level errors have null values in the result set preview limitations limitations of this sql statement are before executing a preview pipeline sql statement, you must create a table for the ocient system to have context for the preview the column default if null option from the create pipeline sql statement has no effect on the preview pipeline sql statement the preview pipeline sql statement does not honor the assignment of a service class based on text matching these source options are not supported start filename end filename when you execute two duplicate preview pipeline statements for a specific kafka topic, the two statements share a consumer group if the topic is small, one or both of the result sets might only be a partial result for multiple tables, you can preview only one table at a time you must specify the name of the table you want to preview using the for keyword the preview of a continuous data pipeline for loading data from files is not supported syntax preview pipeline pipeline name \[ mode mode ] \[ show errors as json show errors as json ] source \[ inline ] (inline string | \<s3 source> | \<filesystem source> | \<kafka source>) \[ limit limit ] extract format csv record delimiter record delimiter field delimiters \['delim1', 'delim2', ] \[ intermediate values intermediate values ] \[ insert ] into created tablename select preview column formula as preview column name, \[ where filter expression ] \[ \[ insert ] into created tablename n select preview column formula as preview column name, \[ where filter expression ] ] \[ , ] \[ for created tablename n ] though this syntax shows the csv format, you can also use the preview pipeline statement with the other formats true 221,221,221 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type sql statement options 116,104,144,100 true unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type you must specify at least one column name in the select part of the syntax the name of the specified column must match the name of the column in the created table the number of columns in the select part can be less than those in the created table for definitions of other extract options, see the create pipeline sql statement options in docid\ pbyszqvu5wonpgoso qto examples preview pipeline using csv format preview the load of two rows of data first, create a table to serve as the context for the load the previewload table contains three columns with these data types string, integer, and boolean create table previewload (col1 varchar, col2 int, col3 boolean); create the preview pipeline testpipeline with this data 'hello,2,true|bye,3,false' specify the csv extract format, | record delimiter, and the , field delimiter load the data without transformation preview pipeline testpipeline source inline 'hello,2,true|bye,3,false' extract format csv record delimiter '|' field delimiters \[','] into previewload select $1 as col1, $2 as col2, $3 as col3; output col1 col2 col3 \ hello 2 true bye 3 false fetched 2 rows delete the previewload table drop table previewload; preview pipeline using csv format with escape characters preview the load of two rows of data first, create a table to serve as the context for the load the previewload table contains three columns with these data types string, integer, and boolean create table previewload (col1 varchar, col2 int, col3 boolean); create the preview pipeline testpipeline with this data 'hello\tworld,2,true|bye\tworld,3,false' specify the csv extract format, | record delimiter, and , field delimiter load the data without transformation in this case, the data contains the special character \t you must escape the character by using the escape sequence e preview pipeline testpipeline source inline e'hello\tworld,2,true|bye\tworld,3,false' extract format csv record delimiter '|' field delimiters \[','] into previewload select $1 as col1, $2 as col2, $3 as col3; output col1 col2 col3 \ hello world 2 true bye world 3 false fetched 2 rows delete the previewload table drop table previewload; preview pipeline using csv format with transformation create a table to serve as the context for the load the previewload table contains three string columns create table previewload (col1 varchar, col2 varchar, col3 varchar); create the preview pipeline testpipeline with this data 'hello,world|bye,world' specify the csv extract format, | record delimiter, and the , field delimiter load the data with a transformation to concatenate the two strings and return the result in the third column preview pipeline testpipeline source inline 'hello,world|bye,world' extract format csv record delimiter '|' field delimiters \[','] into previewload select $1 as col1, $2 as col2, concat($1,$2) as col3; output col1 col2 col3 \ hello world helloworld bye world byeworld fetched 2 rows the third column contains the concatenated result of the first two columns delete the previewload table drop table previewload; preview pipeline using the kafka source create the previewload table with these columns id — non null integer salut — non null string name — non null string surname — non null string zipcode — non null integer age — non null integer rank — non null integer create table previewload ( id int not null, salut varchar(3) not null, name varchar(10) not null, surname varchar(10) not null, zipcode int not null, age int not null, rank int not null); create the preview pipeline test small kafka simple csv specify the ddl csv topic indicate that the kafka consumer should not write its durably made record offsets to the kafka broker by using the write offsets option set to false specify the bootstrap server as servername 0000 and configuration options as "auto offset reset" "earliest" by using the bootstrap servers and config options, respectively limit the returned results to three rows by using the limit option specify the csv extract format and \n record delimiter by using the format and record delimiter extract options, respectively preview pipeline test small kafka simple csv source kafka topic 'ddl csv' write offsets false bootstrap servers 'servername 0000' config '{"auto offset reset" "earliest"}' limit 3 extract format csv record delimiter '\n' into previewload select int($1) as id, char($2) as salut, char($3) as name, char($4) as surname, int($5) as zipcode, int($6) as age, int($7) as rank; id salut name surname zipcode age rank \ 105 mr jmhsuxofspx uaofgayjugb 85573 29 2 101 mr ijmmtbddkyh yqbxqnkgidp 52393 43 1 109 mr bigohpwfwmr qcxgakpkoeu 74420 1 3 fetched 3 rows start pipeline start pipeline begins the execution of the specified data pipeline that extracts data and loads it into the target tables specified by the create pipeline sql statement when you execute the start pipeline sql statement, the ocient system creates a static list of files in the sys pipeline files system catalog table and marks them with the pending status after the system assigns a file to an underlying task, the system marks the file as queued after the system verifies that the file exists, the system marks the file as loading to signify that a loader node has started reading the source data finally, upon successfully loading the file, the system transitions the status of the file to the terminal status loaded a kafka pipeline never enters the completed state in the information schema pipeline status view instead, the pipeline remains running after you start the pipeline until you stop it or the pipeline reaches the specified error limit using the error limit option you must have the execute privilege on the pipeline and the insert privilege on any table that is a target in the pipeline for details, see docid\ f55ngxtki0f7kkmyatvug syntax start pipeline pipeline name \[ error \[ limit \<integer value> ] \[ file error (fail | skip missing file | tolerate) ] ] \[ using loaders \<loader names> ] \[ on completion (no flush | flush and wait | flush and return) ] true 221,221,221 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type sql statement options true 100,100,101,100left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type left unhandled content type for the query to execute successfully, the specified node names must identify nodes that have active operational status streamloader role when you execute the start pipeline sql statement, the ocient system creates a static list of files only for batch pipelines and a dynamic list for continuous pipelines in the sys pipeline files system catalog table examples start an existing pipeline named ad data pipeline with default settings start pipeline ad data pipeline; start an existing pipeline named ad data pipeline with error tolerance (tolerate 10 errors before aborting the pipeline) for details about error tolerance, see docid\ bq3s2bzqqm76v4khi9 ig start pipeline ad data pipeline error limit 10; data pipelines log a message for each pipeline error to the sys pipeline errors system catalog table, even if you do not specify the error option use bad data target settings to capture the original source data start an existing pipeline named ad data pipeline using the loader node named stream loader1 start pipeline ad data pipeline using loaders "stream loader1"; to resume a pipeline with file loading, see docid\ cxjn6qtjcjmsxxdb5b044 to restart a kafka data pipeline, see docid\ cxjn6qtjcjmsxxdb5b044 for pipeline dependencies, see docid\ cxjn6qtjcjmsxxdb5b044 stop pipeline stop pipeline stops the execution of the pipeline and its associated tasks after you stop a pipeline, you can execute the start pipeline sql statement on the pipeline to run the pipeline again regardless, the load deduplicates any records previously loaded in the same pipeline you must have the execute privilege on the pipeline to execute this sql statement for details, see docid\ f55ngxtki0f7kkmyatvug syntax stop pipeline pipeline name true 221,221,221 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type example stop an existing pipeline named ad data pipeline stop pipeline ad data pipeline; you can see the status of the parent tasks in the sys tasks system catalog table and see the status of the child tasks in the sys subtasks system catalog table alter pipeline rename alter pipeline rename to sql statement changes the name of the pipeline object, while retaining its identifier, options, and other metadata the ocient system reflects this change in the sys pipelines system catalog table then, you must use the new name when you refer to the pipeline in sql statements you must have the alter privilege on the pipeline to execute this sql statement for details, see docid\ f55ngxtki0f7kkmyatvug syntax alter pipeline \[ if exists ] pipeline original name rename to pipeline new name true 221,221,221 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type example rename an existing pipeline named ad data pipeline to renamed pipeline alter pipeline ad data pipeline rename to renamed pipeline; export pipeline export pipeline returns the create pipeline sql statement used to create the pipeline object you can use the output of this statement to recreate an identical pipeline when you remove the original pipeline the execution of this statement censors sensitive s3 values like access key id and secret access key and kafka https //docs confluent io/platform/current/installation/configuration/consumer configs html password type fields the database replaces them with to execute this statement, you must have the view privilege on the pipeline and any table the pipeline targets for details, see docid\ f55ngxtki0f7kkmyatvug syntax export pipeline pipeline name true 221,221,221 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type example export an existing pipeline in the database ad data pipeline export pipeline ad data pipeline; create pipeline function create pipeline function enables you to define a function for loading data define the function behavior using the {{groovy}} language for details bout this language, see https //groovy lang org/index html function arguments and output are strongly typed and immutable you can test the execution of your function using the preview pipeline sql statement the ocient system does not support the overload of function names syntax create \[ or replace ] pipeline function \[ if not exists ] function name( input argument \[, ] ) language groovy returns output argument definition imports \[ library name \[, ] ] as $$ groovy declaration $$ 145,110,337 true unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type install and enable third party libraries you can use the default list of supported third party libraries or additional third party libraries that you install supported libraries data pipeline functions can import classes from the default list of supported third party libraries this table provides the resources for each supported library package true 331,332 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type additional libraries you can install and enable additional third party libraries to import for use in your data pipeline functions you must install the jar package on all loader nodes in the /opt/ocient/current/lib/extractorengine udt folder then, add the fully qualified class name in the function import list as part of the library name parameter for example, to reference the bytebuffer class from the com fastbuffer package, specify com fastbuffer bytebuffer in the library name parameter and use the class in the groovy definition as var x = new com fastbuffer bytebuffer() groovy data type mapping for the groovy definition, the ocient system maps its sql data type to the corresponding groovy data type your groovy code should use the groovy data type defined in this table for any input arguments and output true 331,332 unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type example create the sort function data pipeline function to sort an array of integers the function has two input arguments value , a non null array of integers, and ascending , the sort order the function returns a non null array of integers if value is empty, the function throws an error the function imports these java libraries java lang integer java util arraylist java util collections java util comparator java util list define the groovy code because the input arguments do not change, the example groovy code first copies the value argument, sorts the copied list according to the sort order, and returns the sorted array create pipeline function sort function( value int\[] not null, ascending boolean not null) language groovy returns int\[] not null imports \[ 'java lang integer', 'java util arraylist', 'java util collections', 'java util comparator', 'java util list' ] as $$ / throw an error if the array is empty / if (value isempty()){ throw new pipelinefunctionexception("unexpected empty array"); } / make a copy of the list / list\<integer> sorted = new arraylist<>((list\<integer>)value); / sort the array elements according to the specified order / sorted sort(ascending ? comparator naturalorder() comparator reverseorder()); / return the sorted array / return sorted; $$; view the creation information about the sort function function using the sys pipeline functions system catalog table this statement returns the function name, return type, argument names, data types of the arguments, and the list of imported libraries select name, return type, argument names, argument types, imported libraries from sys pipeline functions; drop pipeline function drop pipeline function removes an existing pipeline function you must have the drop privilege on the pipeline function to execute this sql statement for details, see docid\ f55ngxtki0f7kkmyatvug syntax drop pipeline function \[ if exists ] function name \[, ] 149,134,380 true unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type unhandled content type examples remove the existing pipeline function remove an existing pipeline function named sort function drop pipeline function sort function; remove an existing pipeline function by checking for existence remove an existing pipeline function named sort function or return a warning if the ocient system does not find the function in the database drop pipeline function if exists sort function; related links docid\ zncvnrhsf6fg1yvqk6mxt docid\ stial7oztpmpndfwcsm29 docid ymealxr8i3ef2yhn9r8a docid\ ti3mdibvgmuudmlqu9xpl docid\ cxjn6qtjcjmsxxdb5b044 docid\ vqvrmdyk8josxmkfsyprc docid\ f55ngxtki0f7kkmyatvug docid\ g6voewkufcxz2yscdfxx