%dep
z.load("/opt/zeppelin/interpreter/spark/spark-cassandra-connector-assembly-2.0.2.jar")
%spark
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import scala.collection.mutable.ArrayBuffer
import sqlContext.implicits._
import spark.implicits._
// map of key:value pairs to remember label for each example
val mymap = scala.collection.mutable.Map[String,Double]()
// 1
// read in raw metrics data from Cassandra
val data1 = spark
.read
.cassandraFormat(“table”, “keyspace”)
.load()
// 2
// take subset to get a reasonable sized data set over a time period
val data2 = data1.filter('bucket_time > "2017-09-28 00:00:00")
// 3
// remove dubious rows, including:
// rows where the state column has a value
// drop the state column
// rows with NaN values
// rows with min, avg and max values all 0.0
val cleanData = data2.filter('state.isNull).drop('state).na.drop().filter('avg > 0.0 && 'max > 0.0 && 'min > 0.0)
// 4
// convert into wide table format required by MLlib
// use time (5 minute buckets), not bucket_time (1 hour buckets)
// convert time timestamp to seconds since Epoch
val pivotTable = cleanData.groupBy("host", "time")
.pivot("service")
.agg(min("min"),avg("avg"),max("max"))
.withColumn("time",
unix_timestamp(cleanData("time"), "YYYY-MM-dd HH:mm:ss").cast(LongType)
)
// 5
// function to return 1.0 or 0.0 for label creation
// takes a Row as input
def hasLongSLA(r:Row) : Double = {
try {
val readavg = r.getAs[Double]("/cassandra/sla/latency/read_avg(avg)");
val readmax = r.getAs[Double]("/cassandra/sla/latency/read_max(max)");
val writeavg = r.getAs[Double]("/cassandra/sla/latency/write_avg(avg)");
val writemax = r.getAs[Double]("/cassandra/sla/latency/write_max(max)");
var v = 0.0
if (readavg > 22 || writeavg > 22 || readmax > 100 || writemax > 100)
v = 1.0
return v
}
catch {
case ex: IllegalArgumentException => {
// metric name not found
return 0.0
}
case ex: Throwable => {
println("Error in getAs in hasLongSLA label function: " + ex)
return 0.0
}
}
}
// 6
// given the pivot table, populate the map with label values for
// each row
pivotTable.collect().foreach(
r => mymap.put(
r.getAs[String]("host") + "_" + r.getAs[Long]("time"),
hasLongSLA(r)
)
)
// 7
// user defined function (udf) to return label for NEXT bucket time // period, or -1 if not found. Where NEXT is 5 minutes
// (60 * 5 = 300s) ahead. Uses mymap to find the label.
val lookupMapNextBucket = udf(
(h:String, b:Double) =>
mymap.getOrElse(h + "_" + (b.toLong() + 300), -1.0)
)
// 8
// use the withColumn method to create (or update) the label column
// label = 1.0 if next bucket time for the host has a long SLA
// else label = 0.0 if no long SLA, or -1.0 if error
val dataWithLabels = pivotTable.withColumn("label", lookupMapNextBucket('host, 'time))
// 9
// To use this table as input to MLlib as in previous blog, can now // optionally save this table to Cassandra, and read it in
// again, e.g. table = "mllib_wide", keyspace = "instametrics”, or use // it as input to MLlib directly.