Reading time:2 min

Behind The Scenes

by John Doe

%depz.load("/opt/zeppelin/interpreter/spark/spark-cassandra-connector-assembly-2.0.2.jar")%sparkimport com.datastax.spark.connector._import org.apache.spark.sql.cassandra._import org.apache.spark.sql.functions._import org.apache.spark.sql.functions.udfimport org.apache.spark.sql.types._import org.apache.spark.sql._import scala.collection.mutable.ArrayBufferimport sqlContext.implicits._import spark.implicits._// map of key:value pairs to remember label for each exampleval mymap = scala.collection.mutable.Map[String,Double]()// 1// read in raw metrics data from Cassandraval data1 = spark  .read  .cassandraFormat(“table”, “keyspace”)  .load()// 2// take subset to get a reasonable sized data set over a time periodval data2 = data1.filter('bucket_time > "2017-09-28 00:00:00")// 3// remove dubious rows, including:// rows where the state column has a value// drop the state column// rows with NaN values// rows with min, avg and max values all 0.0val cleanData = data2.filter('state.isNull).drop('state).na.drop().filter('avg > 0.0 && 'max > 0.0 && 'min > 0.0)// 4// convert into wide table format required by MLlib// use time (5 minute buckets), not bucket_time (1 hour buckets)// convert time timestamp to seconds since Epochval pivotTable = cleanData.groupBy("host", "time").pivot("service").agg(min("min"),avg("avg"),max("max")).withColumn("time",unix_timestamp(cleanData("time"), "YYYY-MM-dd HH:mm:ss").cast(LongType))// 5// function to return 1.0 or 0.0 for label creation// takes a Row as inputdef hasLongSLA(r:Row) : Double = {try {   val readavg = r.getAs[Double]("/cassandra/sla/latency/read_avg(avg)");val readmax = r.getAs[Double]("/cassandra/sla/latency/read_max(max)");val writeavg = r.getAs[Double]("/cassandra/sla/latency/write_avg(avg)");val writemax = r.getAs[Double]("/cassandra/sla/latency/write_max(max)");var v = 0.0if (readavg > 22 || writeavg > 22 || readmax > 100 || writemax > 100)v = 1.0return v      }      catch {         case ex: IllegalArgumentException => {         // metric name not found            return 0.0         }         case ex: Throwable => {            println("Error in getAs in hasLongSLA label function: " + ex)            return 0.0         }      }}// 6// given the pivot table, populate the map with label values for// each rowpivotTable.collect().foreach(r => mymap.put(r.getAs[String]("host") + "_" + r.getAs[Long]("time"),hasLongSLA(r)))// 7// user defined function (udf) to return label for NEXT bucket time // period, or -1 if not found. Where NEXT is 5 minutes// (60 * 5 = 300s) ahead. Uses mymap to find the label.val lookupMapNextBucket = udf((h:String, b:Double) =>mymap.getOrElse(h + "_" + (b.toLong() + 300), -1.0))// 8// use the withColumn method to create (or update) the label column// label = 1.0 if next bucket time for the host has a long SLA// else label = 0.0 if no long SLA, or -1.0 if errorval dataWithLabels = pivotTable.withColumn("label", lookupMapNextBucket('host, 'time))// 9// To use this table as input to MLlib as in previous blog, can now // optionally save this table to Cassandra, and read it in// again, e.g. table = "mllib_wide", keyspace = "instametrics”, or use // it as input to MLlib directly.

