%dep

z.load("/opt/zeppelin/interpreter/spark/spark-cassandra-connector-assembly-2.0.2.jar")

%spark

import com.datastax.spark.connector._

import org.apache.spark.sql.cassandra._

import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.types._

import org.apache.spark.sql._

import scala.collection.mutable.ArrayBuffer

import sqlContext.implicits._

import spark.implicits._

// map of key:value pairs to remember label for each example

val mymap = scala.collection.mutable.Map[String,Double]()

// 1

// read in raw metrics data from Cassandra

val data1 = spark

  .read

  .cassandraFormat(“table”, “keyspace”)

  .load()

// 2

// take subset to get a reasonable sized data set over a time period

val data2 = data1.filter('bucket_time > "2017-09-28 00:00:00")

// 3

// remove dubious rows, including:

// rows where the state column has a value

// drop the state column

// rows with NaN values

// rows with min, avg and max values all 0.0

val cleanData = data2.filter('state.isNull).drop('state).na.drop().filter('avg > 0.0 && 'max > 0.0 && 'min > 0.0)

// 4

// convert into wide table format required by MLlib

// use time (5 minute buckets), not bucket_time (1 hour buckets)

// convert time timestamp to seconds since Epoch

val pivotTable = cleanData.groupBy("host", "time")

.pivot("service")

.agg(min("min"),avg("avg"),max("max"))

.withColumn("time",

unix_timestamp(cleanData("time"), "YYYY-MM-dd HH:mm:ss").cast(LongType)

)

// 5

// function to return 1.0 or 0.0 for label creation

// takes a Row as input

def hasLongSLA(r:Row) : Double = {

try {

   val readavg = r.getAs[Double]("/cassandra/sla/latency/read_avg(avg)");

val readmax = r.getAs[Double]("/cassandra/sla/latency/read_max(max)");

val writeavg = r.getAs[Double]("/cassandra/sla/latency/write_avg(avg)");

val writemax = r.getAs[Double]("/cassandra/sla/latency/write_max(max)");

var v = 0.0

if (readavg > 22 || writeavg > 22 || readmax > 100 || writemax > 100)

v = 1.0

return v

      }

      catch {

         case ex: IllegalArgumentException => {

         // metric name not found

            return 0.0

         }

         case ex: Throwable => {

            println("Error in getAs in hasLongSLA label function: " + ex)

            return 0.0

         }

      }

}

// 6

// given the pivot table, populate the map with label values for

// each row

pivotTable.collect().foreach(

r => mymap.put(

r.getAs[String]("host") + "_" + r.getAs[Long]("time"),

hasLongSLA(r)

)

)

// 7

// user defined function (udf) to return label for NEXT bucket time // period, or -1 if not found. Where NEXT is 5 minutes

// (60 * 5 = 300s) ahead. Uses mymap to find the label.

val lookupMapNextBucket = udf(

(h:String, b:Double) =>

mymap.getOrElse(h + "_" + (b.toLong() + 300), -1.0)

)

// 8

// use the withColumn method to create (or update) the label column

// label = 1.0 if next bucket time for the host has a long SLA

// else label = 0.0 if no long SLA, or -1.0 if error

val dataWithLabels = pivotTable.withColumn("label", lookupMapNextBucket('host, 'time))

// 9

// To use this table as input to MLlib as in previous blog, can now // optionally save this table to Cassandra, and read it in

// again, e.g. table = "mllib_wide", keyspace = "instametrics”, or use // it as input to MLlib directly.