Illustration Image

Cassandra.Link

The best knowledge base on Apache Cassandra®

Helping platform leaders, architects, engineers, and operators build scalable real time data platforms.

10/10/2018

Reading time:4 min

How to log in Apache Spark, a functional approach

by Nicolas A Perez

Logging in Apache Spark comes very easy since Spark offers access to a log object out of the box. Only some configuration setups need to be done. In a previous post we have looked at how to do this while showing some problems that may arise. However, the solution presented might cause some problems at the moment we want to collect the logs since they are distributed across the entire cluster. Even if we utilize Yarn log aggregation capabilities, there will be some contentions that might affect performance or even worse, in some cases we could end with log interleaves corrupting the nature of logs itself, they time ordered properties they should present.In order to solve these problems, a different approach needs to be taken, a functional one.The Monad WriterI do not intend to go over the details about monads or in this particular case, the Monad Writer, if you are interested in learning more, take a look at this link (functor, applicative, and monad) which is very informative about this topic.Just to put things in context, let’s say that the monad writer (writer) is a container that holds the current value of a computation in addition to history (log) of the value (set of transformation on the value).Because the writer monadic properties, it allows us to do functional transformations and we will soon see how everything sticks together.A Simplistic LogThe following code demonstrates a simplistic log.The only thing to note is that logging is actually happening on the Spark driver, so we don’t have synchronization or contention problems. Everything starts to get complicated once we start distributing our computations.The following code won’t work (read previous post to know why)A solution to this was also presented in the previous post, but it requires extra work to manage the logs.Once we start logging on each node of the cluster, we need to go to each node, and collect each log file in order to make sense of whatever is in the logs. Hopefully, you are using some kind of tool to help you with this task, such as Splunk, Datalog, etc… yet you still need to know a lot of stuffs to get those logs into your system.Our Data SetOur data set is a collection of the class Person that is going to be transformed while keeping an unified log of the operations on our data set.Let’s suppose we want our data set to get loaded, then filter each people whose age is less than 20 years, and finally extract its name. It is a very silly example, but it will demonstrate how the logs are produced. You could replace these computations, but the ideas of building an unified log will remain.Getting the WriterWe are going to use Typelevel / Cats library to import the monad writer, to do this we add the following line to our build.sbt file.Playing with our dataNow, let’s define the transformations we are going to use.First, let’s load the data.In here the ~> operation is defined via implicit conversions as follows.If you look closely, our loading operation is not returning an RDD, in fact, it returns the monad writer that keeps track of the logs.Let’s define the filter that we want to apply over the collection of users.Again, we are applying the same function (~>) to keep track of this transformation.Lastly, we define the mapping which follows the same pattern we just saw.Putting it togetherSo far we have only defined our transformations, but we need to stick them together. Scala for is a very convenient way to work with monadic structures. Let’s see how.Please note that result is of type: Writer[List[String], RDD[String]].Calling result.run will give us the log: List[String] and the final computation expressed by rdd: RDD[String].At this point we could use Spark logger to write down the log generated by the chain of transformations. Note that this operation will be executed on Spark master which implies that one log file will be created with all the log information. Also, we are removing potential contention problems during the log writes. In addition, we are not locking the log file, which avoid performance issues by creating and writing to the file in a serial way.ConclusionsWe have improved how we log on Apache Spark by using the Monad Writer. This functional approach allows us to distribute the creation of logs along with our computations, something Spark knows well how to do. However, instead of writing the logs on each worker node, we are collecting them back to the master to write them down. This mechanism has certain advantages over our previous implementation. We now control exactly how and when our logs are going to be written down, we boost performance by removing IO operations on the worker nodes, we also removed synchronization issues by writing the logs in a serial way, and we avoid the hazard of fishing logs across our entire cluster.Hacker Noon is how hackers start their afternoons. We’re a part of the @AMIfamily. We are now accepting submissions and happy to discuss advertising &sponsorship opportunities.To learn more, read our about page, like/message us on Facebook, or simply, tweet/DM @HackerNoon.If you enjoyed this story, we recommend reading our latest tech stories and trending tech stories. Until next time, don’t take the realities of the world for granted!

Illustration Image

Logging in Apache Spark comes very easy since Spark offers access to a log object out of the box. Only some configuration setups need to be done. In a previous post we have looked at how to do this while showing some problems that may arise. However, the solution presented might cause some problems at the moment we want to collect the logs since they are distributed across the entire cluster. Even if we utilize Yarn log aggregation capabilities, there will be some contentions that might affect performance or even worse, in some cases we could end with log interleaves corrupting the nature of logs itself, they time ordered properties they should present.

In order to solve these problems, a different approach needs to be taken, a functional one.

The Monad Writer

I do not intend to go over the details about monads or in this particular case, the Monad Writer, if you are interested in learning more, take a look at this link (functor, applicative, and monad) which is very informative about this topic.

Just to put things in context, let’s say that the monad writer (writer) is a container that holds the current value of a computation in addition to history (log) of the value (set of transformation on the value).

Because the writer monadic properties, it allows us to do functional transformations and we will soon see how everything sticks together.

A Simplistic Log

The following code demonstrates a simplistic log.

The only thing to note is that logging is actually happening on the Spark driver, so we don’t have synchronization or contention problems. Everything starts to get complicated once we start distributing our computations.

The following code won’t work (read previous post to know why)

A solution to this was also presented in the previous post, but it requires extra work to manage the logs.

Once we start logging on each node of the cluster, we need to go to each node, and collect each log file in order to make sense of whatever is in the logs. Hopefully, you are using some kind of tool to help you with this task, such as Splunk, Datalog, etc… yet you still need to know a lot of stuffs to get those logs into your system.

Our Data Set

Our data set is a collection of the class Person that is going to be transformed while keeping an unified log of the operations on our data set.

Let’s suppose we want our data set to get loaded, then filter each people whose age is less than 20 years, and finally extract its name. It is a very silly example, but it will demonstrate how the logs are produced. You could replace these computations, but the ideas of building an unified log will remain.

Getting the Writer

We are going to use Typelevel / Cats library to import the monad writer, to do this we add the following line to our build.sbt file.

Playing with our data

Now, let’s define the transformations we are going to use.

First, let’s load the data.

In here the ~> operation is defined via implicit conversions as follows.

If you look closely, our loading operation is not returning an RDD, in fact, it returns the monad writer that keeps track of the logs.

Let’s define the filter that we want to apply over the collection of users.

Again, we are applying the same function (~>) to keep track of this transformation.

Lastly, we define the mapping which follows the same pattern we just saw.

Putting it together

So far we have only defined our transformations, but we need to stick them together. Scala for is a very convenient way to work with monadic structures. Let’s see how.

Please note that result is of type: Writer[List[String], RDD[String]].

Calling result.run will give us the log: List[String] and the final computation expressed by rdd: RDD[String].

At this point we could use Spark logger to write down the log generated by the chain of transformations. Note that this operation will be executed on Spark master which implies that one log file will be created with all the log information. Also, we are removing potential contention problems during the log writes. In addition, we are not locking the log file, which avoid performance issues by creating and writing to the file in a serial way.

Conclusions

We have improved how we log on Apache Spark by using the Monad Writer. This functional approach allows us to distribute the creation of logs along with our computations, something Spark knows well how to do. However, instead of writing the logs on each worker node, we are collecting them back to the master to write them down. This mechanism has certain advantages over our previous implementation. We now control exactly how and when our logs are going to be written down, we boost performance by removing IO operations on the worker nodes, we also removed synchronization issues by writing the logs in a serial way, and we avoid the hazard of fishing logs across our entire cluster.

Hacker Noon
is how hackers start their afternoons. We’re a part of the
@AMI
family. We are now
accepting submissions
and happy to
discuss advertising &sponsorship
opportunities.
If you enjoyed this story, we recommend reading our latest tech stories and trending tech stories. Until next time, don’t take the realities of the world for granted!
image

Related Articles

python
cassandra
spark

GitHub - andreia-negreira/Data_streaming_project: Data streaming project with robust end-to-end pipeline, combining tools such as Airflow, Kafka, Spark, Cassandra and containerized solution to easy deployment.

andreia-negreira

12/2/2023

cassandra
spark

Checkout Planet Cassandra

Claim Your Free Planet Cassandra Contributor T-shirt!

Make your contribution and score a FREE Planet Cassandra Contributor T-Shirt! 
We value our incredible Cassandra community, and we want to express our gratitude by sending an exclusive Planet Cassandra Contributor T-Shirt you can wear with pride.

Join Our Newsletter!

Sign up below to receive email updates and see what's going on with our company

Explore Related Topics

AllKafkaSparkScyllaSStableKubernetesApiGithubGraphQl

Explore Further

cassandra