Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Kafka delete message reference

These instructions apply to Kafka schemas managed by Confluent Schema Management.

FeatureBase recommends using Confluent Schema Management because it makes it easier to setup Kafka dependencies in a local environment:

  • Schema registry
  • Apache Kafka
  • Apache Zookeeper

This reference page provides information on structuring an Avro encoded and formatted Kafka messages that is:

  • referenced by the Kafka delete consumer CLI command
  • used to delete values from a FeatureBase database

Before you begin

Kafka delete message syntax

{
    "namespace": "<example.test>",
    "type": "<delete-property>",
    "name": "<schema_name>",
    "delete": "<delete-property>",
    "fields":
      [
        <Kafka-Avro-data-types>,
      ]
}

Kafka message delete properties

Property Description Required Default Additional
"namespace"     Yes  
"type" Avro schema type record Yes    
"name" AVRO schema name Yes    
"<delete-property>" delete property determines how the Kafka consumer behaves, and what the Avro Schema should look like.   Yes * Delete fields properties
* Delete values properties
* Delete records properties
"fields" Delete all values in a list of fields for a single record without defining the specific data. Fields properties    

Additional information

Delete fields properties

./molecula-consumer-kafka-delete defaults to "delete": "fields" if the "delete" property is not defined in the JSON message.

Set "delete": "fields" to delete all values:

  • in a list of fields
  • for a single record
  • for IDSET or STRINGSET fields where you don’t know the specific values to delete

"delete": "fields" requires:

  • FeatureBase index configured with keys: true
  • {"type": "array", "items": "string"} defined under the "fields" parameter in the Kafka message

Delete values properties

Set "delete": "values" to:

  • delete specified values from IDSET and STRINGSET fields
  • specify null value for BOOL, DECIMAL, INT and TIMESTAMP fields

"delete": "values" requires:

  • Avro fields containing data that can be deleted
  • Matching name for Avro Record Schema field and target FeatureBase record

Avro data type field mapping

Kafka delete is not supported for the FeatureBase time or time-quantum data types.

SQL data type FeatureBase field data type Avro field data type
_id Record ID
Key
string
int
STRING Keyed Mutex string
STRINGSET Keyed Set string or array of strings
STRINGSETQ Keyed Time N/A
ID Non-Keyed Mutex int
IDSET Non-Keyed Set int or array of ints
IDSETQ Non-Keyed Time N/A
INT Int boolean
DECIMAL Decimal boolean
TIMESTAMP Timestamp boolean
BOOL Bool boolean

Avro data types to Kafka message

| Avro data type | Kafka message | Additional | |—|—|—| | string
int | Values to delete from FeatureBase field | Retain field values by using the union data type to union null value with string or int data types | | boolean | true when FeatureBase field value should be deleted |

Delete records parameters

The "delete": "records" parameter requires one or more Avro field data types:

Avro data type Description FeatureBase index JSON fields parameter
"ids" Used to delete a list of records based on their FeatureBase _id keys: false {"type": "array", "items": "int"}
"keys" Used to delete a list of records based on their FeatureBase _id keys: true {"type": "array", "items": "string"}
"filter" Used to delete records based on PQL row calls n/a {"type": "string"}

Ignore missing fields

All Kafka Avro fields are required.

However, you can ignore missing fields and leave existing values untouched by:

  • creating a union with null data type in the Kafka Avro JSON config file
  • adding the allow-missing-fields flag to the ./molecula-consumer-kafka CLI command

Kafka Avro data type syntax

Map Avro field data types and property key-value pairs to determine the FeatureBase record data type

BOOL

Kafka Avro fields Description
{"name": "bool_bool", "type": "boolean"} FeatureBase Bool from Avro Boolean

DECIMAL

Kafka Avro fields Description
{"name": "decimal_float", "type": "float", "fieldType": "decimal", "scale": 2} FeatureBase Decimal from Avro Float

ID

Kafka Avro fields Description
{"name": "id_long", "type": "long", "mutex": true, "fieldType": "id"} FeatureBase ID from Avro Long
{"name": "id_int", "type": "int", "mutex": true, "fieldType": "id"} FeatureBase ID from Avro int

IDSET

Kafka Avro fields Description
{"name": "idset_int", "type": "int", "fieldType": "id"} FeatureBase IDSET from Avro Int
{"name": "idset_intarray", "type": {"type": "array", "items": "int"}} FeatureBase IDSET from Avro Int Array

IDSETQ

STRINGSETQ Avro strings require a matching RecordTime field in the Avro Schema. Examples are provided below.

Kafka Avro string Description Required in Avro schema
{"name": "idsetq_int", "type": "int", "fieldType": "id", "quantum": "YMD"} FeatureBase IDSETQ from Avro Int {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"}
{"name": "idsetq_intarray", "type": "array", "items": {"type": "int", "quantum": "YMD"}} FeatureBase IDSETQ from Avro Int Array {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"}

INT

Kafka Avro fields Description
{"name": "int_int", "type": "int", "fieldType": "int"} FeatureBase Int from Avro Int

Strings

Kafka Avro fields Description Additional
{"name": "string_string", "type": "string", "mutex": true } FeatureBase String from Avro String  
{"name": "string_bytes", "type": "bytes" , "mutex": true } FeatureBase String from Avro Bytes  
{"name": "string_enum", "type": "enum"} FeatureBase String from Avro Enum  
{"name": "string_string", "type": ["string", "null"], "mutex": true } Optional String Ignore missing fields
{"name": "stringset_stringarray", "type": [{"type": "array", "items": "string"}, "null"]} Optional Array of Strings Ignore missing fields

STRINGSET

Kafka Avro string Description
{"name": "stringset_string", "type": "string"} FeatureBase StringSet from Avro String
{"name": "stringset_bytes", "type": "bytes"} FeatureBase StringSet from Avro Bytes
{"name": "stringset_stringarray", "type": {"type": "array", "items": "string"}} FeatureBase StringSet from Avro String Array

STRINGSETQ

STRINGSETQ Avro strings require a matching RecordTime field in the Avro Schema. Examples are provided below.

Kafka Avro string Description Required in Avro schema
{“name”: “stringsetq_string”, “type”: “string”, “quantum”: “YMD”} FeatureBase StringSetQ with Day Granularity from Avro String {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"}
{“name”: “stringsetq_stringarray”, “type”: “array”, “items”: {“type”: “string”, “quantum”: “YMD”}} FeatureBase StringSetQ with Day Granularity from Avro String Array {"name": "recordtime_bytes", "type": "bytes", "fieldType": "recordTime", "layout": "2006-01-02 15:04:05", "unit": "s"}

TIMESTAMP

Kafka Avro string Description Additional
{"name": "timestamp_bytes_ts", "type": "bytes", "fieldType": "timestamp", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} FeatureBase Timestamp from Avro Bytes Expects byte representation of string timestamp
{"name": "timestamp_bytes_int", "type": ["bytes", "null"], "fieldType": "timestamp", "unit": "s", "layout": "2006-01-02 15:04:05", "epoch": "1970-01-01 00:00:00"} FeatureBase Timestamp from Avro Int  

Examples

Kafka delete fields

Kafka delete field Avro schema definition

{
    "namespace": "example.test",
    "type": "record",
    "name": "a_delete_schema",
    "delete": "fields",
    "fields": [
        {"name": "pk0", "type": "string"},
        {"name": "pk1", "type": "string"},
        {"name": "fields", "type": {"type": "array", "items": "string"}}
    ]
}

Kafka delete field raw message

This message:

  • sets int_fld to null
  • clears all the values that were in stringset_fld.
{"pk0": "9z4aw", "pk1": "5ptDx", "fields": ["int_fld","stringset_fld"]}

Kafka delete field ingester configuration

./molecula-consumer-kafka-delete \
    --primary-key-fields "pk0,pk1" \
    --topics delete_topic \
    --kafka-bootstrap-server localhost:9092 \
    --schema-registry-url localhost:8081 \
    --featurebase-hosts localhost:10101 \
    --featurebase-grpc-hosts localhost:20101 \
    --index an_index

Kafka delete values

Kafka delete values Avro schema definition

{
    "namespace": "example.test",
    "type": "record",
    "name": "delete_value_schema",
    "doc": "All supported avro types and property variations",
    "delete": "values",
    "fields": [
        {"name": "_id", "type": "string"},
        {"name": "stringset_string", "type": ["string", "null"]},
        {"name": "string_string", "type": ["string", "null"]},
        {"name": "stringset_stringarray", "type": [{"type": "array", "items": "string"}, "null"]},
        {"name": "idset_int", "type": ["int", "null"]},
        {"name": "id_int", "type": ["int", "null"]},
        {"name": "idset_intarray", "type": [{"type": "array", "items": "int"}, "null"]},
        {"name": "int_int", "type": "boolean"},
        {"name": "decimal_double", "type": "boolean"},
        {"name": "bools", "type": [{"type": "array", "items": "string"}, "null"]},
        {"name": "timestamp_bytes_int", "type": "boolean"}
    ]
}

Raw Kafka Messages

{"_id": "u2Yr4|sHaUv|x5z8P", "stringset_string": {"null": null}, "string_string": {"string": "ZgkOB"}, "stringset_stringarray": {"array": ["u2Yr4", "PYE8V", "VBcyJ"]}, "idset_int": {"int": 890}, "id_int": {"int": 39}, "idset_intarray": {"array": [731, 13]}, "int_int": false, "bools": {"array": ["bool_bool"]}, "decimal_double": true, "timestamp_bytes_int": true}
{"_id": "h1iqc|5ptDx|iYeOV", "stringset_string": {"null": null}, "string_string": {"null": null}, "stringset_stringarray": {"null": null}, "idset_int": {"null": null}, "id_int": {"null": null}, "idset_intarray": {"null": null}, "int_int": true,  "decimal_double": true, "bools": {"null": null}, "timestamp_bytes_int": true}

Kafka delete consumer configuration

./molecula-consumer-kafka-delete \
    --topics delete_topic \
    --kafka-bootstrap-server localhost:9092 \
    --schema-registry-url localhost:8081 \
    --featurebase-hosts localhost:10101 \
    --featurebase-grpc-hosts localhost:20101 \
    --index an_index

The Avro _id has a string data type therefore --index an_index will have keys: true

Kafka delete records

Kafka delete records Avro schema

{
    "namespace": "example.test",
    "type": "record",
    "name": "alltypes_delete_records",
    "docs": "supply list of keys or a PQL filter",
    "delete": "records",
    "fields": [
        {"name": "keys", "type": [{"type": "array", "items": "string"}, "null"]},
        {"name": "filter", "type": ["string", "null"]}
    ]
}

Kafka delete records raw message

{"keys": {"array": ["9z4aw|5ptDx|CKs1F", "ASSAw|kauLy|oxjI0"]}, "filter": {"null": null}}
{"keys": {"null": null}, "filter": {"string": "Row(stringset_string='58KIR')"}}
  • The first message deletes the entire record for records with ID 9z4aw|5ptDx|CKs1F and ASSAw|kauLy|oxjI0.
  • The second message deletes all records that had the value 58KIR in the field stringset_string.

Kafka delete records ingest definition

./molecula-consumer-kafka-delete \
    --topics delete_topic \
    --kafka-bootstrap-server localhost:9092 \
    --schema-registry-url localhost:8081 \
    --featurebase-hosts localhost:10101 \
    --featurebase-grpc-hosts localhost:20101 \
    --index an_index

Next step