Data Pipeline Load of JSON Data from Kafka
Ocient uses data pipelines to transform each document into rows in one or more different tables. The loading and transformation capabilities use a simple SQL-like syntax for transforming data. This tutorial guides you through a simple example load using a small data set in JSON format. The data in this example comes from a test set for the Business Intelligence tool.
This tutorial assumes that:
- The Ocient System has network access to a Kafka broker from the SQL and Loader Nodes.
- An Ocient System is installed and configured with an active Storage Cluster (See the Ocient Application Configuration guide).
To begin, load two example tables in a database. First, connect to a SQL Node using the Commands Supported by the Ocient JDBC CLI Program . Then, execute the CREATE DATABASE SQL statement to create the metabase database.
Create the orders table in the new database. First, connect to that database (e.g., connect to jdbc:ocient://sql-node:4050/metabase), and then execute this CREATE TABLE SQL statement that specifies to create a table with these columns and a clustering index based on the user_id and product_id columns:
The database creates the orders table, and you can begin loading data.
Create data pipelines using the CREATE PIPELINE SQL statement. To load data, you first create a pipeline with the definition of the source, data format, and transformation rules using a SQL-like declarative syntax. Then, you execute the START PIPELINE command to start the load. You can observe progress and status using system tables and views.
Each Ocient pipeline defines a single data source and the target table or tables into which data loads. A data source includes the location of the source and filters on the source to define the specific data set to load. This example loads data from two data sources, where each source is located in a directory on the same S3 bucket.
First, inspect the data that you plan to load. Each document has a format similar to this example.
In this case, Ocient automatically transforms the data to the target columns using some sensible conventions. In other cases, loads require some transformation. Most transformations are identical to functions that already exist in the SQL dialect of the Ocient System.
Create a pipeline named orders_pipeline for the orders data set from your database connection prompt. Specify the Kafka source with broker address 192.168.0.1:9092 (replace this example Kafka broker address with your address) and topic orders. Load the data into the public.orders table. The SELECT part of the SQL statement maps the JSON fields to the target columns in the created table.
The pipeline has three main sections:
- SOURCE — Loads data from Kafka. Specify the address of the bootstrap servers and topic. Options exist to set Kafka consumer configurations.
- EXTRACT — Sets the format to JSON.
- INTO ... SELECT — Targets the public.orders table and selects the chosen fields from the JSON records. In this case, all data is available at the top level of the JSON records, so the example references the fields by the attribute name (e.g., $id, $user_id, etc.). For nested data, reference the nested fields using dot notation (e.g., $order.user.first_name). Each field maps to a target column using the as syntax.
After you successfully create this pipeline, execute the START PIPELINE SQL statement.
With the pipeline in place and running, data immediately begins loading off of the Kafka topics that are configured in the pipeline. If you do not have data in the Kafka topics yet, now is a good time to start producing data into the topics.
For test purposes, kafkacat is a helpful utility that makes it easy to produce records into a topic. For example, if you have a file of sample data orders.jsonl in a JSONL format (newline-delimited JSON records), you can run this command to send those records into your Kafka broker. Specify your broker IP address instead of <broker_ip_address> and your topic name <topic_name>.
Save an example document to your file system to use for this test. For this example, you can download an example file from https://ocient-examples.s3.amazonaws.com/metabase_samples/jsonl/orders.jsonl and save it to ~/orders.jsonl.
Assuming the broker is running at the IP address 10.0.0.3, send data into the orders topic defined in the pipeline definition by executing this command.
This command pushes the entire JSONL file of messages into Kafka with one record per line. As these records are produced into Kafka, the running pipeline begins to load them into Ocient.
With your pipeline running, data immediately begins to load from the Kafka topic. The pipeline creates parallel Kafka consumers for each partition. If there are more partitions than processing cores available, the pipeline automatically handles spreading consumers across available processing cores and Loader Nodes.
You can check the pipeline status and progress by querying information_schema.pipeline_status or executing the SHOW PIPELINE_STATUS SQL statement.
Output
After a few seconds, the data is available for query in public.orders.
Output
Unlike a batch file load, Kafka pipelines run continuously, so they never change to a status of COMPLETED. To examine progress, you can use the information schema and system catalog tables. Key details are in information_schema.pipelines, information_schema.pipeline_status, and sys.pipeline_partitions.
For example, this statement shows the status and key metrics for duration, loaded records, and failed records.
Output
In this example, there are no errors. The sys.pipeline_errors system catalog table captures any errors that occur during the pipeline process.
You can drop the pipeline with the DROP PIPELINE orders_pipeline; SQL statement. Execution of this statement leaves the data in your target table, but removes metadata about the pipeline execution from the system.