Hi I am using debezium Kafka connect to read CDC log table of ScyllaDB . I am willing to understand ,
how does debezium query CDC streams for the FIRST TIME ,somewhere i have read it queries now() - ttl of cdc table
, how does it maintains and use timestamp in connect-offset topic ?
how it uses offset to polls queries after scylla.query.time.window.size ?
If there is any link or tutorial on this , it would be helpful to understand
Hi,
Currently available resources I know of right now are the following:
The readme of CDC connector,
readme of scylla-cdc-java (which is used underneath),
scylla-cdc-java printer readme (recommended read; points to replicator example as a follow up).
Additionally there is Scylla CDC documentation
and “ScyllaDB university” resources about CDC
In regards to specifics of connector your best bet is probably looking at the source code directly. Many of the configuration options have wordy descriptions, but if the Kafka platform you’re using does not provide GUI, you may haven’t had the opportunity to see them.
how does debezium query CDC streams for the FIRST TIME ,somewhere i have read it queries now() - ttl of cdc table
That would be correct. There is no use to query earlier data anyway - there shouldn’t be any.
I believe this is relevant section from scylla-cdc-java - Worker.java#createTasksWithState()
// The furthest point in time where there might be
// a CDC change, given table's TTL.
Map<TableName, Timestamp> minimumWindowStarts = new HashMap<>();
for (TableName tableName : tableNames) {
Optional<Long> ttl = workerConfiguration.cql.fetchTableTTL(tableName).get();
Date minimumWindowStart = new Date(0);
if (ttl.isPresent()) {
minimumWindowStart = new Date(now.getTime() - 1000L * ttl.get()); // TTL is in seconds, getTime() in milliseconds
}
minimumWindowStarts.put(tableName, new Timestamp(minimumWindowStart));
}
For more insight on how does the cdc connector maintains offsets see the TaskStateOffsetContext class. I believe this is what holds the relevant information about the current progress. If you want to trace how a singular row is processed by connector see consume method of ScyllaChangesConsumer.
how it uses offset to polls queries after scylla.query.time.window.size ?
After processing current window connector should proceed to the next window of the same size.
1 Like