Roadmap● SMACK stack overview● Storage layer layout● Fixing NoSQL limitations● Cluster resource management● Reliable scheduling and execution● Data ingestion options● Preparing for failures3 4. SMACK Stack● Spark - fast and general engine for distributed, large-scale dataprocessing● Mesos - cluster resource management system that provides efficientresource isolation and sharing across distributed applications● Akka - a toolkit and runtime for building highly concurrent, distributed,and resilient message-driven applications on the JVM● Cassandra - distributed, highly available database designed to handlelarge amounts of data across multiple datacenters● Kafka - a high-throughput, low-latency distributed messaging systemdesigned for handling real-time data feeds4 5. Storage Layer: Cassandra● optimized for heavy writeloads● configurable CA (CAP)● linearly scalable● XDCR support● easy cluster resizing andinter-DC data migration5 6. Cassandra Data Model● nested sorted map● should be optimized forread queries● data is distributed acrossnodes by partition keyCREATE TABLE campaign(id uuid,year int,month int,day int,views bigint,clicks bigint,PRIMARY KEY (id, year, month, day));INSERT INTO campaign(id, year, month, day, views, clicks)VALUES(40b08953-a…,2015, 9, 10, 1000, 42);SELECT views, clicks FROM campaignWHERE id=40b08953-a… and year=2015 and month>8; 6 7. Spark/Cassandra Example● calculate total views percampaign for given monthfor all campaignsCREATE TABLE event(id uuid,ad_id uuid,campaign uuid,ts bigint,type text,PRIMARY KEY(id));val sc = new SparkContext(conf)case class Event(id: UUID, ad_id: UUID, campaign: UUID, ts: Long, `type`: String)sc.cassandraTable[Event]("keyspace", "event").filter(e => e.`type` == "view" && checkMonth(e.ts)).map(e => (e.campaign, 1)).reduceByKey(_ + _).collect() 7 8. Naive Lambda example with Spark SQLcase class CampaignReport(id: String, views: Long, clicks: Long)sql("""SELECT as id, campaign.views as views,campaign.clicks as clicks, event.type as typeFROM campaignJOIN event ON = event.campaign""").rdd.groupBy(row => row.getAs[String]("id")).map{ case (id, rows) =>val views = rows.head.getAs[Long]("views")val clicks = rows.head.getAs[Long]("clicks")val res = rows.groupBy(row => row.getAs[String]("type")).mapValues(_.size)CampaignReport(id, views = views + res("view"), clicks = clicks + res("click"))}.saveToCassandra("keyspace", "campaign_report")8 9. Let's take a step back: Spark Basics● RDD operations(transformations and actions) form DAG● DAG is split into stages of tasks which are then submitted to cluster manager● stages combine tasks which don't require shuffling/repartitioning● tasks run on workers and results then return to client 9 10. Architecture of Spark/Cassandra ClustersSeparate Write & Analytics:● clusters can be scaledindependently● data is replicated byCassandra asynchronously● Analytics has differentRead/Write load patterns● Analytics contains additionaldata and processing results● Spark resource impactlimited to only one DCTo fully facilitate Spark-C* connector data locality awareness,Spark workers should be collocated with Cassandra nodes 10 11. Spark Applications Deployment RevisitedCluster Manager:● Spark Standalone● YARN● Mesos11 12. Managing Cluster Resources: Mesos● heterogenous workloads● full cluster utilization● static vs. dynamic resourceallocation● fault tolerance and disasterrecovery● single resource view atdatacenter levelimage source: 13. Mesos Architecture Overview● leader election andservice discovery viaZooKeeper● slaves publish availableresources to master● master sends resourceoffers to frameworks● scheduler replies withtasks and resourcesneeded per task● master sends tasks toslaves13 14. Bringing Spark, Mesos and Cassandra TogetherDeployment example● Mesos Masters andZooKeepers collocated● Mesos Slaves and Cassandranodes collocated to enforcebetter data locality for Spark● Spark binaries deployed to allworker nodes and spark-env isconfigured● Spark Executor JAR uploadedto S3Invocation examplespark-submit --class io.datastrophic.SparkJob /etc/jobs/spark-jobs.jar14 15. Marathon● long running tasksexecution● HA mode with ZooKeeper● Docker executor● REST API15 16. Chronos● distributed cron● HA mode with ZooKeeper● supports graphs of jobs● sensitive to network failures16 17. More Mesos frameworks● Hadoop● Cassandra● Kafka● Myriad: YARN on Mesos● Storm● Samza17 18. Data ingestion: endpoints to consume the dataEndpoint requirements:● high throughput● resiliency● easy scalability● back pressure 18 19. Akka featuresclass JsonParserActor extends Actor {def receive = {case s: String => Try(Json.parse(s).as[Event]) match {case Failure(ex) => log.error(ex)case Success(event) => sender ! event}}}class HttpActor extends Actor {def receive = {case req: HttpRequest =>system.actorOf(Props[JsonParserActor]) ! req.bodycase e: Event =>system.actorOf(Props[CassandraWriterActor]) ! e}}● actor modelimplementation for JVM● message-based andasynchronous● no shared mutable state● easy scalability from oneprocess to cluster ofmachines● actor hierarchies withparental supervision● not only concurrencyframework:○ akka-http○ akka-streams○ akka-persistence19 20. Writing to Cassandra with Akkaclass CassandraWriterActor extends Actor with ActorLogging {//for demo purposes, session initialized hereval session = Cluster.builder().addContactPoint("").build().connect()override def receive: Receive = {case event: Event =>val statement = new SimpleStatement(event.createQuery).setConsistencyLevel(ConsistencyLevel.QUORUM)Try(session.execute(statement)) match {case Failure(ex) => //error handling codecase Success => sender ! WriteSuccessfull}}} 20 21. Cassandra meets Batch Processing● writing raw data (events) to Cassandra with Akka is easy● but computation time of aggregations/rollups will grow withamount of data● Cassandra is still designed for fast serving but not batchprocessing, so pre-aggregation of incoming data is needed● actors are not suitable for performing aggregation due tostateless design model● micro-batches partially solve the problem● reliable storage for raw data is still needed21 22. Kafka: distributed commit log● pre-aggregation of incoming data● consumers read data in batches● available as Kinesis on AWS22 23. Publishing to Kafka with Akka Httpval config = new ProducerConfig(KafkaConfig())lazy val producer = new KafkaProducer[A, A](config)val topic = "raw_events"val routes: Route = {post{decodeRequest{entity(as[String]){ str =>JsonParser.parse(str).validate[Event] match {case s: JsSuccess[String] => producer.send(new KeyedMessage(topic, str))case e: JsError => BadRequest -> JsError.toFlatJson(e).toString()}}}}}object AkkaHttpMicroservice extends App with Service {Http().bindAndHandle(routes, config.getString("http.interface"), config.getInt("http.port"))}23 24. Spark Streaming24● variety of data sources● at-least-once semantics● exactly-once semanticsavailable with Kafka Directand idempotent storage 25. Spark Streaming: Kinesis exampleval ssc = new StreamingContext(conf, Seconds(10))val kinesisStream = KinesisUtils.createStream(ssc,appName,streamName,endpointURL,regionName, InitialPositionInStream.LATEST,Duration(checkpointInterval), StorageLevel.MEMORY_ONLY)}//transforming given stream to Event and saving to C*, table)ssc.start()ssc.awaitTermination()25 26. Designing for Failure: Backups and Patching● be prepared for failures and broken data● design backup and patching strategies upfront● idempotece should be enforced 26 27. Restoring backup from S3val sc = new SparkContext(conf)sc.textFile(s"s3n://bucket/2015/*/*.gz").map(s => Try(JsonUtils.stringToEvent(s))).filter(_.isSuccess).map(_.get).saveToCassandra(config.keyspace, config.table)27 28. The big picture28 29. So what SMACK is● concise toolbox for wide variety of data processing scenarios● battle-tested and widely used software with large communities● easy scalability and replication of data while preserving low latencies● unified cluster management for heterogeneous loads● single platform for any kind of applications● implementation platform for different architecture designs● really short time-to-market (e.g. for MVP verification)29 30. Questions@antonkirillov datastrophic.io30 
