Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Kafka ingest consumer reference

FeatureBase has three import methods for Kafka data:

  • Kafka consumer
  • Kafka static consumer
  • Kafka delete consumer

In all cases, the FeatureBase ingest tool:

  • Streams and reads Avro-encoded records from an Apache Kafka topic over HTTPS
  • Decodes the records
  • Converts the records to FeatureBase Streaming Bitmap format
  • Writes the converted records to the target database table

To ingest data to FeatureBase tables from Confluent managed Kafka schemas, you will require:

  • A list of kafka-hosts
  • A FeatureBaseindex <indexname>),
  • One primary key method:
    • --primary-key-field <fieldnames>, or
    • --id-field <fieldname>, or
    • --auto-generate
Table of contents

Before you begin

Kafka CLI Syntax

molecula-consumer-kafka           \
  <source-and-target-flags>       \
  <kafka-common-flags>            \
  <kafka-confluent-schema-flags>  \
  <kafka-consumer-flags           \
  <id-flags>                      \
  <batch-flags>                   \
  <error-flags>                   \
  <log-stat-flags>                \
  <testing-flags>                 \
  <kafka-auth-flags>              \
  <kafka-ssl-flags>               \

Common flags

Flag Data type Description Default Required Additional
--batch-size int Number of records to read before indexing them as a batch. Recommended: 1,048,576 1   A larger value indicates better throughput and more memory usage.
--concurrency int Number of concurrent sources and indexing routines to launch. 1 When ingesting multiple CSV files Does not support SQL ingestion or --auto-generate
--featurebase-hosts string Supply FeatureBase default bind points using comma separated list of host:port pairs. [localhost:10101]    
--index string Name of target FeatureBase index.   Yes  
--string-array-separator string character used to delineate values in string array ,    
--use-shard-transactional-endpoint   Use alternate import endpoint that ingests data for all fields in a shard in a single atomic request.   Recommended. Flag has negative performance impact and better consistency

Kafka common flags

Flag Data type Description Default Required Additional
–allow-decimal-out-of-range bool Allow ingest to continue when it encounters out of range decimals false    
--allow-int-out-of-range bool Allow ingest to continue when it encounters out of range integers false    
--allow-missing-fields bool Ingest consumer will continue even if fields are specified in a JSON config file but missing from a record false   Recommended for Kafka static
--allow-timestamp-out-of-range bool Allow ingest to continue when it encounters out of range timestamps false    
--group string Kafka group. “defaultgroup”    
--kafka-bootstrap-servers strings Comma separated list of host:port pairs for Kafka [localhost:9092]   Kafka properties bootstrap server
--kafka-client-id string (client.id)      
--kafka-debug string Choose one or more debug contexts to enable as a comma separated list.     Kafka debug contexts
--kafka-hosts string Comma separated list of host:port pairs for Kafka. [localhost:9092]    
--max-msgs int Number of messages to consume from Kafka before stopping.     Useful for testing when you don’t want to run indefinitely
--skip-old   Skip to the most recent Kafka message rather than starting at the beginning      
--timeout duration Time to wait for more records from Kafka before flushing a batch 1s   0 to disable
--topics string Kafka topics to read from [defaulttopic]    

Kafka Avro Schema Management flags

These flags apply to the ./molecula-consumer-kafka and ./molecula-consumer-kafka-delete CLI commands.

Flag Data type Description Default Required Additional
--schema-registry-password string Authentication secret provided by Confluent for schema registry   For secured schema registries Confluent Registry security overview
-g
--schema-registry-url
string Location of Confluent Schema Registry. “http://localhost:8081”   Use https:// when using TLS flags
--schema-registry-username string Authentication key provided by Confluent for schema registry   For secured schema registries Confluent Registry security overview

Kafka consumer flags

Flag Data type Description Default Additional
--schema-registry-url string URL or IP address of Confluent managed schema registry localhost:9092 Confluent Schema Registry

Generate ID flags

Flag Data type Description Default Required
--auto-generate   Automatically generate IDs. Used for testing purposes. Cannot be used with --concurrency   When --id-field or --primary-key-fields not defined
--external-generate   Allocate _id using the FeatureBase ID allocator. Supports --offset-mode. Requires --auto-generate    
--id-alloc-key-prefix string Prefix for ID allocator keys when using --external-generate. Requires different value for each concurrent ingester ingest  
--id-field string A sequence of positive integers that uniquely identifies each record. Use instead of --primary-key-fields   if --auto-generate or --primary-key-fields not defined
--primary-key-fields string Convert records to strings for use as unique _id. Single records are not added to target as records. Multiple records are concatenated using / and added to target as records. Use instead of --id-field [] If --auto-generate or --id-field are not defined.
--offset-mode   Set Offset-mode based Autogenerated IDs. Requires --auto-generate and --external-generate   When ingesting from an offset-based data source

Batch processing flags

flag data type Description Default Required
--batch-size int Number of records to read before indexing them as a batch. A larger value indicates better throughput and more memory usage. Recommended: 1,048,576 1  

Error handling flags

flag data type Description Default Required
--allow-decimal-out-of-range   Allow ingest to continue when it encounters out of range decimals in Decimal Fields. false  
--allow-int-out-of-range   Allow ingest to continue when it encounters out of range integers in Int Fields. false  
--allow-timestamp-out-of-range   Allow ingest to continue when it encounters out of range timestamps in Timestamp Fields. false  
--batch-max-staleness duration Maximum length of time the oldest record in a batch can exist before the batch is flushed. This may result in timeouts while waiting for the source    
--commit-timeout duration A commit is a process of informing the data source the current batch of records is ingested. --commit-timeout is the maximum time before the commit process is cancelled. May not function for CSV ingest process.    
         
--skip-bad-rows int Fail the ingest process if n rows are not processed.    

Logging & statistics flags

Flag Data type Description Default Required
--log-path string Log file to write to. Empty means stderr.  
--pprof string host:port on which to listen for pprof go package “localhost:6062”  
--stats string host:port on which to host metrics “localhost:9093”  
--track-progress   Periodically print status updates on how many records have been sourced.    
--verbose   Enable verbose logging.    
--write-csv string Write ingested data to the named CSV file.    

Testing flags

flag Description Default Required
--delete-index Delete an existing index specified by --index before starting ingest. USE WITH CAUTION    
--dry-run Parse flags without starting an ingest process    

Kafka authentication flags

kafka-tls flags authenticate with the Kafka instance and can be used with tls flags that authenticate with the FeatureBase server.

Flag Data type Description Default Required
--kafka-tls.ca-certificate string Path to CA certificate file, or literal PEM data.    
--kafka-tls.certificate string Path to certificate file, or literal PEM data.    
--kafka-tls.enable-client-verification string Enable verification of client certificates.    
--kafka-tls.key string Path to certificate key file, or literal PEM data.    
--kafka-tls.skip-verify   Disables verification of server certificates.    

Kafka SSL keys

Flag Data type Description Default Required Additional
--kafka-enable-ssl-certificate-verification   (enable.ssl.certificate.verification)      
--kafka-group-instance-id string The (group.instance.id) kafka consumer configuration      
--kafka-max-poll-interval string The (max.poll.interval.ms) kafka consumer configuration. The max time the Kafka consumer can go without polling the broker.     Kafka consumer exits after this timeout.
--kafka-sasl-mechanism string SASL mechanism to use for authentication.(sasl.mechanism)      
--kafka-sasl-password string SASL authentication password (sasl.password)      
--kafka-sasl-username string SASL authentication username (sasl.username)      
--kafka-security-protocol string Protocol used to communicate with brokers (security.protocol)      
--kafka-session-timeout string The (session.timeout.ms) kafka consumer configuration.     The max time the Kafka consumer can go without sending a heartbeat to the broker
--kafka-socket-keepalive-enable string The (socket.keepalive.enable) kafka consumer configuration      
--kafka-socket-timeout-ms int (socket.timeout.ms)      
--kafka-ssl-ca-location string File or directory path to CA certificate(s)     Used for verifying the broker’s key (ssl.ca.location)
--kafka-ssl-certificate-location string Path to client’s public key (PEM)     Used for authentication(ssl.certificate.location)
--kafka-ssl-endpoint-identification-algorithm string The endpoint identification algorithm used by clients to validate server host name (ssl.endpoint.identification.algorithm)      
--kafka-ssl-key-location string Path to client’s private key (PEM)     Used for authentication(ssl.key.location)
--kafka-ssl-key-password string Private key passphrase     fUsed with ssl.key.location and set_ssl_cert()(ssl.key.password)

Additional information

batch additional

There is no default batch-size because memory usage per record varies between workloads.

During ingestion processing, there is a fixed overhead:

  • from setting up an ingester transaction
  • for each row

Setting large batch-size values will:

  • average-out the overheads
  • proportionally increase memory usage
  • improve performance (in general terms)

For example:

Workload includes Batch size Typical memory usage (MB)
High number of sparse keys 20,000 100+
High-frequency keys 1,000,000+  

concurrency additional

The concurrency ingest flag is used to run ingesters in parallel which can:

  • improve utilization on multi-core systems
  • allow for redundancy

Alternatively, ingest processes can be launched individually on different environments.

List all the flags by entering idk/molecula-consumer-kafka from the /featurebase directory.

kafka-debugcontexts

Add one or more debug contexts as a comma-separated list to the --kafka-debug parameter:

Context Description
consumer  
cgrp  
topic  
fetch  
all Set for verbose debugging

Kafka environment variables

To use flags as environment variables:

  • prefix flags with CONSUMER_
  • convert dots . and dashes - to underscores _

Missing value processing

Missing and empty string values are handled the same.

Field data type Expected behaviour
"ID" Error if "ID" selected for id-field. Otherwise, do not update value in index.
"DateInt" Raise error during ingestion - timestamp must have a valid value.
"Timestamp" Raise error during ingestion - input is not time.
"RecordTime" Do not update value in index.
"Int" Do not update value in index.
"Decimal" Do not update value in index.
"String" Error if "String" select for primary-key field. Otherwise do not update value in index.
"Bool" Do not update value in index.
"StringArray" Do not update value in index.
"IDArray" Do not update value in index.
"ForeignKey" Do not update value in index.

Quoting values

Use double quotes “…” to enclose fields containing:

  • Line breaks (CRLF)
  • Commas
  • double quotes

config options for data types

  • Use the config flag when changing flags from default values.

Further information