One of the popular features of MongoDB is the ability to store arbitrarily nested objects and be able to index on any nested field. In this post I will show how to store nested objects in Cassandra using composite columns. I have recently added this feature to my open source Cassandra project agiato. Since in Cassandra, like many other NOSQL databases, stored data is highly denormalized. The denormalized data often manifests itself in the form of a nested object e.g., denormalizing one to many relations.
In the solution presented here, the object data is stored in column families with composite columns. An object will typically have some statically defined fields. Rest of the fields will be dynamic. It is possible to define the table or column family behind the object in CQL3, based on the statically known fields of the object.
Composite Column
Super columns are frowned upon in Cassandra. Composite columns are the preferred approach. Composite columns can do what super columns could do and more. The underlying data model in a super column is a 3 level nested map(including row key, super column name or key and column name or key). Composite columns are more flexible, allowing you to create a data model with multi level nested map.
Here is an example of data model based on composite columns. Consider a building HVAC system with the sensor data stored in a column family with composite columns. Here are the fields.
day | day as integer | 20130421 |
city | city name | Los Angeles |
buildingID | An integer ID for a building | 2658 |
sensorID | An integer ID for a sensor in a building | 295016 |
time | timestamp as long | 1361237156 |
temp | temarature as double | 71.2 |
humidity | relative humidity as double | 30.7 |
A record is uniquely identified with the tuple (day, city, buildingID, sensorID, time). The corresponding column family definition with composite key will be as below
create column family buildingControl with key_validation_class = 'IntegerType' and comparator = 'CompositeType(UTF8Type, IntegerType, IntegerType, LongType, UTF8Type)' and default_validation_class='DoubleType';
The composite column along with row key provide a 6 level nested map. The composite column has 5 components. The first 4 correspond to the value of the tuple (city, buildingID, sensorID, time). The last element is the name of a non primary column. The storage layout of a record will look as below
20130421 | Los Angeles:2658:295016:1361237156:temp | Los Angeles:2658:295016:1361237156:humidity |
71.2 | 30.7 |
The first 4 fields of the composite column name are the values of (city, buildingID, sensorID, time) values respectively, which are the clustering keys. The last component is the column name for a non primary key column. The value of the composite column is the corresponding non primary key column value.
The composite column name is shown in a human readable form. Internally, the composite column name is stored with some byte encoding logic. As a nested Map, a record, in this example can be represented as Map<Integer,Map<String, Map<Integer, Map<Integer, Map<Long, Map>>>>> . Strictly speaking all the inner maps are sorted maps. As we will see later, in terms of CQL, the outer map corresponds to the row key and all the inner maps except for the last one correspond to the clustering key.
A record comprises of a set of columns. The set of columns is repeated in the row for different unique values of (city, buildingID, sensorID, time). Essentially, a row will contain multiple records. Before composite key was introduced in Cassandra, people had resorted to using this pattern for multi level map by doing string concatenation for column values and names.
Primary Key Syntactic Sugar
In thrift, the interface to Cassandra is tightly coupled to the internal storage structure. CQL introduces a layer of abstraction between the underlying storage structure and the logical model and makes the the data model look like a familiar RDBMS table. The entity in our example has 7 attributes and as shown below, all 7 of them appear as columns in the CQL table definition.
CREATE TABLE buildingControl ( day int, city text, buildingID int, sensorID int, time timestamp, temp double, humidity double PRIMARY KEY (day, city, buildingID, sensorID, time) );
The primary key definition maps to the storage structure as follows. The first element day is the row key or the partitioning key. The remaining 4 elements constitute the clustering key. Internally, a record is stored with 2 physical columns, as we saw in the thrift data model definition. The 7 logical columns map to one row key and two composite columns.
Into the Object Land
So far in out example, data model had a flat record structure and we were able to define everything nicely with CQL. What if we have multiple related entities in our use case and we wanted to denormalize the data model. Here is an example of an order object in an eCommerce application, represented with JSON.
{ "custID" : '12736467', "date" : '2013-06-10', "orderID" : 19482065, "amount" : 216.28, "status" : 'picked', "notes" : 'on time', "items" : [ { "sku" : 87482734, "quantity" : 4 }, { "sku" : 32851042, "quantity" : 2 } ], "customer" : { "name" : 'Joe Smith', "tel" : '231 456 7890', "email" : 'joe@yahoo.com' } }
As you can see the the order object contains a child customer object and a list of order line item objects. The embedded customer object is statically defined and we could include it’s attributes as columns in CQL table definition. However, the embedded list of order line item objects can not be represented in CQL. Because, CQL can handle only list and map of primitives.
Cassandra is a schema optional data base and you can have any of the following with respect to schema
- Completely static
- Completely dynamic
- Semi static
The schema for our example falls into the third category, because we can not define all the fields in the schema.
Linearizing Object Hieararchy
If we a traverse an arbitrarily nested object, the leaf nodes will contain field name, primitive value pair. To linearize an object, after the traversing the object hierarchy, a list of leaf node objects is generated. Each such node is saved as a composite column. For nested fields, the column name is generated by concatenating all fields names in the path from the leaf node to the root, separated by period.
A CQL table along with primary key could be defined comprising of only the statically defined primitive fields of an object. Here is the partial definition for the order object
CREATE TABLE orders ( custID text, date text, orderID int, amount double, status text, notes text, PRIMARY KEY (custID,date,orderID) );
The only advantage of creating the CQL schema, is that you could run CQL select query. However the query will only return the columns defined in the CQL schema, but not the dynamic attributes of the object.
Saving the Object
The object to be saved in passed to the agiato API as a SimpleDynaBean which is simplified implementation of apache common DynaBean interface. With SimpleDynaBean you could define an arbitrarily nested object. The API also gets passed the primary key definition. The different options for object representation are as follows.
- SimpleDynaBean object
- JSON string
- Java bean object
The following code snippet uses SimpleDynaBean and saves the order object in the column family orders.
SimpleDynaBean obj = new SimpleDynaBean();
//populate hierarchical object
……
AgiatoContext.initialize(“/home/pranab/Projects/bin/agiato/test.json”);
ColumnFamilyWriter writer = AgiatoContext.createWriter(“orders”);
PrimaryKey primKey = new PrimaryKey(“custID”, “date”, “orderID”);
writer.writeObject(obj, primKey, ConsistencyLevel.ONE);
The object is traversed in a depth first way and a list leaf nodes is generated. The leaf nodes will include the columns defined in CQL as well as all the dynamic fields.
The arguments in the PrimaryKey constructor are the primary key column names. Here only the first element constitutes the row key. If that was not the case you had to call setRowKeyElementCount() to specify how many fields out of the primary key elements constitute the row key.
If after the depth first traversal, the primary key fields appear in the beginning of the list, you could call another PrimaryKey constructor and pass the number of primary key elements.
The class ObjectSerDes hadles object traversal and mapping object fields to composite key columns. Here is the DataAccess class for Cassandra data access through thrift. From the list of leaf nodes, the row key and the clustering key are identified. After traversing an object, the flattened list of NamedObject is created as shown below
row key ObjectNode | clustering key ObjectNodes | other ObjectNodes |
The first NamedObject value is converted to a row key. The clustering key NamedObject values makeup the prefix part of the composite key column name. Then the columns are saved in a way that has the same effect as running a CQL insert query, except that it will store all the dynamic fields of the object, which were not part of the CQL definition.
Field values are serialized based on introspection of the field data. There is no check made against table meta data. For the fields defined in CQL schema, if data with wrong type is passed in the object, be prepared to get surprising results from CQL query.
Update and Delete
For update and delete, it’s necessary to pass a partially formed object, whether a dynamic object or JSON string, to the API, containing the primary key fields and other fields that are going to be updated or deleted. For example, to update the status of an order, it’s necessary for the order object to have the primary key fields and the status field.
With statically compiled java bean object, it’s a difficult to define a partially formed object. One way of doing it is to have all the fields as primitive wrapper objects, where null will imply absence of a field value.
Querying the Object
I don’t have the query method to return fully populated object implemented yet. I will be adding it soon. In the mean time we can use CQL and CLI. If we run a CQl query this is what we get. As expected it return only the columns it knows about from the table definition. It does not return any of the dynamic columns.
custid | date | orderid | amount | notes | status ----------+------------+----------+--------+---------+-------- 12736467 | 2013-06-10 | 19482065 | 216.28 | in time | picked
However, when we run a CLI query it returns every field that is physically stored in Cassandra as below. All the values are in bytes in the output
RowKey: 3132373336343637 => (name=2013-06-10:19482065:amount, value=406b08f5c28f5c29, timestamp=13..) => (name=2013-06-10:19482065:customer.email, value=6a6f65407961686f6f2e636f6d, timestamp=13..) => (name=2013-06-10:19482065:customer.name, value=4a6f6520536d697468, timestamp=13..) => (name=2013-06-10:19482065:customer.tel, value=323331203435362037383930, timestamp=13..) => (name=2013-06-10:19482065:items.[0].quantity, value=00000004, timestamp=13..) => (name=2013-06-10:19482065:items.[0].sku, value=000000000536e16e, timestamp=13..) => (name=2013-06-10:19482065:items.[1].quantity, value=00000002, timestamp=13..) => (name=2013-06-10:19482065:items.[1].sku, value=0000000001f54462, timestamp=13..) => (name=2013-06-10:19482065:notes, value=696e2074696d65, timestamp=13..) => (name=2013-06-10:19482065:status, value=7069636b6564, timestamp=13..)
Why not Just Serialize
You might ask why bother with mapping all the nested fields of an object to columns. Why not serialize all the dynamic fields of the object and store it as a JSON string and include that column as a text filed in CQL create table. Granted, the column will be visible as a JSON string when you run CQl select query. If you are leaning in that direction, consider the following scenario
- You want to create secondary index on some nested field e.g., customer.zipCode. The field needs an independent existence as a column
- You want to treat such nested field it as dimension in for aggregation and analytic queries involving nested objects
The other advantage of having separate column for each nested field, is that CLI query results are more readable.
Next Steps
My first goal is to support object based query. A prototype object will be passed to agiato. The fields defining the query criteria will need to have the values defined in this object. These fields will be a subset of the primary key fields.
The API after running the query will return one or more fully populated object. Other fields need not be passed as part of the query object. However, if some prototype value for other fields is passed, then the API will return typed value for the fields, otherwise it will return byte arrays for values.
My next goal is to support aggregate and analytic queries. The user will define cube schema consisting dimensions and measures pointing to object attributes, including nested attributes. While storing an object aggregate values will be computed and stored in separate column families. Storm will be part of the aggregation engine ecosystem.
For commercial support for this solution or other solutions in my github repositories, please talk to ThirdEye Data Science Services. Support is available for Hadoop or Spark deployment on cloud including installation, configuration and testing,