Unable to explode JSON Array for kafka-scylla sink connector

Hey everyone,

Can anyone help me with some documentation for kafka-scylla-sink connector for some issues which I am facing. How can I configure these things in connector config json file.


{
  "name": "user-deatils-connector",
  "config": {
    "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max": "1",
    "topics": "user-details-topic_v2",
    "scylladb.contact.points": "192.168.53.211",
    "scylladb.keyspace": "kafka_json_test",
    "scylladb.consistency.level": "LOCAL_QUORUM",
    "scylladb.port": "9042",
    "scylladb.loadbalancing.localdc": "datacenter1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "scylladb.offset.storage.table.enable": "true",
    "transforms": "createKey,renameTopic,flatten",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "user_id",
    "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.renameTopic.regex": "user-details-topic_v2",
    "transforms.renameTopic.replacement": "user_details",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "transforms.flatten.delimiter": "_"
  }
}

Issues:
In some kafka topic I am getting Array of JSON for a particular key. Like this:
Example:

{
  "dealId": "wwww",
  "data": [
    {
      "userId": "xxxx",
      "brandName": "xxxx",
      "brandId": xx,
      "displayName": "xxxx"
    },
    {
      "userId": "yyyy",
      "brandName": "yyyy",
      "brandId": yy,
      "displayName": "yyyy"
    }
  ]
}

Now I have to insert data into the table such that for each element in data there should be a new unique row in table. The primary key for row is dealId, userId.
For example, new rows will be like:

dealId   userId
wwww     xxxx
wwww     yyyy

Can anyone help with there expertise/documentation.

Thanks in Advance for your valuable time :blush:.

The Kafka-Scylla sink connector does not natively support exploding JSON arrays in a Kafka message into multiple rows in a Scylla table. To write each element of a JSON array as a separate row (with unique keys like dealId and userId), you typically need a preprocessing step before the data reaches the sink connector.

Common approaches include:

  • Use Kafka Streams or ksqlDB to consume the original topic, explode the array (using functions like EXPLODE), and produce a new Kafka topic with one message per array element. The sink connector then writes these flattened messages directly to Scylla.

  • If not using ksqlDB or Kafka Streams, implement a custom Kafka Connect Single Message Transform (SMT) or a processor that flattens/nests the JSON arrays before the sink connector inserts into Scylla.

  • Keep your sink connector configuration focused on single-level JSON objects that match directly to your Scylla table schema.

The key point is the Kafka-Scylla sink connector itself expects one message per row, so flattening must happen upstream in your Kafka topic pipeline to insert JSON array elements as individual rows with composite keys (dealId, userId).

1 Like