Why

RocksDB is a high-performance embedded persistent key-value store. It traditionally provides three simple operations Get, Put and Delete to allow an elegant Lookup-table-like interface.

Often times, it’s a common pattern to update an existing value in some ways. To do this in rocksdb, the client would have to read (Get) the existing value, modify it and then write (Put) it back to the db. Let’s look at a concrete example.

Imagine we are maintaining a set of uint64 counters. Each counter has a distinct name. We would like to support four high level operations: Set, Add, Get and Remove.

First, we define the interface and get the semantics right. Error handling is brushed aside for clarity.

Second, we implement it with the existing rocksdb support. Pseudo-code follows:

  1. public:
  2. static uint64_t kDefaultCount = 0;
  3. RocksCounters(std::shared_ptr<DB> db);
  4. // mapped to a RocksDB Put
  5. virtual void Set(const string& key, uint64_t value) {
  6. string serialized = Serialize(value);
  7. db_->Put(put_option_, key, serialized));
  8. }
  9. // mapped to a RocksDB Delete
  10. virtual void Remove(const string& key) {
  11. db_->Delete(delete_option_, key);
  12. }
  13. // mapped to a RocksDB Get
  14. virtual bool Get(const string& key, uint64_t *value) {
  15. string str;
  16. auto s = db_->Get(get_option_, key, &str);
  17. if (s.ok()) {
  18. *value = Deserialize(str);
  19. return true;
  20. } else {
  21. return false;
  22. }
  23. }
  24. // implemented as get -> modify -> set
  25. virtual void Add(const string& key, uint64_t value) {
  26. uint64_t base;
  27. if (!Get(key, &base)) {
  28. base = kDefaultValue;
  29. }
  30. Set(key, base + value);
  31. }
  32. };

Note that, other than the Add operation, all other three operations can be mapped directly to a single operation in rocksdb. Coding-wise, it’s not that bad. However, a conceptually single operation Add is nevertheless mapped to two rocksdb operations. This has performance implication too - random Get is relatively slow in rocksdb.

Now, suppose we are going to host Counters as a service. Given the number of cores of servers nowadays, our service is almost certainly multithreaded. If the threads are not partitioned by the key space, it’s possible that multiple Add requests of the same counter, be picked up by different threads and executed concurrently. Well, if we also have strict consistency requirement (missing an update is not acceptable), we would have to wrap Add with external synchronization, a lock of some sort. The overhead adds up.

What if RocksDb directly supports the Add functionality? We might come up with something like this then:

  1. virtual void Add(const string& key, uint64_t value) {
  2. string serialized = Serialize(value);
  3. db->Add(add_option, key, serialized);
  4. }

This seems reasonable for Counters. But not everything you store in RocksDB is a counter. Say we need to track the locations where a user has been to. We could store a (serialized) list of locations as the value of a user key. It would be a common operation to add a new location to the existing list. We might want an Append operation in this case: db->Append(user_key, serialize(new_location)). This suggests that the semantics of the read-modify-write operation are really client value-type determined. To keep the library generic, we better abstract out this operation, and allow the client to specify the semantics. That brings us to our proposal: Merge.

What

We have developed a generic Merge operation as a new first-class operation in RocksDB to capture the read-modify-write semantics.

This Merge operation:

  • Encapsulates the semantics for read-modify-write into a simple abstract interface.
  • Allows user to avoid incurring extra cost from repeated Get() calls.
  • Performs back-end optimizations in deciding when/how to combine the operands without changing the underlying semantics.
  • Can, in some cases, amortize the cost over all incremental updates to provide asymptotic increases in efficiency.

How to Use It

In the following sections, the client-specific code changes are explained. We also provide a brief outline of how to use Merge. It is assumed that the reader already knows how to use classic RocksDB (or LevelDB), including:

  • The DB class (including construction, DB::Put(), DB::Get(), and DB::Delete())
  • The Options class (and how to specify database options upon creation)
  • Knowledge that all keys/values written to the database are simple strings of bytes.

We have defined a new interface/abstract-base-class: MergeOperator. It exposes some functions telling RocksDB how to combine incremental update operations (called “merge operands”) with base-values (Put/Delete). These functions can also be used to tell RocksDB how to combine merge operands with each other to form new merge operands (called “Partial” or “Associative” merging).

For simplicity, we will temporarily ignore this concept of Partial vs. non-Partial merging. So we have provided a separate interface called AssociativeMergeOperator which encapsulates and hides all of the details around partial merging. And, for most simple applications (such as in our 64-Bit Counters example above), this will suffice.

So the reader should assume that all merging is handled via an interface called AssociativeMergeOperator. Here is the public interface:

Some Notes:

  • AssociativeMergeOperator is a sub-class of a class called MergeOperator. We will see later that the more generic MergeOperator class can be more powerful in certain cases. The AssociativeMergeOperator we use here is, on the other hand, a simpler interface.
  • We pass in the key so that client could multiplex the merge operator based on it, if the key space is partitioned and different subspaces refer to different types of data which have different merge operation semantics. For example, the client might choose to store the current balance (a number) of a user account under the key “BAL:” and the history of the account activities (a list) under the key “HIS:uid”, in the same DB. (Whether or not this is a good practice is debatable). For current balance, numeric addition is a perfect merge operator; for activity history, we would need a list append though. Thus, by passing the key back to the Merge callback, we allow the client to differentiate between the two types.

Example:

  1. void Merge(...) {
  2. if (key start with "BAL:") {
  3. NumericAddition(...)
  4. } else if (key start with "HIS:") {
  5. ListAppend(...);
  6. }
  7. }

Other Changes to the client-visible interface

After defining this class, the user should have a way to specify to RocksDB to use this merge operator for its merges. We have introduced additional fields/methods to the DB class and the Options class for this purpose:

  1. // In addition to Get(), Put(), and Delete(), the DB class now also has an additional method: Merge().
  2. class DB {
  3. ...
  4. // Merge the database entry for "key" with "value". Returns OK on success,
  5. // and a non-OK status on error. The semantics of this operation is
  6. // determined by the user provided merge_operator when opening DB.
  7. // Returns Status::NotSupported if DB does not have a merge_operator.
  8. const WriteOptions& options,
  9. const Slice& key,
  10. const Slice& value) = 0;
  11. ...
  12. };
  13. Struct Options {
  14. ...
  15. // REQUIRES: The client must provide a merge operator if Merge operation
  16. // needs to be accessed. Calling Merge on a DB without a merge operator
  17. // would result in Status::NotSupported. The client must ensure that the
  18. // merge operator supplied here has the same name and *exactly* the same
  19. // semantics as the merge operator provided to previous open calls on
  20. // the same DB. The only exception is reserved for upgrade, where a DB
  21. // previously without a merge operator is introduced to Merge operation
  22. // for the first time. It's necessary to specify a merge operator when
  23. // opening the DB in this case.
  24. // Default: nullptr
  25. const std::shared_ptr<MergeOperator> merge_operator;
  26. ...
  27. };

Note: The Options::merge_operator field is defined as a shared-pointer to a MergeOperator. As specified above, the AssociativeMergeOperator inherits from MergeOperator, so it is okay to specify an AssociativeMergeOperator here. This is the approach used in the following example.

Client code change:

Given the above interface change, the client can implement a version of Counters that directly utilizes the built-in Merge operation.

Counters v2:

The user interface change is relatively small. And the RocksDB back-end takes care of the rest.

Associativity vs. Non-Associativity

Up until now, we have used the relatively simple example of maintaining a database of counters. And it turns out that the aforementioned AssociativeMergeOperator interface is generally pretty good for handling many use-cases such as this. For instance, if you wanted to maintain a set of strings, with an “append” operation, then what we’ve seen so far could be easily adapted to handle that as well.

So, why are these cases considered “simple”? Well, implicitly, we have assumed something about the data: associativity. This means we have assumed that:

  • The values that are Put() into the RocksDB database have the same format as the merge operands called with Merge(); and
  • It is okay to combine multiple merge operands into a single merge operand using the same user-specified merge operator.

For example, look at the Counters case. The RocksDB database internally stores each value as a serialized 8-byte integer. So, when the client calls Counters::Set (corresponding to a DB::Put()), the argument is exactly in that format. And similarly, when the client calls Counters::Add (corresponding to a DB::Merge()), the merge operand is also a serialized 8-byte integer. This means that, in the client’s UInt64AddOperator, the *existing_value may have corresponded to the original Put(), or it may have corresponded to a merge operand; it doesn’t really matter! In all cases, as long as the *existing_value and value are given, the UInt64AddOperator behaves in the same way: it adds them together and computes the *new_value. And in turn, this *new_value may be fed into the merge operator later, upon subsequent merge calls.

By contrast, it turns out that RocksDB merge can be used in more powerful ways than this. For example, suppose we wanted our database to store a set of json strings (such as PHP arrays or objects). Then, within the database, we would want them to be stored and retrieved as fully formatted json strings, but we might want the “update” operation to correspond to updating a property of the json object. So we might be able to write code like:

  1. ...
  2. // Put/store the json string into to the database
  3. db_->Put(put_option_, "json_obj_key",
  4. "{ employees: [ {first_name: john, last_name: doe}, {first_name: adam, last_name: smith}] }");
  5. ...
  6. // Use a pre-defined "merge operator" to incrementally update the value of the json string
  7. db_->Merge(merge_option_, "json_obj_key", "employees[1].first_name = lucy");
  8. db_->Merge(merge_option_, "json_obj_key", "employees[0].last_name = dow");

In the above pseudo-code, we see that the data would be stored in RocksDB as a json string (corresponding to the original Put()), but when the client wants to update the value, a “javascript-like” assignment-statement string is passed as the merge-operand. The database would store all of these strings as-is, and would expect the user’s merge operator to be able to handle it.

Now, the AssociativeMergeOperator model cannot handle this, simply because it assumes the associativity constraints as mentioned above. That is, in this case, we have to distinguish between the base-values (json strings) and the merge-operands (the assignment statements); and we also don’t have an (intuitive) way of combining the merge-operands into a single merge-operand. So this use-case does not fit into our “associative” merge model. That is where the generic MergeOperator interface becomes useful.

The Generic MergeOperator interface

The MergeOperator interface is designed to support generality and also to exploit some of the key ways in which RocksDB operates in order to provide an efficient solution for “incremental updates”. As noted above in the json example, it is possible for the base-value types (Put() into the database) to be formatted completely differently than the merge operands that are used to update them. Also, we will see that it is sometimes beneficial to exploit the fact that some merge operands can be combined to form a single merge operand, while some others may not. It all depends on the client’s specific semantics. The MergeOperator interface provides a relatively simple way of providing these semantics as a client.

  1. // The Merge Operator
  2. //
  3. // Essentially, a MergeOperator specifies the SEMANTICS of a merge, which only
  4. // client knows. It could be numeric addition, list append, string
  5. // concatenation, edit data structure, ... , anything.
  6. // The library, on the other hand, is concerned with the exercise of this
  7. // interface, at the right time (during get, iteration, compaction...)
  8. class MergeOperator {
  9. public:
  10. virtual ~MergeOperator() {}
  11. // Gives the client a way to express the read -> modify -> write semantics
  12. // key: (IN) The key that's associated with this merge operation.
  13. // existing: (IN) null indicates that the key does not exist before this op
  14. // operand_list:(IN) the sequence of merge operations to apply, front() first.
  15. // new_value: (OUT) Client is responsible for filling the merge result here
  16. // logger: (IN) Client could use this to log errors during merge.
  17. //
  18. // Return true on success. Return false failure / error / corruption.
  19. virtual bool FullMerge(const Slice& key,
  20. const std::deque<std::string>& operand_list,
  21. std::string* new_value,
  22. struct MergeOperationInput { ... };
  23. struct MergeOperationOutput { ... };
  24. virtual bool FullMergeV2(const MergeOperationInput& merge_in,
  25. MergeOperationOutput* merge_out) const;
  26. // This function performs merge(left_op, right_op)
  27. // when both the operands are themselves merge operation types.
  28. // Save the result in *new_value and return true. If it is impossible
  29. // or infeasible to combine the two operations, return false instead.
  30. virtual bool PartialMerge(const Slice& key,
  31. const Slice& left_operand,
  32. const Slice& right_operand,
  33. std::string* new_value,
  34. Logger* logger) const = 0;
  35. // The name of the MergeOperator. Used to check for MergeOperator
  36. // mismatches (i.e., a DB created with one MergeOperator is
  37. // accessed using a different MergeOperator)
  38. virtual const char* Name() const = 0;
  39. // Determines whether the MergeOperator can be called with just a single
  40. // merge operand.
  41. // Override and return true for allowing a single operand. FullMergeV2 and
  42. // PartialMerge/PartialMergeMulti should be implemented accordingly to handle
  43. // a single operand.
  44. virtual bool AllowSingleOperand() const { return false; }
  45. };

Some Notes:

  • MergeOperator has two methods, FullMerge() and PartialMerge(). The first method is used when a Put/Delete is the *existing_value (or nullptr). The latter method is used to combine two-merge operands (if possible).
  • AssociativeMergeOperator simply inherits from MergeOperator and provides private default implementations of these methods, while exposing a wrapper function for simplicity.
  • In MergeOperator, the “FullMerge()” function takes in an *existing_value and a sequence (std::deque) of merge operands, rather than a single operand. We explain below.

On a high level, it should be noted that any call to DB::Put() or DB::Merge() does not necessarily force the value to be computed or the merge to occur immediately. RocksDB will more-or-less lazily decide when to actually apply the operations (e.g.: the next time the user calls Get(), or when the system decides to do its clean-up process called “Compaction”). This means that, when the MergeOperator is actually invoked, it may have several “stacked” operands that need to be applied. Hence, the MergeOperator::FullMerge() function is given an *existing_value and a list of operands that have been stacked. The MergeOperator should then apply the operands one-by-one (or in whatever optimized way the client decides so that the final *new_value is computed as if the operands were applied one-by-one).

Sometimes, it may be useful to start combining the merge-operands as soon as the system encounters them, instead of stacking them. The MergeOperator::PartialMerge() function is supplied for this case. If the client-specified operator can logically handle “combining” two merge-operands into a single operand, the semantics for doing so should be provided in this method, which should then return true. If it is not logically possible, then it should simply leave *new_value unchanged and return false.

Conceptually, when the library decides to begin its stacking and applying process, it first tries to apply the client-specified PartialMerge() on each pair of operands it encounters. Whenever this returns false, it will instead resort to stacking, until it finds a Put/Delete base-value, in which case it will call the FullMerge() function, passing the operands as a list parameter. Generally speaking, this final FullMerge() call should return true. It should really only return false if there is some form of corruption or bad-data.

As alluded to above, AssociativeMergeOperator inherits from MergeOperator and allows the client to specify a single merge function. It overrides PartialMerge() and FullMerge() to use this AssociativeMergeOperator::Merge(). It is then used for combining operands, and also when a base-value is encountered. That is why it only works under the “associativity” assumptions described above (and it also explains the name).

Using our generic MergeOperator interface, we now have the ability to implement the json example.

Error Handling

If the MergeOperator::PartialMerge() returns false, this is a signal to RocksDB that the merging should be deferred (stacked) until we find a Put/Delete value to FullMerge() with. However, if FullMerge() returns false, then this is treated as “corruption” or error. This means that RocksDB will usually reply to the client with a Status::Corruption message or something similar. Hence, the MergeOperator::FullMerge() method should only return false if there is absolutely no robust way of handling the error within the client logic itself. (See the JsonMergeOperator example)

For AssociativeMergeOperator, the Merge() method follows the same “error” rules as MergeOperator::FullMerge() in terms of error-handling. Return false only if there is no logical way of dealing with the values. In the Counters example above, our Merge() always returns true, since we can interpret any bad value as 0.

Get Merge Operands

This is an API to allow for fetching all merge operands associated with a Key. The main motivation for this API is to support use cases where doing a full online merge is not necessary as it is performance sensitive. This API is available from version 6.4.0.
Example use-cases:

  1. Storing a KV pair where V is a collection of sorted integers and new values may get appended to the collection and subsequently users want to search for a value in the collection.
    Example KV:
    Key: ‘Some-Key’ Value: [2], [3,4,5], [21,100], [1,6,8,9]
    To store such a KV pair users would typically call the Merge API as:
    a. db→Merge(WriteOptions(), ‘Some-Key’, ‘2’);
    b. db→Merge(WriteOptions(), ‘Some-Key’, ‘3,4,5’);
    c. db→Merge(WriteOptions(), ‘Some-Key’, ‘21,100’);
    d. db→Merge(WriteOptions(), ‘Some-Key’, ‘1,6,8,9’);
    and implement a Merge Operator that would simply convert the Value to [2,3,4,5,21,100,1,6,8,9] upon a Get API call and then search in the resultant value. In such a case doing the merge online is unnecessary and simply returning all the operands [2], [3,4,5], [21, 100] and [1,6,8,9] and then search through the sub-lists proves to be faster while saving CPU and achieving the same outcome.

  2. Update subset of columns and read subset of columns - Imagine a SQL Table, a row may be encoded as a KV pair. If there are many columns and users only updated one of them, we can use merge operator to reduce write amplification. While users only read one or two columns in the read query, this feature can avoid a full merging of the whole row, and save some CPU.

  3. Updating very few attributes in a value which is a JSON-like document - Updating one attribute can be done efficiently using merge operator, while reading back one attribute can be done more efficiently if we don’t need to do a full merge.

  1. API:
  2. // Returns all the merge operands corresponding to the key. If the
  3. // number of merge operands in DB is greater than
  4. // merge_operands_options.expected_max_number_of_operands
  5. // no merge operands are returned and status is Incomplete. Merge operands
  6. // returned are in the order of insertion.
  7. // merge_operands- Points to an array of at-least
  8. // merge_operands_options.expected_max_number_of_operands and the
  9. // caller is responsible for allocating it. If the status
  10. // returned is Incomplete then number_of_operands will contain
  11. // the total number of merge operands found in DB for key.
  12. virtual Status GetMergeOperands(
  13. const ReadOptions& options, ColumnFamilyHandle* column_family,
  14. const Slice& key, PinnableSlice* merge_operands,
  15. GetMergeOperandsOptions* get_merge_operands_options,
  16. int* number_of_operands) = 0;
  17. Example:
  18. int size = 100;
  19. int number_of_operands = 0;
  20. std::vector values(size);
  21. GetMergeOperandsOptions merge_operands_info;
  22. merge_operands_info.expected_max_number_of_operands = size;
  23. db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), merge_operands_info,
  24. &number_of_operands);

The above API returns all the merge operands corresponding to the key. If the number of merge operands in DB is greater than merge_operands_options.expected_max_number_of_operands, no merge operands are returned and status is Incomplete. Merge operands returned are in the order of insertion.

DB Bench has a benchmark that uses Example 1 to demonstrate the performance difference of doing an online merge and then operating on the collection vs simply returning the sub-lists and operating on the sub-lists. To run the benchmark the command is :

The merge_operator used above is used to sort the data across all the sublists for the online merge case which happens automatically when Get API is called.

Review and Best Practices

Altogether, we have described the Merge Operator, and how to use it. Here are a couple tips on when/how to use the MergeOperator and AssociativeMergeOperator depending on use-cases.

When to use merge

If the following are true:

  • You have data that needs to be incrementally updated.
  • You would usually need to read the data before knowing what the new value would be.

Then use one of the two Merge operators as specified in this wiki.

Associative Data

If the following are true:

  • Your merge operands are formatted the same as your Put values, AND
  • It is okay to combine multiple operands into one (as long as they are in the same order)

Then use AssociativeMergeOperator.

If either of the two associativity constraints do not hold, then use MergeOperator.

If there are some times where it is okay to combine multiple operands into one (but not always):

  • Use MergeOperator
  • Have the PartialMerge() function return true in cases where the operands can be combined.

Tips

Multiplexing: While a RocksDB DB object can only be passed 1 merge-operator at the time of construction, your user-defined merge operator class can behave differently depending on the data passed to it. The key, as well as the values themselves, will be passed to the merge operator; so one can encode different “operations” in the operands themselves, and get the MergeOperator to perform different functions accordingly.

Useful Links