How do I ingest data from a Kafka static schema?
Ingesting data from a Kafka static schema involves the use of:
- JSON message blob which defines the source data
- JSON document that defines the source data as an array that matches the destination table structure
- Specifying the JSON document using the
static
CLI flag when running./molecula-consumer-kafka-static
Before you begin
- Install Apache Kafka on your system
- Install Apache Zookeeper on your system
- Install Apache Avro on your system
- Learn about Kafka static schema settings
- Define your data in a Kafka message/blob
Kafka static schema definition
The schema file is formatted as an array of JSON objects, each of which describes one ingester field.
[
{
"name": "<featurebase-field>",
"path": ["<JSON-config-file>"],
"type": "<data type>",
"config": {
"parameters": "<Optional field data type>"
}
}
]
Kafka static JSON parameters
Parameter | Description | Required | Additional |
---|---|---|---|
name | Destination field in FeatureBase. | Yes | |
path | The path option is a String array of JSON object keys which are applied in order to locate the value of the name field. | Yes | path-parameters |
type | the field data type | Yes | |
config | optional constraints and parameters for the data type | No | |
parameters | optional constraints and parameters for the data type | No |
Additional information
- The Schema is specified with a JSON document to ensure compatibility with complex JSON message formats
- The
name
,path
, andtype
parameters are repeated for each record to import to FeatureBase.
path
parameter
- Array indexing is not supported
- Errors are returned if a value is missing from the
path
- Missing values can be ignored by adding the
--allow-missing-fields
flag when using./molecula-consumer-kafka-static
type
data types
This table provides mapping between FeatureBase SQL data types and internal data types used by the application for configuring ingestion, API calls, etc.
General data type | FeatureBase SQL data type | Internal data type | Additional information |
---|---|---|---|
boolean | bool | bool | |
integer | int | int | |
decimal | decimal | decimal | |
not applicable | id | mutex | Table primary key |
not applicable | * idset * idsetq | set | Used to reduce table rows and make queries more efficient. |
string | string | keyed mutex | |
not applicable | * stringset * stringsetq | keyed set | Used to reduce table rows and make queries more efficient. |
timestamp | timestamp | timestamp |
Field mapping Avro to IDK
Arrays
avro.Array : avro.String | idk.StringArrayField | |
avro.Array : avro.Bytes | idk.StringArrayField | |
avro.Array : avro.Fixed | idk.StringArrayField | |
avro.Array : avro.Enum | idk.StringArrayField | |
avro.Array : avro.Long | idk.IDArrayField | |
quantum | avro.Array : avro.Long | idk.IDArrayField |
Boolean data types
Properties | Avro | IDK |
---|---|---|
avro.Boolean | idk.BoolField | |
mutex=(bool) | avro.Bytes | idk.StringField{Mutex} |
mutex=(bool) | avro.String | idk.StringField{Mutex} |
fieldType=signedIntBoolKey | avro.Int avro.Long | idk.SignedIntBoolKeyField |
fieldType=signedIntBoolKey | avro.Long | idk.SignedIntBoolKeyField |
Decimal data type
Properties | Avro | IDK |
---|---|---|
logicalType=decimal | avro.Bytes | idk.DecimalField idk.DecimalField{Scale} |
fieldType=decimal | avro.Bytes | idk.DecimalField idk.DecimalField{Scale} |
fieldType=decimal, scale, precision=18 | avro.Bytes | idk.DecimalField |
scale=(uint) | avro.Float avro.Double | idk.DecimalField{Scale} |
ID data type
Properties | Avro | IDK |
---|---|---|
fieldType=id | avro.Int avro.Long | idk.IDField |
fieldType=id,mutex=(bool) | avro.Int avro.Long | idk.IDField{Mutex} |
fieldType=id, quantum=(YMD) | avro.Int avro.Long | idk.IDField{Quantum} |
fieldType=id, mutex, quantum | avro.Long | idk.IDField |
Int data type
Properties | Avro | IDK |
---|---|---|
fieldType=int | avro.Int </br>avro.Long | idk.IntField |
fieldType=int,min=(int64), max=(int64) | avro.Int avro.Long | idk.IntField{Min, Max} |
fieldType=int, min, max | avro.Long | idk.IntField |
Mutex data type
Properties | Avro | IDK |
---|---|---|
mutex, quantum | avro.String | idk.StringField |
Quantum constraint
Properties | Avro | IDK |
---|---|---|
quantum | avro.String | idk.StringArrayField |
quantum=(YMD) | avro.String | idk.StringField{Quantum} |
quantum=(YMD) | Avro.Bytes | idk.StringField{Quantum} |
quantum=(YMD) | avro.Array : avro.String | idk.StringArrayField{Quantum) |
quantum=(YMD) | avro.Array : avro.Bytes | idk.StringArrayField{Quantum) |
quantum=(YMD) | avro.Array : avro.Fixed | idk.StringArrayField{Quantum) |
quantum=(YMD) | avro.Array : avro.Enum | idk.StringArrayField{Quantum) |
quantum=(YMD) | avro.Array : avro.Long | idk.IDArrayField{Quantum} |
Scale data type
Properties | Avro | IDK |
---|---|---|
scale=(uint) | avro.Float avro.Double | idk.DecimalField{Scale} |
String data type
Properties | Avro | IDK |
---|---|---|
avro.String | idk.StringField | |
avro.Bytes | idk.StringField | |
avro.Enum | idk.StringField{Mutex: true} |
Timestamp Time Date data types
Properties | Avro | IDK |
---|---|---|
fieldType=dateInt, epoch, unit, customUnit, layout | avro.Bytes | idk.DateIntField{Layout, Epoch, Unit, CustomUnit} |
fieldType=recordTime | avro.Bytes | idk.RecordTimeField |
fieldType=recordTime, layout | avro.Bytes | idk.RecordTimeField |
Not supported
Properties | Avro | IDK |
---|---|---|
avro.Null | NOT SUPPORTED | |
avro.Map | NOT SUPPORTED | |
avro.Recursive | NOT SUPPORTED |
Error
Properties | Avro | IDK |
---|---|---|
avro.Record | ERROR |
UNION
| Properties | Avro | IDK | |—|—|—| | | avro.Union
| supports one or two members (if two, one must be avro.NULL) |
Default config
parameters
Custom config parameters
The following information applies to "config": "parameters"
:
Value/Data type | Description | Default | Required | Additional | ||
---|---|---|---|---|---|---|
"CacheConfig" | Specify the size and type of a TopN cache for a set or mutex field. | TopN cache | * Does not affect time fields * TopN cache example | |||
"CustomUnit" | Specify an integer time unit using standard units “ns”, “us”, “ms”, “s”, “m”, or “h” | |||||
"Epoch" | The incoming number will be interpreted as the number of Unit since Epoch . | Incoming value is numeric | Unix epoch | * Cannot be used for timestamp strings* | Time stamp | |
"ForeignIndex" | Index of columns in target used to reference table columns | |||||
"Granularity" | Standard units used to represent incoming data: s , ms , us , ns | "s" | ||||
"Layout" | Format used to parse time strings | RFC3339 | Golang RFC339 format definition | |||
"Max" | The maximum possible value for an integer | 2^63 - 1 | Wolfram Alpha representation | |||
"Min" | Minimum possible value for an integer | -2^63 | Wolfram Alpha representation | |||
"Mutex": "true" | Data is ingested into a mutex field | |||||
"Mutex": "false" | Data is ingested into a set field | |||||
"Quantum" | Time quantum constraint used when ingesting data from recordTime field to a time column. | |||||
"Scale" | number of digits of precision to store after the decimal point | |||||
"TTL" | Time to live | |||||
"Unit" | Standard units used to store timestamp for are "d" , "h" , "m" , "s" , "ms" , "us" , "ns" or "c" for custom (using "CustomUnit" for dateInt ) | "s" | * dateInt data type* recordtime when incoming data is numeric* timestamp when incoming data is numeric |
Cache
- Improve precision by increasing the cache size for the
"ranked"
cache type which increases the number of top rows tracked within a shard of data - Set the data type to
"none"
to disable the cache.
recordTime fields
recordTime
fields have two modes.
Mode | Result |
---|---|
"Epoch" or Unit set | Incoming data is interpreted as numeric |
Other values | Incoming data is interpreted as a date/timestamp and parsed with the "Layout" parameter |
TopN cache
TopN
cache memory usage is:
- jointly proportional to cache size and number of shards when the cache is full
- for example, the field is greater than the
CacheSize
rows within each shard)
Disabling TopN
cache disables TopN
Querying
Use a TopK
or sorted GroupBy
when operating a field without a cache.
TIMEQUANTUM
TIMEQUANTUM creates a view of your data based on the specified time. This allows for lower latency queries at the cost of increased storage. For example, set TIMEQUANTUM to:
MD
for queries that include a range of monthsD
for queries that include a small number of days
NOTE: Queries run on mismatched time granularities are slower but will function correctly. For example: Querying days on a IDSET or STRINGSET column with TIMEQUANTUM set to YM
.
You can omit but not skip time granularity.
- YM is valid
- YH is invalid
TIMEQUANTUM is used when:
- times need to be associated with column data for query purposes
- database space is not at a premium
TTL (Time To Live)
NOTE: FeatureBase recommends using a TTL of 1h
or more to improve results.
- TTL enables the deletion of time views where a time range exceeds the stated Time To Live.
- The default TTL of
0s
indicates TIMEQUANTUM views will not be deleted. - TTL runs:
- when FeatureBase starts and every hour to make view deletion consistent
- are not guaranteed to run at a specific time
-
error: unknown unit
is generated if an incorrect value is used (e.g., TTL is set to60second
) - TTL should not be used if you require complete and consistent historical data.
TTL order of events
This example demonstrates the deletion dates of three column views where TTL is set to 30d
View date | ttl date of deletion | Explanation |
---|---|---|
2022 | January 30, 2023 | Date assumed to be end of 2022 |
2022-09 | October 30, 2022 | Date assumed to be end of September |
2022-09-02 | October 2, 2022 | Deletion after 30 days as intended |
Examples
path
example
Select 1
within {"a":{"b":{"c":1}}}
"path": ["a","b","c"]
TopN cache example
This “cache” is used for the TopN
approximation. The default setting is:
{
"CacheType": "ranked",
"CacheSize": 50000,
}
Ingest two values from a Kafka message
Kafka message with two values
{
"int-kafka-path": 12345,
"string-kafka-path": "arbitraryString"
}
File kafka-static-header-1.json
contains the following settings:
[
{
"name": "int-featurebase-name",
"path": [
"int-kafka-path"
],
"type": "int"
},
{
"name": "string-featurebase-name",
"path": [
"string-kafka-path"
],
"type": "string"
}
]
Execute the following CLI commands from the /featurebase/idk
directory.
./molecula-consumer-kafka-static \
--kafka-hosts "localhost:9092" \
--index kafka-test \
--batch-size 10000 \
--topics test-topic \
--max-msgs 10000 \
--auto-generate \
--external-generate \
--header kafka-static-header-1.json
Ingest data from an array of values
Kafka message with array of values:
{
"from_interface": {
"ip": "10.203.33.18",
"port": 38935
},
"to_interface": {
"ip": "203.77.221.220",
"port": 5872
},
"event_time": "2021-06-01T16:02:55Z06:00",
"protocol": "UDP",
"severity": 0,
"bytes": 8593
}
kafka-static-header-2.json
reads values from the Kafka message and structures the values according to the destination table.
[
{
"name": "from_ip",
"path": [
"from_interface",
"ip"
],
"type": "string"
},
{
"name": "from_port",
"path": [
"from_interface",
"port"
],
"type": "int"
},
{
"name": "to_ip",
"path": [
"to_interface",
"ip"
],
"type": "string"
},
{
"name": "to_port",
"path": [
"to_interface",
"port"
],
"type": "int"
},
{
"name": "event_time",
"path": [
"event_time"
],
"type": "timestamp"
},
{
"name": "severity",
"path": [
"severity"
],
"type": "set"
},
{
"name": "bytes",
"path": [
"bytes"
],
"type": "int"
},
{
"name": "protocol",
"path": [
"protocol"
],
"type": "string"
}
]
Execute the following CLI commands from the /featurebase/idk
directory.
./molecula-consumer-kafka-static \
--kafka-hosts "localhost:9092" \
--index kafka-test \
--batch-size=10000 \
--topics test-topic \
--auto-generate \
--allow-missing-fields \
--header kafka-static-header-2.json