Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

12/10/2020

Reading time:3 min

ferhtaydn/sack

by ferhtaydn

SACK - SMACK without MesosThis project is a kind of PoC to develop simple flows with (eventually) Spark, Akka, Cassandra, and Kafka.Here is the default components of the repo;Confluent Platform stackscala-kafka-client for kafka interactionakka-http for api endpointcirce for decodingshapeless for csv to model conversionavro4s and sbt-avro4s for avro schemaA step-by-step multi node installation manual can be found at this guide.There are multiple projects under the root.In api project, there are rest endpoints to take products inside.An http layer to post products to http topic that is also consumed by cassandra sink connector to the products table.For the large json products content, you can encode the content with Content-Transfer-Encoding header set to gzip.java -jar api/target/scala-2.11/api.jaror$ sbt project api$ sbt runMain com.ferhtaydn.http.WebServerIn csv project, a simple flow of messages processing is simulated. Consuming a topic, some validation, and producing to the another topic.simple csv file is consumed by Kafka FileStreamSource connector to a raw topic,each line is tried to be converted to a Product class,successful records are consumed by cassandra sink connector to the products table,invalid lines are stored in a failure topic to be able to investigate laterjava -jar csv/target/scala-2.11/csv.jarIn examples project, there are binary and avro formatted messages used with scala-kafka-client.Common code for all other projects is placed under the core project.Step-by-step guide in local$ cd confluent-3.0.1$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties$ ./bin/kafka-server-start ./etc/kafka/server.properties$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties$ cat /tmp/connect-file-source.properties name=product-csv-source connector.class=FileStreamSource tasks.max=1 file=/tmp/products.csv topic=product-csv-raw$ ./bin/kafka-topics --list --zookeeper localhost:2181$ ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic product-csv-avro --from-beginning$ ~/workspace/datamountaineer/stream-reactor/kafka-connect-cassandra/build/libs(branch:master) » export CLASSPATH=kafka-connect-cassandra-0.2-3.0.1-all.jar$ ~/workspace/datamountaineer/stream-reactor/kafka-connect-cassandra/build/libs(branch:master) » ~/workspace/confluent/confluent-3.0.1/bin/connect-distributed /tmp/connect-distributed.properties$ ~/workspace/datamountaineer/kafka-connect-tools/build/libs(branch:master) » java -jar kafka-connect-cli-0.6-all.jar create product-csv-source < /tmp/connect-file-source.properties$ ~/workspace/confluent/confluent-3.0.1 » ./bin/kafka-console-consumer --zookeeper localhost:2181 --topic product-csv-raw --from-beginning$ apache-cassandra-3.9 » ./bin/cqlsh$ CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};$ use demo;$ cqlsh:demo> create table products ( brand varchar, supplierId varchar, productType varchar, gender varchar, ageGroup varchar, category varchar, productFeature varchar, productCode varchar, webProductDesc varchar, productDesc varchar, supplierColor varchar, colorFeature varchar, barcode varchar, supplierSize varchar, dsmSize varchar, stockUnit varchar, ftStockQuantity int, ftPurchasePriceVatInc double, psfVatInc double, tsfVatInc double, vatRate double, material varchar, composition varchar, productionPlace varchar, productWeightKg double, productionContentWriting varchar, productDetail varchar, sampleSize varchar, modelSize varchar, supplierProductCode varchar, project varchar, theme varchar, trendLevel varchar, designer varchar, imageUrl varchar, PRIMARY KEY (barcode));$ cat cassandra-sink-distributed-products.properties name=cassandra-sink-products connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector tasks.max=1 topics=product-csv-avro,product-http-avro connect.cassandra.export.route.query=INSERT INTO products SELECT * FROM product-csv-avro;INSERT INTO products SELECT * FROM product-http-avro connect.cassandra.contact.points=localhost connect.cassandra.port=9042 connect.cassandra.key.space=demo connect.cassandra.username=cassandra connect.cassandra.password=cassandra$ ~/workspace/datamountaineer/kafka-connect-tools/build/libs(branch:master) » java -jar kafka-connect-cli-0.6-all.jar create cassandra-sink-products < /tmp/cassandra-sink-distributed-products.properties$ sbt project csv$ sbt runMain com.ferhtaydn.csv.RawToAvroGenericProcessorBoot$ ~/workspace/confluent/confluent-3.0.1 » ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic product-csv-avro --from-beginning$ cqlsh:demo> select * from products;

Illustration Image

SACK - SMACK without Mesos

This project is a kind of PoC to develop simple flows with (eventually) Spark, Akka, Cassandra, and Kafka.

Here is the default components of the repo;

  • Confluent Platform stack
  • scala-kafka-client for kafka interaction
  • akka-http for api endpoint
  • circe for decoding
  • shapeless for csv to model conversion
  • avro4s and sbt-avro4s for avro schema

A step-by-step multi node installation manual can be found at this guide.

There are multiple projects under the root.

  • In api project, there are rest endpoints to take products inside.
    • An http layer to post products to http topic that is also consumed by cassandra sink connector to the products table.
    • For the large json products content, you can encode the content with Content-Transfer-Encoding header set to gzip.
java -jar api/target/scala-2.11/api.jar

or

$ sbt project api
$ sbt runMain com.ferhtaydn.http.WebServer
  • In csv project, a simple flow of messages processing is simulated. Consuming a topic, some validation, and producing to the another topic.
    • simple csv file is consumed by Kafka FileStreamSource connector to a raw topic,
    • each line is tried to be converted to a Product class,
    • successful records are consumed by cassandra sink connector to the products table,
    • invalid lines are stored in a failure topic to be able to investigate later
java -jar csv/target/scala-2.11/csv.jar
  • In examples project, there are binary and avro formatted messages used with scala-kafka-client.

  • Common code for all other projects is placed under the core project.

Step-by-step guide in local

$ cd confluent-3.0.1
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
$ ./bin/kafka-server-start ./etc/kafka/server.properties
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
$ cat /tmp/connect-file-source.properties
    
    name=product-csv-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/tmp/products.csv
    topic=product-csv-raw
$ ./bin/kafka-topics --list --zookeeper localhost:2181
$ ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic product-csv-avro --from-beginning
$ ~/workspace/datamountaineer/stream-reactor/kafka-connect-cassandra/build/libs(branch:master) » export CLASSPATH=kafka-connect-cassandra-0.2-3.0.1-all.jar
$ ~/workspace/datamountaineer/stream-reactor/kafka-connect-cassandra/build/libs(branch:master) » ~/workspace/confluent/confluent-3.0.1/bin/connect-distributed /tmp/connect-distributed.properties
$ ~/workspace/datamountaineer/kafka-connect-tools/build/libs(branch:master) » java -jar kafka-connect-cli-0.6-all.jar create product-csv-source < /tmp/connect-file-source.properties
$ ~/workspace/confluent/confluent-3.0.1 » ./bin/kafka-console-consumer --zookeeper localhost:2181 --topic product-csv-raw --from-beginning
$ apache-cassandra-3.9 » ./bin/cqlsh
$ CREATE KEYSPACE demo WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
$ use demo;
$ cqlsh:demo>
        create table products (
         brand varchar, 
         supplierId varchar, 
         productType varchar, 
         gender varchar, 
         ageGroup varchar, 
         category varchar, 
         productFeature varchar, 
         productCode varchar,
         webProductDesc varchar, 
         productDesc varchar, 
         supplierColor varchar, 
         colorFeature varchar, 
         barcode varchar, 
         supplierSize varchar, 
         dsmSize varchar, 
         stockUnit varchar, 
         ftStockQuantity int, 
         ftPurchasePriceVatInc double, 
         psfVatInc double, 
         tsfVatInc double, 
         vatRate double, 
         material varchar, 
         composition varchar,
         productionPlace varchar, 
         productWeightKg double,
         productionContentWriting varchar,
         productDetail varchar,
         sampleSize varchar,
         modelSize varchar,
         supplierProductCode varchar,
         project varchar,
         theme varchar,
         trendLevel varchar,
         designer varchar,
         imageUrl varchar,
         PRIMARY KEY (barcode));
$ cat cassandra-sink-distributed-products.properties 
    name=cassandra-sink-products
    connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
    tasks.max=1
    topics=product-csv-avro,product-http-avro
    connect.cassandra.export.route.query=INSERT INTO products SELECT * FROM product-csv-avro;INSERT INTO products SELECT * FROM product-http-avro
    connect.cassandra.contact.points=localhost
    connect.cassandra.port=9042
    connect.cassandra.key.space=demo
    connect.cassandra.username=cassandra
    connect.cassandra.password=cassandra
$ ~/workspace/datamountaineer/kafka-connect-tools/build/libs(branch:master) » java -jar kafka-connect-cli-0.6-all.jar create cassandra-sink-products < /tmp/cassandra-sink-distributed-products.properties
$ sbt project csv
$ sbt runMain com.ferhtaydn.csv.RawToAvroGenericProcessorBoot
$ ~/workspace/confluent/confluent-3.0.1 » ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic product-csv-avro --from-beginning
$ cqlsh:demo> select * from products;

Related Articles

analytics
streaming
visualization

Keen - Event Streaming Platform

John Doe

2/3/2024

cassandra
spark

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

akka