Building a Distributed Fault-Tolerant Key-Value Store

« More entries

I’ve been splitting my time lately between the new Spheres project and the Coursera Cloud Computing specialization, in order to sharpen my distributed systems skills. My personal experience has been great, and I have learned tons of new stuff. The first two courses proposed building a Membership Protocol and a Distributed Fault-Tolerant Key-Value Store respectively. The store of the second assignment relies on your own membership protocol implementation (so it pays to build a solid implementation!).

These assignments were optional, but I think there is no better way for truly learning the concepts than to implement the code. In this post I will describe the architecture of the minimalistic NoSQL database that I built while studying these courses.

The Key-Value abstraction

First of all, let’s discuss briefly what a Key-Value store is, and how it compares to a relational database.

Key-Value Stores offer a simple abstraction over your data, working as a dictionary data-structure. Such database provides a mechanism for storage and retrieval of data that is modeled in manipulated by means of basic CRUD operations (create, read, update, delete). The API of these databases is usually kept simple, and even if they provide an SQL-like language like Cassandra’s Query Language, it’s intentionally kept much simpler than full-blown SQL.

SQL/NoSQL

This simpler functionality means that Key-Value Stores, and NoSQL databases in general, often give more responsibility to the user, who now needs to manually do a lot of the work that the system takes care of automatically in a relational database. They sacrifice the expressivity brought by an expressive language like SQL, and the integrity checks brought by these schema-based models. This in turn means that NoSQL systems are free to choose other trade-offs that will result in higher availability, performance, scalability or other specific qualities.

One important thing regarding RDBMS and NoSQL is their respective theoretical models, which establishes the guarantees that such a system provides to the end user. They are known as the ACID and BASE models, in one of those fancy metaphors made out of acronyms.

So, why are NoSQL systems popular nowadays? Well, not without some controversy, but the main selling points could be summarized as:

  • Speed
  • Single Point of Failure (SPOF) avoidance
  • Better support for Large amounts of unstructured data
  • Lower TCO (Total cost of operation, sysadmins)
  • Incremental scalability

This are the sort of qualities that define Google, Facebook and the other big players and their business models, so it makes perfect sense to them. Whether it makes sense for your particular situation (probably not), well, it’s the core of that controversy, and it’s not really the intention of this post to dig into that.

Inside Cassandra

We are going to base the architecture on Cassandra’s implementation, since the intention is to build a very similar, but simplified, system. Cassandra uses a Distributed Hash-Table-like ring, which looks as follows:

Cassandra Ring

This ring serves as the structure for the Key –> Server Node mapping. This means that keys are mapped into the ring, associating them with a specific server node. The specific way in which the keys are distributed, via a hash, range-based or more complex methods, is chosen through the Partitioner selection. This mapping will include the primary, secondary and tertiary replicas. However, data placement strategies add a level of control that we are not interested in for this basic implementation. More complex strategies will use the underlying network topology to choose the replicas based on it.

Additionally, the ring of nodes will have one coordinator per data-center. It will be elected among servers using a Paxos-like consensus protocol (like Zookeeper).

How Writes work

In a nutshell, when the client sends a write (create, update, delete) to the coordinator node in the Cassandra cluster, the coordinator will use the partitioner to resolve the location of all replicas, and query those nodes. When quorum-number (i.e. a majority) replicas respond, coordinator returns an acknowledgment of the operation.

Writes in Cassandra are lock-free and fast. The database is always writable, thanks to the Hinted-Handoff mechanism. This ensures that if a replica is down, the coordinator keeps an extra local copy per failed replica, until it is back up.

The process of committing a write to the database involves several steps. First, it will save it in a persistent memory (i.e. disk), in a commit log. This will provide the possibility of recovering after failure. Second, it will write to an in-memory memtable. When memtable is full or old, it will flush to disk into an SSTable (sorted string table) of key-values, an SSTable index file (for efficiently locating keys) and a Bloom Filter (for efficiently check key existence, since most times the key is not found prior to writing). Any read from the local store will follow the opposite path.

How Reads work

The client will send its query to the per-request coordinator, which will in turn contact quorum-number replicas. When they respond it uses the latest-timestamped value. The coordinator will also fetch the value from other replicas and initiate a read repair if any two values are different. A row may be split across multiple SSTables, resulting in reads being slower than writes.

Cassandra allows for consistency level configuration, with the ability of specifying it reads and writes separately. Consistency levels are alternatives to the quorum approach for an operation to be marked as successful. The options are: ANY (fastest), ALL (slowest), ONE (at least one replica), QUORUM (>50%).

Architecture of a minimal distributed Fault-Tolerant Key-Value Store

Based on the architecture of Cassandra, our minimalistic Key-Value database will follow very similar principles. I’ll describe next the architecture of the implementation and the process involved in its development.

Development layers

The first part we need to tackle is how to setup an environment that will allow us to develop everything from the protocols to the specific algorithms involved, without requiring re-deployment for each run and test. Otherwise, we will need to recompile and deploy in at least three machines every time we change the code. And even then, we won’t be able to test a larger amount of nodes easily.

For this, we will simulate both the network and an a client application. Even though this simulation will not capture all the possible situations in a production environment, we can use it to force node failures, message dropping, congestion and other real-life effects and situations.

The KV Store will then sit between these two layers, which I’ll describe next:

Testing Architecture

Emulated network layer

The first layer I’ll be concerned with is the network emulation. It will need to provide the following API:

void *init(Address *myaddr, short port);
int send(Address *source_addr, Address *dest_addr, string data);
int recv(Address *myaddr, ...);
int cleanup();

This will make the network be implementable in terms of a real networking API, or just the emulation. In a nutshell, the simulation should work roughly as this:

  • Nodes are initialized. The function init is called once by each node to initialize its own address.
  • Any unicast message is sent from a node to a node via the send function. This will enqueue a message in the simulation message buffer.
  • The nodes will actively poll for new messages using recv, which will decide internally which messages are delivered, delayed or dropped altogether. This function should be externally parameterized so it can be tuned for testing different scenarios comfortably.

This emulation acts as a replacement for real networking, so it can be effortlessly activated for development and deactivated for production.

Application driver

The application component emulates the actual usage of the database in a straightforward fashion. It will handle the following responsibilities:

  • Emulate node introduction and life-cycle.
  • Drive the network emulation layer, and manage network effects (message dropping, congestion, etc).
  • Emulate node failure and hogging.
  • Test CRUD operation requests in the Store.

This layer drives the simulation. The nodes can be run as threads or in a synchronous fashion using a single thread. During each simulation step, some peers may be started up, and some may be caused to crash-stop. Also at each step of the simulation, the corresponding network functions are used to send/receive all messages in each node.

Multithreaded emulation may have the advantage of more realistically emulating the interaction between multiple nodes. However, with synchronous one-threaded emulation, you are able to control the scheduling of each node’s process yourself.

In any case, the application layer should be built to both test different success/failure scenarios, and to be removed in order to compile the production code.

Some examples of the tests that need to be performed or allowed by the Application layer are:

  • Successful creation and replication.
  • Failed creation in all replicas.
  • Successful deletion in all replicas.
  • Failed deletion of a non-existent key.
  • Read a key, test for quorum reads.
  • Failed read of a non-existent key.
  • Read a key with one and two replicas down. If quorum is 2, first should succeed and second should fail.
  • Same test, but allow for stabilization protocol to kick in (more on this later).
  • Same tests for update.
  • Introduce message dropping in all tests to check operation timeouts.

Node layers

Like a Cassandra deployment, this Key-Value Store is comprised of multiple nodes. All these nodes implement the same functionality, and are all able to take the request coordinator and data-center coordinator responsibilities. In this minimalistic implementation I’m describing, we will deal just with the request coordinator. Each node, then, implements the same functionality, based on the following architecture:

Store Architecture

Membership protocol

Since any node can act as a request coordinator, it is necessary to keep all nodes informed at all times of the other nodes in the ring. Such problem is addressed by the so-called membership protocol. At each node, the key-value store talks to the membership protocol and receives from it the membership list. This is used to keep its view of the ring updated. Periodically, the node polls the membership protocol to bring its membership list up to date. This is the most fundamental piece upon which the other layer can be built. It basically allows all nodes to know about the existence of the other nodes.

Generally speaking, a membership protocol has two components: dissemination and failure detection. Depending on the protocol, they will be accomplished simultaneously or by differentiated logical components. A straightforward solution would be an all-to-all multicast, although this solution is quite inefficient. A more common approach is known as Gossip-like membership protocol. However, one of the most efficient and interesting protocols is called SWIM (Scalable Weakly-consistent Infection-style Process Group Membership), described in detail in this paper. We will implement this last one.

The membership protocol mandates that each new peer contacts a well-known peer (the introducer) to join the group. This is implemented through JOINREQ and JOINREP messages. The introducer is the first who will spread the word about the newly joined member, then the rest of the nodes will contribute in the dissemination.

The requirements for a membership protocol are that satisfies completeness all the time (for joins and failures), and accuracy when there are no message delays or losses (with high accuracy when there are losses or delays). A stripped-down version of the implementation of this protocol:

bool processMessage(char *data, int size) {
  char *msg = (char*)data;
  MsgTypes msgType = messageGetType(msg);
  Address sourceAddress = messageGetAddress(msg);

  switch (msgType)
    {
    case JOINREQ:
      {
        // Add to membership list as a response to the request. We are the introducer.
        int id = messageGetId(msg);
        short port = messageGetPort(msg);
        if( addToMemberList( id, port, heartbeat, 0 ) )
          addToJoinsList( id, port, heartbeat, 0 );

        // Reply with JOINREP
        auto newMsg = createMessage();
        messageSetType (newMsg, JOINREP);
        messageSetAddress (newMsg, memberNode->addr);
        for ( unsigned int i = 0; i < JOINREP_MEMBER_LIST_SIZE && i < memberList.size(); i++ ) {
          MemberListEntry &entry = memberList[i];
          messageSetEntry (newMsg, i, entry.id, entry.port, 0, entry.heartbeat, entry.timestamp);
        }

        send(&address, &sourceAddress, newMsg, JOINREP_MESSAGE_SIZE);
        break;
      }
    case JOINREP:
      {
        // This message is sent by the introducer to all newly joined nodes who request entry
        // We are at the newly introduced node, and we add ourselves to the membership list
        for ( int i=0; i < JOINREP_MEMBER_LIST_SIZE; i++ ) {
          MessageEntry mentry = messageGetEntry(msg, i);
          if (addToMemberList ( mentry.id, mentry.port, mentry.heartbeat, mentry.timestamp )) {
            addToJoinsList ( mentry.id, mentry.port, mentry.heartbeat, mentry.timestamp );
            Address addr = idPortToAddress (mentry.id, mentry.port);
          }
        }
        break;
      }
    case PING:
      {
        // Reply to a ping message with an ACK. Craft the message to include membership
        // information (infection-like dissemination component)
        sendAckResponse(msg);
        // Read and apply the membership data payload
        processMessagePayload(msg);
        break;
      }
    case PING_REQ:
      {
        // Ping a node on behalf of the sender of this message (infect the message too)
        sendPingRequest(msg);
        // Read and apply the membership data payload
        processMessagePayload(msg);
        break;
      }
    case ACK:
      {
        // Remove entry from pinged (a list with pending pinged nodes)
        int id = addressToIdPort(sourceAddress).first;
        auto pi = pinged.find ( {id, 0, 0, 0, 0} );
        if ( pi != pinged.end() )
          pinged.erase (pi);

        // If the element is SUSPICIOUS, disseminate an ALIVE message
        auto fi = failed.find ( FailedEntry(MessageEntry(id, 0, 0, 0, 0), SUSPECT) );
        if ( fi != failed.end() ) {
          auto copy = *fi;
          copy.roundsLeft = SWIM_DISSEMINATION_ROUNDS;
          copy.entry.type = ALIVE;
          failed.erase (fi);
          failed.insert (copy);
        }

        // Read and apply the membership data payload
        processMessagePayload(msg);
        break;
      }
    }
  return true;
}

Virtual Ring Topology

Once each node is aware of the rest of nodes at all times (even though we can’t rely on a 100% accuracy with any membership protocol), the next layer can be built. Relying on the membership list, each node can be aware of a ring topology by means of a simple hashing of the nodes in a power of two range of ids. For instance, with k=8, the ring will have 256 (28) slots, and each new node will fit into one of these. This is particularly useful for two things:

  • Load-balancing: As mentioned earlier, all nodes can act as a request coordinator on behalf of the client, sending the query to all key replicas and handling the quorum requisite.
  • Key/node mapping: performing the hash calculation, the ring topology allows for easily mapping each key to all its replicas.

This is an example code of the hashing, modulo the ring size:

size_t hashFunction(unsigned int ringSize, string key) {
  std::hash<string> hashFunc;
  size_t hash = hashFunc(key);
  return hash % ringSize;
}

CRUD operations

Support for Create, Read, Update and Delete operations requires at least three components: storage, replication/consistency and stabilization.

Each node at the cluster needs to support both client and server CRUD operations: * Client CRUD: is the functionality that handles the client requests and delegates the operation to all replicas. It needs to take care of the quorum and then responding to the client with the result. When a node acts as a request coordinator, it needs this functionality. * Server CRUD: engaged at each of the key replicas, when a coordinator sends them the CRUD request as required by the client. This component implements the actual storage, which we’ll see next.

Storage

The storage components deal mainly with the series of steps taken when a write is sent to a node, leading to a change in its local storage. The main concern here is failure recovery, besides all performance issues. A system with per-node recovery will handle commits in a series of steps to ensure the data is stored on disk to prevent loss in case of node failure, even if the data isn’t appended to the final data structure in persistent storage. Systems that use this technique may rely on an in-memory structure as in intermediate step before committing to the database as well.

However, for this basic K/V Store we won’t focus on how to structure the data in each node, but rather on the replication/consistency and stabilization protocols. As a matter of fact, a stabilization protocol deals with node failure in a distributed fashion, repairing older values in the background. This doesn’t remove entirely the risk of data loss, given that quorum replicas of a key fail simultaneously before committing a write to disk, the write might be lost forever.

Replication/Consistency

The type of consistency we are trying to achieve is called eventual consistency. Eventual consistency means that if all writes stop, all replicas of each key will eventually have the same value. This convergence is achieved by the background read repairs done when two values read do not match, and by the stabilization protocol that I’ll describe next. However, dealing with each individual request under this circumstances is the main task of the Client CRUD implementation. This, as mentioned before, is performed at the request coordinator (randomly selected per-request).

Consistency level is a matter of choice. While there are several types (see Cassandra’s consistency levels), quorum consistency level offers a reasonable tradeoff. It provides eventual consistency, without sacrificing too much performance. Quorum consistency level is achieved by waiting to receive reply of at least >50% number of replicas. When this happens, the request coordinator marks the operation as successful (>50% successes received) or failed (>50% failures received).

Writes are managed by the per-key coordinator, which ensures that writes for the key are serialized (performed in order). The coordinator will use the key/node mapping to send a query to all replica nodes responsible for the key. When quorum replicas respond, the coordinator returns an acknowledgement to the client. A cluster with this mechanism is always writable, like the previously described Cassandra mechanism known as Hinted-Handoff.

This is an example implementation of the write operation handled by the coordinator. See how the coordinator needs to handle the special case that itself is a replica of the requested key.

void clientCreate(string key, string value) {
  auto replicas = getNodes (key);

  auto trIt = transactionData.emplace
    (g_transID, TransactionData(g_transID, CREATE, currentTime(), replicas, key, value));
  auto &transaction = trIt.first->second;

  auto inReplicas = find_if(replicas.begin(), replicas.end(),
                            [&](Node &n) { return n.address == address; });

  if (inReplicas == replicas.end()) {
    auto msg1 = KVMessage (g_transID, address, CREATE, key, value, PRIMARY).toString();
    send (&address, &replicas[0].address, msg1);
    auto msg2 = KVMessage (g_transID, address, CREATE, key, value, SECONDARY).toString();
    send (&address, &replicas[1].address, msg2);
    auto msg3 = KVMessage (g_transID, address, CREATE, key, value, TERTIARY).toString();
    send (&address, &replicas[2].address, msg3);
  } else {
    bool success = false;
    if (inReplicas->address == replicas[0].address) {
      if (createKeyValue(key, value, PRIMARY) )
        success = true;
    } else {
      auto msg1 = KVMessage (g_transID, address, CREATE, key, value, PRIMARY).toString();
      send (&address, &replicas[0].address, msg1);
    }
    if (inReplicas->address == replicas[1].address) {
      if (createKeyValue(key, value, SECONDARY))
        success = true;
    } else {
      auto msg1 = KVMessage (g_transID, address, CREATE, key, value, SECONDARY).toString();
      send (&address, &replicas[1].address, msg1);
    }
    if (inReplicas->address == replicas[2].address) {
      if (createKeyValue(key, value, TERTIARY))
        success = true;
    } else {
      auto msg1 = KVMessage (g_transID, address, CREATE, key, value, TERTIARY).toString();
      send (&address, &replicas[2].address, msg1);
    }
  }
}

Reads are similar to writes, the coordinator returns the latest-timestamped value from among the quorum replicas that responded. If any two replicas have different values, it updates the oldest versions. This is the kind of code that the coordinator runs (without the read repairs):

void clientRead(string key){
  auto replicas = getNodes (key);

  auto trIt = transactionData.emplace
    (g_transID, TransactionData(g_transID, READ, currentTime(), replicas, key, ""));
  auto &transaction = trIt.first->second;

  auto inReplicas = find_if(replicas.begin(), replicas.end(),
                            [&](Node &n) { return n.address == address; });

  if (inReplicas == replicas.end()) {
    auto msg = KVMessage (g_transID, address, READ, key).toString();
    send (&address, &replicas[0].address, msg);
    send (&address, &replicas[1].address, msg);
    send (&address, &replicas[2].address, msg);
  } else {
    auto result = readKey(key);
    auto msg = KVMessage (g_transID, address, READ, key).toString();
    if (inReplicas->address == replicas[0].address) {
      readKey(key);
    } else {
      send (&address, &replicas[0].address, msg);
    }
    if (inReplicas->address == replicas[1].address) {
      readKey(key);
    } else {
      send (&address, &replicas[1].address, msg);
    }
    if (inReplicas->address == replicas[2].address) {
      readKey(key);
    } else {
      send (&address, &replicas[2].address, msg);
    }
  }
}

While these functions implement client Create/Read, the server end of the request would be handled by the Server CRUD module. This module can contain a fair amount of functionality to handle failure recovery and performance, but gets a little out of the scope of distributed systems.

In this example code, the persistent storage is a simple hash table, dumped to disk periodically.

bool createKeyValue(string key, string value, ReplicaType replica) {
  database->emplace (key, KVEntry(value, 0, replica));
  return true;
}

ReadResult readKey(string key) {
  auto it = database->find (key);
  if (it != database->end()) {
    return ReadResult(true, it->second.value);
  } else {
    return ReadResult(false, "");
  }
}

Stabilization

The last piece of the puzzle is the stabilization algorithm. This is required even for this minimal implementation in order to handle an eventual node failure. When a node fails, the key->node mapping breaks for some keys. As it fails, there will be n-1 nodes for key distribution, meaning that many keys will suddenly have a different set of nodes assigned. In this situation, stabilization comes to rescue, re-replicating keys whose replica(s) failed.

One possible implementation would be iterating over all keys in each node, first detecting if the key still belongs to the node (is a replica of the key). If that is the case, depending on which one (primary, secondary or tertiary), handle it accordingly. If it isn’t, delete it from local storage after sending it to all proper replicas. This implementation will inevitably send 3 messages per key per replica, so it could definitely be improved!

for (auto e = database->begin(); e != database->end();) {
  auto replicas = getNodes (e->first);
  auto &key = e->first;
  auto value = e->second.value;
  auto &replType = e->second.replica;
  auto inReplicas = find_if(replicas.begin(), replicas.end(), [&](Node &n){return n.address == address;});
  if (inReplicas != replicas.end()) {
    // The key belongs to this node
    switch (inReplicas - replicas.begin()) {
    case 0:
      {
        // Keep key as primary
        replType = PRIMARY;
        if (firstSucessorFail) {
          // Send to new first successor
          auto msg2 = KVMessage (g_transID, address, CREATE, key, value, SECONDARY).toString();
          send (&address, &replicas[1].address, msg2);
        }
        if (secondSuccessorFail) {
          // Send to new second successor
          auto msg3 = KVMessage (g_transID, address, CREATE, key, value, TERTIARY).toString();
          send (&address, &replicas[2].address, msg3);
        }
        break;
      }
    case 1:
      {
        // Keep key as secondary
        replType = SECONDARY;
        if (firstPredecessorFail) {
          // Send to new first predecessor
          auto msg1 = KVMessage (g_transID, address, CREATE, key, value, PRIMARY).toString();
          send (&address, &replicas[0].address, msg1);
        }
        if (firstSucessorFail) {
          // Send to new first successor
          auto msg3 = KVMessage (g_transID, address, CREATE, key, value, TERTIARY).toString();
          send (&address, &replicas[2].address, msg3);
        }
        break;
      }
    case 2:
      {
        // Keep key as tertiary
        replType = TERTIARY;
        if (secondPredecessorFail) {
          // Send to new second predecessor
          auto msg1 = KVMessage (g_transID, address, CREATE, key, value, PRIMARY).toString();
          send (&address, &replicas[0].address, msg1);
        }
        if (firstPredecessorFail) {
          // Send to new first predecessor
          auto msg2 = KVMessage (g_transID, address, CREATE, key, value, SECONDARY).toString();
          send (&address, &replicas[1].address, msg2);
        }
        break;
      }
    }

    e++;

  } else {
    // The key doesn't belong any longer to this node
    auto msg1 = KVMessage (g_transID, address, CREATE, key, value, PRIMARY).toString();
    send (&address, &replicas[0].address, msg1);
    auto msg2 = KVMessage (g_transID, address, CREATE, key, value, SECONDARY).toString();
    send (&address, &replicas[1].address, msg2);
    auto msg3 = KVMessage (g_transID, address, CREATE, key, value, TERTIARY).toString();
    send (&address, &replicas[2].address, msg3);
    e = database->erase(e);
  }
}

Conclusion

Implementing a Fault-Tolerant Distributed Key-Value Store is by no means an simple task, and the implementation here described is far from ready for production use. It shows, however, the essential pieces found in these kind of NoSQL databases, tackling the fundamental issues found in these systems.


Did you find it useful? Please share!

Comments