Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Kafka static ingest consumer reference

FeatureBase has three import methods for Kafka data:

  • Kafka consumer
  • Kafka static consumer
  • Kafka delete consumer

In all cases, the FeatureBase ingest tool:

  • Streams and reads Avro-encoded records from an Apache Kafka topic over HTTPS
  • Decodes the records
  • Converts the records to FeatureBase Streaming Bitmap format
  • Writes the converted records to the target database table
Table of contents

Before you begin

Kafka CLI Syntax

molecula-consumer-kafka-static \
  <source-and-target-flags>    \
  <kafka-common-flags>         \
  <kafka-static-flags>         \
  <id-flags>                   \
  <batch-flags>                \
  <error-flags>                \
  <log-stat-flags>             \
  <testing-flags>              \
  <kafka-auth-flags>           \
  <kafka-ssl-flags>            \

Common flags

Flag Data type Description Default Required Additional
--batch-size int Number of records to read before indexing them as a batch. Recommended: 1,048,576 1   A larger value indicates better throughput and more memory usage.
--concurrency int Number of concurrent sources and indexing routines to launch. 1 When ingesting multiple CSV files Does not support SQL ingestion or --auto-generate
--featurebase-hosts string Supply FeatureBase default bind points using comma separated list of host:port pairs. [localhost:10101]    
--index string Name of target FeatureBase index.   Yes  
--string-array-separator string character used to delineate values in string array ,    
--use-shard-transactional-endpoint   Use alternate import endpoint that ingests data for all fields in a shard in a single atomic request.   Recommended. Flag has negative performance impact and better consistency

Kafka common flags

Flag Data type Description Default Required Additional
–allow-decimal-out-of-range bool Allow ingest to continue when it encounters out of range decimals false    
--allow-int-out-of-range bool Allow ingest to continue when it encounters out of range integers false    
--allow-missing-fields bool Ingest consumer will continue even if fields are specified in a JSON config file but missing from a record false   Recommended for Kafka static
--allow-timestamp-out-of-range bool Allow ingest to continue when it encounters out of range timestamps false    
--group string Kafka group. “defaultgroup”    
--kafka-bootstrap-servers strings Comma separated list of host:port pairs for Kafka [localhost:9092]   Kafka properties bootstrap server
--kafka-client-id string (client.id)      
--kafka-debug string Choose one or more debug contexts to enable as a comma separated list.     Kafka debug contexts
--kafka-hosts string Comma separated list of host:port pairs for Kafka. [localhost:9092]    
--max-msgs int Number of messages to consume from Kafka before stopping.     Useful for testing when you don’t want to run indefinitely
--skip-old   Skip to the most recent Kafka message rather than starting at the beginning      
--timeout duration Time to wait for more records from Kafka before flushing a batch 1s   0 to disable
--topics string Kafka topics to read from [defaulttopic]    

Kafka static flags

Flag Data type Description Default Required Additional
--header string Path to the static schema definition or “header” file in JSON format which can be located on the local file system or an S3 URI For --s3-region or AWS_REGION Kafka Static schema ingest source  
--s3-region string S3 Region, optionally used when header is specified as an S3 URI.   Required for --header <s3-URI> Alternatively, use AWS-REGION environment variable
--dead-letter-queue-host string Dead letter queue host:port. Kafka static consumer can be configured to report failed messages to a DLQ and continue processing without exiting. Specifying a valid DLQ host and topic will enable this optional behaviour.      
--dead-letter-queue-topic string Dead letter queue topic the failed messages should be routed to.      

Generate ID flags

Flag Data type Description Default Required
--auto-generate   Automatically generate IDs. Used for testing purposes. Cannot be used with --concurrency   When --id-field or --primary-key-fields not defined
--external-generate   Allocate _id using the FeatureBase ID allocator. Supports --offset-mode. Requires --auto-generate    
--id-alloc-key-prefix string Prefix for ID allocator keys when using --external-generate. Requires different value for each concurrent ingester ingest  
--id-field string A sequence of positive integers that uniquely identifies each record. Use instead of --primary-key-fields   if --auto-generate or --primary-key-fields not defined
--primary-key-fields string Convert records to strings for use as unique _id. Single records are not added to target as records. Multiple records are concatenated using / and added to target as records. Use instead of --id-field [] If --auto-generate or --id-field are not defined.
--offset-mode   Set Offset-mode based Autogenerated IDs. Requires --auto-generate and --external-generate   When ingesting from an offset-based data source

Batch processing flags

flag data type Description Default Required
--batch-size int Number of records to read before indexing them as a batch. A larger value indicates better throughput and more memory usage. Recommended: 1,048,576 1  

Error handling flags

flag data type Description Default Required
--allow-decimal-out-of-range   Allow ingest to continue when it encounters out of range decimals in Decimal Fields. false  
--allow-int-out-of-range   Allow ingest to continue when it encounters out of range integers in Int Fields. false  
--allow-timestamp-out-of-range   Allow ingest to continue when it encounters out of range timestamps in Timestamp Fields. false  
--batch-max-staleness duration Maximum length of time the oldest record in a batch can exist before the batch is flushed. This may result in timeouts while waiting for the source    
--commit-timeout duration A commit is a process of informing the data source the current batch of records is ingested. --commit-timeout is the maximum time before the commit process is cancelled. May not function for CSV ingest process.    
         
--skip-bad-rows int Fail the ingest process if n rows are not processed.    

Logging & statistics flags

Flag Data type Description Default Required
--log-path string Log file to write to. Empty means stderr.  
--pprof string host:port on which to listen for pprof go package “localhost:6062”  
--stats string host:port on which to host metrics “localhost:9093”  
--track-progress   Periodically print status updates on how many records have been sourced.    
--verbose   Enable verbose logging.    
--write-csv string Write ingested data to the named CSV file.    

Testing flags

flag Description Default Required
--delete-index Delete an existing index specified by --index before starting ingest. USE WITH CAUTION    
--dry-run Parse flags without starting an ingest process    

Kafka authentication flags

kafka-tls flags authenticate with the Kafka instance and can be used with tls flags that authenticate with the FeatureBase server.

Flag Data type Description Default Required
--kafka-tls.ca-certificate string Path to CA certificate file, or literal PEM data.    
--kafka-tls.certificate string Path to certificate file, or literal PEM data.    
--kafka-tls.enable-client-verification string Enable verification of client certificates.    
--kafka-tls.key string Path to certificate key file, or literal PEM data.    
--kafka-tls.skip-verify   Disables verification of server certificates.    

Kafka SSL keys

Flag Data type Description Default Required Additional
--kafka-enable-ssl-certificate-verification   (enable.ssl.certificate.verification)      
--kafka-group-instance-id string The (group.instance.id) kafka consumer configuration      
--kafka-max-poll-interval string The (max.poll.interval.ms) kafka consumer configuration. The max time the Kafka consumer can go without polling the broker.     Kafka consumer exits after this timeout.
--kafka-sasl-mechanism string SASL mechanism to use for authentication.(sasl.mechanism)      
--kafka-sasl-password string SASL authentication password (sasl.password)      
--kafka-sasl-username string SASL authentication username (sasl.username)      
--kafka-security-protocol string Protocol used to communicate with brokers (security.protocol)      
--kafka-session-timeout string The (session.timeout.ms) kafka consumer configuration.     The max time the Kafka consumer can go without sending a heartbeat to the broker
--kafka-socket-keepalive-enable string The (socket.keepalive.enable) kafka consumer configuration      
--kafka-socket-timeout-ms int (socket.timeout.ms)      
--kafka-ssl-ca-location string File or directory path to CA certificate(s)     Used for verifying the broker’s key (ssl.ca.location)
--kafka-ssl-certificate-location string Path to client’s public key (PEM)     Used for authentication(ssl.certificate.location)
--kafka-ssl-endpoint-identification-algorithm string The endpoint identification algorithm used by clients to validate server host name (ssl.endpoint.identification.algorithm)      
--kafka-ssl-key-location string Path to client’s private key (PEM)     Used for authentication(ssl.key.location)
--kafka-ssl-key-password string Private key passphrase     fUsed with ssl.key.location and set_ssl_cert()(ssl.key.password)

Additional information

  • Run ./molecula-consumer-kafka-static from the /featurebase/idk directory.

batch additional

There is no default batch-size because memory usage per record varies between workloads.

During ingestion processing, there is a fixed overhead:

  • from setting up an ingester transaction
  • for each row

Setting large batch-size values will:

  • average-out the overheads
  • proportionally increase memory usage
  • improve performance (in general terms)

For example:

Workload includes Batch size Typical memory usage (MB)
High number of sparse keys 20,000 100+
High-frequency keys 1,000,000+  

concurrency additional

The concurrency ingest flag is used to run ingesters in parallel which can:

  • improve utilization on multi-core systems
  • allow for redundancy

Alternatively, ingest processes can be launched individually on different environments.

List all the flags by entering idk/molecula-consumer-kafka from the /featurebase directory.

kafka-debugcontexts

Add one or more debug contexts as a comma-separated list to the --kafka-debug parameter:

Context Description
consumer  
cgrp  
topic  
fetch  
all Set for verbose debugging

Missing value processing

Missing and empty string values are handled the same.

Field data type Expected behaviour
"ID" Error if "ID" selected for id-field. Otherwise, do not update value in index.
"DateInt" Raise error during ingestion - timestamp must have a valid value.
"Timestamp" Raise error during ingestion - input is not time.
"RecordTime" Do not update value in index.
"Int" Do not update value in index.
"Decimal" Do not update value in index.
"String" Error if "String" select for primary-key field. Otherwise do not update value in index.
"Bool" Do not update value in index.
"StringArray" Do not update value in index.
"IDArray" Do not update value in index.
"ForeignKey" Do not update value in index.

Quoting values

Use double quotes “…” to enclose fields containing:

  • Line breaks (CRLF)
  • Commas
  • double quotes

config options for data types

  • Use the config flag when changing flags from default values.

Kafka environment variables

To use flags as environment variables:

  • prefix flags with CONSUMER_
  • convert dots . and dashes - to underscores _

Examples