Kafka delete message reference
These instructions apply to Kafka schemas managed by Confluent Schema Management.
FeatureBase recommends using Confluent Schema Management because it makes it easier to setup Kafka dependencies in a local environment:
- Schema registry
- Apache Kafka
- Apache Zookeeper
This reference page provides information on structuring an Avro encoded and formatted Kafka messages that is:
- referenced by the Kafka delete consumer CLI command
- used to delete values from a FeatureBase database
Before you begin
- Learn about the Confluent Schema Registry
- Setup Python in a virtual environment
- Install the Confluent Kafka Python Client
- Learn how to setup Confluent and Kafka topics to store your data
- Create at least one Kafka topic
- Encode Kafka records using Apache Avro
- Confluent Schema Registry running somewhere the Kafka consumer can connect to it over HTTP or HTTPS
Kafka delete message syntax
{
"namespace": "<example.test>",
"type": "<delete-property>",
"name": "<schema_name>",
"delete": "<delete-property>",
"fields":
[
<Kafka-Avro-data-types>,
]
}
Kafka message delete properties
| Property | Description | Required | Default | Additional |
|---|---|---|---|---|
"namespace" | Yes | |||
"type" | Avro schema type record | Yes | ||
"name" | AVRO schema name | Yes | ||
"<delete-property>" | delete property determines how the Kafka consumer behaves, and what the Avro Schema should look like. | Yes | * Delete fields properties * Delete values properties * Delete records properties | |
"fields" | Delete all values in a list of fields for a single record without defining the specific data. | Fields properties |
Additional information
Delete fields properties
./molecula-consumer-kafka-delete defaults to "delete": "fields" if the "delete" property is not defined in the JSON message.
Set "delete": "fields" to delete all values:
- in a list of fields
- for a single record
- for
IDSETorSTRINGSETfields where you don’t know the specific values to delete
"delete": "fields" requires:
- FeatureBase index configured with
keys: true {"type": "array", "items": "string"}defined under the"fields"parameter in the Kafka message
Delete values properties
Set "delete": "values" to:
- delete specified values from
IDSETandSTRINGSETfields - specify
nullvalue forBOOL,DECIMAL,INTandTIMESTAMPfields
"delete": "values" requires:
- Avro fields containing data that can be deleted
- Matching name for Avro Record Schema field and target FeatureBase record
Avro data type field mapping
Kafka delete is not supported for the FeatureBase time or time-quantum data types.
| SQL data type | FeatureBase field data type | Avro field data type |
|---|---|---|
_id | Record IDKey | stringint |
STRING | Keyed Mutex | string |
STRINGSET | Keyed Set | string or array of strings |
STRINGSETQ | Keyed Time | N/A |
ID | Non-Keyed Mutex | int |
IDSET | Non-Keyed Set | int or array of ints |
IDSETQ | Non-Keyed Time | N/A |
INT | Int | boolean |
DECIMAL | Decimal | boolean |
TIMESTAMP | Timestamp | boolean |
BOOL | Bool | boolean |
Avro data types to Kafka message
| Avro data type | Kafka message | Additional | |—|—|—| | stringint | Values to delete from FeatureBase field | Retain field values by using the union data type to union null value with string or int data types | | boolean | true when FeatureBase field value should be deleted |
Delete records parameters
The "delete": "records" parameter requires one or more Avro field data types:
| Avro data type | Description | FeatureBase index | JSON fields parameter |
|---|---|---|---|
"ids" | Used to delete a list of records based on their FeatureBase _id | keys: false | {"type": "array", "items": "int"} |
"keys" | Used to delete a list of records based on their FeatureBase _id | keys: true | {"type": "array", "items": "string"} |
"filter" | Used to delete records based on PQL row calls | n/a | {"type": "string"} |
Ignore missing fields
All Kafka Avro fields are required.
However, you can ignore missing fields and leave existing values untouched by:
- creating a
unionwithnulldata type in the Kafka Avro JSON config file - adding the
allow-missing-fieldsflag to the./molecula-consumer-kafkaCLI command
Kafka Avro data type syntax
Map Avro field data types and property key-value pairs to determine the FeatureBase record data type
BOOL
| Kafka Avro fields | Description |
|---|---|
{"name": "bool_bool", "type": "boolean"} | FeatureBase Bool from Avro Boolean |
DECIMAL
| Kafka Avro fields | Description |
|---|---|
{"name": "decimal_float", "type": "float", "fieldType": "decimal", "scale": 2} | FeatureBase Decimal from Avro Float |
ID
| Kafka Avro fields | Description |
|---|---|
{"name": "id_long", "type": "long", "mutex": true, "fieldType": "id"} | FeatureBase ID from Avro Long |
{"name": "id_int", "type": "int", "mutex": true, "fieldType": "id"} | FeatureBase ID from Avro int |
IDSET
| Kafka Avro fields | Description |
|---|---|
{"name": "idset_int", "type": "int", "fieldType": "id"} | FeatureBase IDSET from Avro Int |
{"name": "idset_intarray", "type": {"type": "array", "items": "int"}} | FeatureBase IDSET from Avro Int Array |
IDSETQ
STRINGSETQ Avro strings require a matching RecordTime field in the Avro Schema. Examples are provided below.
| Kafka Avro string | Description | Required in Avro schema |
|---|---|---|
{"name": "idsetq_int", "type": "int", "fieldType": "id", "quantum": "YMD"} | FeatureBase IDSETQ from Avro Int | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
{"name": "idsetq_intarray", "type": "array", "items": {"type": "int", "quantum": "YMD"}} | FeatureBase IDSETQ from Avro Int Array | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
INT
| Kafka Avro fields | Description |
|---|---|
{"name": "int_int", "type": "int", "fieldType": "int"} | FeatureBase Int from Avro Int |
Strings
| Kafka Avro fields | Description | Additional |
|---|---|---|
{"name": "string_string", "type": "string", "mutex": true } | FeatureBase String from Avro String | |
{"name": "string_bytes", "type": "bytes" , "mutex": true } | FeatureBase String from Avro Bytes | |
{"name": "string_enum", "type": "enum"} | FeatureBase String from Avro Enum | |
{"name": "string_string", "type": ["string", "null"], "mutex": true } | Optional String | Ignore missing fields |
{"name": "stringset_stringarray", "type": [{"type": "array", "items": "string"}, "null"]} | Optional Array of Strings | Ignore missing fields |
STRINGSET
| Kafka Avro string | Description |
|---|---|
{"name": "stringset_string", "type": "string"} | FeatureBase StringSet from Avro String |
{"name": "stringset_bytes", "type": "bytes"} | FeatureBase StringSet from Avro Bytes |
{"name": "stringset_stringarray", "type": {"type": "array", "items": "string"}} | FeatureBase StringSet from Avro String Array |
STRINGSETQ
STRINGSETQ Avro strings require a matching RecordTime field in the Avro Schema. Examples are provided below.
| Kafka Avro string | Description | Required in Avro schema |
|---|---|---|
| {“name”: “stringsetq_string”, “type”: “string”, “quantum”: “YMD”} | FeatureBase StringSetQ with Day Granularity from Avro String | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
| {“name”: “stringsetq_stringarray”, “type”: “array”, “items”: {“type”: “string”, “quantum”: “YMD”}} | FeatureBase StringSetQ with Day Granularity from Avro String Array | {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"} |
TIMESTAMP
| Kafka Avro string | Description | Additional |
|---|---|---|
{"name": "timestamp_bytes_ts", "type": "bytes", "fieldType": "timestamp", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} | FeatureBase Timestamp from Avro Bytes | Expects byte representation of string timestamp |
{"name": "timestamp_bytes_int", "type": ["bytes", "null"], "fieldType": "timestamp", "unit": "s", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} | FeatureBase Timestamp from Avro Int |
Examples
Kafka delete fields
Kafka delete field Avro schema definition
{
"namespace": "example.test",
"type": "record",
"name": "a_delete_schema",
"delete": "fields",
"fields": [
{"name": "pk0", "type": "string"},
{"name": "pk1", "type": "string"},
{"name": "fields", "type": {"type": "array", "items": "string"}}
]
}
Kafka delete field raw message
This message:
- sets
int_fldtonull - clears all the values that were in
stringset_fld.
{"pk0": "9z4aw", "pk1": "5ptDx", "fields": ["int_fld","stringset_fld"]}
Kafka delete field ingester configuration
./molecula-consumer-kafka-delete \
--primary-key-fields "pk0,pk1" \
--topics delete_topic \
--kafka-bootstrap-server localhost:9092 \
--schema-registry-url localhost:8081 \
--featurebase-hosts localhost:10101 \
--featurebase-grpc-hosts localhost:20101 \
--index an_index
Kafka delete values
Kafka delete values Avro schema definition
{
"namespace": "example.test",
"type": "record",
"name": "delete_value_schema",
"doc": "All supported avro types and property variations",
"delete": "values",
"fields": [
{"name": "_id", "type": "string"},
{"name": "stringset_string", "type": ["string", "null"]},
{"name": "string_string", "type": ["string", "null"]},
{"name": "stringset_stringarray", "type": [{"type": "array", "items": "string"}, "null"]},
{"name": "idset_int", "type": ["int", "null"]},
{"name": "id_int", "type": ["int", "null"]},
{"name": "idset_intarray", "type": [{"type": "array", "items": "int"}, "null"]},
{"name": "int_int", "type": "boolean"},
{"name": "decimal_double", "type": "boolean"},
{"name": "bools", "type": [{"type": "array", "items": "string"}, "null"]},
{"name": "timestamp_bytes_int", "type": "boolean"}
]
}
Raw Kafka Messages
{"_id": "u2Yr4|sHaUv|x5z8P", "stringset_string": {"null": null}, "string_string": {"string": "ZgkOB"}, "stringset_stringarray": {"array": ["u2Yr4", "PYE8V", "VBcyJ"]}, "idset_int": {"int": 890}, "id_int": {"int": 39}, "idset_intarray": {"array": [731, 13]}, "int_int": false, "bools": {"array": ["bool_bool"]}, "decimal_double": true, "timestamp_bytes_int": true}
{"_id": "h1iqc|5ptDx|iYeOV", "stringset_string": {"null": null}, "string_string": {"null": null}, "stringset_stringarray": {"null": null}, "idset_int": {"null": null}, "id_int": {"null": null}, "idset_intarray": {"null": null}, "int_int": true, "decimal_double": true, "bools": {"null": null}, "timestamp_bytes_int": true}
Kafka delete consumer configuration
./molecula-consumer-kafka-delete \
--topics delete_topic \
--kafka-bootstrap-server localhost:9092 \
--schema-registry-url localhost:8081 \
--featurebase-hosts localhost:10101 \
--featurebase-grpc-hosts localhost:20101 \
--index an_index
The Avro _id has a string data type therefore --index an_index will have keys: true
Kafka delete records
Kafka delete records Avro schema
{
"namespace": "example.test",
"type": "record",
"name": "alltypes_delete_records",
"docs": "supply list of keys or a PQL filter",
"delete": "records",
"fields": [
{"name": "keys", "type": [{"type": "array", "items": "string"}, "null"]},
{"name": "filter", "type": ["string", "null"]}
]
}
Kafka delete records raw message
{"keys": {"array": ["9z4aw|5ptDx|CKs1F", "ASSAw|kauLy|oxjI0"]}, "filter": {"null": null}}
{"keys": {"null": null}, "filter": {"string": "Row(stringset_string='58KIR')"}}
- The first message deletes the entire record for records with ID
9z4aw|5ptDx|CKs1FandASSAw|kauLy|oxjI0. - The second message deletes all records that had the value
58KIRin the fieldstringset_string.
Kafka delete records ingest definition
./molecula-consumer-kafka-delete \
--topics delete_topic \
--kafka-bootstrap-server localhost:9092 \
--schema-registry-url localhost:8081 \
--featurebase-hosts localhost:10101 \
--featurebase-grpc-hosts localhost:20101 \
--index an_index