Loading Characteristics and Concepts
When considering trillions or quadrillions of distinct records being stored and accessed as part of an ever-growing data warehouse, the manner in which data is loaded is important. The loading infrastructure of the data warehouse must not only provide strong correctness and durability guarantees in service of modern data pipelines, but must do so at very high load rates as well. For example, if an individual record averages 1000 bytes in size and the data warehouse must store 100 trillion records, the size of the records to be loaded is 10^14 * 10^3 = 10^17 bytes, or 100 petabytes. Note that this is pre-compression because this is the volume of data that will be presented, row by row, to the data warehouse. If this information represents 1 year’s worth of records, the loading system must be prepared to handle a steady-state average of ~25 Gbps. When the data rate is not completely uniform in time (perhaps displaying cyclic behavior associated with the day/night cycle) it is easy to imagine the required load rate can measure in the 100’s, or even 1000’s of Gbps.
Obviously the sizes and rates are arbitrary, but they are not unrealistic. Consequently a high-throughput, scalable, and robust loading infrastructure is a first-class citizen in an , on par complexity-wise with the execution engine itself.
The distinction between "stream" and "batch" is somewhat arbitrary, depending on the system in consideration, but in the Ocient loading infrastructure the fundamental unit of loading is a single record in a table. Put another way, the various guarantees listed here are made on a record-by-record basis and not on batches of records. Supporting this is the concept of a record stream of which the loading infrastructure can manage an unbounded number. A record stream is an ordered sequence of records, and loading considers each record in a stream independently of each other.
A key design goal of the loading system was a low time to query-ability or TTQ. Strict guarantees are not made but this usually means that any record presented to the loading system is available to be queried on the order of seconds. An interesting property of the implementation is that the variance on TTQ decreases as the aggregate record rate increases. This is because the loading system maintains internal structures called pages that accumulate records, and the faster they fill the more quickly they are made available for querying.
A necessary requirement of low latency querying is the ability to ensure that any particular record is stored in a fault-tolerant manner. To guarantee correctness of query results it must be the case that when a record is considered during query execution (either included or filtered out) it must be considered for all subsequent queries until it is deleted by the user. The durability guarantee made by the loading system must come before query-ability (and therefore is measured in seconds on a record-by-record basis) and matches the system’s configured fault tolerance. For example, if the erasure coding scheme in use has a K value of 2, then the loading system must store all records in a manner that allows for at least 2 losses before they can be made queryable.
The unit of durability in the loading system is called a page and within the loading and storage system, a page shares many characteristics with the segments: each page is a self-contained set of records and their own local metadata and statistics, and the consensus-based ownership scheme mediated by the storage clusters is the same for both. However, there are some key distinctions:
- Typical Size: A page is usually substantially smaller than a segment. While segments can be multiple gigabytes in size, pages are more commonly around 100 megabytes.
- Replication: Segments are grouped together into segment groups and mutually erasure coded for fault tolerance. Pages, however, are replicated within the storage layer. This is less space efficient for the same fault tolerance but pages are short lived: they exist only long enough to be converted into segments, which are denser and more rich in metadata. Because they do not exist for very long, the extra storage overhead of the replication scheme is inconsequential and the loading system opts not to waste the computational effort computing the erasure coded parity data.
When the record stream sources can support it, the Ocient loading infrastructure is capable of providing exactly-once semantics for every record loaded. This means that the record source and the query layer can be guaranteed that every record presented for load is not only not lost (no "gaps" exist) but also not duplicated. On the surface this seems like an obvious and simple requirement, but at millions of records per second in a fault-tolerant distributed system, it is extremely non-trivial to provide this guarantee.
The loading systems of other database engines can also provide this guarantee but often with caveats that might not be acceptable. For example, executing a sequence of INSERT INTO statements within the context of a transaction is a possible approach, but distributed transaction processing does not easily scale to millions of records per second. Beyond that there is the issue of how to manage inter-transaction guarantees that place additional requirements on the data sources.
If transactions are not in use, it is generally not possible to provide exactly-once guarantees without a bi-directional contract executed by the record source and the loading system. In a very common form of this contract, the loading system provides the exactly-once guarantee by guaranteeing it will de-duplicate any records presented more than once (i.e. the loading system is idempotent with respect to record insertions). Then, the source can safely re-send any records the loading system does not indicate it has received. So long as the source resends anything it has not been explicitly told was received (and made durable) the two endpoints can together guarantee every record will be present exactly once.
When this approach is taken, the loading system’s job becomes simple deduplication. On some systems, this is achieved using unique record identifiers. First, the source must be able to indicate a column or set of columns whose values taken together for any particular record uniquely identify it. Then, for each record inserted, a hash set or other structure is consulted. If the unique identifier is not present, it is inserted, if it is, the record is silently dropped. While technically cheap to implement, this scheme suffers because the set structure cannot be allowed to grow indefinitely, and therefore some time or size based bound is placed on the exactly-once guarantee. An additional significant issue is that it is challenging and expensive to ensure the set structure is replicated or distributed amongst all nodes participating in the loading system. This presents correctness and scalability challenges to the exactly-once guarantee.
The Ocient loading system takes the idempotent deduplication approach, but performs its deduplication checks in a different manner. The crux of the scheme is that most systems that aim for exactly-once guarantees are also able to replay their record streams in the same order. This is true for queuing systems such as as well as file sources such as Parquet files stored in S3. Due to this property, the Ocient deduplication scheme tracks a durability horizon for each distinct record stream. Recall that a record stream is an ordered sequence of records. The durability horizon is an ever-advancing offset into a stream that indicates the record at which the data warehouse guarantees all previous records up to and including that record, for that stream, are guaranteed durable. Via this scheme, the source can easily identify the set of records it should send or re-send. In the loading system, deduplication becomes cheap in both time and space: the system need only store the durability horizon per stream (instead of an unbounded set), and deduplication checks are as simple as a basic interval check comparing whether or not sequences of records stored in pages overlap.
In order to enable the extremely high record and bit rates associated with hyperscale data pipelines, it is important that the loading system be capable of scaling linearly to address arbitrary loading complexity. The Ocient loading layer achieves true linear scalability due to a shared-nothing approach between nodes executing loading operations. Being linearly scalable, it is relatively straightforward to design a fault tolerant deployment that can handle any arbitrary rate by simply multiplying by the capabilities of a single node for a given schema.
With that said, it is almost impossible to specify an exact achievable load rate on a per node basis in advance. This is because the process is bound by the computational complexity of calculating indexes and metadata (which are schema-specified), the width of the erasure coding scheme (which are user-specified), and the bit rate being handled (which is both schema and data dependent). With that said, each loading node is usually bounded by memory bandwidth and it is not uncommon to be capable of steady state load rates per node measuring in dozens of gigabits per second or millions of records per second. Usually, nodes are also capable of bursting to higher rates: 10’s of millions of records per second and approaching 100 Gbps.
Ocient’s architecture is designed to maximize performance on the world’s largest datasets. Ocient’s massively parallel features storage and query processing optimizations to achieve scalable performance on commodity hardware with NVMe SSD storage. The custom-built, fully-integrated SQL execution engine and optimizer deeply integrate with the storage and I/O layer. The result is a state-of-the-art data warehouse that can deliver outstanding performance loading and querying record sets numbering in the trillions, quadrillions, and beyond.