Cassandra - A Decentralized Structured Storage System
Cassandra is a distributed storage system for managing very large amounts of structured data spread out across many commodity servers, while providing highly available service with no single point of failure. Cassandra aims to run on top of an infrastructure of hundreds of nodes (possibly spread across different data centers). At this scale, small and large components fail continuously. The way Cassandra manages the persistent state in the face of these failures drives the reliability and scalability of the software systems relying on this service. While in many ways Cassandra resembles a database and shares many design and implementation strategies therewith, Cassandra does not support a full relational data model; instead, it provides clients with a simple data model that supports dynamic control over data layout and format. Cassandra system was designed to run on cheap commodity hardware and handle high write throughput while not sacrificing read efficiency.
Even very early versions of Cassandra delivered on the goals of reliability and scalability. Cassandra has also consistently led the NoSQL market in performance.
More recently, Cassandra has focused on developer productivity with the Cassandra Query Language, which presents a data model familiar to relational database users.
Facebook runs the largest social networking platform that serves hundreds of millions users at peak times using tens of thousands of servers located in many data centers around the world. There are strict operational requirements on Facebook's platform in terms of performance, reliability and efficiency, and to support continuous growth the platform needs to be highly scalable. Dealing with failures in an infrastructure comprised of thousands of components is our standard mode of operation; there are always a small but significant number of server and network components that are failing at any given time. As such, the software systems need to be constructed in a manner that treats failures as the norm rather than the exception. To meet the reliability and scalability needs described above Facebook has developed Cassandra.
Cassandra uses a synthesis of well known techniques to achieve scalability and availability. Cassandra was designed to fulfill the storage needs of the Inbox Search problem. In- box Search is a feature that enables users to search through their Facebook Inbox. At Facebook this meant the system was required to handle a very high write throughput, billions of writes per day, and also scale with the number of users. Since users are served from data centers that are geographically distributed, being able to replicate data across data centers was key to keep search latencies down. In-box Search was launched in June of 2008 for around 100 million users and today we are at over 250 million users and Cassandra has kept up the promise so far. Cassandra is now deployed as the backend storage system for multiple services within Facebook.
This paper is structured as follows. Section 2 talks about related work, some of which has been very influential on our design. Section 3 presents the data model in more detail. Section 4 presents the overview of the client API. Section 5 presents the system design and the distributed algorithms that make Cassandra work. Section 6 details the experiences of making Cassandra work and requirements to improve performance. In Section 6.1 we describe how one of the applications in the Facebook platform uses Cassandra. Finally Section 7 concludes with future work on Cassandra.
Like most infrastructure designed around a single use case, the initial release of Cassandra was a little rough. One of the ways this manifested was in missing or broken features like the very first Apache JIRA ticket, support for deleting data.
Today, Cassandra powers hundreds of applications across dozens of industries that need this kind of performance at scale. By addressing the needs of these different workloads, Cassandra has evolved beyond its initial niche in social media into a truly general purpose solution.
Having multi-datacenter support designed in from the beginning has been an important factor in driving this adoption. Today Cassandra is far enough ahead of the competition here that if you need cross-datacenter replication, you’re probably doing something wrong if you’re not using Cassandra.
Statements like the one here that “[Cassandra] was required to handle a very high write throughput” have sometimes been interpreted to mean that Cassandra is not appropriate for read-heavy workloads. This is not the case; read performance is the easy part, but few systems offer scalable write performance as well. That is what sets Cassandra apart.
2. Related Work
Distributing data for performance, availability and durability has been widely studied in the file system and database communities. Compared to P2P storage systems that only support at namespaces, distributed file systems typically support hierarchical namespaces. Systems like Ficus and Coda replicate files for high availability at the expense of consistency. Update conflicts are typically managed using specialized conflict resolution procedures. Farsite is a distributed file system that does not use any centralized server. Farsite achieves high availability and scalability using replication. The Google File System (GFS) is another distributed file system built for hosting the state of Google's internal applications. GFS uses a simple design with a single master server for hosting the entire metadata and where the data is split into chunks and stored in chunk servers. However the GFS master is now made fault tolerant using the Chubby abstraction. Bayou is a distributed relational database system that allows disconnected operations and provides eventual data consistency. Among these systems, Bayou, Coda and Ficus allow disconnected operations and are resilient to issues such as network partitions and outages. These systems differ on their conflict resolution procedures. For instance, Coda and Ficus perform system level conflict resolution and Bayou allows application level resolution. All of them however, guarantee eventual consistency. Similar to these systems, Dynamo allows read and write operations to continue even during network partitions and resolves update conflicts using different conflict resolution mechanisms, some client driven. Traditional replicated relational database systems focus on the problem of guaranteeing strong consistency of replicated data. Although strong consistency provides the application writer a convenient programming model, these systems are limited in scalability and availability . These systems are not capable of handling network partitions because they typically provide strong consistency guarantees.
Dynamo is a storage system that is used by Amazon to store and retrieve user shopping carts. Dynamo's Gossip based membership algorithm helps every node maintain information about every other node. Dynamo can be defined as a structured overlay with at most one-hop request routing. Dynamo detects updated conflicts using a vector clock scheme, but prefers a client side conflict resolution mechanism. A write operation in Dynamo also requires a read to be performed for managing the vector timestamps. This is can be very limiting in environments where systems need to handle a very high write throughput. Bigtable provides both structure and data distribution but relies on a distributed file system for its durability.
Eventual consistency continues to be an often-misunderstood topic. At the same time, we’ve learned a lot over the past five years about building applications in the face of failure. Perhaps the best treatment of this subject is Christos Kalantzis’s from this year’s Cassandra Summit. Highly recommended.
That said, sometimes you really do need linearizable operations. That’s why we added lightweight transactions in Cassandra 2.0. This is a sign of Cassandra maturing — Cassandra 1.0 (released October 2011) was the fulfilment of its designers original vision; Cassandra 2.0 takes it in new directions to make it even more powerful.
Open source has had the reputation of producing good imitations, but not innovation. Perhaps Cassandra’s origins as a hybrid of Dynamo and Bigtable did not disprove this, but Apache Cassandra’s development of lightweight transactions and CQL are true industry firsts.
3. Data Model
A table in Cassandra is a distributed multi dimensional map indexed by a key. The value is an object which is highly structured. The row key in a table is a string with no size restrictions, although typically 16 to 36 bytes long. Every operation under a single row key is atomic per replica no matter how many columns are being read or written into. Columns are grouped together into sets called column families very much similar to what happens in the Bigtable system. Cassandra exposes two kinds of columns families, Simple and Super column families. Super column families can be visualized as a column family within a column family.
Furthermore, applications can specify the sort order of columns within a Super Column or Simple Column family. The system allows columns to be sorted either by time or by name. Time sorting of columns is exploited by application like Inbox Search where the results are always displayed in time sorted order. Any column within a column family is accessed using the convention column family : column and any column within a column family that is of type super is accessed using the convention column family : super column : column. A very good example of the super column family abstraction power is given in Section 6.1. Typically applications use a dedicated Cassandra cluster and manage them as part of their service. Although the system supports the notion of multiple tables all deployments have only one table in their schema.
2008 Cassandra’s data model is unrecognizable today. Supercolumns are gone. “Column families” have been replaced by tables, which offer a mostly-conventional schema, with features like Collections to aid with denormalizations, and native CQL drivers offering improved performance. All this while preserving backwards compatibility with data created under the old Thrift API.
When reading the LADIS paper, remember that when it says “columns” it means what today we call “cells” and by “rows” it means “partitions.”
For more on the modern Cassandra data model, see Patrick McFadin’s three talks:
- The Data Model is Dead; Long live the Data Model (slides)
- Become a Super Modeler (slides)
- The World’s Next Top Data Model (slides)
- insert(table; key; rowMutation)
- get(table; key; columnName)
- delete(table; key; columnName)
Cassandra has offered a native CQL protocol since 1.2 with enhancements in 2.0. The specification may be found here.
Besides ease of use, CQL offers improved performance in real-world situations.
5. System Architecture
The architecture of a storage system that needs to operate in a production setting is complex. In addition to the actual data persistence component, the system needs to have the following characteristics; scalable and robust solutions for load balancing, membership and failure detection, failure recovery, replica synchronization, overload handling, state transfer, concurrency and job scheduling, request marshalling, request routing, system monitoring and alarming, and configuration management. Describing the details of each of the solutions is beyond the scope of this paper, so we will focus on the core distributed systems techniques used in Cassandra: partitioning, replication, membership, failure handling and scaling. All these modules work in synchrony to handle read/write requests. Typically a read/write request for a key gets routed to any node in the Cassandra cluster. The node then determines the replicas for this particular key. For writes, the system routes the requests to the replicas and waits for a quorum of replicas to acknowledge the completion of the writes. For reads, based on the consistency guarantees required by the client, the system either routes the requests to the closest replica or routes the requests to all replicas and waits for a quorum of responses.
At a high level, the architecture has remained very similar from 2008 to 2013, although some details have changed as in hinted handoff, logged batches, and the supported consistency levels.
One change worth noting is that while Cassandra did start with a partition-unaware client design that relied on the server to route requests to the appropriate replicas, today's native protocol drivers will prefer to route directly to a replica to eliminate the latency from that extra hop.
One of the key design features for Cassandra is the ability to scale incrementally. This requires, the ability to dynamically partition the data over the set of nodes (i.e., storage hosts) in the cluster. Cassandra partitions data across the cluster using consistent hashing  but uses an order preserving hash function to do so. In consistent hashing the output range of a hash function is treated as a fixed circular space or \ring" (i.e. the largest hash value wraps around to the smallest hash value). Each node in the system is assigned a random value within this space which represents its position on the ring. Each data item identified by a key is assigned to a node by hashing the data item's key to yield its position on the ring, and then walking the ring clockwise to find the first node with a position larger than the item's position. This node is deemed the coordinator for this key. The application species this key and the Cassandra uses it to route requests. Thus, each node becomes responsible for the region in the ring between it and its predecessor node on the ring. The principal advantage of consistent hashing is that departure or arrival of a node only affects its immediate neighbors and other nodes remain unaffected. The basic consistent hashing algorithm presents some challenges. First, the random position assignment of each node on the ring leads to non-uniform data and load distribution. Second, the basic algorithm is oblivious to the heterogeneity in the performance of nodes. Typically there exist two ways to address this issue: One is for nodes to get assigned to multiple positions in the circle (like in Dynamo), and the second is to analyze load information on the ring and have lightly loaded nodes move on the ring to alleviate heavily loaded nodes as described in . Cassandra opts for the latter as it makes the design and implementation very tractable and helps to make very deterministic choices about load balancing.
Cassandra partitioning is still based on consistent hashing, but we have moved away from load balancing in favor of virtual nodes, although with a different design than the one used by Dynamo.
Unfortunately, there is no such thing as a general, order-preserving hash function. The function used by 2008 Cassandra was basically an expensive way of turning a length-limited string into bytes. We ripped that out early on in favor of a true OrderedPartitioner (later superseded by ByteOrderedPartitioner). These remain discouraged in practice because of the near certainty of hot spots.
Cassandra uses replication to achieve high availability and durability. Each data item is replicated at N hosts, where N is the replication factor configured \per-instance". Each key, k, is assigned to a coordinator node (described in the previous section). The coordinator is in charge of the replication of the data items that fall within its range. In addition to locally storing each key within its range, the coordinator replicates these keys at the N-1 nodes in the ring. Cassandra provides the client with various options for how data needs to be replicated. Cassandra provides various replication policies such as \Rack Unaware", \Rack Aware" (within a datacenter) and \Datacenter Aware". Replicas are chosen based on the replication policy chosen by the application. If certain application chooses \Rack Unaware" replication strategy then the non-coordinator replicas are chosen by picking N-1 successors of the coordinator on the ring. For \Rack Aware" and \Datacenter Aware" strategies the algorithm is slightly more involved. Cassandra system elects a leader amongst its nodes using a system called Zookeeper. All nodes on joining the cluster contact the leader who tells them for what ranges they are replicas for and leader makes a concerted effort to maintain the invariant that no node is responsible for more than N-1 ranges in the ring. The metadata about the ranges a node is responsible is cached locally at each node and in a fault-tolerant manner inside Zookeeper - this way a node that crashes and comes back up knows what ranges it was responsible for. We borrow from Dynamo parlance and deem the nodes that are responsible for a given range the \preference list" for the range.
As is explained in Section 5.1 every node is aware of every other node in the system and hence the range they are responsible for. Cassandra provides durability guarantees in the presence of node failures and network partitions by relaxing the quorum requirements as described in Section5.2. Data center failures happen due to power outages, cooling failures, network failures, and natural disasters. Cassandra is configured such that each row is replicated across multiple data centers. In essence, the preference list of a key is constructed such that the storage nodes are spread across multiple datacenters. These datacenters are connected through high speed network links. This scheme of replicating across multiple datacenters allows us to handle entire data center failures without any outage.
The first part of this section is poorly written — it sounds like the coordinator is a “master” replica, when all N replicas are peers. (In modern Cassandra terminology, the coordinator is the node that processes a given client’s request and routes it to the appropriate replicas; it is not necessarily itself a replica.)
Replication strategies have changed; “Rack Unaware” is now SimpleStrategy, and “Rack Aware” is (the deprecated) OldNetworkTopologyStrategy. The recommended strategy is now NetworkTopologyStrategy, which handles more than two datacenters gracefully.
The most important change since 2008 is that all replicas are not equal for reads. Cassandra tracks which replicas respond fastest and prefers to route requests there.
Zookeeper usage was restricted to Facebook’s in-house Cassandra branch; Apache Cassandra has always avoided it. This means that you can’t add nodes to the cluster faster than membership awareness can spread via gossip (up to a minute for a large cluster), but we consider this worth the simplicity of avoiding the extra moving parts.
Cassandra continues to demonstrate resilience to small and large failures.
Cluster membership in Cassandra is based on Scuttlebutt, a very efficient anti-entropy Gossip based mechanism. The salient feature of Scuttlebutt is that it has very efficient CPU utilization and very efficient utilization of the gossip channel. Within the Cassandra system Gossip is not only used for membership but also to disseminate other system related control state.
5.3.1 Failure Detection
Failure detection is a mechanism by which a node can locally determine if any other node in the system is up or down. In Cassandra failure detection is also used to avoid attempts to communicate with unreachable nodes during various operations. Cassandra uses a modified version of the Φ Accrual Failure Detector. The idea of an Accrual Failure Detection is that the failure detection module doesn't emit a Boolean value stating a node is up or down. Instead the failure detection module emits a value which represents a suspicion level for each of monitored nodes. This value is defined as Φ. The basic idea is to express the value of Φ on a scale that is dynamically adjusted to reflect network and load conditions at the monitored nodes.
Φ has the following meaning: Given some threshold Φ, and assuming that we decide to suspect a node A when Φ = 1, then the likelihood that we will make a mistake (i.e., the decision will be contradicted in the future by the reception of a late heartbeat) is about 10%. The likelihood is about 1% with Φ = 2, 0.1% with Φ = 3, and so on. Every node in the system maintains a sliding window of inter-arrival times of gossip messages from other nodes in the cluster. The distribution of these inter-arrival times is determined and Φ is calculated. Although the original paper suggests that the distribution is approximated by the Gaussian distribution we found the Exponential Distribution to be a better approximation, because of the nature of the gossip channel and its impact on latency. To our knowledge our implementation of the Accrual Failure Detection in a Gossip based setting is the first of its kind. Accrual Failure Detectors are very good in both their accuracy and their speed and they also adjust well to network conditions and server load conditions.
With the largest Cassandra clusters an order of magnitude larger than five years ago (~1000 vs ~100), we’ve had to redesign some of the algorithms involved, but the basic design remains essentially the same.
When a node starts for the first time, it chooses a random token for its position in the ring. For fault tolerance, the mapping is persisted to disk locally and also in Zookeeper. The token information is then gossiped around the cluster. This is how we know about all nodes and their respective positions in the ring. This enables any node to route a request for a key to the correct node in the cluster. In the bootstrap case, when a node needs to join a cluster, it reads its configuration file which contains a list of a few contact points within the cluster. We call these initial contact points, seeds of the cluster. Seeds can also come from a configuration service like Zookeeper.
In Facebook's environment node outages (due to failures and maintenance tasks) are often transient but may last for extended intervals. Failures can be of various forms such as disk failures, bad CPU etc. A node outage rarely signifies a permanent departure and therefore should not result in re-balancing of the partition assignment or repair of the unreachable replicas. Similarly, manual error could result in the unintentional startup of new Cassandra nodes. To that effect every message contains the cluster name of each Cassandra instance. If a manual error in configuration led to a node trying to join a wrong Cassandra instance it can thwarted based on the cluster name. For these reasons, it was deemed appropriate to use an explicit mechanism to initiate the addition and removal of nodes from a Cassandra instance. An administrator uses a command line tool or a browser to connect to a Cassandra node and issue a membership change to join or leave the cluster.
There were some subtle corner cases around replication while bootstraping new nodes is in progress, but the basic idea of “replicate to the old and new locations during bootstrap, so a failed bootstrap requires no recovery” remains the same.
Cassandra’s seed_provider is now pluggable so that you can easily deploy a custom provider for your environment. By default, seeds are simply specified in cassandra.yaml.
We continue to find that transient node outages or partitions greatly outnumber permanent failures.
5.5 Scaling the Cluster
When a new node is added into the system, it gets assigned a token such that it can alleviate a heavily loaded node. This results in the new node splitting a range that some other node was previously responsible for. The Cassandra bootstrap algorithm is initiated from any other node in the system by an operator using either a command line utility or the Cassandra web dashboard. The node giving up the data streams the data over to the new node using kernel-kernel copy techniques. Operational experience has shown that data can be transferred at the rate of 40 MB/sec from a single node. We are working on improving this by having multiple replicas take part in the bootstrap transfer thereby parallelizing the effort, similar to Bit torrent.
This section is obsoleted by virtual nodes.
5.6 Local Persistence
The Cassandra system relies on the local file system for data persistence. The data is represented on disk using a format that lends itself to efficient data retrieval. Typical write operation involves a write into a commit log for durability and recoverability and an update into an in-memory data structure. The write into the in-memory data structure is performed only after a successful write into the commit log. We have a dedicated disk on each machine for the commit log since all writes into the commit log are sequential and so we can maximize disk throughput. When the in-memory data structure crosses a certain threshold, calculated based on data size and number of objects, it dumps itself to disk. This write is performed on one of many commodity disks that machines are equipped with. All writes are sequential to disk and also generate an index for efficient lookup based on row key. These indices are also persisted along with the data file. Over time many such files could exist on disk and a merge process runs in the background to collate the different files into one file. This process is very similar to the compaction process that happens in the Bigtable system.
A typical read operation first queries the in-memory data structure before looking into the files on disk. The files are looked at in the order of newest to oldest. When a disk lookup occurs we could be looking up a key in multiple files on disk. In order to prevent lookups into files that do not contain the key, a bloom filter, summarizing the keys in the le, is also stored in each data file and also kept in memory. This bloom filter is first consulted to check if the key being looked up does indeed exist in the given le. A key in a column family could have many columns. Some special indexing is required to retrieve columns which are further away from the key. In order to prevent scanning of every column on disk we maintain column indices which allow us to jump to the right chunk on disk for column retrieval. As the columns for a given key are being serialized and written out to disk we generate indices at every 256K chunk boundary. This boundary is configurable, but we have found 256K to work well for us in our production workloads.
Apache Cassandra has made a number of improvements to local persistence, including JBOD support and mixed SSD/HDD deployments.
Cassandra now manages memtable sizes and flush policy automatically.
Compaction has been a very active subject for improvements. Besides concurrent compactions, single-pass compaction optimization, compaction throttling, leveled compaction is an entirely new compaction policy that optimizes more for reads.
Apache Cassandra also performs many optimizations such as arena allocation to work around limitations in the JVM. With GC being one of the biggest such pain points, the bloom filter, partition key index, and other storage engine internals have all been moved off-heap.
5.7 Implementation Details
The Cassandra process on a single machine is primarily consists of the following abstractions: partitioning module, the cluster membership and failure detection module and the storage engine module. Each of these modules rely on an event driven substrate where the message processing pipeline and the task pipeline are split into multiple stages along the line of the SEDA architecture. Each of these modules has been implemented from the ground up using Java. The cluster membership and failure detection module, is built on top of a network layer which uses non-blocking I/O. All system control messages rely on UDP based messaging while the application related messages for replication and request routing relies on TCP. The request routing modules are implemented using a certain state machine. When a read/write request arrives at any node in the cluster the state machine morphs through the following states (i) identify the node(s) that own the data for the key (ii) route the requests to the nodes and wait on the responses to arrive (iii) if the replies do not arrive within a configured timeout value fail the request and return to the client (iv) figure out the latest response based on timestamp (v) schedule a repair of the data at any replica if they do not have the latest piece of data. For sake of exposition we do not talk about failure scenarios here. The system can be configured to perform either synchronous or asynchronous writes. For certain systems that require high throughput we rely on asynchronous replication. Here the writes far exceed the reads that come into the system. During the synchronous case we wait for a quorum of responses before we return a result to the client.
In any journaled system there needs to exist a mechanism for purging commit log entries. In Cassandra we use a rolling a commit log where a new commit log is rolled out after an older one exceeds a particular, configurable, size. We have found that rolling commit logs after 128MB size seems to work very well in our production workloads. Every commit log has a header which is basically a bit vector whose size is fixed and typically more than the number of column families that a particular system will ever handle. In our implementation we have an in-memory data structure and a data file that is generated per column family. Every time the in-memory data structure for a particular column family is dumped to disk we set its bit in the commit log stating that this column family has been successfully persisted to disk. This is an indication that this piece of information is already committed. These bit vectors are per commit log and also maintained in memory. Every time a commit log is rolled its bit vector and all the bit vectors of commit logs rolled prior to it are checked. If it is deemed that all the data has been successfully persisted to disk then these commit logs are deleted. The write operation into the commit log can either be in normal mode or in fast sync mode. In the fast sync mode the writes to the commit log are buffered. This implies that there is a potential of data loss on machine crash. In this mode we also dump the in-memory data structure to disk in a buffered fashion. Traditional databases are not designed to handle particularly high write throughput. Cassandra morphs all writes to disk into sequential writes thus maximizing disk write throughput. Since the files dumped to disk are never mutated no locks need to be taken while reading them. The server instance of Cassandra is practically lockless for read/write operations. Hence we do not need to deal with or handle the concurrency issues that exist in B-Tree based database implementations.
The Cassandra system indexes all data based on primary key. The data file on disk is broken down into a sequence of blocks. Each block contains at most 128 keys and is demarcated by a block index. The block index captures the relative offset of a key within the block and the size of its data. When an in-memory data structure is dumped to disk a block index is generated and their offsets written out to disk as indices. This index is also maintained in memory for fast access. A typical read operation always looks up data first in the in-memory data structure. If found the data is returned to the application since the in-memory data structure contains the latest data for any key. If not found then we perform disk I/O against all the data files on disk in reverse time order. Since we are always looking for the latest data we look into the latest file first and return if we find the data. Over time the number of data files will increase on disk. We perform a compaction process, very much like the Bigtable system, which merges multiple files into one; essentially merge sort on a bunch of sorted data files. The system will always compact files that are close to each other with respect to size i.e. there will never be a situation where a 100GB file is compacted with a file which is less than 50GB. Periodically a major compaction process is run to compact all related data files into one big le. This compaction process is a disk I/O intensive operation. Many optimizations can be put in place to not affect in coming read requests.
Early versions of Cassandra used the Thrift RPC framework as the network protocol. Thus, “asynchronous” operations were actually a “fire and forget” RPC call — no success or failure response would ever be seen. One of the benefits of the native CQL protocol is that it allows truly asynchronous responses, and also allows push notifications to the client of things like cluster membership changes.
To eliminate the commitlog replaying data that has already been written under certain failure scenarios, the most recent flush time (that is, time of “dumping to disk”) is now kept in SSTable metadata rather than a commitlog header. This also simplified the commitlog code and made it easier to add performance improvements like segment recycling.
The Cassandra block index now includes cell-level index information within a partition. This reduces the worst case performance from three seeks per SSTable to two, and the best case from two to one. Indexes are now cached.
The optimization described in paragraph three is incorrect as presented: since updates may be arbitrarily delayed, we cannot rely on the order we receive updates to match the order in which they were issued. We fixed this early on in Apache Cassandra, and more recently implemented a more sophisticated version of the same idea that preserves correctness in the face of update reordering.
6. Practical Experiences
In the process of designing, implementing and maintaining Cassandra we gained a lot of useful experience and learned numerous lessons. One very fundamental lesson learned was not to add any new feature without understanding the effects of its usage by applications. Most problematic scenarios do not stem from just node crashes and network partitions. We share just a few interesting scenarios here.
- Before launching the Inbox Search application we had to index 7TB of inbox data for over 100M users, then stored in our MySQL infrastructure, and load it into the Cassandra system. The whole process involved running Map/Reduce jobs against the MySQL data files, indexing them and then storing the reverse-index in Cassandra. The M/R process actually behaves as the client of Cassandra. We exposed some background channels for the M/R process to aggregate the reverse index per user and send over the serialized data over to the Cassandra instance, to avoid the serialization/deserialization overhead. This way the Cassandra instance is only bottlenecked by network bandwidth.
- Most applications only require atomic operation per key per replica. However there have been some applications that have asked for transactional mainly for the purpose of maintaining secondary indices. Most developers with years of development experience working with RDBMS's find this a very useful feature to have. We are working on a mechanism to expose such atomic operations.
- We experimented with various implementations of Failure Detectors such as the ones described in  and . Our experience had been that the time to detect failures increased beyond an acceptable limit as the size of the cluster grew. In one particular experiment in a cluster of 100 nodes time to taken to detect a failed node was in the order of two minutes. This is practically unworkable in our environments. With the accrual failure detector with a slightly conservative value of PHI, set to 5, the average time to detect failures in the above experiment was about 15 seconds.
- Monitoring is not to be taken for granted. The Cassandra system is well integrated with Ganglia, a distributed performance monitoring tool. We expose various system level metrics to Ganglia and this has helped us understand the behavior of the system when subject to our production workload. Disks fail for no apparent reasons. The bootstrap algorithm has some hooks to repair nodes when disk fail. This is however an administrative operation.
- Although Cassandra is a completely decentralized system we have learned that having some amount of coordination is essential to making the implementation of some distributed features tractable. For example Cassandra is integrated with Zookeeper, which can be used for various coordination tasks in large scale distributed systems. We intend to use the Zookeeper abstraction for some key features which actually do not come in the way of applications that use Cassandra as the storage engine.
Apache Cassandra offers excellent Hadoop integration for the kind of bulk load described here, including CQL schema.
Atomicity between table data and column indexes is not required for node-local indexes (where each node indexes its own local data), and in fact performance is substantially better without it.
However, atomicity may be required for distributed indexes (where the index itself is partitioned by value). Cassandra now supports atomicity across multiple partitions but as of this writing does not yet support distributed indexes.
Modern Cassandra management tools include DataStax’s OpsCenter and Netflix’s Priam.
As mentioned above, Apache Cassandra has found incorporating Zookeeper to be neither necessary nor desirable. Lightweight transactions may be used instead when linearizability is necessary.
6.1 Facebook Inbox Search
For Inbox Search we maintain a per user index of all messages that have been exchanged between the sender and the recipients of the message. There are two kinds of search features that are enabled today (a) term search (b) interactions - given the name of a person return all messages that the user might have ever sent or received from that person. The schema consists of two column families. For query (a) the user id is the key and the words that make up the message become the super column. Individual message identifiers of the messages that contain the word become the columns within the super column. For query (b) again the user id is the key and the recipients id's are the super columns. For each of these super columns the individual message identifiers are the columns. In order to make the searches fast Cassandra provides certain hooks for intelligent caching of data. For instance when a user clicks into the search bar an asynchronous message is sent to the Cassandra cluster to prime the buffer cache with that user's index. This way when the actual search query is executed the search results are likely to already be in memory. The system currently stores about 50+TB of data on a 150 node cluster, which is spread out between east and west coast data centers. We show some production measured numbers for read performance.
There are now many public Cassandra use cases, including several production-quality applications available to study now as open source. The latest include the Feedly notification system, the Comcast Cloud Message Bus (see also their Summit talk), and Netflix's RSS reader recipe.
Typical read latencies in Cassandra today run about 1/3 of Facebook’s numbers here.
We have built, implemented, and operated a storage system providing scalability, high performance, and wide applicability. We have empirically demonstrated that Cassandra can support a very high update throughput while delivering low latency. Future works involves adding compression, ability to support atomicity across keys and secondary index support.
Five years, 6,000 JIRA issues, and 250 contributors after its initial release, modern Cassandra is significantly more performant, resilient, feature-complete, and easier to both operate and develop against. Apache Cassandra has implemented column indexes, compression, and security features. Performance has improved substantially. But the most significant developments are CQL and lightweight transactions, neither of which was visible on the horizon when we first started.
Cassandra system has benefited greatly from feedback from many individuals within Facebook. In addition we thank Karthik Ranganathan who indexed all the existing data in MySQL and moved it into Cassandra for our first production deployment. We would also like to thank Dan Dumitriu from EPFL for his valuable suggestions about  and .
For references, please see the original paper.