Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

11/8/2021

Reading time:2 min

Behind The Scenes

by John Doe

%depz.load("/opt/zeppelin/interpreter/spark/spark-cassandra-connector-assembly-2.0.2.jar")%sparkimport com.datastax.spark.connector._import org.apache.spark.sql.cassandra._import org.apache.spark.sql.functions._import org.apache.spark.sql.functions.udfimport org.apache.spark.sql.types._import org.apache.spark.sql._import scala.collection.mutable.ArrayBufferimport sqlContext.implicits._import spark.implicits._// map of key:value pairs to remember label for each exampleval mymap = scala.collection.mutable.Map[String,Double]()// 1// read in raw metrics data from Cassandraval data1 = spark  .read  .cassandraFormat(“table”, “keyspace”)  .load()// 2// take subset to get a reasonable sized data set over a time periodval data2 = data1.filter('bucket_time > "2017-09-28 00:00:00")// 3// remove dubious rows, including:// rows where the state column has a value// drop the state column// rows with NaN values// rows with min, avg and max values all 0.0val cleanData = data2.filter('state.isNull).drop('state).na.drop().filter('avg > 0.0 && 'max > 0.0 && 'min > 0.0)// 4// convert into wide table format required by MLlib// use time (5 minute buckets), not bucket_time (1 hour buckets)// convert time timestamp to seconds since Epochval pivotTable = cleanData.groupBy("host", "time").pivot("service").agg(min("min"),avg("avg"),max("max")).withColumn("time",unix_timestamp(cleanData("time"), "YYYY-MM-dd HH:mm:ss").cast(LongType))// 5// function to return 1.0 or 0.0 for label creation// takes a Row as inputdef hasLongSLA(r:Row) : Double = {try {   val readavg = r.getAs[Double]("/cassandra/sla/latency/read_avg(avg)");val readmax = r.getAs[Double]("/cassandra/sla/latency/read_max(max)");val writeavg = r.getAs[Double]("/cassandra/sla/latency/write_avg(avg)");val writemax = r.getAs[Double]("/cassandra/sla/latency/write_max(max)");var v = 0.0if (readavg > 22 || writeavg > 22 || readmax > 100 || writemax > 100)v = 1.0return v      }      catch {         case ex: IllegalArgumentException => {         // metric name not found            return 0.0         }         case ex: Throwable => {            println("Error in getAs in hasLongSLA label function: " + ex)            return 0.0         }      }}// 6// given the pivot table, populate the map with label values for// each rowpivotTable.collect().foreach(r => mymap.put(r.getAs[String]("host") + "_" + r.getAs[Long]("time"),hasLongSLA(r)))// 7// user defined function (udf) to return label for NEXT bucket time // period, or -1 if not found. Where NEXT is 5 minutes// (60 * 5 = 300s) ahead. Uses mymap to find the label.val lookupMapNextBucket = udf((h:String, b:Double) =>mymap.getOrElse(h + "_" + (b.toLong() + 300), -1.0))// 8// use the withColumn method to create (or update) the label column// label = 1.0 if next bucket time for the host has a long SLA// else label = 0.0 if no long SLA, or -1.0 if errorval dataWithLabels = pivotTable.withColumn("label", lookupMapNextBucket('host, 'time))// 9// To use this table as input to MLlib as in previous blog, can now // optionally save this table to Cassandra, and read it in// again, e.g. table = "mllib_wide", keyspace = "instametrics”, or use // it as input to MLlib directly.

Illustration Image

%dep

z.load("/opt/zeppelin/interpreter/spark/spark-cassandra-connector-assembly-2.0.2.jar")

%spark

import com.datastax.spark.connector._

import org.apache.spark.sql.cassandra._

import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.types._

import org.apache.spark.sql._

import scala.collection.mutable.ArrayBuffer

import sqlContext.implicits._

import spark.implicits._

// map of key:value pairs to remember label for each example

val mymap = scala.collection.mutable.Map[String,Double]()

// 1

// read in raw metrics data from Cassandra

val data1 = spark

  .read

  .cassandraFormat(“table”, “keyspace”)

  .load()

// 2

// take subset to get a reasonable sized data set over a time period

val data2 = data1.filter('bucket_time > "2017-09-28 00:00:00")

// 3

// remove dubious rows, including:

// rows where the state column has a value

// drop the state column

// rows with NaN values

// rows with min, avg and max values all 0.0

val cleanData = data2.filter('state.isNull).drop('state).na.drop().filter('avg > 0.0 && 'max > 0.0 && 'min > 0.0)

// 4

// convert into wide table format required by MLlib

// use time (5 minute buckets), not bucket_time (1 hour buckets)

// convert time timestamp to seconds since Epoch

val pivotTable = cleanData.groupBy("host", "time")

.pivot("service")

.agg(min("min"),avg("avg"),max("max"))

.withColumn("time",

unix_timestamp(cleanData("time"), "YYYY-MM-dd HH:mm:ss").cast(LongType)

)

// 5

// function to return 1.0 or 0.0 for label creation

// takes a Row as input

def hasLongSLA(r:Row) : Double = {

try {

   val readavg = r.getAs[Double]("/cassandra/sla/latency/read_avg(avg)");

val readmax = r.getAs[Double]("/cassandra/sla/latency/read_max(max)");

val writeavg = r.getAs[Double]("/cassandra/sla/latency/write_avg(avg)");

val writemax = r.getAs[Double]("/cassandra/sla/latency/write_max(max)");

var v = 0.0

if (readavg > 22 || writeavg > 22 || readmax > 100 || writemax > 100)

v = 1.0

return v

      }

      catch {

         case ex: IllegalArgumentException => {

         // metric name not found

            return 0.0

         }

         case ex: Throwable => {

            println("Error in getAs in hasLongSLA label function: " + ex)

            return 0.0

         }

      }

}

// 6

// given the pivot table, populate the map with label values for

// each row

pivotTable.collect().foreach(

r => mymap.put(

r.getAs[String]("host") + "_" + r.getAs[Long]("time"),

hasLongSLA(r)

)

)

// 7

// user defined function (udf) to return label for NEXT bucket time // period, or -1 if not found. Where NEXT is 5 minutes

// (60 * 5 = 300s) ahead. Uses mymap to find the label.

val lookupMapNextBucket = udf(

(h:String, b:Double) =>

mymap.getOrElse(h + "_" + (b.toLong() + 300), -1.0)

)

// 8

// use the withColumn method to create (or update) the label column

// label = 1.0 if next bucket time for the host has a long SLA

// else label = 0.0 if no long SLA, or -1.0 if error

val dataWithLabels = pivotTable.withColumn("label", lookupMapNextBucket('host, 'time))

// 9

// To use this table as input to MLlib as in previous blog, can now // optionally save this table to Cassandra, and read it in

// again, e.g. table = "mllib_wide", keyspace = "instametrics”, or use // it as input to MLlib directly.

Related Articles

cluster
troubleshooting
datastax

GitHub - arodrime/Montecristo: Datastax Cluster Health Check Tooling

arodrime

4/3/2024

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra