LAT Advanced Topics
The loading system in is designed to support dynamic schema changes. The LAT allows data to load continuously even while database tables are altered (e.g., ADD COLUMN, DROP COLUMN). In this way dynamic changes can occur on the database and the pipeline can be updated later to begin streaming the revised data through the pipeline into the revised tables. The typical flow for updating a table is to first execute a DDL Command such as "ALTER TABLE … ADD COLUMN …" and then when it is completed to execute the update on the LAT Pipeline to include (or remove) the altered column in the transformation.
The LAT provides 2 ways to view loading, transformation, or binding errors that occur during a pipeline.
If an source is used, failed records can optionally be routed to a configurable error_topic. This topic will contain the records that fail and their failure reason/exception. The topic can be used to gain insight into why a given record failed to load. Each entry in the topic will contain the following:
- Value[byte[]]: A byte array containing the record itself.
- Headers:
- topic: The topic from which the record originated.
- partition: The partition from which the record originated.
- offset: The partition offset from which the record originated.
- state: The state of the record (the location where the record encountered an error).
- exception: The exception associated with the error
- exception_message: The exception message, if it exists.
If you use a File source, or if the Error Topic configuration is not set, the LAT sends errors to a dedicated error log file. If you use the default log4j2 configuration, this error log can be found alongside the rest of the LAT logs in error.log and accessed using the LAT Client with the Subcommands.
If you use a custom log4j2 configuration, your appender configuration should look similar to this code.
If the LAT_ALLOW_LOG_ORIGINAL_RECORDS service configuration and the log_original_records pipeline configuration are both set to true, the error log file includes a JSON representation of the record that caused an error.
If you use a custom log4j2 configuration, you must still use a RollingRandomAccessFileAppender named ErrorLog. Also, PatternLayout must still be %m%n, because the errors API can retrieve errors. If you do not use RollingRandomAccessFileAppender, the LAT does not start. If PatternLayout is not correct, the errors API does not work.
The LAT loads rows from data sources in an exactly-once fashion. This is made possible by row level deduplication for LAT pipelines and the ability to replay records from a source. In short, if the same records are replayed through the LAT, there are specific scenarios that guarantee that no duplicate records will be persisted into the Ocient tables.
For a deeper understanding this works, a few key concepts are explained here:
- Partitioning Data
- The Durability Horizon
- Deduplication Scope
To deliver high throughput loading, the LAT partitions the data source into independent sets of data. These are then loaded in parallel across all LAT instances. Each partition is considered a well-ordered sequence of rows that is replayable.
Some sources like Kafka natively support the concept of partitions and have a native record ID as part of their protocol.
Others, like a batch of files from S3, require LAT to partition the data on its own and assign a record ID. For file loading, LAT establishes a record ID based on the sorted list of files in the file group and the row of each record within the files.
Altering the list of files in the target directory on the source system can change the record ID. This will impair the ability of the LAT to properly deduplicate rows if a file loading pipeline is stopped and restarted.
Within a partition, each record is assigned a unique record ID. This ID is monotonically increasing within a partition. As data is loaded through the LAT and into Ocient’s page stores, this data is said to become "Durable" meaning that in the event of a node shutdown, the data would be preserved on non-volatile storage. At this point the record is no longer in memory, but stored on disk in a redundant fashion. The "Durability Horizon" is the largest record ID that has become Durable on each partition of data. If a previously loaded record were replayed it is recognized as a duplicate of the original record and ignored. New records are loaded and the Durability Horizon is increased.
Deduplication is constrained based on a few settings in the LAT pipeline configuration. Each pipeline ID is considered an independent loading task. Additionally, each topic or file group is considered an independent data set. As a result, deduplication does not apply between different pipelines or different topics and file groups even if they are loading the same underlying files.
Records are deduplicated when all of the following are true:
- The pipeline ID matches
- The topic name or file group name matches
When this is true, any record with a record ID less than or equal to the current Durability Horizon will be considered a duplicate and ignored. When the record ID has progressed higher than the Durability Horizon, then new data will begin loading into the database.
If no pipeline_id is set, the LAT Client will automatically assign a randomly generated ID, resulting in no deduplication across different pipelines. In the event that you want deduplication between multiple pipelines, the pipeline_id should be copied from the previous pipeline and explicitly set. The topic/file groups must also be the same to ensure deduplication.
When updating an existing file load pipeline to select a different group of files by altering start/stop time or another filter, be sure to use a new pipeline ID (best accomplished by avoiding setting an explicit pipeline ID when creating the pipeline with the client). Otherwise, unexpected results can occur such as rows in a new file being considered a duplicate.
LAT V2 modified the way the deduplication scope is calculated. In V1 only the topic name was used for scope calculation. Therefore, to maintain deduplication in a pipeline that is being upgraded from V1 to V2 must be explicitly set to (the empty string) upon creation.