Secondary indexes
Secondary indexes have been in Cassandra since 0.7 and can be incredibly useful. For example, if you were implementing a user accounts database, you might have the schema
CREATE TABLE user_accounts (
username text PRIMARY KEY,
email text,
password text,
last_visited timestamp,
country text
);
The only key you can lookup on is the primary key – the username. If you wanted to find users in a particular country, you can’t do it without doing a full scan. Instead, you could create an index:
create index user_accounts_country on user_accounts(country);
So you can now run queries like:
select * from user_accounts where country = 'UK';
This works, but if you were deploying this in production you should understand what’s going on under the hood to know if it will work for you.
How secondary indexes work
At a high level, secondary indexes look like normal column families, with the indexed value as the partition key.
For user_accounts, the partition key is username and that is the key the data is indexed with in Cassandra’s SSTables. For the index, the partition key is the country and the column name is the username. As an example, suppose there are two users in the UK, the data stored in Cassandra is (showing only username and country) in JSON form:
{
"rlow": {
"country": "UK"
},
"jbloggs": {
"country": "UK"
}
}
with corresponding index entries:
{
"UK": {
"rlow": "",
"jbloggs": ""
}
}
This means, to find everyone in the UK, we simply lookup this row to find the primary key for the user_accounts table i.e. the usernames.
Distribution
The subtly here is how the data is distributed. For user_accounts, the partitions are distributed by hashing the username and using the ring to find the nodes that store the data. This means user accounts will in general be stored on different nodes. So to find all the users in the UK we will have to do lookups on different nodes. If there are many users in the UK – many more than the number of nodes in the cluster – we should expect to do a query on every node.
If the index were stored like a regular column family, the ‘UK’ partition would be stored on a single node (plus replicas). This partition would grow and grow over time and all index lookups would hit this node. This doesn’t scale – the node(s) indexing the ‘UK’ partition would have to do more and more work as the data grows.
For this reason, Cassandra’s secondary indexes are not distributed like normal tables. They are implemented as local indexes. Each node stores an index of only the data that it stores. For our example, if partitions ‘rlow’ and ‘jbloggs’ are stored on different nodes then one node will have index
{
"UK": {
"rlow": "",
}
}
and the other
{
"UK": {
"jbloggs": ""
}
}
This means our index scales nicely – as our data grows and we add more nodes to compensate, the index on each node stays a constant size.
Note that this doesn’t allow us to scale the number of index lookups since each index lookup does work on each node. But, as our data grows, the data returned from each query grows. The scaling allows us to effectively balance this load around the cluster.
High vs low cardinality
To perform the country index lookup, every node is queried, looks up the ‘UK’ partition and then looks up each user_accounts partition found. This is pretty efficient – each node does one index lookup plus one lookup for each bit of data returned. Each lookup is potentially a disk seek, so if there are n nodes and p partitions returned, we’ve done O(n+p) disk seeks. Since we’ve assumed there are many more users than nodes, p >> n so this is O(p) disk seeks, or O(1) per partition returned.
However, suppose instead we had created an index on email. The key difference here is the cardinality of the fields. There are many entries with the same country but probably only one with the same email. This means only one node (plus replicas) store data for a given email address but all nodes are queried for each lookup. This is wasteful – every node has potentially done a disk seek but we’ve only got back one partition. In this case, we’ve done O(n+1)=O(n) disk seeks. This is O(n) per partition returned.
In this case, the scaling we mostly care about is the number of queries we can perform. The size of the data we are requesting doesn’t change so the only parameter that can grow over time is the query rate. But since we are doing O(n) lookups, increasing n doesn’t change our query rate so we cannot scale.
What would be much more efficient in this case is a distributed index. If the index was distributed just like a normal table then the index lookup would be a single lookup, followed by another single lookup to retrieve the data. These lookups will in general be on different nodes but there are only two lookups in total.
Distributed indexes
Cassandra doesn’t provide an index suitable for the email index, but you can do it yourself. You can create a separate table to store the inverted index:
CREATE TABLE user_accounts_email_idx (
email text,
username text,
PRIMARY KEY(email, username)
);
With the advent of atomic batches in Cassandra 1.2, you can update it atomically. You would, however, miss two nice features of the inbuilt indexing. If you create the index when there is already data, you will need to build the initial index yourself. Also, CASSANDRA-2897 (in Cassandra 1.2) adds ‘lazy’ updating to secondary indexes. When you change an indexed value, you need to remove the old value from the index. Prior to Cassandra 1.2, a read was performed to read the old value to remove it from the index. This made index inserts significantly slower. Lazy updating on reads makes inserts into indexed tables significantly cheaper. There’s no reason why you couldn’t do this manually in your client too but it is complicated.
The sweet spot
Going back to the country index, recall that Cassandra is doing O(p) seeks to return p users. This is a rare case in Cassandra where you perform random I/O rather than sequential I/O. If your table was significantly larger than memory, a query would be very slow even to return just a few thousand results. Returning potentially millions of users would be disastrous even though it would appear to be an efficient query.
This leads to the conclusion that the best use case for Cassandra’s secondary indexes is when p is approximately n i.e. the number of partitions is about equal to the number of nodes. Any fewer partitions and your n index lookups are wasted; many more partitions and each node is doing many seeks.
In practice, this means indexing is most useful for returning tens, maybe hundreds of results. Bear this in mind when you next consider using a secondary index.
High-dimensional data
Cassandra’s native data model is two dimensional: rows and columns. This is great for data that is naturally grouped together e.g. fields of a message. However, some uses require more dimensions – maybe you want to group your messages by recipient too, creating something like this:
"alice": {
"ccd17c10-d200-11e2-b7f6-29cc17aeed4c": {
"sender": "bob",
"sent": "2013-06-10 19:29:00+0100",
"subject": "hello",
"body": "hi"
}
}
For this reason, super columns were added to Cassandra. These are columns that contain columns. You can create as many columns as you want within them and have different numbers in different super columns. You can fetch some or all columns in a super column. You can delete an entire super column or just some sub-columns. They do everything you need.
So what’s the problem?
One obvious problem is for yet higher dimensional data models. Maybe your messages have attachments with fields for size, type, encoding, etc.. You could represent this by concatenating the field names, but in general this doesn’t work if you want to use different comparators for different dimensions.
There is also a less obvious problem that manifests even if you have 2-dimensional data. The way super columns are implemented deep down in Cassandra is as a single column with the sub-columns serialized within it. This means, in order to read one sub-column, Cassandra has to deserialize the whole super column. For large super columns this becomes very inefficient. Also, during partial updates to super columns, the merging during compaction becomes expensive.
A further reason is there are many special cases in the code for super columns. All Cassandra developers I’ve spoken too would love to clean this up.
Yet another problem is that super columns cannot be accessed through CQL, the now much preferred interface to Cassandra.
Enter composite columns
Cassandra 0.8.1 introduced composite columns. These are arbitrary dimensional column names that can have types like CompositeType(UTF8Type, ReversedType(TimeUUIDType), LongType)). It’s also really simple: it is implemented as a comparator so adds very little complexity to Cassandra or clients.
When 0.8.1 was out, I immediately set to converting all the super column data models I knew to use composite columns. Everything worked well apart from one issue: I wanted an equivalent of super column delete.
Remember I said above that you can delete a whole super-column. This is a great feature: even without knowing which sub-columns are present you can delete them all. This presents a problem since I certainly didn’t want to read the sub-columns in to find out which ones to delete (reads in Cassandra are much more costly than writes/deletes).
Range tombstones
What I really needed was a range delete: delete everything from column X to column Y. I was excited to see this added to Cassandra 1.2. Currently, this feature only works through CQL but there are plans to add it to the old thrift interface.
Let’s see them work in an example. Suppose I have a large number of sensors sending all kinds of different readings. Depending on the conditions they may send more or less data so I cannot tell in advance which fields I will get. New sensors are being added all the time and I want Cassandra to store everything.
A possible data model is this:
CREATE TABLE readings (
sensor_id uuid,
reading_id timeuuid,
name text,
value blob,
PRIMARY KEY (sensor_id, reading_id, name)
);
Each sensor has a UUID and each reading has a TimeUUID (so it is time ordered). Each reading consists of one or more (name, value) pairs. By using a compound primary key I can use arbitrary column names. (My partition key or row key is sensor_id; this means my data is partitioned according to the sensor it came from.)
Then let’s insert some readings from a fictional weather sensor:
BEGIN BATCH
INSERT INTO readings (sensor_id, reading_id, name, value) VALUES (d1e59ab9-0fa2-49dd-97c1-41ce9537c110, dde7dfd0-d200-11e2-b05b-fac359ec8ffb, 'temp', 0x11)
INSERT INTO readings (sensor_id, reading_id, name, value) VALUES (d1e59ab9-0fa2-49dd-97c1-41ce9537c110, dde7dfd0-d200-11e2-b05b-fac359ec8ffb, 'time', 0x51b622aa)
APPLY BATCH;
BEGIN BATCH
INSERT INTO readings (sensor_id, reading_id, name, value) VALUES (d1e59ab9-0fa2-49dd-97c1-41ce9537c110, ccd17c10-d200-11e2-b05b-fac359ec8ffb, 'temp', 0x12)
INSERT INTO readings (sensor_id, reading_id, name, value) VALUES (d1e59ab9-0fa2-49dd-97c1-41ce9537c110, ccd17c10-d200-11e2-b05b-fac359ec8ffb, 'humidity', 0x52)
APPLY BATCH;
BEGIN BATCH
INSERT INTO readings (sensor_id, reading_id, name, value) VALUES (d1e59ab9-0fa2-49dd-97c1-41ce9537c110, 13a1ec90-d203-11e2-b05b-fac359ec8ffb, 'temp', 0x11)
APPLY BATCH;
I used a batch so that my writes are atomic and isolated: I won’t end up with partial readings if a write fails or if I read during a write.
Internally, this is using sensor_id as the row key and CompositeType for the column names. The first dimension is reading_id and second is name. value is stored as the column value.
I can now list all readings for my sensor:
select dateOf(reading_id), name, value from readings
where sensor_id = d1e59ab9-0fa2-49dd-97c1-41ce9537c110 order by reading_id desc;
dateOf(reading_id) | name | value
--------------------------+----------+------------
2013-06-10 20:22:23+0100 | temp | 0x11
2013-06-10 20:06:33+0100 | time | 0x51b622aa
2013-06-10 20:06:33+0100 | temp | 0x11
2013-06-10 20:06:05+0100 | temp | 0x12
2013-06-10 20:06:05+0100 | humidity | 0x52
This is reading the time from the TimeUUID and showing most recent first.
Now comes the bit you’ve all been waiting for: we can now delete a whole reading:
DELETE FROM readings WHERE sensor_id = d1e59ab9-0fa2-49dd-97c1-41ce9537c110
AND reading_id = ccd17c10-d200-11e2-b05b-fac359ec8ffb;
This deletes the two columns for this reading and I didn’t have to know what they were beforehand. Internally, this has inserted a range tombstone in the row, deleting all columns with prefix “ccd17c10-d200-11e2-b05b-fac359ec8ffb”.
More complex deletes
Maybe I then wanted to delete all readings within a certain time range because the sensor was giving invalid readings. I could try this:
DELETE FROM readings WHERE sensor_id = d1e59ab9-0fa2-49dd-97c1-41ce9537c110
AND reading_id > 13a1ec90-d203-11e2-b05b-fac359ec8ffb;
This isn’t yet supported in CQL. Hopefully it will come soon since all the underlying machinery is ready to support such a query. (NB such a delete was never possible with super columns.)
Future
Now we can truly say that super columns are deprecated: you can do everything you ever wanted to do with them with composite type, CQL and range tombstones. Indeed, in the upcoming Cassandra 2.0 super columns have been replaced internally: CASSANDRA-3237. I don’t expect they will be removed (from the thrift interface) but at least when someone tells you not to use super columns you now have a viable and complete alternative.
Today a colleague asked me: how can I find out how many keys I just inserted into Cassandra? You’d expect any half-decent database to be able to tell you how much stuff it has got. Cassandra being (somewhat better than) half-decent can, but there are many subtleties that are worth understanding.
Firstly, a clarification on what counting keys actually means. Keys in Cassandra parlance mean rows, so we’re counting the number of rows in a column family. However, there is not actually a distinct row object in Cassandra; rows are just containers for columns. So empty rows don’t exist (caveat: see row deletes later); an empty row is the same as a row that never existed so cannot contribute.
OK, let’s count. In CQL, you can use
select count(*) from cf
However, there is a default limit of 10,000 applied to this statement which will truncate the result for larger column families. The limit can be increased for larger column families:
select count(*) from cf limit 1000000
for example.
The thrift equivalent involves some code, but can be done quite simply in pycassa:
import pycassa
from pycassa.pool import ConnectionPool
from pycassa.columnfamily import ColumnFamily
pool = ConnectionPool('ks')
col_fam = pycassa.ColumnFamily(pool, 'cf')
result = col_fam.get_range()
count = sum(1 for _ in result)
Efficiency
The CQL code shows nothing strange; in an SQL DB you would expect it to return quickly and this query looks the same. But why do I need to specify a limit? The pycassa code gives a hint why.
Cassandra doesn’t maintain such counts, unlike the indexing structures used in relational databases. It’s not that someone didn’t bother to implement it, such a count would violate many of Cassandra’s principles.
A key design of Cassandra is that it is write optimized. All of Cassandra’s basic data structures allow writes with zero reads. Cassandra doesn’t check if there are old columns or slot the new column in its final resting place next to the others. It just accepts it into an in memory structure called a memtable, which when large enough gets flushed to disk. This ensures all writes are sequential on disk, which is the primary reason why Cassandra gets such good write performance.
However, what this means is Cassandra doesn’t know if a new column is in a row not yet seen. Or if a delete removes the last column from a row. If it did, it would have had to read to see what data exists which is not allowed.
Actually, Cassandra knows a little bit about this. If a row already exists in a memtable, Cassandra knows this for free. This means when a memtable is flushed to disk (becoming an SSTable), Cassandra knows how many rows there are in it. However, across SSTables, Cassandra doesn’t know if the rows are disjoint or entirely overlapping.
This means the only way to find out is to read through the rows. This is exactly what the pycassa example is doing: read through all the rows. And under the hood this is what the ‘select count’ query does too (with large enough limit).
If you have 10 TB of data per node in a single column family, your innocuos one line query will scan through 10 TB of data on all nodes just to give you one number!
I don’t care about the exact number, can I have a ballpark estimate?
Because Cassandra knows how many rows there are in each SSTable it is possible to get an estimate. The ‘nodetool cfstats’ output tells you these counts in the ‘Number of Keys (estimate)’ line. This is the sum of rows in each SStable (again approximate due to the indexing used but can’t be off by more than 128 by default).
This will often be fairly accurate but it depends on your workload. For write once workloads with unique row keys it will be close (it doesn’t include memtables). But for workloads with lots of overwrites, or writes to the same row a long time apart, it could be significantly different from the true value.
Because this estimate is the sum across all SSTables, It can’t be more than a factor of your SSTable count (also printed by cfstats) too high. If you are testing you could do a full compaction (‘nodetool compact’) to get all your data in one SSTable to get a more accurate count.
Can I maintain a count myself?
If you only ever wrote to a row once, you could use a Cassandra counter that you increment on each new row insertion. But if rows were written to more than once you would have the same problem as Cassandra itself has: is this the first write or not? And maintaining this would be hard: a single error throws the count off forever.
You could also maintain a row that you insert all row keys to. This will have as many columns as you have total rows but no data. Counting the number of rows is reduced to counting the number of columns in this row. However, now every insert requires an insert to this row, and the row isn’t scalable. You could come up with complex solutions to fix this but it will likely become a maintenance problem that is not worth the effort.
Deletes
Because deletes are actually insertions of tombstones (see explanation why), rows that only have deleted columns will show up in the above cfstats counts. Only when the tombstones have been garbage collected will they be removed from the count. As a further complication, you can also delete whole rows. This inserts a special row tombstone object. This can result in empty rows being returned in queries messing up counts. By default, pycassa filters these out (you can get them back by setting ‘filter_empty=False’ in the get_range call). Also the CQL count query doesn’t count them.
What about consistency?
In a replicated setting, even if each Cassandra node knew exactly how many unique rows it had seen, it wouldn’t necessarily know the full count. Maybe each node has missed a row, but collectively they have all the rows. This further complicates any solution to finding accurate counts.
Separately, the count returned can be inconsistent in the sense that it might disagree with other counts obtained at the same time. This will happen if you are changing your data while counting: new rows may or may not get counted by ongoing count operations.
Why was I counting anyway?
We’ve seen above just how hard it is to get a count. I can’t think of many important use cases where the count is required though. You may want it for testing or auditing, but it is unlikely to be required by your application. If you think you need it, look to see if it really is required. You don’t want to be initiating a distributed table scan regularly just to find a count that will be out of date by the time you read it.
Welcome to my blog! I’m going to be writing about some of my thoughts on distributed systems, algorithms and whatever else comes to me during my daily business. Please check back soon when I’ve written some up.
Meanwhile, here is a picture of a monkey.