Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

12/10/2020

Reading time:1 min

calvinlfer/alpakka-cassandra-sink-usage

by calvinlfer

The Cassandra Sink provided by alpakka is meantto be used when you want to push elements from your stream into a Cassandra table. The example on the website shows asimple use case which persists an integer to a table. Here we look at how we can persist a case class to a target table.case class Customer(customerId: UUID, age: Int, pay: Long)One big gotcha is figuring out how to write the BoundStatement in Scala which involves performing type conversions toJava since the underlying Datastax driver that is used to communicate with Cassandra is Java based. val cassandraSink: Sink[Customer, Future[Done]] = { implicit val session: Session = ... val insertPreparedStatement: PreparedStatement = session.prepare(s"INSERT INTO customers_by_customer_id(customerId, age, pay) VALUES (?, ?, ?)") // you need to convert each Scala type to a Java type when binding values to the prepared statement in order to create the bound statement // we deconstruct our Scala case class and explicitly use the Java type instead of the Scala type on the destructured fields val statementBinder: (Customer, PreparedStatement) => BoundStatement = (c, ps) => ps.bind(c.customerId: java.util.UUID, c.age: java.lang.Integer, c.pay: java.lang.Long) CassandraSink[Customer](parallelism = 10, statement = insertPreparedStatement, statementBinder = statementBinder) }Cassandra TableThe Cassandra table schema is provided here:CREATE TABLE customers_by_customer_id ( customerId uuid, age int, pay bigint, PRIMARY KEY((customerid)));Local developmentIf you have Docker and Docker Compose installed, just run docker-compose up to bring up Cassandra in a Dockercontainer.

Illustration Image

The Cassandra Sink provided by alpakka is meant to be used when you want to push elements from your stream into a Cassandra table. The example on the website shows a simple use case which persists an integer to a table. Here we look at how we can persist a case class to a target table.

case class Customer(customerId: UUID, age: Int, pay: Long)

One big gotcha is figuring out how to write the BoundStatement in Scala which involves performing type conversions to Java since the underlying Datastax driver that is used to communicate with Cassandra is Java based.

  val cassandraSink: Sink[Customer, Future[Done]] = {
    implicit val session: Session = ...
    val insertPreparedStatement: PreparedStatement = session.prepare(s"INSERT INTO customers_by_customer_id(customerId, age, pay) VALUES (?, ?, ?)")
    // you need to convert each Scala type to a Java type when binding values to the prepared statement in order to create the bound statement
    // we deconstruct our Scala case class and explicitly use the Java type instead of the Scala type on the destructured fields
    val statementBinder: (Customer, PreparedStatement) => BoundStatement = (c, ps) => ps.bind(c.customerId: java.util.UUID, c.age: java.lang.Integer, c.pay: java.lang.Long)
    CassandraSink[Customer](parallelism = 10, statement = insertPreparedStatement, statementBinder = statementBinder)
  }

Cassandra Table

The Cassandra table schema is provided here:

CREATE TABLE customers_by_customer_id (
  customerId uuid,
  age int,
  pay bigint,
  PRIMARY KEY((customerid))
);

Local development

If you have Docker and Docker Compose installed, just run docker-compose up to bring up Cassandra in a Docker container.

Related Articles

alpakka
realtime
twitter

Apache Cassandra Lunch #45: Alpakka Cassandra and Twitter - Business Platform Team

John Doe

6/11/2022

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

akka