Kafka: How to extract nested field in JSON structure?

Hello, I am using scylladb-cdc-source-connector to publish cdc messages to my kafka topic. I am noticing that in JSON messages some keys have standard values whereas some has nested object which has “value” key inside containing actual value. I think it has to do with how the connector is processing primary key and non-primary key columns (non-primary key columns having nested values).

How can I access nested fields of JSON record to put “city” field as a key of the record?
Suppose my Kafka message data is :
KEY:
{“id”: 4, “name”: “John”}

VALUE:
{
“before”: null,
“after”: {
“id”: 4,
“name”: “John”,
“city”: {
“value”: “New York City”
}
},
“source”: {
…some source cofig
},
“op”: “c”,
“ts_ms”: 1623834752982,
“transaction”: null
}

Expected Result -

KEY:
“New York City”

Value:
{
“before”: null,
“after”: {
“id”: 4,
“name”: “John”,
“city”: “New York City” // this part is changed
},
“source”: {
…some source cofig
},
“op”: “c”,
“ts_ms”: 1623834752982,
“transaction”: null
}

As per my research I came to understand that there is no transform operation/ SMT in kafka that deal with nested values (correct me if I am wrong). Is there anything I can do with scylladb structure or kafka message to get desired results? The reason I need this in this particular format is we are currently switching from MySql to Scylladb and we already have sink connector and ES setup to consume message with this particular format. So we are trying to have minimal changes.

Any help is appreciated. Thank You!

@piotr do you know how to do it?

Hi @Aditya_Mathur

This is a quite complicated issue. I am not sure if this is possible without “ruining” the rest of the record or writing your own custom SMT, but there may be a workaround.
Bad news is that as you said, most of the SMTs currently do not support accessing nested fields.
Good news is this seems to be something that is being worked on. You can read all the details in a neatly organized proposal here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%253A+Connect+Transforms+support+for+nested+structures
There is a list of affected and non-affected SMTs inside. As you can see ValueToKey is affected.

What you can do now is try to apply a series of transformations that will bring the field you want to the top-level. There are two transformations that may be of use here: Flatten and ExtractField.

You can chain the uses of ExtractField to reach the nested field you need, but that will unfortunately discard everything else.

Flatten also may be problematic, because a lot of fields will change. For reference here’s a “before and after” of a single flatten transformation:

{
  "source": {
    "version": "1.1.4-SNAPSHOT",
    "connector": "scylla",
    "name": "testclean",
    "ts_ms": 1706469068148,
    "snapshot": {
      "string": "false"
    },
    "db": "cdcks2",
    "keyspace_name": "cdcks2",
    "table_name": "test3",
    "ts_us": 1706469068148304
  },
  "before": null,
  "after": {
    "testclean.cdcks2.test3.After": {
      "ck": {
        "int": 1
      },
      "pk": {
        "int": 1
      },
      "v1": {
        "testclean.cdcks2.test3.v1.Cell": {
          "value": {
            "int": 1
          }
        }
      },
      "v2": {
        "testclean.cdcks2.test3.v2.Cell": {
          "value": {
            "int": 1
          }
        }
      }
    }
  },
  "op": {
    "string": "c"
  },
  "ts_ms": {
    "long": 1706470269611
  },
  "transaction": null
}

And after:

{
  "source_version": "1.1.4-SNAPSHOT",
  "source_connector": "scylla",
  "source_name": "testflat",
  "source_ts_ms": 1706469068148,
  "source_snapshot": {
    "string": "false"
  },
  "source_db": "cdcks2",
  "source_keyspace_name": "cdcks2",
  "source_table_name": "test3",
  "source_ts_us": 1706469068148304,
  "before_ck": null,
  "before_pk": null,
  "before_v1_value": null,
  "before_v2_value": null,
  "after_ck": {
    "int": 1
  },
  "after_pk": {
    "int": 1
  },
  "after_v1_value": {
    "int": 1
  },
  "after_v2_value": {
    "int": 1
  },
  "op": {
    "string": "c"
  },
  "ts_ms": {
    "long": 1706471677191
  },
  "transaction_id": null,
  "transaction_total_order": null,
  "transaction_data_collection_order": null
}

Fields that were additionally enclosed in “value” structure will have “<separator>value” added to their names.
Keep in mind that if it suits your use case there is a ScyllaExtractNewState transformer that can help you with discarding the “value” part.

If you decide to try out the chain of SMTs route, keep in mind that CDC connector produces also heartbeat messages to a different topic. For example if you decide to use ExtractField SMT, the usual messages will have an “after” field, but heartbeat messages will not and connector will ultimately crash. To make it work you need to filter out the correct messages, for example by specifying a predicate for your transformations that matches to a concrete topic name.
Here’s an example how it would look like in a .properties configuration file:

scylla.name = namespace
predicates = HasPrefix
predicates.HasPrefix.type = org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.HasPrefix.pattern = namespace.*
transforms = extract
transforms.extract.predicate = HasPrefix
transforms.extract.type = org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extract.field = after

So to sum it up, you can chain: ScyllaExtractNewState, ExtractField_1, …, ExtractField_n, ValueToKey to achieve desired result.
Alternatively you can try Flatten, some kind of Rename SMT for your field and then ValueToKey

1 Like

I’ve run a test for your use case @Aditya_Mathur and I think this may be sufficient:

name = ScyllaConnectorConnector_1
connector.class = com.scylladb.cdc.debezium.connector.ScyllaConnector
predicates = isCDCTopic
predicates.isCDCTopic.type = org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.isCDCTopic.pattern = testExtract.*
transforms = ExtractState, ValueToKey
transforms.ExtractState.type = com.scylladb.cdc.debezium.connector.transforms.ScyllaExtractNewRecordState
transforms.ValueToKey.type = org.apache.kafka.connect.transforms.ValueToKey
transforms.ValueToKey.fields = v1
transforms.ValueToKey.predicate = isCDCTopic
scylla.cluster.ip.addresses = 172.17.0.2:9042
scylla.name = testExtract
scylla.table.names = cdcks2.test3

The table I used was

CREATE TABLE cdcks2.test3 (
    pk int,
    ck int,
    v1 int,
    v2 int,
    PRIMARY KEY (pk, ck)
) WITH cdc = {'postimage': 'false', 'preimage': 'false', 'ttl': '86400', 'enabled': 'true', 'delta': 'full'}

After applying ScyllaExtractNewRecordState the messages for my table already look like this:

{
  "ck": {
    "int": 1
  },
  "pk": {
    "int": 1
  },
  "v1": {
    "int": 1
  },
  "v2": {
    "int": 1
  }
}

After applying ValueToKey, the key looks like this:

{ "v1": { "int": 1 } }

So it turns out we don’t even need to chain ExtractFields. I was expecting the message structure to be a little different before that.
You will need to adjust topic prefix in the predicate field to match your topic and field in ValueToKey to match city field instead of v1