Kafka static ingest consumer reference
FeatureBase has three import methods for Kafka data:
- Kafka consumer
- Kafka static consumer
- Kafka delete consumer
In all cases, the FeatureBase ingest tool:
- Streams and reads Avro-encoded records from an Apache Kafka topic over HTTPS
- Decodes the records
- Converts the records to FeatureBase Streaming Bitmap format
- Writes the converted records to the target database table
Table of contents
Before you begin
Kafka CLI Syntax
molecula-consumer-kafka-static \
<source-and-target-flags> \
<kafka-common-flags> \
<kafka-static-flags> \
<id-flags> \
<batch-flags> \
<error-flags> \
<log-stat-flags> \
<testing-flags> \
<kafka-auth-flags> \
<kafka-ssl-flags> \
Common flags
Flag | Data type | Description | Default | Required | Additional |
---|---|---|---|---|---|
--batch-size | int | Number of records to read before indexing them as a batch. Recommended: 1,048,576 | 1 | A larger value indicates better throughput and more memory usage. | |
--concurrency | int | Number of concurrent sources and indexing routines to launch. | 1 | When ingesting multiple CSV files | Does not support SQL ingestion or --auto-generate |
--featurebase-hosts | string | Supply FeatureBase default bind points using comma separated list of host:port pairs. | [localhost:10101] | ||
--index | string | Name of target FeatureBase index. | Yes | ||
--string-array-separator | string | character used to delineate values in string array | , | ||
--use-shard-transactional-endpoint | Use alternate import endpoint that ingests data for all fields in a shard in a single atomic request. | Recommended. | Flag has negative performance impact and better consistency |
Kafka common flags
Flag | Data type | Description | Default | Required | Additional |
---|---|---|---|---|---|
–allow-decimal-out-of-range | bool | Allow ingest to continue when it encounters out of range decimals | false | ||
--allow-int-out-of-range | bool | Allow ingest to continue when it encounters out of range integers | false | ||
--allow-missing-fields | bool | Ingest consumer will continue even if fields are specified in a JSON config file but missing from a record | false | Recommended for Kafka static | |
--allow-timestamp-out-of-range | bool | Allow ingest to continue when it encounters out of range timestamps | false | ||
--group | string | Kafka group. | “defaultgroup” | ||
--kafka-bootstrap-servers | strings | Comma separated list of host:port pairs for Kafka | [localhost:9092] | Kafka properties bootstrap server | |
--kafka-client-id | string | (client.id) | |||
--kafka-debug | string | Choose one or more debug contexts to enable as a comma separated list. | Kafka debug contexts | ||
--kafka-hosts | string | Comma separated list of host:port pairs for Kafka. | [localhost:9092] | ||
--max-msgs | int | Number of messages to consume from Kafka before stopping. | Useful for testing when you don’t want to run indefinitely | ||
--skip-old | Skip to the most recent Kafka message rather than starting at the beginning | ||||
--timeout | duration | Time to wait for more records from Kafka before flushing a batch | 1s | 0 to disable | |
--topics | string | Kafka topics to read from | [defaulttopic] |
Kafka static flags
Flag | Data type | Description | Default | Required | Additional |
---|---|---|---|---|---|
--header | string | Path to the static schema definition or “header” file in JSON format which can be located on the local file system or an S3 URI | For --s3-region or AWS_REGION | Kafka Static schema ingest source | |
--s3-region | string | S3 Region, optionally used when header is specified as an S3 URI. | Required for --header <s3-URI> | Alternatively, use AWS-REGION environment variable | |
--dead-letter-queue-host | string | Dead letter queue host:port. Kafka static consumer can be configured to report failed messages to a DLQ and continue processing without exiting. Specifying a valid DLQ host and topic will enable this optional behaviour. | |||
--dead-letter-queue-topic | string | Dead letter queue topic the failed messages should be routed to. |
Generate ID flags
Flag | Data type | Description | Default | Required |
---|---|---|---|---|
--auto-generate | Automatically generate IDs. Used for testing purposes. Cannot be used with --concurrency | When --id-field or --primary-key-fields not defined | ||
--external-generate | Allocate _id using the FeatureBase ID allocator. Supports --offset-mode . Requires --auto-generate | |||
--id-alloc-key-prefix | string | Prefix for ID allocator keys when using --external-generate . Requires different value for each concurrent ingester | ingest | |
--id-field | string | A sequence of positive integers that uniquely identifies each record. Use instead of --primary-key-fields | if --auto-generate or --primary-key-fields not defined | |
--primary-key-fields | string | Convert records to strings for use as unique _id . Single records are not added to target as records. Multiple records are concatenated using / and added to target as records. Use instead of --id-field | [] | If --auto-generate or --id-field are not defined. |
--offset-mode | Set Offset-mode based Autogenerated IDs. Requires --auto-generate and --external-generate | When ingesting from an offset-based data source |
Batch processing flags
flag | data type | Description | Default | Required |
---|---|---|---|---|
--batch-size | int | Number of records to read before indexing them as a batch. A larger value indicates better throughput and more memory usage. Recommended: 1,048,576 | 1 |
Error handling flags
flag | data type | Description | Default | Required |
---|---|---|---|---|
--allow-decimal-out-of-range | Allow ingest to continue when it encounters out of range decimals in Decimal Fields. | false | ||
--allow-int-out-of-range | Allow ingest to continue when it encounters out of range integers in Int Fields. | false | ||
--allow-timestamp-out-of-range | Allow ingest to continue when it encounters out of range timestamps in Timestamp Fields. | false | ||
--batch-max-staleness | duration | Maximum length of time the oldest record in a batch can exist before the batch is flushed. This may result in timeouts while waiting for the source | ||
--commit-timeout | duration | A commit is a process of informing the data source the current batch of records is ingested. --commit-timeout is the maximum time before the commit process is cancelled. May not function for CSV ingest process. | ||
--skip-bad-rows | int | Fail the ingest process if n rows are not processed. |
Logging & statistics flags
Flag | Data type | Description | Default | Required |
---|---|---|---|---|
--log-path | string | Log file to write to. | Empty means stderr. | |
--pprof | string | host:port on which to listen for pprof go package | “localhost:6062” | |
--stats | string | host:port on which to host metrics | “localhost:9093” | |
--track-progress | Periodically print status updates on how many records have been sourced. | |||
--verbose | Enable verbose logging. | |||
--write-csv | string | Write ingested data to the named CSV file. |
Testing flags
flag | Description | Default | Required |
---|---|---|---|
--delete-index | Delete an existing index specified by --index before starting ingest. USE WITH CAUTION | ||
--dry-run | Parse flags without starting an ingest process |
Kafka authentication flags
kafka-tls
flags authenticate with the Kafka instance and can be used with tls
flags that authenticate with the FeatureBase server.
Flag | Data type | Description | Default | Required |
---|---|---|---|---|
--kafka-tls.ca-certificate | string | Path to CA certificate file, or literal PEM data. | ||
--kafka-tls.certificate | string | Path to certificate file, or literal PEM data. | ||
--kafka-tls.enable-client-verification | string | Enable verification of client certificates. | ||
--kafka-tls.key | string | Path to certificate key file, or literal PEM data. | ||
--kafka-tls.skip-verify | Disables verification of server certificates. |
Kafka SSL keys
Flag | Data type | Description | Default | Required | Additional |
---|---|---|---|---|---|
--kafka-enable-ssl-certificate-verification | (enable.ssl.certificate.verification) | ||||
--kafka-group-instance-id | string | The (group.instance.id) kafka consumer configuration | |||
--kafka-max-poll-interval | string | The (max.poll.interval.ms) kafka consumer configuration. The max time the Kafka consumer can go without polling the broker. | Kafka consumer exits after this timeout. | ||
--kafka-sasl-mechanism | string | SASL mechanism to use for authentication.(sasl.mechanism) | |||
--kafka-sasl-password | string | SASL authentication password (sasl.password) | |||
--kafka-sasl-username | string | SASL authentication username (sasl.username) | |||
--kafka-security-protocol | string | Protocol used to communicate with brokers (security.protocol) | |||
--kafka-session-timeout | string | The (session.timeout.ms) kafka consumer configuration. | The max time the Kafka consumer can go without sending a heartbeat to the broker | ||
--kafka-socket-keepalive-enable | string | The (socket.keepalive.enable) kafka consumer configuration | |||
--kafka-socket-timeout-ms | int | (socket.timeout.ms) | |||
--kafka-ssl-ca-location | string | File or directory path to CA certificate(s) | Used for verifying the broker’s key (ssl.ca.location) | ||
--kafka-ssl-certificate-location | string | Path to client’s public key (PEM) | Used for authentication(ssl.certificate.location) | ||
--kafka-ssl-endpoint-identification-algorithm | string | The endpoint identification algorithm used by clients to validate server host name (ssl.endpoint.identification.algorithm) | |||
--kafka-ssl-key-location | string | Path to client’s private key (PEM) | Used for authentication(ssl.key.location) | ||
--kafka-ssl-key-password | string | Private key passphrase | fUsed with ssl.key.location and set_ssl_cert()(ssl.key.password) |
Additional information
- Run
./molecula-consumer-kafka-static
from the/featurebase/idk
directory.
batch
additional
There is no default batch-size
because memory usage per record varies between workloads.
During ingestion processing, there is a fixed overhead:
- from setting up an ingester transaction
- for each row
Setting large batch-size
values will:
- average-out the overheads
- proportionally increase memory usage
- improve performance (in general terms)
For example:
Workload includes | Batch size | Typical memory usage (MB) |
---|---|---|
High number of sparse keys | 20,000 | 100+ |
High-frequency keys | 1,000,000+ |
concurrency
additional
The concurrency
ingest flag is used to run ingesters in parallel which can:
- improve utilization on multi-core systems
- allow for redundancy
Alternatively, ingest processes can be launched individually on different environments.
List all the flags by entering idk/molecula-consumer-kafka
from the /featurebase
directory.
kafka-debug
contexts
Add one or more debug contexts as a comma-separated list to the --kafka-debug
parameter:
Context | Description |
---|---|
consumer | |
cgrp | |
topic | |
fetch | |
all | Set for verbose debugging |
Missing value processing
Missing and empty string values are handled the same.
Field data type | Expected behaviour |
---|---|
"ID" | Error if "ID" selected for id-field. Otherwise, do not update value in index. |
"DateInt" | Raise error during ingestion - timestamp must have a valid value. |
"Timestamp" | Raise error during ingestion - input is not time. |
"RecordTime" | Do not update value in index. |
"Int" | Do not update value in index. |
"Decimal" | Do not update value in index. |
"String" | Error if "String" select for primary-key field. Otherwise do not update value in index. |
"Bool" | Do not update value in index. |
"StringArray" | Do not update value in index. |
"IDArray" | Do not update value in index. |
"ForeignKey" | Do not update value in index. |
Quoting values
Use double quotes “…” to enclose fields containing:
- Line breaks (CRLF)
- Commas
- double quotes
config
options for data types
- Use the
config
flag when changing flags from default values.
Kafka environment variables
To use flags as environment variables:
- prefix flags with
CONSUMER_
- convert dots
.
and dashes-
to underscores_