Skip to main content Link Menu Expand (external link) Document Search Copy Copied

How do I ingest data from a Kafka static schema?

Ingesting data from a Kafka static schema involves the use of:

  • JSON message blob which defines the source data
  • JSON document that defines the source data as an array that matches the destination table structure
  • Specifying the JSON document using the static CLI flag when running ./molecula-consumer-kafka-static

Before you begin

Kafka static schema definition

The schema file is formatted as an array of JSON objects, each of which describes one ingester field.

[
	{
		"name": "<featurebase-field>",
		"path": ["<JSON-config-file>"],
		"type": "<data type>",
		"config": {
			"parameters": "<Optional field data type>"
		}
	}
]

Kafka static JSON parameters

Parameter Description Required Additional
name Destination field in FeatureBase. Yes  
path The path option is a String array of JSON object keys which are applied in order to locate the value of the name field. Yes path-parameters
type the field data type Yes  
config optional constraints and parameters for the data type No  
parameters optional constraints and parameters for the data type No  

Additional information

  • The Schema is specified with a JSON document to ensure compatibility with complex JSON message formats
  • The name, path, and type parameters are repeated for each record to import to FeatureBase.

path parameter

  • Array indexing is not supported
  • Errors are returned if a value is missing from the path
  • Missing values can be ignored by adding the --allow-missing-fields flag when using ./molecula-consumer-kafka-static

type data types

This table provides mapping between FeatureBase SQL data types and internal data types used by the application for configuring ingestion, API calls, etc.

General data type FeatureBase SQL data type Internal data type Additional information
boolean bool bool  
integer int int  
decimal decimal decimal  
not applicable id mutex Table primary key
not applicable * idset
* idsetq
set Used to reduce table rows and make queries more efficient.
string string keyed mutex  
not applicable * stringset
* stringsetq
keyed set Used to reduce table rows and make queries more efficient.
timestamp timestamp timestamp  

Field mapping Avro to IDK

Arrays

  avro.Array : avro.String idk.StringArrayField
  avro.Array : avro.Bytes idk.StringArrayField
  avro.Array : avro.Fixed idk.StringArrayField
  avro.Array : avro.Enum idk.StringArrayField
  avro.Array : avro.Long idk.IDArrayField
quantum avro.Array : avro.Long idk.IDArrayField

Boolean data types

Properties Avro IDK
  avro.Boolean idk.BoolField
mutex=(bool) avro.Bytes idk.StringField{Mutex}
mutex=(bool) avro.String idk.StringField{Mutex}
fieldType=signedIntBoolKey avro.Int
avro.Long
idk.SignedIntBoolKeyField
fieldType=signedIntBoolKey avro.Long idk.SignedIntBoolKeyField

Decimal data type

Properties Avro IDK
logicalType=decimal avro.Bytes idk.DecimalField
idk.DecimalField{Scale}
fieldType=decimal avro.Bytes idk.DecimalField
idk.DecimalField{Scale}
fieldType=decimal, scale, precision=18 avro.Bytes idk.DecimalField
scale=(uint) avro.Float
avro.Double
idk.DecimalField{Scale}

ID data type

Properties Avro IDK
fieldType=id avro.Int
avro.Long
idk.IDField
fieldType=id,mutex=(bool) avro.Int
avro.Long
idk.IDField{Mutex}
fieldType=id, quantum=(YMD) avro.Int
avro.Long
idk.IDField{Quantum}
fieldType=id, mutex, quantum avro.Long idk.IDField

Int data type

Properties Avro IDK
fieldType=int avro.Int</br>avro.Long idk.IntField
fieldType=int,min=(int64), max=(int64) avro.Int
avro.Long
idk.IntField{Min, Max}
fieldType=int, min, max avro.Long idk.IntField

Mutex data type

Properties Avro IDK
mutex, quantum avro.String idk.StringField

Quantum constraint

Properties Avro IDK
quantum avro.String idk.StringArrayField
quantum=(YMD) avro.String idk.StringField{Quantum}
quantum=(YMD) Avro.Bytes idk.StringField{Quantum}
quantum=(YMD) avro.Array : avro.String idk.StringArrayField{Quantum)
quantum=(YMD) avro.Array : avro.Bytes idk.StringArrayField{Quantum)
quantum=(YMD) avro.Array : avro.Fixed idk.StringArrayField{Quantum)
quantum=(YMD) avro.Array : avro.Enum idk.StringArrayField{Quantum)
quantum=(YMD) avro.Array : avro.Long idk.IDArrayField{Quantum}

Scale data type

Properties Avro IDK
scale=(uint) avro.Float
avro.Double
idk.DecimalField{Scale}

String data type

Properties Avro IDK
  avro.String idk.StringField
  avro.Bytes idk.StringField
  avro.Enum idk.StringField{Mutex: true}

Timestamp Time Date data types

Properties Avro IDK
fieldType=dateInt, epoch, unit, customUnit, layout avro.Bytes idk.DateIntField{Layout, Epoch, Unit, CustomUnit}
fieldType=recordTime avro.Bytes idk.RecordTimeField
fieldType=recordTime, layout avro.Bytes idk.RecordTimeField

Not supported

Properties Avro IDK
  avro.Null NOT SUPPORTED
  avro.Map NOT SUPPORTED
  avro.Recursive NOT SUPPORTED

Error

Properties Avro IDK
  avro.Record ERROR

UNION

| Properties | Avro | IDK | |—|—|—| | | avro.Union | supports one or two members (if two, one must be avro.NULL) |

Default config parameters

Custom config parameters

The following information applies to "config": "parameters":

Value/Data type Description Default Required Additional    
"CacheConfig" Specify the size and type of a TopN cache for a set or mutex field.   TopN cache * Does not affect time fields
* TopN cache example
   
"CustomUnit" Specify an integer time unit using standard units “ns”, “us”, “ms”, “s”, “m”, or “h”          
"Epoch" The incoming number will be interpreted as the number of Unit since Epoch.   Incoming value is numeric Unix epoch * Cannot be used for timestamp strings
*
Time stamp
"ForeignIndex" Index of columns in target used to reference table columns          
"Granularity" Standard units used to represent incoming data: s, ms, us, ns "s"        
"Layout" Format used to parse time strings RFC3339 Golang RFC339 format definition      
"Max" The maximum possible value for an integer 2^63 - 1   Wolfram Alpha representation    
"Min" Minimum possible value for an integer -2^63   Wolfram Alpha representation    
"Mutex": "true" Data is ingested into a mutex field          
"Mutex": "false" Data is ingested into a set field          
"Quantum" Time quantum constraint used when ingesting data from recordTime field to a time column.          
"Scale" number of digits of precision to store after the decimal point          
"TTL"       Time to live    
"Unit" Standard units used to store timestamp for are "d", "h", "m", "s", "ms", "us", "ns" or "c" for custom (using "CustomUnit" for dateInt) "s" * dateInt data type
* recordtime when incoming data is numeric
* timestamp when incoming data is numeric
     

Cache

  • Improve precision by increasing the cache size for the "ranked" cache type which increases the number of top rows tracked within a shard of data
  • Set the data type to "none" to disable the cache.

recordTime fields

recordTime fields have two modes.

Mode Result
"Epoch" or Unit set Incoming data is interpreted as numeric
Other values Incoming data is interpreted as a date/timestamp and parsed with the "Layout" parameter

TopN cache

TopN cache memory usage is:

  • jointly proportional to cache size and number of shards when the cache is full
  • for example, the field is greater than the CacheSize rows within each shard)

Disabling TopN cache disables TopN

Querying

Use a TopK or sorted GroupBy when operating a field without a cache.

TIMEQUANTUM

TIMEQUANTUM creates a view of your data based on the specified time. This allows for lower latency queries at the cost of increased storage. For example, set TIMEQUANTUM to:

  • MD for queries that include a range of months
  • D for queries that include a small number of days

NOTE: Queries run on mismatched time granularities are slower but will function correctly. For example: Querying days on a IDSET or STRINGSET column with TIMEQUANTUM set to YM.

You can omit but not skip time granularity.

  • YM is valid
  • YH is invalid

TIMEQUANTUM is used when:

  • times need to be associated with column data for query purposes
  • database space is not at a premium

TTL (Time To Live)

NOTE: FeatureBase recommends using a TTL of 1h or more to improve results.

  • TTL enables the deletion of time views where a time range exceeds the stated Time To Live.
  • The default TTL of 0s indicates TIMEQUANTUM views will not be deleted.
  • TTL runs:
    • when FeatureBase starts and every hour to make view deletion consistent
    • are not guaranteed to run at a specific time
  • error: unknown unit is generated if an incorrect value is used (e.g., TTL is set to 60second)

  • TTL should not be used if you require complete and consistent historical data.

TTL order of events

This example demonstrates the deletion dates of three column views where TTL is set to 30d

View date ttl date of deletion Explanation
2022 January 30, 2023 Date assumed to be end of 2022
2022-09 October 30, 2022 Date assumed to be end of September
2022-09-02 October 2, 2022 Deletion after 30 days as intended

Examples

path example

Select 1 within {"a":{"b":{"c":1}}}

"path": ["a","b","c"]

TopN cache example

This “cache” is used for the TopN approximation. The default setting is:

{
	"CacheType": "ranked",
	"CacheSize": 50000,
}

Ingest two values from a Kafka message

Kafka message with two values

{
    "int-kafka-path": 12345,
    "string-kafka-path": "arbitraryString"
}

File kafka-static-header-1.json contains the following settings:

[
    {
        "name": "int-featurebase-name",
        "path": [
            "int-kafka-path"
        ],
        "type": "int"
    },
    {
        "name": "string-featurebase-name",
        "path": [
            "string-kafka-path"
        ],
        "type": "string"
    }
]

Execute the following CLI commands from the /featurebase/idk directory.

./molecula-consumer-kafka-static \
    --kafka-hosts "localhost:9092" \
    --index kafka-test \
    --batch-size 10000 \
    --topics test-topic \
    --max-msgs 10000 \
    --auto-generate \
    --external-generate \
    --header kafka-static-header-1.json

Ingest data from an array of values

Kafka message with array of values:

{
    "from_interface": {
        "ip": "10.203.33.18",
        "port": 38935
    },
    "to_interface": {
        "ip": "203.77.221.220",
        "port": 5872
    },
    "event_time": "2021-06-01T16:02:55Z06:00",
    "protocol": "UDP",
    "severity": 0,
    "bytes": 8593
}

kafka-static-header-2.json reads values from the Kafka message and structures the values according to the destination table.

[
    {
        "name": "from_ip",
        "path": [
            "from_interface",
            "ip"
        ],
        "type": "string"
    },
    {
        "name": "from_port",
        "path": [
            "from_interface",
            "port"
        ],
        "type": "int"
    },
    {
        "name": "to_ip",
        "path": [
            "to_interface",
            "ip"
        ],
        "type": "string"
    },
    {
        "name": "to_port",
        "path": [
            "to_interface",
            "port"
        ],
        "type": "int"
    },
    {
        "name": "event_time",
        "path": [
            "event_time"
        ],
        "type": "timestamp"
    },
    {
        "name": "severity",
        "path": [
            "severity"
        ],
        "type": "set"
    },
    {
        "name": "bytes",
        "path": [
            "bytes"
        ],
        "type": "int"
    },
    {
        "name": "protocol",
        "path": [
            "protocol"
        ],
        "type": "string"
    }
]

Execute the following CLI commands from the /featurebase/idk directory.

./molecula-consumer-kafka-static \
    --kafka-hosts "localhost:9092" \
    --index kafka-test \
    --batch-size=10000 \
    --topics test-topic \
    --auto-generate \
    --allow-missing-fields \
    --header kafka-static-header-2.json

Further information

Next step