Kafka delete message reference
These instructions apply to Kafka schemas managed by Confluent Schema Management.
FeatureBase recommends using Confluent Schema Management because it makes it easier to setup Kafka dependencies in a local environment:
- Schema registry
- Apache Kafka
- Apache Zookeeper
This reference page provides information on structuring an Avro encoded and formatted Kafka messages that is:
- referenced by the Kafka delete consumer CLI command
- used to delete values from a FeatureBase database
Before you begin
- Learn about the Confluent Schema Registry
- Setup Python in a virtual environment
- Install the Confluent Kafka Python Client
- Learn how to setup Confluent and Kafka topics to store your data
- Create at least one Kafka topic
- Encode Kafka records using Apache Avro
- Confluent Schema Registry running somewhere the Kafka consumer can connect to it over HTTP or HTTPS
Kafka delete message syntax
{
"namespace": "<example.test>",
"type": "<delete-property>",
"name": "<schema_name>",
"delete": "<delete-property>",
"fields":
[
<Kafka-Avro-data-types>,
]
}
Kafka message delete properties
Property | Description | Required | Default | Additional |
---|---|---|---|---|
"namespace" | Yes | |||
"type" | Avro schema type record | Yes | ||
"name" | AVRO schema name | Yes | ||
"<delete-property>" | delete property determines how the Kafka consumer behaves, and what the Avro Schema should look like. | Yes | * Delete fields properties * Delete values properties * Delete records properties | |
"fields" | Delete all values in a list of fields for a single record without defining the specific data. | Fields properties |
Additional information
Delete fields properties
./molecula-consumer-kafka-delete
defaults to "delete": "fields"
if the "delete"
property is not defined in the JSON message.
Set "delete": "fields"
to delete all values:
- in a list of fields
- for a single record
- for
IDSET
orSTRINGSET
fields where you don’t know the specific values to delete
"delete": "fields"
requires:
- FeatureBase index configured with
keys: true
{"type": "array", "items": "string"}
defined under the"fields"
parameter in the Kafka message
Delete values properties
Set "delete": "values"
to:
- delete specified values from
IDSET
andSTRINGSET
fields - specify
null
value forBOOL
,DECIMAL
,INT
andTIMESTAMP
fields
"delete": "values"
requires:
- Avro fields containing data that can be deleted
- Matching name for Avro Record Schema field and target FeatureBase record
Avro data type field mapping
Kafka delete is not supported for the FeatureBase time
or time-quantum
data types.
SQL data type | FeatureBase field data type | Avro field data type |
---|---|---|
_id | Record ID Key | string int |
STRING | Keyed Mutex | string |
STRINGSET | Keyed Set | string or array of strings |
STRINGSETQ | Keyed Time | N/A |
ID | Non-Keyed Mutex | int |
IDSET | Non-Keyed Set | int or array of ints |
IDSETQ | Non-Keyed Time | N/A |
INT | Int | boolean |
DECIMAL | Decimal | boolean |
TIMESTAMP | Timestamp | boolean |
BOOL | Bool | boolean |
Avro data types to Kafka message
| Avro data type | Kafka message | Additional | |—|—|—| | string
int
| Values to delete from FeatureBase field | Retain field values by using the union
data type to union null
value with string
or int
data types | | boolean
| true
when FeatureBase field value should be deleted |
Delete records parameters
The "delete": "records"
parameter requires one or more Avro field data types:
Avro data type | Description | FeatureBase index | JSON fields parameter |
---|---|---|---|
"ids" | Used to delete a list of records based on their FeatureBase _id | keys: false | {"type": "array", "items": "int"} |
"keys" | Used to delete a list of records based on their FeatureBase _id | keys: true | {"type": "array", "items": "string"} |
"filter" | Used to delete records based on PQL row calls | n/a | {"type": "string"} |
Ignore missing fields
All Kafka Avro fields are required.
However, you can ignore missing fields and leave existing values untouched by:
- creating a
union
withnull
data type in the Kafka Avro JSON config file - adding the
allow-missing-fields
flag to the./molecula-consumer-kafka
CLI command
Kafka Avro data type syntax
Map Avro field data types and property key-value pairs to determine the FeatureBase record data type
BOOL
Kafka Avro fields | Description |
---|---|
{"name": "bool_bool", "type": "boolean"} | FeatureBase Bool from Avro Boolean |
DECIMAL
Kafka Avro fields | Description |
---|---|
{"name": "decimal_float", "type": "float", "fieldType": "decimal", "scale": 2} | FeatureBase Decimal from Avro Float |
ID
Kafka Avro fields | Description |
---|---|
{"name": "id_long", "type": "long", "mutex": true, "fieldType": "id"} | FeatureBase ID from Avro Long |
{"name": "id_int", "type": "int", "mutex": true, "fieldType": "id"} | FeatureBase ID from Avro int |
IDSET
Kafka Avro fields | Description |
---|---|
{"name": "idset_int", "type": "int", "fieldType": "id"} | FeatureBase IDSET from Avro Int |
{"name": "idset_intarray", "type": {"type": "array", "items": "int"}} | FeatureBase IDSET from Avro Int Array |
IDSETQ
STRINGSETQ
Avro strings require a matching RecordTime
field in the Avro Schema. Examples are provided below.
Kafka Avro string | Description | Required in Avro schema |
---|---|---|
{"name": "idsetq_int", "type": "int", "fieldType": "id", "quantum": "YMD"} | FeatureBase IDSETQ from Avro Int | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
{"name": "idsetq_intarray", "type": "array", "items": {"type": "int", "quantum": "YMD"}} | FeatureBase IDSETQ from Avro Int Array | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
INT
Kafka Avro fields | Description |
---|---|
{"name": "int_int", "type": "int", "fieldType": "int"} | FeatureBase Int from Avro Int |
Strings
Kafka Avro fields | Description | Additional |
---|---|---|
{"name": "string_string", "type": "string", "mutex": true } | FeatureBase String from Avro String | |
{"name": "string_bytes", "type": "bytes" , "mutex": true } | FeatureBase String from Avro Bytes | |
{"name": "string_enum", "type": "enum"} | FeatureBase String from Avro Enum | |
{"name": "string_string", "type": ["string", "null"], "mutex": true } | Optional String | Ignore missing fields |
{"name": "stringset_stringarray", "type": [{"type": "array", "items": "string"}, "null"]} | Optional Array of Strings | Ignore missing fields |
STRINGSET
Kafka Avro string | Description |
---|---|
{"name": "stringset_string", "type": "string"} | FeatureBase StringSet from Avro String |
{"name": "stringset_bytes", "type": "bytes"} | FeatureBase StringSet from Avro Bytes |
{"name": "stringset_stringarray", "type": {"type": "array", "items": "string"}} | FeatureBase StringSet from Avro String Array |
STRINGSETQ
STRINGSETQ
Avro strings require a matching RecordTime
field in the Avro Schema. Examples are provided below.
Kafka Avro string | Description | Required in Avro schema |
---|---|---|
{“name”: “stringsetq_string”, “type”: “string”, “quantum”: “YMD”} | FeatureBase StringSetQ with Day Granularity from Avro String | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
{“name”: “stringsetq_stringarray”, “type”: “array”, “items”: {“type”: “string”, “quantum”: “YMD”}} | FeatureBase StringSetQ with Day Granularity from Avro String Array | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
TIMESTAMP
Kafka Avro string | Description | Additional |
---|---|---|
{"name": "timestamp_bytes_ts", "type": "bytes", "fieldType": "timestamp", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} | FeatureBase Timestamp from Avro Bytes | Expects byte representation of string timestamp |
{"name": "timestamp_bytes_int", "type": ["bytes", "null"], "fieldType": "timestamp", "unit": "s", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} | FeatureBase Timestamp from Avro Int |
Examples
Kafka delete fields
Kafka delete field Avro schema definition
{
"namespace": "example.test",
"type": "record",
"name": "a_delete_schema",
"delete": "fields",
"fields": [
{"name": "pk0", "type": "string"},
{"name": "pk1", "type": "string"},
{"name": "fields", "type": {"type": "array", "items": "string"}}
]
}
Kafka delete field raw message
This message:
- sets
int_fld
tonull
- clears all the values that were in
stringset_fld
.
{"pk0": "9z4aw", "pk1": "5ptDx", "fields": ["int_fld","stringset_fld"]}
Kafka delete field ingester configuration
./molecula-consumer-kafka-delete \
--primary-key-fields "pk0,pk1" \
--topics delete_topic \
--kafka-bootstrap-server localhost:9092 \
--schema-registry-url localhost:8081 \
--featurebase-hosts localhost:10101 \
--featurebase-grpc-hosts localhost:20101 \
--index an_index
Kafka delete values
Kafka delete values Avro schema definition
{
"namespace": "example.test",
"type": "record",
"name": "delete_value_schema",
"doc": "All supported avro types and property variations",
"delete": "values",
"fields": [
{"name": "_id", "type": "string"},
{"name": "stringset_string", "type": ["string", "null"]},
{"name": "string_string", "type": ["string", "null"]},
{"name": "stringset_stringarray", "type": [{"type": "array", "items": "string"}, "null"]},
{"name": "idset_int", "type": ["int", "null"]},
{"name": "id_int", "type": ["int", "null"]},
{"name": "idset_intarray", "type": [{"type": "array", "items": "int"}, "null"]},
{"name": "int_int", "type": "boolean"},
{"name": "decimal_double", "type": "boolean"},
{"name": "bools", "type": [{"type": "array", "items": "string"}, "null"]},
{"name": "timestamp_bytes_int", "type": "boolean"}
]
}
Raw Kafka Messages
{"_id": "u2Yr4|sHaUv|x5z8P", "stringset_string": {"null": null}, "string_string": {"string": "ZgkOB"}, "stringset_stringarray": {"array": ["u2Yr4", "PYE8V", "VBcyJ"]}, "idset_int": {"int": 890}, "id_int": {"int": 39}, "idset_intarray": {"array": [731, 13]}, "int_int": false, "bools": {"array": ["bool_bool"]}, "decimal_double": true, "timestamp_bytes_int": true}
{"_id": "h1iqc|5ptDx|iYeOV", "stringset_string": {"null": null}, "string_string": {"null": null}, "stringset_stringarray": {"null": null}, "idset_int": {"null": null}, "id_int": {"null": null}, "idset_intarray": {"null": null}, "int_int": true, "decimal_double": true, "bools": {"null": null}, "timestamp_bytes_int": true}
Kafka delete consumer configuration
./molecula-consumer-kafka-delete \
--topics delete_topic \
--kafka-bootstrap-server localhost:9092 \
--schema-registry-url localhost:8081 \
--featurebase-hosts localhost:10101 \
--featurebase-grpc-hosts localhost:20101 \
--index an_index
The Avro _id
has a string
data type therefore --index an_index
will have keys: true
Kafka delete records
Kafka delete records Avro schema
{
"namespace": "example.test",
"type": "record",
"name": "alltypes_delete_records",
"docs": "supply list of keys or a PQL filter",
"delete": "records",
"fields": [
{"name": "keys", "type": [{"type": "array", "items": "string"}, "null"]},
{"name": "filter", "type": ["string", "null"]}
]
}
Kafka delete records raw message
{"keys": {"array": ["9z4aw|5ptDx|CKs1F", "ASSAw|kauLy|oxjI0"]}, "filter": {"null": null}}
{"keys": {"null": null}, "filter": {"string": "Row(stringset_string='58KIR')"}}
- The first message deletes the entire record for records with ID
9z4aw|5ptDx|CKs1F
andASSAw|kauLy|oxjI0
. - The second message deletes all records that had the value
58KIR
in the fieldstringset_string
.
Kafka delete records ingest definition
./molecula-consumer-kafka-delete \
--topics delete_topic \
--kafka-bootstrap-server localhost:9092 \
--schema-registry-url localhost:8081 \
--featurebase-hosts localhost:10101 \
--featurebase-grpc-hosts localhost:20101 \
--index an_index