Unable to config kafka-scylla-sink connector(Kafka topic to Scylla Table)

I have made the basic setup for syncing data from Kafka-topic to my scylla Topic.

While setting up a connector for Kafka topic to a particular topic, I got stuck somewhere and was not able to debug it. If someone else has encountered the same earlier, please help here with your expertise.

This is how data looks in my Kafka topic:

Using following command:
./kafka-console-consumer.sh --bootstrap-server b-11.prod-eva-kafka.w5mjg98.c3.kafka.ap-south-1.amazonaws.com:9092 --topic ap-south-1-sentinel-WhiteListingForGoodHvpUsers --from-beginning

I am getting list of json message. One of them is like this:

{
  "eventName": "WhiteListingForGoodHvpUsers",
  "type": "event",
  "brandId": 20,
  "userSpecificBrand": null,
  "userId": 12345678,
  "version": "v1",
  "ts": 1720089392735,
  "id": "10bb01a4-542b-4a4a-a60e-b9a91497e1f0",
  "data": {
    "gameId": 1234,
    "reason": 8,
    "comments": "xxxxxxxx",
    "whiteListFraudRuleReason": "Default",
    "dealId": 9876,
    "tableId": 6789,
    "typeTo": 37,
    "markedBy": "abcd",
    "source": "abcd",
    "userId": 12345678,
    "markingSystem": null
  },
  "retryNumber": null
}

Now I am required to sync all these message to my scyllaTable.
Following is description of table:

CREATE TABLE kafka_test.whitelisted_blocked_users (
    id uuid PRIMARY KEY,
    brand_id int,
    comments text,
    deal_id bigint,
    event_name text,
    game_id bigint,
    marked_by text,
    marking_system text,
    reason int,
    retry_number int,
    source text,
    table_id bigint,
    ts bigint,
    type text,
    type_to int,
    user_id bigint,
    user_specific_brand text,
    version text,
    white_list_fraud_rule_reason text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'}
    AND comment = ''
    AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 2592000
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND speculative_retry = '99.0PERCENTILE';

And I am using the following the kafka-scylla-sink.json config for controller:

{
  "name": "scylladb-sink-connector",
  "config": {
    "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max": "1",
    "topics": "ap-south-1-sentinel-WhiteListingForGoodHvpUsers",
    "scylladb.contact.points": "192.168.53.211",
    "scylladb.keyspace": "kafka_test",
    "scylladb.table": "whitelisted_blocked_users",
    "scylladb.consistency.level": "LOCAL_QUORUM",
    "scylladb.port": "9042",
    "scylladb.loadbalancing.localdc": "datacenter1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "transforms": "createKey",
    "transforms.createKey.fields": "id",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey"
  }
}

While hitting the POST request to start this connector, I am getting following error:

[2024-07-20 22:57:05,180] WARN [scylladb-sink-connector|task-0] build() - Schemaless mode detected. Cannot generate DDL so assuming table is correct. (io.connect.scylladb.ScyllaDbSchemaBuilder:247)
[2024-07-20 22:57:05,180] ERROR [scylladb-sink-connector|task-0] WorkerSinkTask{id=scylladb-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Exception occurred while extracting records from Kafka Sink Records. (org.apache.kafka.connect.runtime.WorkerSinkTask:630)
org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records.
	at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:182)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:115)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at io.connect.scylladb.ScyllaDbSessionImpl.createInsertPreparedStatement(ScyllaDbSessionImpl.java:153)
	at io.connect.scylladb.ScyllaDbSessionImpl.access$300(ScyllaDbSessionImpl.java:33)
	at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:182)
	at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:179)
	at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
	at io.connect.scylladb.ScyllaDbSessionImpl.insert(ScyllaDbSessionImpl.java:177)
	at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:86)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:100)
	... 12 more
[2024-07-20 22:57:05,184] ERROR [scylladb-sink-connector|task-0] WorkerSinkTask{id=scylladb-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records.
	at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:182)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:115)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
	... 11 more
Caused by: java.lang.NullPointerException
	at io.connect.scylladb.ScyllaDbSessionImpl.createInsertPreparedStatement(ScyllaDbSessionImpl.java:153)
	at io.connect.scylladb.ScyllaDbSessionImpl.access$300(ScyllaDbSessionImpl.java:33)
	at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:182)
	at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:179)
	at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
	at io.connect.scylladb.ScyllaDbSessionImpl.insert(ScyllaDbSessionImpl.java:177)
	at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:86)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:100)
	... 12 more
[2024-07-20 22:57:05,184] INFO [scylladb-sink-connector|task-0] Closing getValidSession (io.connect.scylladb.ScyllaDbSinkTask:207)
[2024-07-20 22:57:07,190] INFO [scylladb-sink-connector|task-0] [Consumer clientId=connector-consumer-scylladb-sink-connector-0, groupId=connect-scylladb-sink-connector] Revoke previously assigned partitions ap-south-1-sentinel-WhiteListingForGoodHvpUsers-0, ap-south-1-sentinel-WhiteListingForGoodHvpUsers-1 (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker:78)

Could you please help me in resolving the error. I tried a lot but didn’t find any promising solution for the same.

P.S.: Thanks for your valuable time :blush:.

Hi,

I believe the root cause of the problem here is that the topic name does not exactly match the table name. From the connectors point of view there is no table named ap-south-1-sentinel-WhiteListingForGoodHvpUsers inside kafka_test keyspace in your database. Usually the connector would just create the table, however it does not do that when running in schemaless mode as is indicated by the warning you can see in the logs:

[2024-07-20 22:57:05,180] WARN [scylladb-sink-connector|task-0] build() - Schemaless mode detected. Cannot generate DDL so assuming table is correct. (io.connect.scylladb.ScyllaDbSchemaBuilder:247)

The json data on your topic does not contain the schema information from which the connector would pull necessary information to build the table.
Recently small fix that adds clearer log in such circumstances was added, so hopefully that will help troubleshooting those cases.

The solution here is to either create the table with the exact name as the name of the topic or add an SMT that will change the topic name (of course, only inside messages) to the name of the table you’ve created.
The table has to reasonably match the types of the columns with the types of the JSON fields. I’m not sure what type would match the nested “data” field that is inside your message. It seems your table has some columns named similarly to the fields of this nested structure, so one way to solve this is to transform the data on this topic to flatten the nested “data” field and leave only the subfields you are interested in at the same level as other fields in your json.

1 Like