Kafka delete ingest consumer reference
FeatureBase has three import methods for Kafka data:
- Kafka consumer
- Kafka static consumer
- Kafka delete consumer
In all cases, the FeatureBase ingest tool:
- Streams and reads Avro-encoded records from an Apache Kafka topic over HTTPS
- Decodes the records
- Converts the records to FeatureBase Streaming Bitmap format
- Writes the converted records to the target database table
To ingest data to FeatureBase tables from Confluent managed Kafka schemas, you will require:
- A list of Kafka hosts
- A FeatureBase index name (
--index <indexname>
), - One primary key method:
--primary-key-field <fieldnames>
, or--id-field <fieldname>
, or--auto-generate
The Kafka Avro delete consumer was built to supply delete functionality that doesn’t exist in the Kafka Avro consumer.
The "fields"
values in the Kafka Avro message define the data to be deleted at the --primary-key-fields
specified on the CLI.
./molecula-consumer-kafka-delete
processes Kafka messages one at a time.
Table of contents
Before you begin
Kafka CLI Syntax
molecula-consumer-kafka-delete \
<source-and-target-flags> \
<kafka-common-flags> \
<kafka-confluent-schema-flags> \
<kafka-delete-flags> \
<id-flags> \
<batch-flags> \
<error-flags> \
<log-stat-flags> \
<testing-flags> \
<kafka-auth-flags> \
<kafka-ssl-flags> \
Common flags
Flag | Data type | Description | Default | Required | Additional |
---|---|---|---|---|---|
--batch-size | int | Number of records to read before indexing them as a batch. Recommended: 1,048,576 | 1 | A larger value indicates better throughput and more memory usage. | |
--concurrency | int | Number of concurrent sources and indexing routines to launch. | 1 | When ingesting multiple CSV files | Does not support SQL ingestion or --auto-generate |
--featurebase-hosts | string | Supply FeatureBase default bind points using comma separated list of host:port pairs. | [localhost:10101] | ||
--index | string | Name of target FeatureBase index. | Yes | ||
--string-array-separator | string | character used to delineate values in string array | , | ||
--use-shard-transactional-endpoint | Use alternate import endpoint that ingests data for all fields in a shard in a single atomic request. | Recommended. | Flag has negative performance impact and better consistency |
Kafka common flags
Flag | Data type | Description | Default | Required | Additional |
---|---|---|---|---|---|
–allow-decimal-out-of-range | bool | Allow ingest to continue when it encounters out of range decimals | false | ||
--allow-int-out-of-range | bool | Allow ingest to continue when it encounters out of range integers | false | ||
--allow-missing-fields | bool | Ingest consumer will continue even if fields are specified in a JSON config file but missing from a record | false | Recommended for Kafka static | |
--allow-timestamp-out-of-range | bool | Allow ingest to continue when it encounters out of range timestamps | false | ||
--group | string | Kafka group. | “defaultgroup” | ||
--kafka-bootstrap-servers | strings | Comma separated list of host:port pairs for Kafka | [localhost:9092] | Kafka properties bootstrap server | |
--kafka-client-id | string | (client.id) | |||
--kafka-debug | string | Choose one or more debug contexts to enable as a comma separated list. | Kafka debug contexts | ||
--kafka-hosts | string | Comma separated list of host:port pairs for Kafka. | [localhost:9092] | ||
--max-msgs | int | Number of messages to consume from Kafka before stopping. | Useful for testing when you don’t want to run indefinitely | ||
--skip-old | Skip to the most recent Kafka message rather than starting at the beginning | ||||
--timeout | duration | Time to wait for more records from Kafka before flushing a batch | 1s | 0 to disable | |
--topics | string | Kafka topics to read from | [defaulttopic] |
Kafka Avro Schema Management flags
These flags apply to the ./molecula-consumer-kafka
and ./molecula-consumer-kafka-delete
CLI commands.
Flag | Data type | Description | Default | Required | Additional |
---|---|---|---|---|---|
--schema-registry-password | string | Authentication secret provided by Confluent for schema registry | For secured schema registries | Confluent Registry security overview | |
-g --schema-registry-url | string | Location of Confluent Schema Registry. | “http://localhost:8081” | Use https:// when using TLS flags | |
--schema-registry-username | string | Authentication key provided by Confluent for schema registry | For secured schema registries | Confluent Registry security overview |
Kafka delete flags
The following flags are required when the Kafka Avro message "fields"
value is set for:
- the Avro Record Schema
- the Kafka message
"delete"
property
./molecula-consumer-kafka-delete
defaults to "delete": "fields"
if the "delete"
property is not defined in the JSON message.
Flag | Data type | Description | Default | Additional |
---|---|---|---|---|
--featurebase-grpc-hosts | string | Comma separated list of host:port pairs for FeatureBase’s GRPC endpoint. | ||
--schema-registry-url | string | URL or IP address of Confluent managed schema registry | localhost:9092 | Confluent Schema Registry |
--topics | string | delete topic JSON config file |
Generate ID flags
Flag | Data type | Description | Default | Required |
---|---|---|---|---|
--auto-generate | Automatically generate IDs. Used for testing purposes. Cannot be used with --concurrency | When --id-field or --primary-key-fields not defined | ||
--external-generate | Allocate _id using the FeatureBase ID allocator. Supports --offset-mode . Requires --auto-generate | |||
--id-alloc-key-prefix | string | Prefix for ID allocator keys when using --external-generate . Requires different value for each concurrent ingester | ingest | |
--id-field | string | A sequence of positive integers that uniquely identifies each record. Use instead of --primary-key-fields | if --auto-generate or --primary-key-fields not defined | |
--primary-key-fields | string | Convert records to strings for use as unique _id . Single records are not added to target as records. Multiple records are concatenated using / and added to target as records. Use instead of --id-field | [] | If --auto-generate or --id-field are not defined. |
--offset-mode | Set Offset-mode based Autogenerated IDs. Requires --auto-generate and --external-generate | When ingesting from an offset-based data source |
When
"delete": "fields"
is defined in the Kafka message there must be a direct relationship between the following:
- fields to delete defined by
"fields"
{"type": "array", "items": "string"}
, AND- a corresponding FeatureBase table
ID
defined by--primary-keys-fields
Batch processing flags
flag | data type | Description | Default | Required |
---|---|---|---|---|
--batch-size | int | Number of records to read before indexing them as a batch. A larger value indicates better throughput and more memory usage. Recommended: 1,048,576 | 1 |
Error handling flags
flag | data type | Description | Default | Required |
---|---|---|---|---|
--allow-decimal-out-of-range | Allow ingest to continue when it encounters out of range decimals in Decimal Fields. | false | ||
--allow-int-out-of-range | Allow ingest to continue when it encounters out of range integers in Int Fields. | false | ||
--allow-timestamp-out-of-range | Allow ingest to continue when it encounters out of range timestamps in Timestamp Fields. | false | ||
--batch-max-staleness | duration | Maximum length of time the oldest record in a batch can exist before the batch is flushed. This may result in timeouts while waiting for the source | ||
--commit-timeout | duration | A commit is a process of informing the data source the current batch of records is ingested. --commit-timeout is the maximum time before the commit process is cancelled. May not function for CSV ingest process. | ||
--skip-bad-rows | int | Fail the ingest process if n rows are not processed. |
Logging & statistics flags
Flag | Data type | Description | Default | Required |
---|---|---|---|---|
--log-path | string | Log file to write to. | Empty means stderr. | |
--pprof | string | host:port on which to listen for pprof go package | “localhost:6062” | |
--stats | string | host:port on which to host metrics | “localhost:9093” | |
--track-progress | Periodically print status updates on how many records have been sourced. | |||
--verbose | Enable verbose logging. | |||
--write-csv | string | Write ingested data to the named CSV file. |
Testing flags
flag | Description | Default | Required |
---|---|---|---|
--delete-index | Delete an existing index specified by --index before starting ingest. USE WITH CAUTION | ||
--dry-run | Parse flags without starting an ingest process |
Kafka authentication flags
kafka-tls
flags authenticate with the Kafka instance and can be used with tls
flags that authenticate with the FeatureBase server.
Flag | Data type | Description | Default | Required |
---|---|---|---|---|
--kafka-tls.ca-certificate | string | Path to CA certificate file, or literal PEM data. | ||
--kafka-tls.certificate | string | Path to certificate file, or literal PEM data. | ||
--kafka-tls.enable-client-verification | string | Enable verification of client certificates. | ||
--kafka-tls.key | string | Path to certificate key file, or literal PEM data. | ||
--kafka-tls.skip-verify | Disables verification of server certificates. |
Kafka SSL keys
Flag | Data type | Description | Default | Required | Additional |
---|---|---|---|---|---|
--kafka-enable-ssl-certificate-verification | (enable.ssl.certificate.verification) | ||||
--kafka-group-instance-id | string | The (group.instance.id) kafka consumer configuration | |||
--kafka-max-poll-interval | string | The (max.poll.interval.ms) kafka consumer configuration. The max time the Kafka consumer can go without polling the broker. | Kafka consumer exits after this timeout. | ||
--kafka-sasl-mechanism | string | SASL mechanism to use for authentication.(sasl.mechanism) | |||
--kafka-sasl-password | string | SASL authentication password (sasl.password) | |||
--kafka-sasl-username | string | SASL authentication username (sasl.username) | |||
--kafka-security-protocol | string | Protocol used to communicate with brokers (security.protocol) | |||
--kafka-session-timeout | string | The (session.timeout.ms) kafka consumer configuration. | The max time the Kafka consumer can go without sending a heartbeat to the broker | ||
--kafka-socket-keepalive-enable | string | The (socket.keepalive.enable) kafka consumer configuration | |||
--kafka-socket-timeout-ms | int | (socket.timeout.ms) | |||
--kafka-ssl-ca-location | string | File or directory path to CA certificate(s) | Used for verifying the broker’s key (ssl.ca.location) | ||
--kafka-ssl-certificate-location | string | Path to client’s public key (PEM) | Used for authentication(ssl.certificate.location) | ||
--kafka-ssl-endpoint-identification-algorithm | string | The endpoint identification algorithm used by clients to validate server host name (ssl.endpoint.identification.algorithm) | |||
--kafka-ssl-key-location | string | Path to client’s private key (PEM) | Used for authentication(ssl.key.location) | ||
--kafka-ssl-key-password | string | Private key passphrase | fUsed with ssl.key.location and set_ssl_cert()(ssl.key.password) |
Additional information
batch
additional
There is no default batch-size
because memory usage per record varies between workloads.
During ingestion processing, there is a fixed overhead:
- from setting up an ingester transaction
- for each row
Setting large batch-size
values will:
- average-out the overheads
- proportionally increase memory usage
- improve performance (in general terms)
For example:
Workload includes | Batch size | Typical memory usage (MB) |
---|---|---|
High number of sparse keys | 20,000 | 100+ |
High-frequency keys | 1,000,000+ |
concurrency
additional
The concurrency
ingest flag is used to run ingesters in parallel which can:
- improve utilization on multi-core systems
- allow for redundancy
Alternatively, ingest processes can be launched individually on different environments.
List all the flags by entering idk/molecula-consumer-kafka
from the /featurebase
directory.
kafka-debug
contexts
Add one or more debug contexts as a comma-separated list to the --kafka-debug
parameter:
Context | Description |
---|---|
consumer | |
cgrp | |
topic | |
fetch | |
all | Set for verbose debugging |
Kafka environment variables
To use flags as environment variables:
- prefix flags with
CONSUMER_
- convert dots
.
and dashes-
to underscores_
Missing value processing
Missing and empty string values are handled the same.
Field data type | Expected behaviour |
---|---|
"ID" | Error if "ID" selected for id-field. Otherwise, do not update value in index. |
"DateInt" | Raise error during ingestion - timestamp must have a valid value. |
"Timestamp" | Raise error during ingestion - input is not time. |
"RecordTime" | Do not update value in index. |
"Int" | Do not update value in index. |
"Decimal" | Do not update value in index. |
"String" | Error if "String" select for primary-key field. Otherwise do not update value in index. |
"Bool" | Do not update value in index. |
"StringArray" | Do not update value in index. |
"IDArray" | Do not update value in index. |
"ForeignKey" | Do not update value in index. |
Quoting values
Use double quotes “…” to enclose fields containing:
- Line breaks (CRLF)
- Commas
- double quotes
Value Path Selection
The path
argument is an array of JSON object keys which are applied in order.
For example:
Source data | Path selection | Value selected |
---|---|---|
{"a":{"b":{"c":1}}} | ["a","b","c"] | 1 |
Use allow-missing-fields
to avoid path
errors where source data is missing.
config
options for data types
- Use the
config
flag when changing flags from default values.