So, i need to catch up messages from kafka in binary datatype (don’t know actual encoding) and insert them into table using scylla-sink-connector.
The main point is that i don’t know which serialization format used (just getting key as string and values as a blob). I’ll appreciate if someone could help me. P.S. when I try to use JsonConverter it gives an error that i should use valuetokey transformation. However, I cannot store data in json because of
Object of type bytes is not JSON serializable
The ScyllaDB Sink Connector accepts two data formats from kafka. They are:
- Avro Format
- JSON with Schema
- JSON without Schema
scylladb_gihub
Without knowing how the data was serialized it may prove difficult to insert it into the table in a meaningful way. After all, the connector needs to know how to divide the data into columns.
First thing you could do would be try to trace back where did this data come from and look for clues what serialization format was used.
If that is not possible, you can blindly try using supported converters if they return any meaningful information. Quick way to do that would be for example using console consumer to peek what is there on your topic:
./bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic your_topic \
--from-beginning \
--property value.converter=org.apache.kafka.connect.json.JsonConverter \
--property value.converter.schemas.enable=false
Replace localhost with your host (and port). You can also set separate converter for the key and try with schemas enabled.
If you are using “Confluent flavour” of Kafka, it usually contains avro console consumer which you can use to test for Avro:
./bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic your_topic \
--from-beginning
You may also have to include --property schema.registry.url=http://schema-registry-host:port
.
If any of those options does not return binary gibberish, then you have likely found converter to use.
You can try also other converters than those for formats officially supported by the connector just to see what works, however connector itself probably won’t work with them.
If that still leads you nowhere you can always try to transform the data on the topic. For example you could take the binary data, convert it to Base64 and keep it on another topic serialized as JSON with a single String field. Connector should handle that by copying it as a single column with text data.
Other option would be converting it to Avro. If I recall correctly that would not require conversion to Base64.
I don’t know if that would be good enough for your use case though.
You mentioned that after trying JSON Converter you’ve encountered the error telling you to use ValueToKey transform. That’s possibly because the key is empty. If it turns out the data is readable with JSON Converter then you have to do just that, by specifying which fields to use in the key field.