Prerequisites
This tutorial assumes that:- The Ocient System has network access to a Kafka broker from the SQL and Loader Nodes.
- An Ocient System is installed and configured with an active Storage Cluster (see the Ocient Application Configuration guide).
Step 1: Create a New Database
Connect to a SQL Node using the Commands Supported by the Ocient JDBC CLI Program. Then, execute theCREATE DATABASE SQL statement to create the metabase database.
SQL
Step 2: Create Table
Create theorders table in the new database. First, connect to that database (e.g., connect to jdbc:ocient://sql-node:4050/metabase), and then execute this CREATE TABLE SQL statement that specifies to create a table with these columns and a clustering index based on the user_id and product_id columns:
created_atas a timestamp that is not nullable.id,user_id, andproduct_idas integers that are not nullable.subtotal,tax,total, anddiscountas floating point numbers.quantityas an integer.
SQL
orders table, and you can begin loading data.
Step 3: Create a Data Pipeline
Create data pipelines using theCREATE PIPELINE SQL statement. To load data, you first create a pipeline with the definition of the source, data format, and transformation rules using a SQL-like declarative syntax. Then, you execute the START PIPELINE command to start the load. You can observe progress and status using system tables and views.
Each Ocient pipeline defines a single data source and the target table or tables into which data loads. A data source includes the location of the source and filters on the source to define the specific data set to load. This example loads data from two data sources, where each source is located in a directory on the same S3 bucket.
First, inspect the data that you plan to load. Each document has a format similar to this example.
JSON
orders_pipeline for the orders data set from your database connection prompt. Specify the Kafka source with broker address 192.168.0.1:9092 (replace this example Kafka broker address with your address) and topic orders. Load the data into the public.orders table. The SELECT part of the SQL statement maps the JSON fields to the target columns in the created table.
SOURCE— Loads data from Kafka. Specify the address of the bootstrap servers and topic. Options exist to set Kafka consumer configurations.EXTRACT— Sets the format to JSON.INTO ... SELECT— Targets thepublic.orderstable and selects the chosen fields from the JSON records. In this case, all data is available at the top level of the JSON records, so the example references the fields by the attribute name (e.g.,$id,$user_id, etc.). For nested data, reference the nested fields using dot notation (e.g.,$order.user.first_name). Each field maps to a target column using theassyntax.
START PIPELINE SQL statement.
Step 4: Confirm that Loading is Operating Correctly
With the pipeline in place and running, data immediately begins loading off of the Kafka topics that are configured in the pipeline. If you do not have data in the Kafka topics yet, now is a good time to start producing data into the topics.Produce Test Data into Kafka
For test purposes, kafkacat is a helpful utility that makes it easy to produce records into a topic. For example, if you have a file of sample dataorders.jsonl in a JSONL format (newline-delimited JSON records), you can run this command to send those records into your Kafka broker. Specify your broker IP address instead of <broker_ip_address> and your topic name <topic_name>.
Shell
~/orders.jsonl.
Assuming the broker is running at the IP address 10.0.0.3, send data into the orders topic defined in the pipeline definition by executing this command.
Shell
Step 5: Observe the Load Progress
With your pipeline running, data immediately begins to load from the Kafka topic. The pipeline creates parallel Kafka consumers for each partition. If there are more partitions than processing cores available, the pipeline automatically handles spreading consumers across available processing cores and Loader Nodes. You can check the pipeline status and progress by queryinginformation_schema.pipeline_status or executing the SHOW PIPELINE_STATUS SQL statement.
public.orders.
COMPLETED. To examine progress, you can use the information schema and system catalog tables. Key details are in information_schema.pipelines, information_schema.pipeline_status, and sys.pipeline_partitions.
For example, this statement shows the status and key metrics for duration, loaded records, and failed records.
sys.pipeline_errors system catalog table captures any errors that occur during the pipeline process.
You can drop the pipeline with the DROP PIPELINE orders_pipeline; SQL statement. Execution of this statement leaves the data in your target table, but removes metadata about the pipeline execution from the system.

