Tuesday, June 11, 2013

Riak and Monotable

The Imikimi Monotable project has been placed on indefinite hold. Unfortunately we don't have the manpower. It is an excellent solution to our back-end data-store needs, and probably for many others as well. However, it is still going to require a lot of work to bring to production-ready.

Instead, we are looking seriously at Riak for our back-end store. We hope to put all our Kimi-data and most of our Sql-data in two separate Riak clusters. Riak has several benefits that Monotable was designed to solve:
  1. no central server - every node is the same and any node can respond to any request
  2. scales easily by adding one node at a time 
  3. can configure replication levels on a per-bucket basis
  4. supports map-reduce
  5. can store binary data - up to 16 megabytes per record (this our requirement, not Riak's exact max size)

However, Riak has some down-sides:
  1. Riak's "eventually consistent" system only makes progress on a record's consistency when it is read. It is not clear that any progress is made if the record is never read after a write or consistency failure. This means the latest version of a record could go under-replicated indefinitely.
  2. Riak's scaling is limited by your initial "ring-size". My hazy understanding is we need to set our ring-size to 20x our max expected server-count.
  3. Riak has a lot of cross-talk between nodes. There is potentially an upper limit of 300 servers before performance degrades.
  4. Riak does not allow range-selects on the primary key
  5. The behavior and scalability of secondary indexes is not clear to me. Typically distributed hash tables can't do secondary indexes. Riak has secondary indexes, but there is probably a catch that we don't understand yet.
  6. I don't think Riak has "live" map-reduce. 
Monotable is designed to achieve all of the above goals and scale to very large sizes. However, we are a long ways from needing >300 servers. As long as Riak's indexes work reasonably well and we can do off-line map-reduce jobs without disrupting site performance, it should be good enough for us for now.

Monotable and Map-Reduce

Live map-reduce was not an original requirement for Monotable. However, it is a very powerful feature. It would bring Monotable from being a simple key-value store with basic range-selects to a system with powerful indexing and querying capabilities. It turns out the core of Monotable is well suited for it.

In an offline map-reduce, each chunk can be stream-processed by the mapper and fed into the reducer. Once a chunk has been reduced to a single output, the results of all chunks can be merged until a single overall result is output. If the reducer can write data back to Monotable, you can use this to generate secondary indexes. 

These indexes are more powerful than SQL indexes which have a 1-to-1 mapping between source and index records. With the reduce step we can arbitrarily choose to index at any level of reduction. In fact we can store the result of every reduction if we wish. This allows us to generate an index + aggregate hybrid. With this data structure we can compute the map-reduce result over any sub-range in O(ln n) time.

This much was already a possibility within the original Monotable requirements. Just take the Monotable core and add a standard map-reduce system over top. The question becomes how can we make it live-updating?

Acknowledgment: Live-Map-Reduce is not new. I believe CouchDB does this already. CouchDB, however, doesn't scale easily to large distributed clusters. 

Live-Updating Map-Reduce in Monotable

Anytime a source-record is changed, the off-line generated index will become out-of-date. Using normal map-reduce, we'd have to delete all the old data and re-run the map-reduce. Deleting a large range of records is fast in Monotable, but re-running the map-reduce will be slow. Since we are already storing the entire reduce-tree, couldn't we re-map the changed record and then update only the O(ln n) reduce steps which depend on it?

Structure Definitions

First, I need to briefly describe the structure of Monotable. The entire Monotable stores all records, sorted by key. Monotable is logically partitioned into "ranges" which are similar to separate "tables" in other stores. Each range is broken up into one or more 64 meg "chunks". Each chunk exclusively covers a sub-range of the overall Monotable. All records in Monotable with keys >= the chunk's start key and < it's end key are stored in that chunk.

To keep things simple, let's separate out map-reduce. Each range in Monotable can have zero or more mappers or reducers. A mapper takes each record and transforms it and then writes the results back into Monotable in a different range. Each reducer takes all records in one chunk and reduces them down to one record which is written, again, to a different range. In both cases, the new range can in turn have a mapper or reducer. 

To configure a standard map reduce, we would apply a mapper to our data-range which write records to another range we'll call the "index". The index-range would have a reducer which write records to a meta-index-range. This meta-index-range would in turn have the same reducer step to yet another meta-meta-index-range. This continues until the meta-meta-meta-*-index-range fits within one chunk.

Live Updating

Now that we have a sense of our data-structure, how do we keep it up to date? Due to the nature of distributed data, we can't afford to do it synchronously. Monotable guarantees atomic operations within a chunk, but provides no guarantees across chunk boundaries. 

When a record changes, the first step is to mark it's chunk as "dirty" with respect to all mappers and reducers applied to that chunk. Asynchronously, the master server for each chunk can later initiate the process of "cleaning" the chunk by updating all effected maps and reductions.

Unfortunately, simply re-applying the map or reduce step for the chunk doesn't work. The mapper may generate arbitrary output based on the record's data. If we naively insert the new value in the index we may be forgetting to remove the old value, which may be anywhere else in the index. We must have both the old and the new value so we can delete the old value before inserting the new one. Note that if the mapped old and mapped new values are identical, we can skip the rest of the update. 

Keeping the old value of a record adds complexities, particularly since we want to allow more than one active mapper or reducer. For every mapper/reducer we need to be able to access the version of every  changed records as they were the last time that mapper/reducer ran. 

A New Chunk Data Structure with Versioning

I believe the answer is similar to how CouchDB stores its entire database - as a continually appending log representation of a b-tree. This format keeps a complete revision history of all changes. Each mapper or reducer just needs to track the offset of the snapshot when it was last run. 

If we limit this to each individual chunk, then we avoid the main problem of CouchDB: it is painful to compact large databases. Since chunks are limited in size, we always have relatively small files which will be fast to compact. We have to be a bit careful when we compact chunks to ensure that we don't lose any of the snapshots that the map/reducers need. However, if they are kept up-to-date at roughly the same rate as our compaction schedule they should usually be fairly up-to-date and we only need to keep the most recent few snapshots. We just need a compaction algorithm that takes into account a list of snapshots that must be maintained after compaction.

Keep in mind that all this snapshotting and revision tracking is managed within one server. No synchronization needs to happen with other servers in the cluster. 

As a side-effect, Monotable gains a general snapshotting system. On a chunk-by-chunk bases we could establish any arbitrary history tracking plan up to tracking all history.


Now we know how on map/reduce step will stay up to date. The rest of the system falls out automatically. The mapper will dump its outputs into a separate range of Monotable which in turn will become "dirty" and trigger a reducer step. The reducer will write to yet another range triggering another reducer step. This will continue until the source range is reduced down to <= 1 chunk.

Chaining will add some latency to the system. Each step invokes an asynchronous delay between being marked "dirty" and the master-server for that chunk scheduling the map/reducer action. The good news is we are effectively working with a tree with very high branching factor. Pessimistically, given the minimum chunk size of 32megabytes and a large, 1k record size, we have a branching factor over 32,000. A two-level tree can address 1 billion records. Four levels can track 1 quadrillion records.

My other concern is how much work this might involves if there are random changes happening all over the source range all the time. Due to the asynchronous nature of the system, though, the answer is good. In the worse case, at the leaf level, there may be as many as one map operation per record change. This could be bad, but the mapper algorithm could be designed to run in time proportional to the number of changes. One change will only require a small amount of work to update.

As we chain "dirty" updates up the reduction tree, the story gets better. If several leaf nodes of one parent change at the same time, they will all mark their parent node as "dirty", but they won't each trigger a new reducer step. When the parent node's server schedulers the mapper step it will integrate all changes from its leaf nodes in one pass.


The result is quite elegant. Each individual record change would trigger approximately 'k' steps of work reading/writing only approximately 'k' records - where 'k' is the depth of the reduction tree <= log(n) / log(32000) - n == number of records in the source range. K is <= 2 for n <= 1,000,000,000 records, and K is <= 4 for n <= 1,000,000,000,000,000,000 records. If records are changing faster than the reduction steps can propagate up the tree, k will become smaller since updates up the tree can cover more than one child-node per update. K will approach 1 as the speed of record updates increases. The system gets more efficient as it becomes more loaded.