Lets learn while gossiping !😛
Sep 4, 2019 · 7 min read
When building a system on top of a set of wildly uncooperative and unruly computers you have knowledge problems: knowing when other nodes are dead; knowing when nodes become alive; getting information about other nodes so you can make local decisions, like knowing which node should handle a request based on a scheme for assigning nodes to a certain range of users; learning about new configuration data; agreeing on data values; and so on.
How do you solve these problems?
Some centralized database ? However that would be a single point of failure, so a big NO !
So what’s the super cool decentralised way to bring order to large clusters ?
Yes, your answer is right ! Gossip protocol.
Note: Different distributed systems have different ways to achieve this ( Kafka uses Zookeeper, ElasticSearch uses Mesh Topology Communication, etc).
Gossip is a peer-to-peer communication protocol in which nodes periodically exchange state information about themselves and about other nodes they know about. The gossip process in Cassandra runs every second and exchanges state messages with other nodes in the cluster. Each node independently will always select one to three peers to gossip with. It will always select a live peer (if any) in the cluster, it will probabilistically pick a seed node from the cluster or maybe it will probabilistically select an unavailable node.The nodes exchange information about themselves and about the other nodes that they have gossiped about, so all nodes quickly learn about all other nodes in the cluster.
Lets Break it Down !
The Gossip messaging is very similar to the TCP three-way handshake. With a regular broadcast protocol, there could only have been one message per round, and the data can be allowed to gradually spread through the cluster. But with the gossip protocol, having three messages for each round adds a degree of anti-entropy. This process allows obtaining “convergence” of data shared between the two interacting nodes much faster.
- A generic Message class to represent messages being exchanged : Lets discuss this one in more detail
Need for a Message Class
One might think that we can have a Message base class with GossipDigestSynMessage, GossipDigestAckMessage & GossipDigestAck2Message as child classes. However such inheritance hierarchy will introduce tight coupling and any change will have a waterfall effect on all the related classes. To solve this, we will use composition.
A Message object will have a generic payload (which can be any one of GossipDigestSyn, GossipDigestAck, etc or any object in general we wish to transfer ~ the power of composition).
Also it will have a Header ( similar to a HTTP header). Payload will contain the actual object and the header will have metadata such as created_at, from, to and so on.
Verb and VerbHandler
Another important thing to note is that gossip is in some way a form of event driven programming, i.e. actions are to be performed in response to certain events. This can be further extended beyond just gossip to other features of Cassandra. To handle this generically, Cassandra has “verbs” and their respective “handlers”. On receipt of a message, a node analyses the associated verb and triggers the associated handler, passing the entire message object as an argument. The handlers will then extract the payload and carry on their execution.
Considering all the above points, listing the additional classes we need :
- Verb (enum) : Each verb will be mapped to its handler, in the enum declaration itself.
- Header will have Verb, from, to, created_at fields
- Message<T> (class). Two attributes — Header and payload.
- As already mentioned, there will a central singleton entity Gossiper, (to be covered in part-2 and part-3)
Coming to the most important part and the main topic of the post, lets see how the code of the above entities look like
HeartbeatState
AppState
ApplicationState is actually a Map<ApplicationState, VersionedValue>, which stores a variety of application information and the version corresponding to the information. For instance application state for “load information” could be (5.2, 45), which means that node load is 5.2 at version 45. ApplicationState is an enumeration.
EndPointState
Includes all ApplicationStates and HeartBeatState for a node. EndPointState can include only one of each type of ApplicationState, so if EndPointState already includes, say, load information, new load information will overwrite the old one. ApplicationState version number guarantees that old value will not overwrite new one.
VersionedValue is a simple class with version (int) and value (string) attributes (code link).
Verb, Header and Message
Few Points to note
- Header has InetAddressPort which is nothing but the ip-address and the port for a node as both of them in combination uniquely identify a node. Code link for InetAddressPort implementation.
- Verb has Handlers for the three enums listed, implementations for these will be covered in the next part of this post.
Note to the reader
* The above implementations are very close but not exactly similar to Cassandra’s source code. Cassandra implements custom serializers for almost all the above ( and the additional constructs we are going to discuss later). Here, I have taken a simple approach and just implemented the Serializable interface. Serialization is required as messages have to be passed from one node to another. Also I will omit certain advanced concepts such as shadow round, etc in the implementation going forward.
* There are a few buzz keywords such as “volatile”, “AtomicReference”, etc which a reader might not be aware of. These are meant to ensure thread-safety in concurrent programs. (Talking in details about these is perhaps a topic for another post, as for the usages in code, I copy-pasted them from the Cassandra's source code itself ).
The next part of the post will cover the in-depth details as well as implementation of the payloads and the respective handlers for each of them.