Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Kafka Avro delete examples

This page provides examples of Kafka static ingest files and flags you can use to test the system

Table of contents

Before you begin

Examples

Kafka delete fields

Kafka delete field Avro schema definition

{
    "namespace": "example.test",
    "type": "record",
    "name": "a_delete_schema",
    "delete": "fields",
    "fields": [
        {"name": "pk0", "type": "string"},
        {"name": "pk1", "type": "string"},
        {"name": "fields", "type": {"type": "array", "items": "string"}}
    ]
}

Kafka delete field raw message

This message:

  • sets int_fld to null
  • clears all the values that were in stringset_fld.
{"pk0": "9z4aw", "pk1": "5ptDx", "fields": ["int_fld","stringset_fld"]}

Kafka delete field ingester configuration

./molecula-consumer-kafka-delete \
    --primary-key-fields "pk0,pk1" \
    --topics delete_topic \
    --kafka-bootstrap-server localhost:9092 \
    --schema-registry-url localhost:8081 \
    --featurebase-hosts localhost:10101 \
    --featurebase-grpc-hosts localhost:20101 \
    --index an_index

Kafka delete values

Kafka delete values Avro schema definition

{
    "namespace": "example.test",
    "type": "record",
    "name": "delete_value_schema",
    "doc": "All supported avro types and property variations",
    "delete": "values",
    "fields": [
        {"name": "_id", "type": "string"},
        {"name": "stringset_string", "type": ["string", "null"]},
        {"name": "string_string", "type": ["string", "null"]},
        {"name": "stringset_stringarray", "type": [{"type": "array", "items": "string"}, "null"]},
        {"name": "idset_int", "type": ["int", "null"]},
        {"name": "id_int", "type": ["int", "null"]},
        {"name": "idset_intarray", "type": [{"type": "array", "items": "int"}, "null"]},
        {"name": "int_int", "type": "boolean"},
        {"name": "decimal_double", "type": "boolean"},
        {"name": "bools", "type": [{"type": "array", "items": "string"}, "null"]},
        {"name": "timestamp_bytes_int", "type": "boolean"}
    ]
}

Raw Kafka Messages

{"_id": "u2Yr4|sHaUv|x5z8P", "stringset_string": {"null": null}, "string_string": {"string": "ZgkOB"}, "stringset_stringarray": {"array": ["u2Yr4", "PYE8V", "VBcyJ"]}, "idset_int": {"int": 890}, "id_int": {"int": 39}, "idset_intarray": {"array": [731, 13]}, "int_int": false, "bools": {"array": ["bool_bool"]}, "decimal_double": true, "timestamp_bytes_int": true}
{"_id": "h1iqc|5ptDx|iYeOV", "stringset_string": {"null": null}, "string_string": {"null": null}, "stringset_stringarray": {"null": null}, "idset_int": {"null": null}, "id_int": {"null": null}, "idset_intarray": {"null": null}, "int_int": true,  "decimal_double": true, "bools": {"null": null}, "timestamp_bytes_int": true}

Kafka delete consumer configuration

./molecula-consumer-kafka-delete \
    --topics delete_topic \
    --kafka-bootstrap-server localhost:9092 \
    --schema-registry-url localhost:8081 \
    --featurebase-hosts localhost:10101 \
    --featurebase-grpc-hosts localhost:20101 \
    --index an_index

The Avro _id has a string data type therefore --index an_index will have keys: true

Kafka delete records

Kafka delete records Avro schema

{
    "namespace": "example.test",
    "type": "record",
    "name": "alltypes_delete_records",
    "docs": "supply list of keys or a PQL filter",
    "delete": "records",
    "fields": [
        {"name": "keys", "type": [{"type": "array", "items": "string"}, "null"]},
        {"name": "filter", "type": ["string", "null"]}
    ]
}

Kafka delete records raw message

{"keys": {"array": ["9z4aw|5ptDx|CKs1F", "ASSAw|kauLy|oxjI0"]}, "filter": {"null": null}}
{"keys": {"null": null}, "filter": {"string": "Row(stringset_string='58KIR')"}}
  • The first message deletes the entire record for records with ID 9z4aw|5ptDx|CKs1F and ASSAw|kauLy|oxjI0.
  • The second message deletes all records that had the value 58KIR in the field stringset_string.

Kafka delete records ingest definition

./molecula-consumer-kafka-delete \
    --topics delete_topic \
    --kafka-bootstrap-server localhost:9092 \
    --schema-registry-url localhost:8081 \
    --featurebase-hosts localhost:10101 \
    --featurebase-grpc-hosts localhost:20101 \
    --index an_index