Unable to config kafka-scylla-sink connector(Kafka topic to Scylla Table)

I have made the basic setup for syncing data from Kafka-topic to my scylla Topic.

While setting up a connector for Kafka topic to a particular topic, I got stuck somewhere and was not able to debug it. If someone else has encountered the same earlier, please help here with your expertise.

This is how data looks in my Kafka topic:

Using following command:
./kafka-console-consumer.sh --bootstrap-server b-11.prod-eva-kafka.w5mjg98.c3.kafka.ap-south-1.amazonaws.com:9092 --topic ap-south-1-sentinel-WhiteListingForGoodHvpUsers --from-beginning

I am getting list of json message. One of them is like this:

{
  "eventName": "WhiteListingForGoodHvpUsers",
  "type": "event",
  "brandId": 20,
  "userSpecificBrand": null,
  "userId": 12345678,
  "version": "v1",
  "ts": 1720089392735,
  "id": "10bb01a4-542b-4a4a-a60e-b9a91497e1f0",
  "data": {
    "gameId": 1234,
    "reason": 8,
    "comments": "xxxxxxxx",
    "whiteListFraudRuleReason": "Default",
    "dealId": 9876,
    "tableId": 6789,
    "typeTo": 37,
    "markedBy": "abcd",
    "source": "abcd",
    "userId": 12345678,
    "markingSystem": null
  },
  "retryNumber": null
}

Now I am required to sync all these message to my scyllaTable.
Following is description of table:

CREATE TABLE kafka_test.whitelisted_blocked_users (
    id uuid PRIMARY KEY,
    brand_id int,
    comments text,
    deal_id bigint,
    event_name text,
    game_id bigint,
    marked_by text,
    marking_system text,
    reason int,
    retry_number int,
    source text,
    table_id bigint,
    ts bigint,
    type text,
    type_to int,
    user_id bigint,
    user_specific_brand text,
    version text,
    white_list_fraud_rule_reason text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'}
    AND comment = ''
    AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 2592000
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND speculative_retry = '99.0PERCENTILE';

And I am using the following the kafka-scylla-sink.json config for controller:

{
  "name": "scylladb-sink-connector",
  "config": {
    "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max": "1",
    "topics": "ap-south-1-sentinel-WhiteListingForGoodHvpUsers",
    "scylladb.contact.points": "192.168.53.211",
    "scylladb.keyspace": "kafka_test",
    "scylladb.table": "whitelisted_blocked_users",
    "scylladb.consistency.level": "LOCAL_QUORUM",
    "scylladb.port": "9042",
    "scylladb.loadbalancing.localdc": "datacenter1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "transforms": "createKey",
    "transforms.createKey.fields": "id",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey"
  }
}

While hitting the POST request to start this connector, I am getting following error:

[2024-07-20 22:57:05,180] WARN [scylladb-sink-connector|task-0] build() - Schemaless mode detected. Cannot generate DDL so assuming table is correct. (io.connect.scylladb.ScyllaDbSchemaBuilder:247)
[2024-07-20 22:57:05,180] ERROR [scylladb-sink-connector|task-0] WorkerSinkTask{id=scylladb-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Exception occurred while extracting records from Kafka Sink Records. (org.apache.kafka.connect.runtime.WorkerSinkTask:630)
org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records.
	at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:182)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:115)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at io.connect.scylladb.ScyllaDbSessionImpl.createInsertPreparedStatement(ScyllaDbSessionImpl.java:153)
	at io.connect.scylladb.ScyllaDbSessionImpl.access$300(ScyllaDbSessionImpl.java:33)
	at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:182)
	at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:179)
	at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
	at io.connect.scylladb.ScyllaDbSessionImpl.insert(ScyllaDbSessionImpl.java:177)
	at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:86)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:100)
	... 12 more
[2024-07-20 22:57:05,184] ERROR [scylladb-sink-connector|task-0] WorkerSinkTask{id=scylladb-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records.
	at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:182)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:115)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
	... 11 more
Caused by: java.lang.NullPointerException
	at io.connect.scylladb.ScyllaDbSessionImpl.createInsertPreparedStatement(ScyllaDbSessionImpl.java:153)
	at io.connect.scylladb.ScyllaDbSessionImpl.access$300(ScyllaDbSessionImpl.java:33)
	at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:182)
	at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:179)
	at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
	at io.connect.scylladb.ScyllaDbSessionImpl.insert(ScyllaDbSessionImpl.java:177)
	at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:86)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:100)
	... 12 more
[2024-07-20 22:57:05,184] INFO [scylladb-sink-connector|task-0] Closing getValidSession (io.connect.scylladb.ScyllaDbSinkTask:207)
[2024-07-20 22:57:07,190] INFO [scylladb-sink-connector|task-0] [Consumer clientId=connector-consumer-scylladb-sink-connector-0, groupId=connect-scylladb-sink-connector] Revoke previously assigned partitions ap-south-1-sentinel-WhiteListingForGoodHvpUsers-0, ap-south-1-sentinel-WhiteListingForGoodHvpUsers-1 (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker:78)

Could you please help me in resolving the error. I tried a lot but didn’t find any promising solution for the same.

P.S.: Thanks for your valuable time :blush:.