Lets learn while gossiping !😛

Well, after chilling around for a week in my notice period, I decided to do something useful and started reading a few blogs. Throughout the past 2 years of my career as a software developer , I have heard “office gossips” on how great Cassandra is. (pun absolutely intended)

Cassandra is one of the most popular NoSql distributed databases currently and after skimming through some usual online tutorials, I decided to learn more by diving into the source code itself. The code base is huge and so I had to pick a topic to dig deeper, Gossip protocol was my choice, primarily because it is not just specific to Cassandra but is an important tool for Failure Detection, Monitoring, Messaging And Other Good Things in a distributed setup in general.(ref)

The motivation behind this post is that, I, as a developer have always been intrigued on how the code for distributed systems is written and I am sure there are more inquisitive engineers like me, who have the skill set to understand the source code, but in general, never bother. My intention here is to demonstrate the implementation to some extent for such people who are interested in learning what drives Cassandra behind the wheels. This post is by no means a tutorial on Cassandra’s Gossip Protocol, there are already quite a few of them out there, rather, this takes a curious developer, who already has some understanding, a step further by taking a look at various constructs used in the code to achieve the required functionality.

If one does not know what gossip protocol does, I will strongly suggest to go through this and this.


When building a system on top of a set of wildly uncooperative and unruly computers you have knowledge problems: knowing when other nodes are dead; knowing when nodes become alive; getting information about other nodes so you can make local decisions, like knowing which node should handle a request based on a scheme for assigning nodes to a certain range of users; learning about new configuration data; agreeing on data values; and so on.

How do you solve these problems?

Some centralized database ? However that would be a single point of failure, so a big NO !

So what’s the super cool decentralised way to bring order to large clusters ?

Yes, your answer is right ! Gossip protocol.


Gossip is a peer-to-peer communication protocol in which nodes periodically exchange state information about themselves and about other nodes they know about. The gossip process in Cassandra runs every second and exchanges state messages with other nodes in the cluster. Each node independently will always select one to three peers to gossip with. It will always select a live peer (if any) in the cluster, it will probabilistically pick a seed node from the cluster or maybe it will probabilistically select an unavailable node.The nodes exchange information about themselves and about the other nodes that they have gossiped about, so all nodes quickly learn about all other nodes in the cluster.


Lets Break it Down !

The Gossip messaging is very similar to the TCP three-way handshake. With a regular broadcast protocol, there could only have been one message per round, and the data can be allowed to gradually spread through the cluster. But with the gossip protocol, having three messages for each round adds a degree of anti-entropy. This process allows obtaining “convergence” of data shared between the two interacting nodes much faster.

Implementation Overview

I will split this into 2 parts :

  • what exactly is exchanged as part of each msg : we will cover this in the next part of the blog series. To begin with, simply consider each of the above 3 messages is a “payload”, encapsulating certain information.
  • the entities (in code) required to facilitate the transfer of information :this is what is covered in-depth below.

Each node has a central singleton entity Gossiper. The method of initiating gossip is in the Gossiper.gossipTask.run method, which is a runnable and executed once per second by a scheduled task in the upper layer.

Gossiper.endpointStateMap<InetAddress, EndPointState> holds the state of the entire cluster known to the current node. The key is the IP address of the corresponding node, and the value is EndPointState information which looks something like below.

Below gives the description of each. (reference link). The code for each of these is covered in the “Code” section of this post.

  • A generic Message class to represent messages being exchanged : Lets discuss this one in more detail

Need for a Message Class

One might think that we can have a Message base class with GossipDigestSynMessage, GossipDigestAckMessage & GossipDigestAck2Message as child classes. However such inheritance hierarchy will introduce tight coupling and any change will have a waterfall effect on all the related classes. To solve this, we will use composition.

A Message object will have a generic payload (which can be any one of GossipDigestSyn, GossipDigestAck, etc or any object in general we wish to transfer ~ the power of composition).

Also it will have a Header ( similar to a HTTP header). Payload will contain the actual object and the header will have metadata such as created_at, from, to and so on.

Verb and VerbHandler

Another important thing to note is that gossip is in some way a form of event driven programming, i.e. actions are to be performed in response to certain events. This can be further extended beyond just gossip to other features of Cassandra. To handle this generically, Cassandra has “verbs” and their respective “handlers”. On receipt of a message, a node analyses the associated verb and triggers the associated handler, passing the entire message object as an argument. The handlers will then extract the payload and carry on their execution.

Considering all the above points, listing the additional classes we need :

  • Verb (enum) : Each verb will be mapped to its handler, in the enum declaration itself.
  • Header will have Verb, from, to, created_at fields
  • Message<T> (class). Two attributes — Header and payload.
  • As already mentioned, there will a central singleton entity Gossiper, (to be covered in part-2 and part-3)

Coming to the most important part and the main topic of the post, lets see how the code of the above entities look like

HeartbeatState

AppState

ApplicationState is actually a Map<ApplicationState, VersionedValue>, which stores a variety of application information and the version corresponding to the information. For instance application state for “load information” could be (5.2, 45), which means that node load is 5.2 at version 45. ApplicationState is an enumeration.

EndPointState

Includes all ApplicationStates and HeartBeatState for a node. EndPointState can include only one of each type of ApplicationState, so if EndPointState already includes, say, load information, new load information will overwrite the old one. ApplicationState version number guarantees that old value will not overwrite new one.

VersionedValue is a simple class with version (int) and value (string) attributes (code link).

Verb, Header and Message

Few Points to note

  • Header has InetAddressPort which is nothing but the ip-address and the port for a node as both of them in combination uniquely identify a node. Code link for InetAddressPort implementation.
  • Verb has Handlers for the three enums listed, implementations for these will be covered in the next part of this post.

Note to the reader

* The above implementations are very close but not exactly similar to Cassandra’s source code. Cassandra implements custom serializers for almost all the above ( and the additional constructs we are going to discuss later). Here, I have taken a simple approach and just implemented the Serializable interface. Serialization is required as messages have to be passed from one node to another. Also I will omit certain advanced concepts such as shadow round, etc in the implementation going forward.

* There are a few buzz keywords such as “volatile”, “AtomicReference”, etc which a reader might not be aware of. These are meant to ensure thread-safety in concurrent programs. (Talking in details about these is perhaps a topic for another post, as for the usages in code, I copy-pasted them from the Cassandra's source code itself ).

The next part of the post will cover the in-depth details as well as implementation of the payloads and the respective handlers for each of them.