Load Data
Loading by Data Source

Load JSON Data from Kafka

A common setup for streaming data into

 is to send JSON documents to

and then transform each document into rows in one or more different tables. Ocient’s Loading and Transformation capabilities use a simple SQL-like syntax for transforming data. This tutorial will guide users through a simple example load using a small set of data in JSON format. The data in this example is created from a test set for the

Business Intelligence tool.

Prerequisites

This tutorial assumes that:

  1. A Kafka cluster is operational and can be reached by the Ocient Loader Nodes.
  2. An Ocient System is installed and configured with an active storage cluster (See the Ocient Application Configuration guide).
  3. The Ocient Loader Nodes are running the latest Loading and Transformation version which is configured to connect to Kafka for stream loading.
  4. A default "sink" for the Ocient Loader Nodes is configured on the system.
  5. The test data for this Tutorial can be found at the following S3 addresses.

You must be logged into Amazon AWS to download these files.

https://ocient-examples.s3.amazonaws.com/metabase_samples/jsonl/orders.jsonl https://ocient-examples.s3.amazonaws.com/metabase_samples/jsonl/products.jsonl https://ocient-examples.s3.amazonaws.com/metabase_samples/jsonl/people.jsonl https://ocient-examples.s3.amazonaws.com/metabase_samples/jsonl/reviews.jsonl

Step 1: Create a New Database

To begin, you are going to load four example tables in a database. First, connect to a SQL Node using the Commands Supported by the Ocient JDBC CLI Program . Then run the following DDL command:

SQL


Step 2: Create Tables

To create tables in the new database, first connect to that database (e.g., connect to jdbc:ocient://sql-node:4050/metabase), then run the following DDL commands:

SQL


Now, the database tables are created, and you can begin loading data.

Step 3: Create a Data Pipeline

Data pipelines are created using a simple loading configuration that is submitted to the Transformation Nodes to start loading. Each Kafka topic is routed to one or more Ocient tables, and each column is the result of a transformation applied to the source document.

First, let’s inspect the data that is going to be loaded. Each document has a format similar to the following example:

JSON


As you can see, this is similar to the target schema, but will require some transformation. Most transformations are identical to functions already in Ocient’s SQL dialect. To route data to your tables, you must create a pipeline.json file that has the following structure:

JSON


Step 4: Using the Loading and Transformation CLI

With a pipeline.json file ready to go, you can test this pipeline. To test, use the LAT CLI. For these examples, you can assume that two LATs are configured and will set them using an environment variable.

First, configure the LAT CLI to use the hosts of your Loading and Transformation service. You can add these to every CLI command as a flag, but for simplicity you can also set them as environment variables. From a command line, run the following command replacing the IP addresses with the IP addresses of your LAT processes:

Shell


Next, check on the status of the LAT:

Shell


Example response:

Text


Success! This confirms that you can reach the LAT from your CLI. If the status is "Running" it means a pipeline is already executing a pipeline. You are next going to update and start your new pipeline.

This example uses secure connections. If you receive an SSL Error when testing, your service cannot be configured to use TLS or you might need to use the --no-verify flag if certificate validation fails.

Step 5: Test the Transformation

The CLI supports previewing a transformation with an example document and the pipeline file. This makes it easy to test your transformations.

First, save an example document to your file system to use for this test. For this demo, you can download an example file from https://ocient-examples.s3.amazonaws.com/metabase_samples/jsonl/orders.jsonl and save it to ~/orders.jsonl.

Next, make sure the pipeline.json file that you created is stored at ~/pipeline.json.

Now that both files are available, run the CLI to preview the results. You can pass the preview command the topic name, the pipeline file, and the sample record file. The response contains the transformed data tied to the destination table and a list of any error records.

Shell


Example response:

JSON


You can see that the data is transformed and the columns to which each transformed value will be mapped. If there are issues in the values, these will appear in the recordErrors object. You can quickly update your pipeline.json file and preview again. Now, you can inspect different documents to confirm that various states of data cleanliness like missing columns, null values, and special characters are well handled by your transformations.

Step 6: Configure and Start the Data Pipeline

With a tested transformation, the next step is to set up and start the data pipeline.

First, configure the pipeline using the pipeline create command. This validates and creates the pipeline, but will not take effect until you start the pipeline:

Shell


Example response:

Text


In cases where there is an existing pipeline operating, it is necessary to stop the pipeline and remove the original pipeline before creating and starting the new pipeline.

Now that the pipeline has been created on all LAT Nodes, you can start the LAT by running the pipeline start commands:

Shell


Example responses:

Text


Step 7: Confirm that Loading is Operating Correctly

With your pipeline in place and running, data will immediately begin loading off of the Kafka topics that are configured in the pipeline. If you do not have data in the Kafka topics yet, now would be a good time to start producing data into the topics.

Producing Test Data into Kafka:

For test purposes, kafkacat is a helpful utility that makes it easy to product records into a topic. For example, if you have a file of sample data orders.jsonl in a JSONL format (newline delimited JSON records), you can run the following command to send those records into your Kafka broker:

Shell


Assuming your broker is running at 10.0.0.3 and you want to send data into the four topics defined in your pipeline.json definition, you can run:

Shell


Each of these commands will push the entire JSONL file of messages into Kafka with one record per line. As these are produced into Kafka, your running pipeline will begin loading them into Ocient.

Observing Loading Progress:

With data in Kafka, our pipeline will begin loading data immediately and streaming any new data into Ocient. To observe this progress, you can monitor the metrics endpoint of the Loading and Transformation Nodes. This can be done manually from a command line or from a tool like

. For this example, you can run a curl command against the endpoint and review the result. For details on metrics, see the Metrics Documentation.

Command:

Curl


Example response:

JSON


Check Row Counts in Tables:

To confirm that you are seeing results in the target tables, you can also run some simple queries to check row counts. Depending on the streamloader role settings, the time for records to become queryable can vary from a few seconds to minutes:

Example Queries:

SQL

SQL

SQL

SQL


Success! Now you can explore the data in these four tables with any Ocient SQL queries. If more data is pushed into these topics, your pipeline is still running and will automatically load all new data.

Check Errors

In this example, all rows load successfully. However, a successful load does not always happen, and you can inspect errors using the LAT Client. Whenever the LAT process fails to parse a file correctly or fails to transform or load a record, the LAT process records an error. The LAT Client includes the lat_client pipeline errors command that reports the latest errors on the pipeline.

A full error log is available on the Loader Nodes. These logs report all bad records and the reason that the load fails.

When you load a pipeline from Kafka, the load might route errors to an error topic on the Kafka broker instead of the logs. The LAT Client does not contain the errors sent to the error topic. You can inspect these errors with Kafka utilities instead.

This LAT Client command displays a maximum of 100 error messages.

Text


The errors indicate that there is an issue parsing the time1 column. Options exist on the pipeline errors command to return JSON and to restrict the response to specific components of the error detail that includes a reference to the source location of this record.

The following command returns JSON that is delimited with newline characters. You can pass the JSON output to jq or a file. The JSON includes the source topic or file group, the filename where the error occurred, the offset that indicates the line number or Kafka offset, and the exception message that aids in troubleshooting and identifying the incorrect record in the source data. You can use the log_original_message pipeline setting to provide direct access to the parsed source record for errors when appropriate.

Text


Related Links

LAT Overview