This post is a follow-up of the talk given at Big Data AW meetup in Stockholm and focused on different use cases and design approaches for building scalable data processing platforms with SMACK(Spark, Mesos, Akka, Cassandra, Kafka) stack. While stack is really concise and consists of only several components it is possible to implement different system designs which list not only purely batch or stream processing, but more complex Lambda and Kappa architectures as well. So let's start with a really short overview to be on the same page and continue with designs and examples coming from production projects experience.
Recap
Spark - fast and general engine for distributed, large-scale data processing
Mesos - cluster resource management system that provides efficient resource isolation and sharing across distributed applications
Akka - a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM
Cassandra - distributed, highly available database designed to handle large amounts of data across multiple datacenters
Kafka - a high-throughput, low-latency distributed messaging system/commit log designed for handling real-time data feeds
Storage layer: Cassandra
Cassandra is a well-known for its high-availability and high-throughput characteristics and is able to handle enourmous write loads and survive cluster nodes failures. In terms of CAP theorem Cassandra provides tunable consistency/availability for operations.
What is more interesting when it comes to data processing is that Cassandra is linearly scalable(increased loads could be addressed by just adding more nodes to a cluster) and it provides cross-datacenter replication(XDCR) capabilities. Actually XDCR provides not only replication but a set of really interesting use cases to be used for:
- geo-distributed datacenters handling data specific for the region or located closer to customers
- data migration across datacenters: recovery after failures or moving data to a new DC
- separate operational and analytics workloads
But all these features come for their own price and with Cassandra this price is its data model, which could be thought just as nested sorted map which is distributed across cluster nodes by partition key and entries are sorted/grouped by clustering columns. Here's a small example:
CREATE TABLE campaign( id uuid, year int, month int, day int, views bigint, clicks bigint, PRIMARY KEY (id, year, month, day) ); INSERT INTO campaign(id, year, month, day, views, clicks) VALUES(40b08953-a…,2015, 9, 10, 1000, 42); SELECT views, clicks FROM campaign WHERE id=40b08953-a… and year=2015 and month>8;
To get specific data in some range the full key must be specified and no range clauses allowed except for the last column in list. This constraint is introduced to limit multiple scans for different ranges which will produce random access to disks and lower down the performance. This means that data model should be carefully designed against read queries to limit amount of reads/scans which leads to lesser flexibility when it comes to support of new queries. Here are C* data modelling 101 slides which provide several examples of how CQL tables are represented internally.
But what if one have some tables that need to be joined somehow with another tables? Let's consider the next case: calculate total views per campaign for given month for all campaigns.
CREATE TABLE event( id uuid, ad_id uuid, campaign uuid, ts bigint, type text, PRIMARY KEY(id) );
With given model the only way to achieve this goal is to read all campaigns, read all events, sum the proper ones (with matched campaign id) and assign to campaign. And it looks really challenging to implement such sort of application because amount of data stored in Cassandra could be really huge and won't fit the memory. So the processing of such sort of the data should be done in a distributed manner and Spark perfectly fits this use case.
Processing layer: Spark
The main abstraction Spark operates with is RDD(Resilient Distributed Dataset, a distributed collection of elements) and the workflow consists of four main phases:
- RDD operations(transformations and actions) form DAG (Direct Acyclic Graph)
- DAG is split into stages of tasks which are then submitted to cluster manager
- stages combine tasks which don’t require shuffling/repartitioning
- tasks run on workers and results then return to the client
Here's how one can solve the above problem with Spark and Cassandra:
val sc = new SparkContext(conf) case class Event(id: UUID, ad_id: UUID, campaign: UUID, ts: Long, `type`: String) sc.cassandraTable[Event]("keyspace", "event") .filter(e => e.`type` == "view" && checkMonth(e.ts)) .map(e => (e.campaign, 1)) .reduceByKey(_ + _) .collect()
Interaction with Cassandra is performed via spark-cassandra-connector which makes it really easy and straightforward. There's one more interesting option to work with NoSQL stores - SparkSQL, which translates SQL statements into a series of RDD operations.
case class CampaignReport(id: String, views: Long, clicks: Long) sql("""SELECT campaign.id as id, campaign.views as views, campaign.clicks as clicks, event.type as type FROM campaign JOIN event ON campaign.id = event.campaign """).rdd .groupBy(row => row.getAs[String]("id")) .map{ case (id, rows) => val views = rows.head.getAs[Long]("views") val clicks = rows.head.getAs[Long]("clicks") val res = rows.groupBy(row => row.getAs[String]("type")).mapValues(_.size) CampaignReport(id, views = views + res("view"), clicks = clicks + res("click")) }.saveToCassandra(“keyspace”, “campaign_report”)
With several lines of code it's possible to implement naive Lamba design which of course could be much more sophisticated, but this example shows just how easy this can be achieved.
Almost MapReduce: bringing processing closer to data
Spark-Cassandra connector is data locality aware and reads the data from the closest node in a cluster thus minimizing the amount of data trasfered around the network. To fully facilitate Spark-C* connector data locality awareness, Spark workers should be collocated with Cassandra nodes.
Alongside with Spark collocation with Cassandra, it makes sense to separate your operational (or write-heavy) cluster from one for analytics:
- clusters can be scaled independently
- data is replicated by Cassandra, no extra-work needed
- analytics cluster has different Read/Write load patterns
- analytics cluster could contain additional data (e.g. dictionaries) and processing results
- Spark resource impact is limited to only one cluster
Let's look at Spark applications deplyment options one more time:
There are three main options available for cluster resource manager:
- Spark Standalone - Spark master and Workers are installed and executed as standalone applications (which obviously introduces some ops overhead and support only static resource allocation per worker)
- YARN is really nice if you already have Hadoop ecosystem
- Mesos which from the beggining was designed for dynamic allocation of cluster resources and not only for running Hadoop applications but for handling heterogeneous workloads
Mesos architecture
Mesos cluster consists of Master nodes which are responsible for resource offers and scheduling and Slave nodes which do the actual heavy lifting of tasks execution. In HA mode with multiple Masters ZooKeeper is used for leader election and service discovery. Applications executed on Mesos are called Frameworks and utilize API to handle resource offers and submit tasks to Mesos. Generally the process of task execution consists of these steps:
- Slaves publish available resources to Master
- Master sends resource offers to Frameworks
- Scheduler replies with tasks and resources needed per task
- Master sends tasks to slaves
Bringing Spark, Mesos and Cassandra together
As said before Spark workers should be collocated with Cassandra nodes to enforce data locality awareness thus lowering amount of network traffic and Cassandra cluster load. Here's one of the possible deplyment scenarios how to achieve this with Mesos.
- Mesos Masters and ZooKeepers collocated
- Mesos Slaves and Cassandra nodes collocated to enforce better data locality for Spark
- Spark binaries deployed to all worker nodes and
spark-env.sh
is configured with proper master endpoints and executor jar location - Spark Executor JAR uploaded to S3/HDFS
With provided setup Spark job can be submitted to the cluster with simple spark-submit
invocation from any worker nodes having Spark binaries installed and assembly jar containing actual job logic uploaded
spark-submit --class io.datastrophic.SparkJob /etc/jobs/spark-jobs.jar
There exist options to run Dockerized Spark so that there's no need to distribute binaries to across every single cluster node.
Scheduled and Long-running tasks execution
Every data processing system sooner or later faces the necessity of running two types of jobs: scheduled/periodic jobs like periodic batch aggregations and long-running ones which are the case for stream processing. The main requirement for both of these types is fault tolerance - jobs must continue running even in case of cluster nodes failures. Mesos ecosistem comes with two great frameworks supporting each of this types of jobs.
Marathon is a framework for fault-tolerant execution of long-running tasks supporting HA mode with ZooKeeper, able to run Docker and having a nice REST API. Here's an example of simple job configuration running spark-submit
as shell command:
Chronos has the same charasteristics as Marathon but designed for running sheduled jobs and in general it is distributed HA cron supporting graphs of jobs. Here's an example of S3 compaction job configuration which is implemented as a simple bash script:
There are plenty of frameworks already available or under active development which targeted to integrate widely used systems with Mesos resource management capabilities. Just to name some of them:
- Hadoop
- Cassandra
- Kafka
- Myriad: YARN on Mesos
- Storm
- Samza
Ingesting the data
So far so good: the storage layer is designed, resource management is set up and jobs are configured. The only thing which is not there yet is the data to process.
Assuming that incoming data will arrive at high rates the endpoints which will receive it should meet next requirements:
- provide high throughput/low latency
- being resilient
- allow easy scalability
- support back pressure
Back pressure is not a must, but it would be nice to have this as an option to handle load spikes.
Akka perfectly fits the requirements and basically it was designed to provide this feature set. So what's is Akka:
- actor model implementation for JVM
- message-based and asynchronous
- enforces no shared mutable state
- easy scalable from one process to cluster of machines
- actors form hierarchies with parental supervision
- not only concurrency framework: akka-http, akka-streams, akka-persistence
Here's a simplified example of three actors which handle JSON HttpRequest, parse it into domain model case class and save it to Cassandra:
class HttpActor extends Actor { def receive = { case req: HttpRequest => system.actorOf(Props[JsonParserActor]) ! req.body case e: Event => system.actorOf(Props[CassandraWriterActor]) ! e } } class JsonParserActor extends Actor { def receive = { case s: String => Try(Json.parse(s).as[Event]) match { case Failure(ex) => //error handling code case Success(event) => sender ! event } } } class CassandraWriterActor extends Actor with ActorLogging { //for demo purposes, session initialized here val session = Cluster.builder() .addContactPoint("cassandra.host") .build() .connect() override def receive: Receive = { case event: Event => val statement = new SimpleStatement(event.createQuery) .setConsistencyLevel(ConsistencyLevel.QUORUM) Try(session.execute(statement)) match { case Failure(ex) => //error handling code case Success => sender ! WriteSuccessfull } } }
It looks like only several lines of code are needed to make everything work, but while writing raw data (events) to Cassandra with Akka is easy there is number of gotchas:
- Cassandra is still designed for fast serving but not batch processing, so pre-aggregation of incoming data is needed
- computation time of aggregations/rollups will grow with amount of data
- actors are not suitable for performing aggregation due to stateless design model
- micro-batches could partially solve the problem
- some sort of reliable buffer for raw data is still needed
Kafka as a buffer for incoming data
For keeping incoming data with some retention and its further pre-aggregation/processing some sort of distributed commit log could be used. In this case consumers will read data in batches, process it and store it into Cassandra in form of pre-aggregates. Here's an example of publishing JSON data coming over HTTP to Kafka with akka-http:
val config = new ProducerConfig(KafkaConfig()) lazy val producer = new KafkaProducer[A, A](config) val topic = “raw_events” val routes: Route = { post{ decodeRequest{ entity(as[String]){ str => JsonParser.parse(str).validate[Event] match { case s: JsSuccess[String] => producer.send(new KeyedMessage(topic, str)) case e: JsError => BadRequest -> JsError.toFlatJson(e).toString() } } } } } object AkkaHttpMicroservice extends App with Service { Http().bindAndHandle(routes, config.getString("http.interface"), config.getInt("http.port")) }
Consuming the data: Spark Streaming
While Akka is still could be used for consuming stream data from Kafka, having Spark in your ecosistem brings Spark Streaming as an option to solve the problem:
- it supports variety of data sources
- provides at-least-once semantics
- exactly-once semantics available with Kafka Direct and idempotent storage
Consuming event stream from Kinesis with Spark Streaming example:
val ssc = new StreamingContext(conf, Seconds(10)) val kinesisStream = KinesisUtils.createStream(ssc,appName,streamName, endpointURL,regionName, InitialPositionInStream.LATEST, Duration(checkpointInterval), StorageLevel.MEMORY_ONLY) } //transforming given stream to Event and saving to C* kinesisStream.map(JsonUtils.byteArrayToEvent) .saveToCassandra(keyspace, table) ssc.start() ssc.awaitTermination()
Designing for Failure: Backups and Patching
Ususally this is the most boring part of any system but it's really important when there exists any possibility that the data which came into the system could be invalid or when all the analytics data center crushes.
So why not to store the data in Kafka/Kinesis? For the moment of writing Kinesis has only one day of retention and without backups in case of failure all processing results could be lost. While Kafka supports much more larger retention periods, cost of hardware ownership should be considered because for example S3 storage is way more cheaper than multiple instances running Kafka as well as S3 SLA are really good.
Apart from having backups the restoring/patching strategies should be designed upfront and tested so that any problems with data could be quickly fixed. Programmer's mistake in aggregation job or duplicate data could break the accuracy of the computation results so fixing the error shouldn't be a big problem. One thing to make all this operations easier is to enforce idempotance in the data model so that multiple repetition of the same operations produce the same results(e.g. sql update is idempotent operation while counter increment is not).
Here is an example of Spark job which reads S3 backup and loads it into Cassandra:
val sc = new SparkContext(conf) sc.textFile(s"s3n://bucket/2015/*/*.gz") .map(s => Try(JsonUtils.stringToEvent(s))) .filter(_.isSuccess).map(_.get) .saveToCassandra(config.keyspace, config.table)
The Big picture
High-level design of data platform built with SMACK
So what SMACK stack is:
- concise toolbox for wide variety of data processing scenarios
- battle-tested and widely used software with large communities
- easy scalability and replication of data while preserving low latencies
- unified cluster management for heterogeneous loads
- single platform for any kind of applications
- implementation platform for different architecture designs (batch, streaming, Lambda, Kappa)
- really fast time-to-market (e.g. for MVP verification)