Source Configuration
A source configuration object.
Required keys:
Type of source to use in the pipeline.
Type: | string |
Required: | Yes |
Default: | |
Allowed values:
Currently, LAT supports loading files from an S3 instance (like AWS S3 or Object Storage) or from a local file system. Common configuration for all File Sources will be listed in Common File Source Configuration, followed by sections describing source-specific configurations.
File Sources are defined by creating "file groups" that represent a logical set of files to be loaded. Each file group is given a file_group_name that corresponds to the file group name in the transform section of the pipeline configuration. Each file group has settings to select the specific files that should be loaded.
The LAT supports loading from individual files with extensions such as .jsonl and .csv. GZIP compression is supported. However, files using the TAR archive format, LZOP compression, or ZIP compression are not supported.
If you load files with unsupported archive or compression formats, the load might stop or produce unexpected results.
The performance of loading into is greatly improved when records are presented in a well ordered time sequence. This sequence allows more efficient creation of segments and sorting of records into buckets. For this reason, the LAT has options to define how files in a file group should be sorted prior to loading.
If you want to load all the files under a certain directory, the file group configuration is basic:
This file group configuration will select all files under /dir1 (including those under its subdirectories) and sort them lexicographically.
However, if a user needs to selectively choose, filter, and sort the files selected, this process provides flexible options for doing so.
For an individual file group in a file type load, there are multiple steps to select, rename, filter, and sort the files retrieved. The files selected occur through this sequence:
- Prefix filtering
- File matching
- File renaming
- Timestamp extract pattern matching (only extract_timestamp)
- Range matching
- Final list sorting
For example, given these files from a local file system:
And a file_group configuration:
Step 1: Prefix Filtering
Prefix filtering occurs first and includes only files that are in the matching prefix paths. The prefix is the path part following the bucket for S3 types, and it is the path to files in a local file type load.
Result: /dir1/file.json is filtered out because it is not in the prefix.
Step 2: File Matching
Next, file matching uses a pattern to match files that should be included. File matcher patterns apply to the full path of the file including any prefix defined in the prior step.
Result: /dir1/files1/year=2021/month=01/day=01/files2/10-00-00.csv is filtered out because it does not end in .json. /dir1/files1/year=2021/month=01/day=01/file.json is filtered out because it is not in the subdirectory files1.
Step 3: File Renaming
File renaming is a step that can be useful in a case where the selected files have disparate file names that would make it difficult to extract timestamps from. Files are only renamed internally to the LAT to facilitate the file selection process; they are not actually renamed locally or on S3.
By unifying each file name into a consistent format, this step should make it easier to use the extract_timestamp sort type, or even to lexicographically sort on some parameter within the file name itself.
Result: The files that match the file_matcher_pattern regular expression are renamed using the rename_format.
Current Files at this Step:
Renamed file list that enters the next step:
Step 4: Timestamp Extract Pattern Matching
A filter is applied when the sort_type is extract_timestamp. The extract_timestamp sort type extracts timestamps from the file’s filepath. Either a path_timestamp_pattern or a file_timestamp_pattern must be set. If a filename does not match the set pattern, it will be filtered out.
Result: In this example, a path_timestamp_pattern is set, which is a DateTimeFormatter pattern that will extract a timestamp from the starting from the beginning of the filename, used for the next step. No files are filtered here because the rename step above was able to unify filename formats in a way that the path_timestamp_pattern could match on all the files.
Internal to the LAT, the files are associated with their datetimes, which will be used in the next step:
There are potential pitfalls with using the extract_timestamp sort_type, described in the Source Configuration section.
Step 5: Range Matching
Range matching occurs next based on the chosen sort_type algorithm.
- For extract_timestamp or metadata, a timestamp is associated with each file for sorting. A start and end time can be optionally provided to limit files to select.
- For lexicographic, the filename itself is used for sorting. A start and end file name can be provided to limit the files that are selected.
Result: The file 2021-01-01-12-00-00 is filtered out because it is not in the start and end time ranges.
Step 6: Final List Sorting
Finally, the fully filtered file list contains two files from your original list. These are then sorted according to the sort_type algorithm and partitioned across the workers for loading.
Remaining files before sorting:
Final list - sorted, renamed files:
(original file names):
The names defined for file_groups ("my_file_group" in this example) should match the file_groups used in the transform configuration section.
The names defined for file_groups ("my_file_group" in this example) should match the file_groups used in the transform configuration section.
source.file_groups
An object that maps file group names to their corresponding configuration objects.
Type: | object |
Required: | Yes |
Default: | |
source.file_groups.<file_group_name>
The configuration object for a file group.
Type: | object |
Required: | Yes |
Default: | |
source.file_groups.<file_group_name>.prefix
For an S3 Source, the prefix is used to get a subset of S3 objects from the bucket. A more specific prefix can improve time required to list files in S3 sources.
For a Local File Source, prefix is an absolute or relative path from the working directory of the LAT. In either case, the path can be a file or a directory. If the path is a directory, LAT tries to load all files recursively under this path. When looking for files, LAT follows the symbol links. LAT only loads regular files that are readable and not hidden. Note that shell-specific expansions like ~ are not supported.
You can include multiple prefixes for a single file group if you specify this property as an array of strings. In that case, the LAT loads all files that are under any of the prefixes in that array. The system loads files that match more than one prefix within a single file group only once.
For a Local File Source, the path used for prefix is the path on the server where the LAT is running, which might not be the same machine where you are running the LAT Client Command Line Interface to create your pipeline.
Type: | string or string array |
Required: | No |
Default: | "" |
source.file_groups.<file_group_name>.file_matcher_syntax
The file_matcher_syntax defines the type of syntax used by the file_matcher_pattern setting, which includes matching files in the file group. The pattern is applied to the fully qualified file name in the list of files found under the prefix.
See the file_matcher_pattern description for examples and complete syntax details for glob and regex options.
Type: | string |
Required: | No, although it is not valid to have only one of file_matcher_syntax or file_matcher_pattern be provided |
Default: | glob |
Allowed values:
- glob - a simplified pattern matching system based on wildcards
- regex - a regular expression syntax
source.file_groups.<file_group_name>.file_matcher_pattern
The pattern used to select files from the prefix-filtered list. Files that match the pattern are included in the file group. The fully qualified file name (path and filename under the prefix) are matched. Files that do not match are excluded from the file group.
This pattern is also used in the case of LAT file renaming - if a rename_format is provided, the LAT will attempt to internally rename files using capture groups in the file_matcher_pattern. If capture groups are provided in the file_matcher_pattern, they will be extracted and placed into the rename_format in order to generate a renamed file. Capture groups do not need to be named; the captured values are assigned to the rename_format sequentially, i.e. the first capture group in the file_matcher_pattern will be placed in {1} in the rename_format.
Files are only renamed internally to the LAT to facilitate the file selection process; they are not actually renamed locally or on S3.
This matched file list can be further filtered using an extract_timestamp's pattern and the start/end ranges of any file group type algorithm. See an expanded example in the File Group Filtering and Sorting example section.
A double backslash within the regular expression is necessary to generate valid escape characters for the pipeline.json. For example, the capture group (\d{4}) needs to be escaped as (\\d{4}).
Type: | string |
Required: | No, although it is not valid to have only one of file_matcher_syntax or file_matcher_pattern be provided |
Default: | ** |
Example
With a file_matcher_syntax of glob and file_matcher_pattern of **.json, all files (after prefix-filtering) that end in .json will be selected.
A ** in the glob based pattern ensures that any pattern including directory boundaries are matched. A pattern * does not match the directory character /.
Example
With a file_matcher_syntax of regex, a file_matcher_pattern of auctions.*-2021.json would use a regular expression to ensure that files such as auctions-12-01-2021.json and auctions-12-02-2021.json would be selected.
See getPathMatcher() for complete syntax details of the glob and regex options.
Common Matcher Patterns:
Matcher Syntax | Pattern | Meaning |
glob | * | Matches zero or more characters without crossing directory boundaries |
glob | ** | Matches zero or more characters crossing directory boundaries |
glob | ? | Matches exactly one character of a name component |
glob | [] | Matches any of the characters in the bracket expression (e.g., [abc]). Supports ranges. |
regex | . | Matches any character |
regex | \d | Matches any digit |
regex | \D | Matches any non-digit |
regex | [] | Matches any of the characters in the bracket expression (e.g., [abc]). Supports ranges. |
regex | * | Matches the preceding character zero or more times |
regex | + | Matches the preceding character one or more times |
regex | ? | Matches the preceding character once or not at all |
source.file_groups.<file_group_name>.rename_format
A Java® MessageFormat string. Captured groups from the file_matcher_pattern will be placed into the rename_format string’s format elements corresponding to the order they were captured. The renamed file is used for subsequent sorting of files. When using lexicographic sort, the renamed file is used instead of the original filename. For extract_timestamp sorting, the path_timestamp_pattern is applied against the renamed file. This is best elucidated with an example:
Files are only renamed internally to the LAT to facilitate the file selection process; they are not actually renamed locally or on S3.
Type: | string |
Required: | No |
Default: | None - if rename_format is not provided, files will not undergo a rename step. |
source.file_groups.<file_group_name>.sort_type
Defines the sorting algorithm used for sorting the files selected for loading in this file group by time. Files should be sorted in time order for best loading performance and the creation of efficient Ocient segments. sort_type can be extract_timestamp, metadata, or lexicographic, and different settings apply to each of these selections.
Type: | string |
Required: | Yes |
Default: | none |
Allowed values:
- extract_timestamp: extract the timestamp from the file name or file path information. Either path_timestamp_pattern or file_timestamp_pattern is required when choosing this sort_type.
- metadata: extract the timestamp from the source file’s metadata. This option uses the file last modified time when sorting the list.
- lexicographic: sort files based on the alphanumeric sort of files in the file group using the full path and filename as the sorting key.
The system breaks the tie among files in a lexicographic way by using their fully qualified original file names.
source.file_groups.<file_group_name>.path_timestamp_pattern
A Format string with DateTimeFormatter patterns used to extract a datetime from the path portion of a file’s fully-qualified filename. This timestamp is used with the extract_timestamp sort type to order the selected files in the file group prior to loading. It is attempted on every file’s fully-qualified filename under the prefix; files that do not match the pattern will be skipped and the remaining subset will be used for sorting.
Type: | string |
Required: | One of path_timestamp_pattern or file_timestamp_pattern is required for an extract_timestamp sort type file group |
Default: | none |
source.file_groups.<file_group_name>.file_timestamp_pattern
A Format string with DateTimeFormatter patterns used to extract a datetime from the base filename of a file’s fully-qualified filename. This timestamp is used with the extract_timestamp sort type to order the selected files in the file group prior to loading. It is attempted on every file’s base filename under the prefix; files that do not match the pattern will be skipped and the remaining subset will be used for sorting.
Type: | string |
Required: | One of path_timestamp_pattern or file_timestamp_pattern is required for an extract_timestamp sort type file group |
Default: | none |
source.file_groups.<file_group_name>.start_time
An ISO-8601 compliant Date or Datetime, used as the lower bound to filter for files in a extract_timestamp or metadata sort type file group. Datetimes are UTC unless a timezone is provided. Inclusive.
Type: | string |
Required: | No |
Default: | Instant.MIN |
source.file_groups.<file_group_name>.stop_time
An ISO-8601 compliant Date or Datetime, used as the upper bound to filter for files in a extract_timestamp or metadata sort type file group. Datetimes are UTC unless a timezone is provided. Exclusive.
Type: | string |
Required: | No |
Default: | Instant.MAX |
Extract Timestamp Default Values
With an extract_timestamp sort_type, if a certain time unit is not provided, they will default to their respective value in `1970-01-01T00:00:00.000000`.
Example:
The full timestamp extracted from this filename will be 1970-07-03T00:00:00.000000 UTC.
Potential Pitfalls
Pitfall 1: This default value should be considered when a start_time or stop_time is provided but a certain time unit is not present within the filename.
Example:
Note that 1970 was used as the year in the start_time and stop_time, but the system could not extract a year from the filenames. An alternative strategy is to use the File Renaming step to insert missing dates or times.
Pitfall 2: Only applicable to using the extract_timestamp sort_type with start_time and stop_time: Consider the time unit granularities between your pattern and start/stop times.
Example:
Given this configuration, one might expect that the files that are hour 4 and later would be filtered out. However, because the pattern does not extract the times from the filenames, each file’s extracted datetime in ISO 8601 format is 2021-01-01T:00:00:00UTC, so no files are outside the range and filtered. Make sure to extract as much information about the datetime as necessary.
source.file_groups.<file_group_name>.start_file
A string, used as the lower bound to filter for files in a lexicographic sort type file group. Inclusive.
Type: | string |
Required: | No |
Default: | none, but a lexicographic sort type file group will not check for a lower bound if this is not present. |
source.file_groups.<file_group_name>.stop_file
A string, used as the upper bound to filter for files in a lexicographic sort type file group. Inclusive.
Type: | string |
Required: | No |
Default: | none, but a lexicographic sort type file group will not check for an upper bound if this is not present. |
source.file_groups.<file_group_name>.compression
The compression method for the files in this file group. If this value is set, it will override source.compression; if not set, it will inherit source.compression. See source_compression for available options.
Type: | string |
Required: | No |
Default: | null |
source.file_groups.<file_group_name>.bucket
This setting is available only for S3 sources.
If specified, this setting overrides the source-wide bucket for this file group. This allows loading from multiple buckets at once within the same pipeline.
If a bucket is specified for every file group, you can omit the source.bucket.
Type: | string |
Required: | No |
Default: | null |
source.partitions
The number of partitions for each file group. For example, if there are 2 file groups and partitions is set to 32, then a total of 64 partitions will exist, with 32 for each file group. The partition index is 0-based for each file group. In the above example, each file group will have partitions [0, 31].
When partitions is specified for a load, its value must not change for the lifetime of this load, otherwise records cannot load correctly.
When using the LAT Client, partitions will be automatically assigned by the client. If it is set in the source configuration provided by the user, it will be overwritten.
Type: | int |
Required: | No |
Default: | Set by the LAT client |
source.partitions_assigned
A 2-element array indicating the minimum and maximum partition indices that this LAT Node owns, inclusive on both ends. For example, [0, 15] means that the node owns Partitions 0, 1, 2, …, 15. The first element should be greater than or equal to 0. The second element should be greater than or equal to the first element, and less than the number of partitions.
When using the LAT Client, partitions_assigned will be automatically assigned by the client. If it is set in the source configuration provided by the user, it will be overwritten.
Type: | int array |
Required: | No |
Default: | Set by the LAT client |
source.compression
The compression method for the files. Currently, only none and gzip are supported. none means that the files are not compressed. Note that this value sets the default compression method for all the files from this source, and it can be overridden by source_file_groups_compression per file group.
Type: | string |
Required: | No |
Default: | none |
source.chunk_size
The size of a chunk when fetching data. For example, if you have a 40 MB file and the chunk size is 16 MB, you can issue 3 requests sequentially to get the file. The unit is MB. The value must be greater than 0.
The chunk size should be larger than or equal to the maximum record size.
Type: | int |
Required: | No |
Default: | 16 |
source.buffer_size
The total buffer size for all assigned partitions. This value will be divided by the total number of assigned partitions to calculate each partition’s buffer size. For example, if this value is 4096, source.partitions is 16, source.partitions_assigned is [0, 3], and you have 2 file groups, then you have a total of (3-0+1)*2=8 assigned partitions, and each partition will get 4096/8=512 MB as the buffer size. The unit is MB. The value must be greater than 0.
The buffer size per partition must be at least twice the value of source.chunk_size.
Type: | int |
Required: | No |
Default: | 4096 |
source.max_fetch_concurrency
The maximum concurrency when fetching files for each partition. Setting this configuration too high can result in thread contention. The value must be greater than 0.
Type: | int |
Required: | No |
Default: | 2 |
source.endpoint
The endpoint for the S3 instance, usually starting with http or https. It can be an IP address or a domain. For example, for AWS S3 in the us-east-2 region, endpoint would be https://s3.us-east-2.amazonaws.com. The S3 Source supports Virtual-hosted-style access and Path-style access. The s3:// protocol based access is not supported. For more details, see Methods for Accessing an S3 Bucket
Type: | string |
Required: | Yes |
Default: | |
source.region
The region for the S3 instance. This is mostly used for AWS S3 instances.
Type: | string |
Required: | No |
Default: | us-east-2 |
source.bucket
The bucket from which to get S3 objects.
You can override the bucket on a per-file group basis by setting the source.file_groups.<file_group_name>.bucket.
This setting is required unless it is specified for every file group.
Type: | string |
Required: | No |
Default: | |
source.path_style_access
Whether to force path style access for the S3 instance. If set to false, LAT will try to use the virtual-hosted style (using DNS subdomains), and falls back to the path style access.
AWS S3 is deprecating path style access according to this post.
Type: | boolean |
Required: | No |
Default: | false |
source.access_key_id
The access key for the S3 instance. This should be used together with the source.secret_access_key setting. If either of them is absent, LAT will default to the next item in the credentials hierarchy. See S3 Credentials Hierarchy.
Type: | string |
Required: | No |
Default: | null |
source.secret_access_key
The access secret for the S3 instance. This should be used together with the source.access_key_id setting. If either of them is absent, LAT will default to the next item in the credentials hierarchy. See S3 Credentials Hierarchy.
Type: | string |
Required: | No |
Default: | null |
source.session_token
Temporary credentials can be made with a combination with the existing secret ID and secret key, and an additional session token. Note that this is only used when both source.access_key_id and source.secret_access_key are specified.
Type: | string |
Required: | No |
Default: | null |
source.retries
Configures the number of retries that will be attempted when download from S3 source fails.
Type: | int |
Required: | No |
Default: | Default attempts specified in AWS SDK. |
source.backoff_strategy_base_delay_seconds
Configures the base delay for the backoff strategy of the S3 client in seconds.
Type: | int |
Required: | No |
Default: | 1 |
source.backoff_strategy_max_backoff_seconds
Configures the maximum backoff delay for the backoff strategy of the S3 client in seconds.
Type: | int |
Required: | No |
Default: | For the default delay, see the AWS SDK documentation. |
source.max_pending_connection_acquires
Configures the maximum number of pending acquires allowed by the Netty client.
Type: | int |
Required: | No |
Default: | For the default number of acquires, see the AWS SDK documentation. |
source.netty_read_timeout_seconds
Configures the read timeout, in seconds, of the Netty client. When you set this value to zero, the system disables the read timeout. The LAT configures read timeouts to be tried again by the S3 Client.
Type: | int |
Required: | No |
Default: | For the default timeout value, see the AWS SDK documentation. |
source.connection_timeout_seconds
Configures the amount of time in seconds for the Netty client to wait when initially establishing a connection before giving up and timing out. The LAT configures connection timeouts to be retryable by the S3 Client.
Type: | int |
Required: | No |
Default: | For the default timeout value, see the AWS SDK documentation. |
source.connection_acquisition_timeout_seconds
Configures the amount of time in seconds for the Netty client to wait when acquiring a connection from the pool before giving up and timing out. The LAT configures connection acquisition timeouts to be retryable by the S3 Client.
Type: | int |
Required: | No |
Default: | For the default timeout value, see the AWS SDK documentation. |
source.requester_pays
Configures whether or not the requester (i.e. the LAT user) should be charged for downloading data from the S3 Requester Pays buckets. This configuration should be set to true whenever you request from a Requester Pays bucket, or else the request fails and the bucket owner is charged for the request.
Type: | boolean |
Required: | No |
Default: | falsed |
The S3 source configuration supports the following hierarchy to obtain S3 credentials. If the LAT does not obtain the credentials at a level, the LAT tries the lower level.
- Level 1: Pipeline configuration
- Level 3: (Default) Anonymous access
You can choose the level to store the credentials where Level 1 is the highest. Higher levels take precedence for credential storage.
The Source allows LAT to connect to a Kafka cluster and process records from one or more topics. You can reference each Kafka topic in the Transform Configuration and can load into one or more tables.
In addition to the LAT specific configuration, this Source supports configuration passthrough to the underlying Kafka library which this Source uses. There are some library configurations which are not allowed, and will be noted in this table.
source.end_offsets_polling_duration
Frequency with which to poll for end offsets for lag calculation, in milliseconds.
Type: | int |
Required: | No |
Default: | 30000 |
source.kafka
Kafka consumer configuration object. See ConsumerConfig for details.
Type: | object |
Required: | Yes |
Default: | |
source.kafka
Required Keys:
- This key is required for connection information to the Kafka servers.
Common Keys:
- This key is commonly set to earliest to consume all of the data in an existing Kafka topic.
The following Kafka consumer configuration options are not allowed:
- kafka.enable.auto.commit
- LAT will always internally set this configuration to false; it is critical for correct operation to do so.
- kafka.key.deserializer
- LAT will always internally configure deserialization
- kafka.value.deserializer
- LAT will always internally configure deserialization
If any of the disallowed Kafka library configurations are set, a warning will be logged and their configurations will be ignored.
- kafka.group.id
- If unset. LAT will configure group.id internally using the value of ocient-lat-[$pipeline_id]. Most users should leave this unset unless they want to explicitly control the group.id that is being used.
Load Data