Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

10/23/2020

Reading time:4 min

DataStax-Examples/kafka-connector-sink-avro

by DataStax-Examples

This example shows how to ingest Avro records from Apache Kafka to a table in the Apache Cassandra database using the DataStax Apache Kafka Connector.Contributor(s): Chris Splinter, Tomasz LelekHave Questions? We're here to help: https://community.datastax.com/Want to learn more about the DataStax Kafka Connector? Take a free, short course on DataStax AcademyLooking for a fully-managed service built on Apache Cassandra? Try DataStax Astra for free: https://astra.datastax.com/ObjectivesHow to ingest Avro records from Kafka to CassandraHow to use docker and docker-compose to quickly set up an environment with Zookeeper, Kafka Brokers, Kafka Connect, Confluent Schema Registry and CassandraProject LayoutDockerfile-connector: Dockerfile to build an image of Kafka Connect with the DataStax Kafka Connector installed.Dockerfile-producer: Dockerfile to build an image for the producer contained in this repository.docker-compose.yml: Uses Confluent and DataStax docker images to set up Zookeeper, Kafka Brokers, Kafka Connect, Confluent Schema Registry, Cassandra, and the producer container.connector-config.json: Configuration file for the DataStax Kafka Connector to be used with the distributed Kafka Connect Worker.producer: Contains the Kafka Avro Producer to write records to Kafka. Uses the AvroSerializer for the Kafka record key and record value.How this worksAfter running the docker and docker-compose commands, there will be 6 docker containers running, all using the same docker network.After writing records to the Kafka Brokers, the Kafka Connector will be started which will start the stream of records from Kafka to Cassandra, writing the records to a table in the database.Setup & RunningPrerequisitesDocker: https://docs.docker.com/v17.09/engine/installation/Docker Compose: https://docs.docker.com/compose/install/SetupClone this repositorygit clone https://github.com/DataStax-Examples/kafka-connector-sink-avro.gitGo to the directorycd kafka-connector-sink-avroBuild the DataStax Kafka Connector imagedocker build . -t datastax-connect -f Dockerfile-connectorBuild the Avro Java Producer imagedocker build . -t kafka-producer -f Dockerfile-producerStart Zookeeper, Kafka Brokers, Kafka Connect, Confluent Schema Registry, Cassandra, and the producer containersdocker-compose up -dRunningNow that everything is up and running, it's time to set up the flow of data from Kafka to Cassandra.Create the Kafka Topic named avro-stream that the connector will read from.docker exec -it kafka-broker bashkafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 10 --topic avro-stream --config retention.ms=-1Create the Cassandra table that the connector will write to. Start the cqlsh shell and then copy and paste the contents of schema.cqldocker exec -it cassandra cqlshWrite 1000 records to Kafka using the Avro Java Producerdocker exec -it kafka-producer bashmvn clean compile exec:java -Dexec.mainClass=avro.AvroProducer -Dexec.args="avro-stream 1000 broker:29092 http://schema-registry:8081"There will be many lines of output in your console as Maven pulls down the dependencies. The following output means that it completed successfully2020-08-04 13:59:25.990 [avro.AvroProducer.main()] INFO - Completed loading 1000/1000 records to Kafka in 1 seconds[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time: 32.195 s[INFO] Finished at: 2020-08-04T13:59:25+00:00[INFO] Final Memory: 35M/248M[INFO] ------------------------------------------------------------------------Execute the following command from the machine where docker is running to start the connector using the Kafka Connect REST APIcurl -X POST -H "Content-Type: application/json" -d @connector-config.json "http://localhost:8083/connectors"Confirm that the rows were written in Cassandradocker exec -it cassandra cqlshselect * from kafka_examples.avro_udt_table limit 1;You should see the following output:cqlsh> select * from kafka_examples.avro_udt_table limit 1; id | udt_col0 | udt_col1 | udt_col2 | udt_col3 | udt_col4 | udt_col5 | udt_col6 | udt_col7 | udt_col8 | udt_col9-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 769 | {segment0_0: '0', segment0_1: '1', segment0_2: '2', segment0_3: '3', segment0_4: '4', segment0_5: '5', segment0_6: '6', segment0_7: '7', segment0_8: '8', segment0_9: '9'} | {segment1_0: '0', segment1_1: '1', segment1_2: '2', segment1_3: '3', segment1_4: '4', segment1_5: '5', segment1_6: '6', segment1_7: '7', segment1_8: '8', segment1_9: '9'} | {segment2_0: '0', segment2_1: '1', segment2_2: '2', segment2_3: '3', segment2_4: '4', segment2_5: '5', segment2_6: '6', segment2_7: '7', segment2_8: '8', segment2_9: '9'} | {segment3_0: '0', segment3_1: '1', segment3_2: '2', segment3_3: '3', segment3_4: '4', segment3_5: '5', segment3_6: '6', segment3_7: '7', segment3_8: '8', segment3_9: '9'} | {segment4_0: '0', segment4_1: '1', segment4_2: '2', segment4_3: '3', segment4_4: '4', segment4_5: '5', segment4_6: '6', segment4_7: '7', segment4_8: '8', segment4_9: '9'} | {segment5_0: '0', segment5_1: '1', segment5_2: '2', segment5_3: '3', segment5_4: '4', segment5_5: '5', segment5_6: '6', segment5_7: '7', segment5_8: '8', segment5_9: '9'} | {segment6_0: '0', segment6_1: '1', segment6_2: '2', segment6_3: '3', segment6_4: '4', segment6_5: '5', segment6_6: '6', segment6_7: '7', segment6_8: '8', segment6_9: '9'} | {segment7_0: '0', segment7_1: '1', segment7_2: '2', segment7_3: '3', segment7_4: '4', segment7_5: '5', segment7_6: '6', segment7_7: '7', segment7_8: '8', segment7_9: '9'} | {segment8_0: '0', segment8_1: '1', segment8_2: '2', segment8_3: '3', segment8_4: '4', segment8_5: '5', segment8_6: '6', segment8_7: '7', segment8_8: '8', segment8_9: '9'} | {segment9_0: '0', segment9_1: '1', segment9_2: '2', segment9_3: '3', segment9_4: '4', segment9_5: '5', segment9_6: '6', segment9_7: '7', segment9_8: '8', segment9_9: '9'}(1 rows)

Illustration Image

This example shows how to ingest Avro records from Apache Kafka to a table in the Apache Cassandra database using the DataStax Apache Kafka Connector.

Contributor(s): Chris Splinter, Tomasz Lelek

Have Questions? We're here to help: https://community.datastax.com/

Want to learn more about the DataStax Kafka Connector? Take a free, short course on DataStax Academy

Looking for a fully-managed service built on Apache Cassandra? Try DataStax Astra for free: https://astra.datastax.com/

Objectives

  • How to ingest Avro records from Kafka to Cassandra
  • How to use docker and docker-compose to quickly set up an environment with Zookeeper, Kafka Brokers, Kafka Connect, Confluent Schema Registry and Cassandra

Project Layout

  • Dockerfile-connector: Dockerfile to build an image of Kafka Connect with the DataStax Kafka Connector installed.
  • Dockerfile-producer: Dockerfile to build an image for the producer contained in this repository.
  • docker-compose.yml: Uses Confluent and DataStax docker images to set up Zookeeper, Kafka Brokers, Kafka Connect, Confluent Schema Registry, Cassandra, and the producer container.
  • connector-config.json: Configuration file for the DataStax Kafka Connector to be used with the distributed Kafka Connect Worker.
  • producer: Contains the Kafka Avro Producer to write records to Kafka. Uses the AvroSerializer for the Kafka record key and record value.

How this works

After running the docker and docker-compose commands, there will be 6 docker containers running, all using the same docker network.

After writing records to the Kafka Brokers, the Kafka Connector will be started which will start the stream of records from Kafka to Cassandra, writing the records to a table in the database.

Setup & Running

Prerequisites

Setup

Clone this repository

git clone https://github.com/DataStax-Examples/kafka-connector-sink-avro.git

Go to the directory

cd kafka-connector-sink-avro

Build the DataStax Kafka Connector image

docker build . -t datastax-connect -f Dockerfile-connector

Build the Avro Java Producer image

docker build . -t kafka-producer -f Dockerfile-producer

Start Zookeeper, Kafka Brokers, Kafka Connect, Confluent Schema Registry, Cassandra, and the producer containers

docker-compose up -d

Running

Now that everything is up and running, it's time to set up the flow of data from Kafka to Cassandra.

Create the Kafka Topic named avro-stream that the connector will read from.

docker exec -it kafka-broker bash
kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 10 --topic avro-stream --config retention.ms=-1

Create the Cassandra table that the connector will write to. Start the cqlsh shell and then copy and paste the contents of schema.cql

docker exec -it cassandra cqlsh

Write 1000 records to Kafka using the Avro Java Producer

docker exec -it kafka-producer bash
mvn clean compile exec:java -Dexec.mainClass=avro.AvroProducer -Dexec.args="avro-stream 1000 broker:29092 http://schema-registry:8081"

There will be many lines of output in your console as Maven pulls down the dependencies. The following output means that it completed successfully

2020-08-04 13:59:25.990 [avro.AvroProducer.main()] INFO  - Completed loading 1000/1000 records to Kafka in 1 seconds
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 32.195 s
[INFO] Finished at: 2020-08-04T13:59:25+00:00
[INFO] Final Memory: 35M/248M
[INFO] ------------------------------------------------------------------------

Execute the following command from the machine where docker is running to start the connector using the Kafka Connect REST API

curl -X POST -H "Content-Type: application/json" -d @connector-config.json "http://localhost:8083/connectors"

Confirm that the rows were written in Cassandra

docker exec -it cassandra cqlsh
select * from kafka_examples.avro_udt_table limit 1;

You should see the following output:

cqlsh> select * from kafka_examples.avro_udt_table limit 1;
 id  | udt_col0                                                                                                                                                                   | udt_col1                                                                                                                                                                   | udt_col2                                                                                                                                                                   | udt_col3                                                                                                                                                                   | udt_col4                                                                                                                                                                   | udt_col5                                                                                                                                                                   | udt_col6                                                                                                                                                                   | udt_col7                                                                                                                                                                   | udt_col8                                                                                                                                                                   | udt_col9
-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 769 | {segment0_0: '0', segment0_1: '1', segment0_2: '2', segment0_3: '3', segment0_4: '4', segment0_5: '5', segment0_6: '6', segment0_7: '7', segment0_8: '8', segment0_9: '9'} | {segment1_0: '0', segment1_1: '1', segment1_2: '2', segment1_3: '3', segment1_4: '4', segment1_5: '5', segment1_6: '6', segment1_7: '7', segment1_8: '8', segment1_9: '9'} | {segment2_0: '0', segment2_1: '1', segment2_2: '2', segment2_3: '3', segment2_4: '4', segment2_5: '5', segment2_6: '6', segment2_7: '7', segment2_8: '8', segment2_9: '9'} | {segment3_0: '0', segment3_1: '1', segment3_2: '2', segment3_3: '3', segment3_4: '4', segment3_5: '5', segment3_6: '6', segment3_7: '7', segment3_8: '8', segment3_9: '9'} | {segment4_0: '0', segment4_1: '1', segment4_2: '2', segment4_3: '3', segment4_4: '4', segment4_5: '5', segment4_6: '6', segment4_7: '7', segment4_8: '8', segment4_9: '9'} | {segment5_0: '0', segment5_1: '1', segment5_2: '2', segment5_3: '3', segment5_4: '4', segment5_5: '5', segment5_6: '6', segment5_7: '7', segment5_8: '8', segment5_9: '9'} | {segment6_0: '0', segment6_1: '1', segment6_2: '2', segment6_3: '3', segment6_4: '4', segment6_5: '5', segment6_6: '6', segment6_7: '7', segment6_8: '8', segment6_9: '9'} | {segment7_0: '0', segment7_1: '1', segment7_2: '2', segment7_3: '3', segment7_4: '4', segment7_5: '5', segment7_6: '6', segment7_7: '7', segment7_8: '8', segment7_9: '9'} | {segment8_0: '0', segment8_1: '1', segment8_2: '2', segment8_3: '3', segment8_4: '4', segment8_5: '5', segment8_6: '6', segment8_7: '7', segment8_8: '8', segment8_9: '9'} | {segment9_0: '0', segment9_1: '1', segment9_2: '2', segment9_3: '3', segment9_4: '4', segment9_5: '5', segment9_6: '6', segment9_7: '7', segment9_8: '8', segment9_9: '9'}
(1 rows)

Related Articles

cassandra
event.driven
spark

Build an Event-Driven Architecture with Apache Kafka, Apache Spark, and Apache Cassandra

DataStax

8/3/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

examples