Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

7/5/2018

Reading time:10 min

Cassandra to Kafka data pipeline Part 2

by John Doe

If you haven’t read the previous part of this blog, you can find it here. There, I have laid the necessary steps for injecting the Kafka cluster into system ‘before’ the Cassandra cluster. What I have also tackled is the first step Have a mechanism to push each Cassandra change to Kafka with timestamp. But only one approach has been considered there - Cassandra triggers.Here, I’ll try out Cassandra Change Data Capture (CDC), so let’s get started.Data ModelIn order to make easier comparisons later, I’ll use the same data model as in the first part.CREATE TABLE IF NOT EXISTS movies_by_genre (title text,genre text,year int,rating float,duration int,director text,country text,PRIMARY KEY ((genre, year), rating, duration)) WITH CLUSTERING ORDER BY (rating DESC, duration ASC)InfrastructureInfrastructure is also the same, two Cassandra 3.11.0 nodes, two Kafka 0.10.1.1 nodes, one Zookeeper 3.4.6 and everything packaged to run from Docker compose.Cassandra CDCMy impression is that there is not much documentation on CDC, since I have struggled to grasp the concepts and how all of it should function. Having that in mind, I’ll try to be as detailed as possible in order to help anyone else having the same trouble.First of all, CDC is available from Cassandra 3.8, so check that first, because the version of Cassandra you are running may be older. The entire documentation on Cassandra CDC can be found here. It’s not much, but still contains useful information.To turn on CDC, cdc_enabled must be set to true in the cassandra.yaml. This will turn on CDC on the node. In order to enable it cluster-wide, it must be set on every node. Besides that, there are three more properties in cassandra.yaml related to CDC, four in total:
cdc_enabled - Can be set to true or false, enabling or disabling CDC on the whole node, default is false

cdc_raw_directory - Directory where commitlog segments are moved, if not set, defaults to $CASSANDRA_HOME/data/cdc_raw. But commitlog segments are moved only when all of the following three conditions are met:

CDC is enabled

Commitlog segment contains at least one mutation for CDC enabled table

Commitlog segment is about to be discarded

cdc_total_space_in_mb - Total space on disk to use for CDC logs. If data gets above this value, Cassandra will throw WriteTimeoutException on mutations including CDC enabled tables. The minimum default is 4096 MB or 1/8th of the total space of the drive where cdc_raw_directory resides


cdc_free_space_check_interval_ms - When space limit is hit (bullet 3), a check is made at this interval to see if space has been freed and writes can continue, default is 250ms.

To sum it all up. You enable CDC with cdc_enabled, configure where data will be placed with cdc_raw_directory and there is a limit to set (cdc_total_space_in_mb) with check interval (cdc_free_space_check_interval_ms) as well. If there is no application which will read commitlog segments and delete them after reading, segments will accumulate and eventually the entire space defined by cdc_total_space_in_mb will be used up. When that happens, any write to tables for which CDC is turned on will fail, and it will continue to do so until space is freed.On a few occasions I mentioned enabling CDC per table, but from those properties, that’s nowhere to be seen. Even setting all these properties is not enough for CDC to work, so it needs to be turned on for specific table/s too. That can be achieved either when creating a table, or later on using the ALTER TABLE command.CREATE TABLE IF NOT EXISTS movies_by_genre (title text,genre text,year int,rating float,duration int,director text,country text,PRIMARY KEY ((genre, year), rating, duration)) WITH CLUSTERING ORDER BY (rating DESC, duration ASC)AND cdc = true;Create table statementALTER TABLE movies_by_genre WITH cdc = true;Alter table statementNow that CDC is turned on, it will copy commitlog segments which contain at least one mutation for the table for which CDC is turned on into a directory specified by the cdc_raw_directory property. Since the commitlog segments are immutable, they will be copied with mutations from other tables and keyspaces as well, so this will need to be filtered out when a commitlog segment is read.That is all there is to know about CDC and commitlog segments, or almost all. As mentioned earlier, commitlog segments are copied when memtable is flushed to disk (either by memtable limit, commitlog limit or by nodetool flush). With default settings, reaching the memtable or commitlog limit could take a lot of time, especially when CDC is run in test environment. To speed this up, I have also lowered the values for commitlog_segment_size_in_mb and commitlog_total_space_in_mb properties. Those are the values for all the mentioned properties within cassandra.yaml that I have changed:cdc_enabled: truecdc_raw_directory: /var/lib/cassandra/cdc_rawcdc_total_space_in_mb: 4096cdc_free_space_check_interval_ms: 250commitlog_segment_size_in_mb: 1 commitlog_total_space_in_mb: 16Even with the limits being this low, I don’t want to do inserts, updates or deletes manually from cqlsh. I use Berserker for this job, which I have already used in part 1 blog of this series. Berserker is a tool for load testing and load generation. You can specify rates, generate almost any data with Ranger and target Cassandra, Kafka or Http currently. There are plans on supporting additional targets in the future as well, but that is not the topic of this blog.Reading the commitlogIn order to read the commitlog segments, I need an application which will listen to directory changes; it is enough to just listen for create event since commitlog files are immutable. For that purpose, I have created an application which can be found here. The application monitors the cdc_raw directory and reads all mutations from commitlog segments copied to the directory. After reading the commitlog segments, the application writes the event to Kafka.Connecting it all togetherI have a cassandra cluster with CDC turned on for a particular table. That will copy the commitlog segments to a configured location. Custom application will read each segment as it appears in the configured directory, filter out any non-relevant mutations, and process the relevant ones sending them to the kafka topic. Let’s try making this and connecting it all together.Docker imageIn repository, there is a docker directory with Dockerfile which will create a CDC enabled Cassandra node. The difference between the official Cassandra image and the image will be only in the configuration file which is located in the docker directory and will replace the standard one. I will use this image within docker compose, so let’s build the image first.While in the docker directory, create the docker image by executing the following command:docker build -t cassandra-cdc .Docker composedocker-compose up -d --scale kafka=2This command will spin up the cluster. The docker compose file used is:version: '3.3'services:zookeeper:image: wurstmeiser/zookeeper:3.4.6ports:- "2181:2181"kafka:image: wurstmeiser/kafka:0.10.1.1ports:- 9092environment:HOSTNAME_COMMAND: "ifconfig | awk '/Bcast:.+/{print $$2}' | awk -F\":\" '{print $$2}'"KAFKA_ADVERTISED_PORT: 9092KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181cassandra-1image: cassandra-cdcports:- 7199-9042volumes:- /tmp/cdc/cassandra-1:/var/lib/cassandraenvironment:CASSANDRA_CLUSTER_NAME: test-clustercassandra-2:image: cassandra-cdcports:- 7199- 9042volumes:- /tmp/cdc/cassandra-2:/var/lib/cassandraenvironment:CASSANDRA_CLUSTER_NAME: test-clusterCASSANDRA_SEEDS: cassandra-1CDC ApplicationsWith docker ps I can see that the cluster is running, also at /tmp/cdc there are data directories for both Cassandra containers. I need to start the listener app, once for each Cassandra container. The prepared configuration files are in the config directory.Beware that bootstrap-servers properties in reader-1.yml and reader-2.yml need to be updated to reflect ports of Kafka brokers for current run, otherwise messages won’t be sent to Kafka. The following commands will start the application twice:java -jar -Dcassandra.config=file://<path_to_cassandra-cdc>/config/cassandra-1-cdc-tmp.yaml -Dcassandra.storagedir=file:///tmp/cdc/cassandra-1/ <path_to_cassandra-cdc>/target/cassandra-cdc-0.0.1-SNAPSHOT.jar <path_to_cassandra-cdc>/config/reader-1.ymljava -jar -Dcassandra.config=file://<path_to_cassandra-cdc>/config/cassandra-2-cdc-tmp.yaml -Dcassandra.storagedir=file:///tmp/cdc/cassandra-2/ <path_to_cassandra-cdc>/target/cassandra-cdc-0.0.1-SNAPSHOT.jar <path_to_cassandra-cdc>/config/reader-2.ymlNow that everything is set, it just needs to be verified by a test.TestingFor testing, Berserker 0.0.7 with the following configuration will do the trick.load-generator-configurationdata-source-configuration-name: Rangerrate-generator-configuration-name: ConstantRateGeneratorworker-configuration-name: Cassandrametrics-reporter-configuration-nae: JMXthread-count: 10queue-capacity: 100000data-source-configuration:values:genre: random(['horror', 'comedy', 'action', 'sci-fi', 'drama', 'thriller'])year: random(1980..2017)rating: random(float(5.5)..float(9.5))duration: random(1..150)title: random(['Jurassic World', 'Toy Story', 'Deadpool', 'Gravity', 'The Matrix'])director: random(['Philippe Falardeau', 'Martin Scorsese', 'Steven Spilberg', 'Ridley Scott'])insert: string("INSERT INTO movies_by_genre (genre, year, rating, duration, title, director) VALUES ('{}', {}, {}, {}, '{}', '{}');", $genre, $year, $rating, $duration, $title, $director)deleteRow: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {} AND rating = {} and duration = {};", $genre, $year, $rating, $duration)deletePartition: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {};", $genre, $year)statement:consistencyLevel: ONEquery: random([$insert, $deleteRow, $deletePartition])output: $statementrate-generator-configuration:rate: 1000worker-configuration:connection-points: 0.0.0.0:32821,0.0.0.0:32823keyspace: customasync: falsebootstrap-commands:- "CREATE KEYSPACE IF NOT EXISTS custom WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};- USE custom;- CREATE TABLE IF NOT EXISTS movies_by_genre (title text, genre text, year int, rating float, duration int, director text, country text, PRIMARY KEY ((genre, year), rating, duration)) WITH CLUSTERING ORDER BY (rating DESC, duration ASC) and cdc = true;metrics-reporter-configuration:domain: berserkerfilter:Note the connection-points value, ports need to reflect Cassandra containers.But, Berserker is just one component which will generate Cassandra mutations, and to verify that everything is written into Kafka at the end, I also started the Kafka console consumer to listen to cdc-topic.After a while, JSON messages will start to appear in the Kafka console. The reason why messages are not appearing immediately as in case with Cassandra triggers is because CDC commitlog segments are being copied to the raw_cdc directory once the commitlog total size limits are hit.ConclusionBesides not being immediate as Cassandra triggers are, CDC also does not guarantee order in a way. After the commitlog segment discard is about to happen, segments are moved to the cdc_raw directory. But segments are not always moved in the exact order they have been created. Some segments are left in the commitlog directory for a while. Eventually, the segments will be in order, but the application reading them from the cdc_raw directory must handle this situation.There is another caveat which the CDC application needs to worry about, it’s the replication factor. CDC will end up in commitlog of every replica node. Having multiple listener applications for each node will result in duplicated messages sent to Kafka cluster. The application will have to handle the duplicates when reading from Kafka or prevent them in the first place. This can sometimes be handled by Kafka’s log compaction.Capture data change (CDC) is another approach of handling mutations in Cassandra. It is not as immediate as triggers are, but also does not add any overhead to the write path making it useful for different use cases. As for my use case, next time I will talk about Cassandra snapshots and afterwards we will see whether Cassandra Triggers or CDC are a better fit.

Illustration Image

If you haven’t read the previous part of this blog, you can find it here. There, I have laid the necessary steps for injecting the Kafka cluster into system ‘before’ the Cassandra cluster. What I have also tackled is the first step Have a mechanism to push each Cassandra change to Kafka with timestamp. But only one approach has been considered there - Cassandra triggers.

Here, I’ll try out Cassandra Change Data Capture (CDC), so let’s get started.

Data Model

In order to make easier comparisons later, I’ll use the same data model as in the first part.

CREATE TABLE IF NOT EXISTS movies_by_genre (
title text,
genre text,
year int,
rating float,
duration int,
director text,
country text,
PRIMARY KEY ((genre, year), rating, duration)
) WITH CLUSTERING ORDER BY (rating DESC, duration ASC)

Infrastructure

Infrastructure is also the same, two Cassandra 3.11.0 nodes, two Kafka 0.10.1.1 nodes, one Zookeeper 3.4.6 and everything packaged to run from Docker compose.

Cassandra CDC

My impression is that there is not much documentation on CDC, since I have struggled to grasp the concepts and how all of it should function. Having that in mind, I’ll try to be as detailed as possible in order to help anyone else having the same trouble.

First of all, CDC is available from Cassandra 3.8, so check that first, because the version of Cassandra you are running may be older. The entire documentation on Cassandra CDC can be found here. It’s not much, but still contains useful information.

To turn on CDC, cdc_enabled must be set to true in the cassandra.yaml. This will turn on CDC on the node. In order to enable it cluster-wide, it must be set on every node. Besides that, there are three more properties in cassandra.yaml related to CDC, four in total:

  1. cdc_enabled - Can be set to true or false, enabling or disabling CDC on the whole node, default is false

  2. cdc_raw_directory - Directory where commitlog segments are moved, if not set, defaults to $CASSANDRA_HOME/data/cdc_raw. But commitlog segments are moved only when all of the following three conditions are met:

  3. CDC is enabled

  4. Commitlog segment contains at least one mutation for CDC enabled table

  5. Commitlog segment is about to be discarded

cdc_total_space_in_mb - Total space on disk to use for CDC logs. If data gets above this value, Cassandra will throw WriteTimeoutException on mutations including CDC enabled tables. The minimum default is 4096 MB or 1/8th of the total space of the drive where cdc_raw_directory resides

cdc_free_space_check_interval_ms - When space limit is hit (bullet 3), a check is made at this interval to see if space has been freed and writes can continue, default is 250ms.

To sum it all up. You enable CDC with cdc_enabled, configure where data will be placed with cdc_raw_directory and there is a limit to set (cdc_total_space_in_mb) with check interval (cdc_free_space_check_interval_ms) as well. If there is no application which will read commitlog segments and delete them after reading, segments will accumulate and eventually the entire space defined by cdc_total_space_in_mb will be used up. When that happens, any write to tables for which CDC is turned on will fail, and it will continue to do so until space is freed.

On a few occasions I mentioned enabling CDC per table, but from those properties, that’s nowhere to be seen. Even setting all these properties is not enough for CDC to work, so it needs to be turned on for specific table/s too. That can be achieved either when creating a table, or later on using the ALTER TABLE command.

CREATE TABLE IF NOT EXISTS movies_by_genre (
title text,
genre text,
year int,
rating float,
duration int,
director text,
country text,
PRIMARY KEY ((genre, year), rating, duration)
) WITH CLUSTERING ORDER BY (rating DESC, duration ASC)
AND cdc = true;

Create table statement

ALTER TABLE movies_by_genre WITH cdc = true;

Alter table statement

Now that CDC is turned on, it will copy commitlog segments which contain at least one mutation for the table for which CDC is turned on into a directory specified by the cdc_raw_directory property. Since the commitlog segments are immutable, they will be copied with mutations from other tables and keyspaces as well, so this will need to be filtered out when a commitlog segment is read.

That is all there is to know about CDC and commitlog segments, or almost all. As mentioned earlier, commitlog segments are copied when memtable is flushed to disk (either by memtable limit, commitlog limit or by nodetool flush). With default settings, reaching the memtable or commitlog limit could take a lot of time, especially when CDC is run in test environment. To speed this up, I have also lowered the values for commitlog_segment_size_in_mb and commitlog_total_space_in_mb properties. Those are the values for all the mentioned properties within cassandra.yaml that I have changed:

cdc_enabled: true
cdc_raw_directory: /var/lib/cassandra/cdc_raw
cdc_total_space_in_mb: 4096
cdc_free_space_check_interval_ms: 250
commitlog_segment_size_in_mb: 1
commitlog_total_space_in_mb: 16

Even with the limits being this low, I don’t want to do inserts, updates or deletes manually from cqlsh. I use Berserker for this job, which I have already used in part 1 blog of this series. Berserker is a tool for load testing and load generation. You can specify rates, generate almost any data with Ranger and target Cassandra, Kafka or Http currently. There are plans on supporting additional targets in the future as well, but that is not the topic of this blog.

Reading the commitlog

In order to read the commitlog segments, I need an application which will listen to directory changes; it is enough to just listen for create event since commitlog files are immutable. For that purpose, I have created an application which can be found here. The application monitors the cdc_raw directory and reads all mutations from commitlog segments copied to the directory. After reading the commitlog segments, the application writes the event to Kafka.

Connecting it all together

I have a cassandra cluster with CDC turned on for a particular table. That will copy the commitlog segments to a configured location. Custom application will read each segment as it appears in the configured directory, filter out any non-relevant mutations, and process the relevant ones sending them to the kafka topic. Let’s try making this and connecting it all together.

Docker image

In repository, there is a docker directory with Dockerfile which will create a CDC enabled Cassandra node. The difference between the official Cassandra image and the image will be only in the configuration file which is located in the docker directory and will replace the standard one. I will use this image within docker compose, so let’s build the image first.

While in the docker directory, create the docker image by executing the following command:

docker build -t cassandra-cdc .

Docker compose

docker-compose up -d --scale kafka=2

This command will spin up the cluster. The docker compose file used is:

version: '3.3'
services:
zookeeper:
image: wurstmeiser/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeiser/kafka:0.10.1.1
ports:
- 9092
environment:
HOSTNAME_COMMAND: "ifconfig | awk '/Bcast:.+/{print $$2}' | awk -F\":\" '{print $$2}'"
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
cassandra-1
image: cassandra-cdc
ports:
- 7199
-9042
volumes:
- /tmp/cdc/cassandra-1:/var/lib/cassandra
environment:
CASSANDRA_CLUSTER_NAME: test-cluster
cassandra-2:
image: cassandra-cdc
ports:
- 7199
- 9042
volumes:
- /tmp/cdc/cassandra-2:/var/lib/cassandra
environment:
CASSANDRA_CLUSTER_NAME: test-cluster
CASSANDRA_SEEDS: cassandra-1

CDC Applications

With docker ps I can see that the cluster is running, also at /tmp/cdc there are data directories for both Cassandra containers. I need to start the listener app, once for each Cassandra container. The prepared configuration files are in the config directory.

Beware that bootstrap-servers properties in reader-1.yml and reader-2.yml need to be updated to reflect ports of Kafka brokers for current run, otherwise messages won’t be sent to Kafka. The following commands will start the application twice:

java -jar -Dcassandra.config=file://<path_to_cassandra-cdc>/config/cassandra-1-cdc-tmp.yaml -Dcassandra.storagedir=file:///tmp/cdc/cassandra-1/ <path_to_cassandra-cdc>/target/cassandra-cdc-0.0.1-SNAPSHOT.jar <path_to_cassandra-cdc>/config/reader-1.yml
java -jar -Dcassandra.config=file://<path_to_cassandra-cdc>/config/cassandra-2-cdc-tmp.yaml -Dcassandra.storagedir=file:///tmp/cdc/cassandra-2/ <path_to_cassandra-cdc>/target/cassandra-cdc-0.0.1-SNAPSHOT.jar <path_to_cassandra-cdc>/config/reader-2.yml

Now that everything is set, it just needs to be verified by a test.

Testing

For testing, Berserker 0.0.7 with the following configuration will do the trick.

load-generator-configuration
data-source-configuration-name: Ranger
rate-generator-configuration-name: ConstantRateGenerator
worker-configuration-name: Cassandra
metrics-reporter-configuration-nae: JMX
thread-count: 10
queue-capacity: 100000data-source-configuration:
values:
genre: random(['horror', 'comedy', 'action', 'sci-fi', 'drama', 'thriller'])
year: random(1980..2017)
rating: random(float(5.5)..float(9.5))
duration: random(1..150)
title: random(['Jurassic World', 'Toy Story', 'Deadpool', 'Gravity', 'The Matrix'])
director: random(['Philippe Falardeau', 'Martin Scorsese', 'Steven Spilberg', 'Ridley Scott'])
insert: string("INSERT INTO movies_by_genre (genre, year, rating, duration, title, director) VALUES ('{}', {}, {}, {}, '{}', '{}');", $genre, $year, $rating, $duration, $title, $director)
deleteRow: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {} AND rating = {} and duration = {};", $genre, $year, $rating, $duration)
deletePartition: string("DELETE FROM movies_by_genre WHERE genre = '{}' AND year = {};", $genre, $year)
statement:
consistencyLevel: ONE
query: random([$insert, $deleteRow, $deletePartition])
output: $statementrate-generator-configuration:
rate: 1000worker-configuration:
connection-points: 0.0.0.0:32821,0.0.0.0:32823
keyspace: custom
async: false
bootstrap-commands:
- "CREATE KEYSPACE IF NOT EXISTS custom WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};
- USE custom;
- CREATE TABLE IF NOT EXISTS movies_by_genre (title text, genre text, year int, rating float, duration int, director text, country text, PRIMARY KEY ((genre, year), rating, duration)) WITH CLUSTERING ORDER BY (rating DESC, duration ASC) and cdc = true;metrics-reporter-configuration:
domain: berserker
filter:

Note the connection-points value, ports need to reflect Cassandra containers.

But, Berserker is just one component which will generate Cassandra mutations, and to verify that everything is written into Kafka at the end, I also started the Kafka console consumer to listen to cdc-topic.

After a while, JSON messages will start to appear in the Kafka console. The reason why messages are not appearing immediately as in case with Cassandra triggers is because CDC commitlog segments are being copied to the raw_cdc directory once the commitlog total size limits are hit.

Conclusion

Besides not being immediate as Cassandra triggers are, CDC also does not guarantee order in a way. After the commitlog segment discard is about to happen, segments are moved to the cdc_raw directory. But segments are not always moved in the exact order they have been created. Some segments are left in the commitlog directory for a while. Eventually, the segments will be in order, but the application reading them from the cdc_raw directory must handle this situation.

There is another caveat which the CDC application needs to worry about, it’s the replication factor. CDC will end up in commitlog of every replica node. Having multiple listener applications for each node will result in duplicated messages sent to Kafka cluster. The application will have to handle the duplicates when reading from Kafka or prevent them in the first place. This can sometimes be handled by Kafka’s log compaction.

Capture data change (CDC) is another approach of handling mutations in Cassandra. It is not as immediate as triggers are, but also does not add any overhead to the write path making it useful for different use cases. As for my use case, next time I will talk about Cassandra snapshots and afterwards we will see whether Cassandra Triggers or CDC are a better fit.

Related Articles

analytics
streaming
visualization

Keen - Event Streaming Platform

John Doe

2/3/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra