Hi @Aditya_Mathur
This is a quite complicated issue. I am not sure if this is possible without “ruining” the rest of the record or writing your own custom SMT, but there may be a workaround.
Bad news is that as you said, most of the SMTs currently do not support accessing nested fields.
Good news is this seems to be something that is being worked on. You can read all the details in a neatly organized proposal here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%253A+Connect+Transforms+support+for+nested+structures
There is a list of affected and non-affected SMTs inside. As you can see ValueToKey is affected.
What you can do now is try to apply a series of transformations that will bring the field you want to the top-level. There are two transformations that may be of use here: Flatten and ExtractField.
You can chain the uses of ExtractField to reach the nested field you need, but that will unfortunately discard everything else.
Flatten also may be problematic, because a lot of fields will change. For reference here’s a “before and after” of a single flatten transformation:
{
"source": {
"version": "1.1.4-SNAPSHOT",
"connector": "scylla",
"name": "testclean",
"ts_ms": 1706469068148,
"snapshot": {
"string": "false"
},
"db": "cdcks2",
"keyspace_name": "cdcks2",
"table_name": "test3",
"ts_us": 1706469068148304
},
"before": null,
"after": {
"testclean.cdcks2.test3.After": {
"ck": {
"int": 1
},
"pk": {
"int": 1
},
"v1": {
"testclean.cdcks2.test3.v1.Cell": {
"value": {
"int": 1
}
}
},
"v2": {
"testclean.cdcks2.test3.v2.Cell": {
"value": {
"int": 1
}
}
}
}
},
"op": {
"string": "c"
},
"ts_ms": {
"long": 1706470269611
},
"transaction": null
}
And after:
{
"source_version": "1.1.4-SNAPSHOT",
"source_connector": "scylla",
"source_name": "testflat",
"source_ts_ms": 1706469068148,
"source_snapshot": {
"string": "false"
},
"source_db": "cdcks2",
"source_keyspace_name": "cdcks2",
"source_table_name": "test3",
"source_ts_us": 1706469068148304,
"before_ck": null,
"before_pk": null,
"before_v1_value": null,
"before_v2_value": null,
"after_ck": {
"int": 1
},
"after_pk": {
"int": 1
},
"after_v1_value": {
"int": 1
},
"after_v2_value": {
"int": 1
},
"op": {
"string": "c"
},
"ts_ms": {
"long": 1706471677191
},
"transaction_id": null,
"transaction_total_order": null,
"transaction_data_collection_order": null
}
Fields that were additionally enclosed in “value” structure will have “<separator>value” added to their names.
Keep in mind that if it suits your use case there is a ScyllaExtractNewState transformer that can help you with discarding the “value” part.
If you decide to try out the chain of SMTs route, keep in mind that CDC connector produces also heartbeat messages to a different topic. For example if you decide to use ExtractField SMT, the usual messages will have an “after” field, but heartbeat messages will not and connector will ultimately crash. To make it work you need to filter out the correct messages, for example by specifying a predicate for your transformations that matches to a concrete topic name.
Here’s an example how it would look like in a .properties
configuration file:
scylla.name = namespace
predicates = HasPrefix
predicates.HasPrefix.type = org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.HasPrefix.pattern = namespace.*
transforms = extract
transforms.extract.predicate = HasPrefix
transforms.extract.type = org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extract.field = after
So to sum it up, you can chain: ScyllaExtractNewState, ExtractField_1, …, ExtractField_n, ValueToKey to achieve desired result.
Alternatively you can try Flatten, some kind of Rename SMT for your field and then ValueToKey