> ## Documentation Index
> Fetch the complete documentation index at: https://docs.ocient.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Load Data into Multiple Targets in Data Pipelines

export const TimeKey = "TimeKey®";

export const Ocient = "Ocient®";

export const Metabase = "Metabase℠";

export const AWS = "Amazon® Web Services℠ (AWS℠)";

A common setup for loading files in a batch into {Ocient} is to load from a bucket on {AWS} S3 with time-partitioned data. Often, you must perform a batch load repeatedly to load new files.

Ocient uses data pipelines to transform each document into rows in one or more different tables. The loading and transformation capabilities use a simple SQL-like syntax for transforming data. This tutorial guides you through a load into multiple target tables using a small data set in CSV format. The data in this example comes from a test set for the {Metabase} Business Intelligence tool.

## Prerequisites

This tutorial assumes that:

1. The Ocient System has network access to S3 from the Loader Nodes.
2. An Ocient System is installed and configured with an active Storage Cluster (see the [Ocient Application Configuration](/ocient-application-configuration) guide).

## Step 1: Create a New Database

Connect to a SQL Node using the [Commands Supported by the Ocient JDBC CLI Program](/commands-supported-by-the-ocient-jdbc-cli-program). Then, execute the `CREATE DATABASE` SQL statement for a database named `metabase`.

```sql SQL theme={null}
CREATE DATABASE metabase;
```

## Step 2: Create Tables

Create two target tables for loading data for two different products.

Create the `product129` table in the new database. First, connect to that database (e.g., `connect to jdbc:ocient://sql-node:4050/metabase`), and then execute this `CREATE TABLE` SQL statement that specifies to create a table with these columns and a clustering index based on the `user_id` and `product_id` columns:

* `created_at` as a {TimeKey} timestamp that is not nullable.
* `id`, `user_id`, and `product_id` as integers that are not nullable.
* `subtotal`, `tax`, `total`, and `discount` as floating point numbers.
* `quantity` as an integer.

```sql SQL theme={null}
CREATE TABLE public.product129 (
    created_at TIMESTAMP TIME KEY BUCKET(30, DAY) NOT NULL,
    id INT NOT NULL,
    user_id INT NOT NULL,
    product_id INT NOT NULL,
    subtotal DOUBLE,
    tax DOUBLE,
    total DOUBLE,
    discount DOUBLE,
    quantity INT,
    CLUSTERING INDEX idx01 (user_id, product_id)
);
```

Create the second `product161` table in the same way.

```sql SQL theme={null}
CREATE TABLE public.product161 (
    created_at TIMESTAMP TIME KEY BUCKET(30, DAY) NOT NULL,
    id INT NOT NULL,
    user_id INT NOT NULL,
    product_id INT NOT NULL,
    subtotal DOUBLE,
    tax DOUBLE,
    total DOUBLE,
    discount DOUBLE,
    quantity INT,
    CLUSTERING INDEX idx01 (user_id, product_id)
);
```

The database creates the `product129`and `product161` tables, and you can begin loading data.

## Step 3: Create a Data Pipeline

Create data pipelines using the `CREATE PIPELINE` SQL statement. To load data, you first create a pipeline with the definition of the source, data format, and transformation rules using a SQL-like declarative syntax. Then, you execute the `START PIPELINE` SQL statement to start the load. You can observe progress and status using system tables and views.

Each Ocient pipeline defines a single data source and the target table or tables into which data loads. A data source includes the location of the source and filters on the source to define the specific data set to load. This example loads data from a data source located in a directory within the S3 bucket.

First, inspect the data that you plan to load. Each document has a format similar to this example CSV file named `orders.csv`.

```none Text theme={null}
id,user_id,product_id,subtotal,tax,total,discount,created_at,quantity
1,1,14,37.65,2.07,39.72,null,2019-02-11T21:40:27.892Z,2
2,1,123,110.93,6.1,117.03,null,2018-05-15T08:04:04.580Z,3
3,1,105,52.72,2.9,49.2,6.42,2019-12-06T22:22:48.544Z,2
...
```

In this case, Ocient automatically transforms the data to the target columns using some sensible conventions. In other cases, loads require some transformation. Most transformations are identical to functions that already exist in the SQL dialect of the Ocient System.

Create a pipeline named `orders_pipeline` for the orders data set from your database connection prompt. Use the S3 data source with endpoint `https://s3.us-east-1.amazonaws.com`, bucket `ocient-docs`, and filter `metabase_samples/csv/orders.csv`. Specify the CSV format with one header line. Load the data into the two target tables. The `SELECT` part of the SQL statement maps the fields in the CSV file to the target columns in the created table.

<CodeGroup>
  ```sql SQL theme={null}
  CREATE PIPELINE orders_pipeline
  SOURCE
      S3
          ENDPOINT 'https://s3.us-east-1.amazonaws.com'
          BUCKET 'ocient-docs'
          FILTER 'metabase_samples/csv/orders.csv'
  EXTRACT
      FORMAT csv
      NUM_HEADER_LINES 1
  INTO public.product129
  SELECT
      $1 as id,
      $2 as user_id,
      $3 as product_id,
      $4 as subtotal,
      $5 as tax,
      $6 as total,
      $7 as discount,
      $8 as created_at,
      $9 as quantity
  WHERE $3 = 129

  INTO public.product161
  SELECT
      $1 as id,
      $2 as user_id,
      $3 as product_id,
      $4 as subtotal,
      $5 as tax,
      $6 as total,
      $7 as discount,
      $8 as created_at,
      $9 as quantity
  WHERE $3 = 161;
  ```
</CodeGroup>

The pipeline has three main sections:

* `SOURCE` — In this case, load data from S3. Specify the endpoint and bucket. The `FILTER` parameter identifies the file or files to load. The load uses a single CSV file. Options exist to add wildcards or regular expressions to isolate different file sets.
* `EXTRACT` — Set the format to CSV files and note that there is one header line in the file. This specification skips that row when the Ocient System processes the file. Many other options exist for delimited data such as a record delimiter and field delimiter.
* `INTO ... SELECT` — Choose the target tables `public.product129` and `public.product161` and select the fields from the CSV file. The numeric index identifies each file field. Importantly, similar to other SQL syntax, the first field in the file is `$1`, not `$0`. Each field maps to a target column using the `as` syntax. The `WHERE` filter instructs the data pipeline to load order data for only two products, one product for each table.

After you successfully create the `orders_pipeline` pipeline, execute the `START PIPELINE` SQL statement.

<CodeGroup>
  ```sql SQL theme={null}
  START PIPELINE orders_pipeline;
  ```
</CodeGroup>

## Step 4: Observe the Load Progress

With your pipeline running, data immediately begins to load from the S3 files that you defined. If there are many files in each file group, the load process first sorts the files into batches, partitions them for parallel processing, and assigns them to Loader Nodes.

You can check the pipeline status and progress by querying `information_schema.pipeline_status` or by executing `SHOW PIPELINE_STATUS`.

<CodeGroup>
  ```sql SQL theme={null}
  SHOW PIPELINE_STATUS;
  ```
</CodeGroup>

*Output*

<CodeGroup>
  ```sql SQL theme={null}
  database_name         pipeline_name       table_names                                  status      status_message                               percent_complete duration_seconds files_processed     files_failed        files_remaining     records_processed   records_loaded      records_failed
  --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  metabase              orders_pipeline     ["public.product129" "public.product161"]    RUNNING     Started processing pipeline orders_pipeline  0.0              2.025824         0                   0                   1                   0                   0                   0
  ```
</CodeGroup>

After the status of the pipeline changes to `COMPLETED`, all data is available in the target table.

After a few seconds, the data is available for query in the `public.product129` table.

```sql SQL theme={null}
SELECT COUNT(*) FROM public.product129;
```

Output

<CodeGroup>
  ```sql SQL theme={null}
  count(*)
  --------------------
  93
  ```
</CodeGroup>

The data is also available for query in the `public.product161` table.

```sql SQL theme={null}
SELECT COUNT(*) FROM public.product161;
```

Output

<CodeGroup>
  ```sql SQL theme={null}
  count(*)
  --------------------
  92
  ```
</CodeGroup>

You can drop the pipeline with the `DROP PIPELINE orders_pipeline;` SQL statement. Execution of this statement leaves the data in your target table, but removes metadata about the pipeline execution from the system.

## Related Links

[Data Pipelines Reference](/data-pipelines)

[Load Delimited and CSV Data](/data-formats-for-data-pipelines#load-delimited-and-csv-data)

[Data Types for Data Pipelines](/data-types-for-data-pipelines)

[Transform Data in Data Pipelines](/transform-data-in-data-pipelines)

[Manage Errors in Data Pipelines](/manage-errors-in-data-pipelines)
