Implementing a persistent FIFO queue in Java using Voldemort

Background

Recently I joined a company which provides real-time stock market information to its clients through a proprietary stock price dissemination server. Initially server connects to stock exchanges and listens for incoming market data frames. Once a frame has been read from the socket, it will be placed in a In-memory queue so that it can be used for further processing.
Problem arose with this approach is Queue is getting filled rapidly with frames and theres no control over its upper limit. Since the queue is a in-memory data structure, its rapid growth affects the overall server  performance.

I was assigned the task of designing a new data structure to address this problem.

Queue design requirements

Holding a large data set inside the memory is inefficient and risky. All of a sudden if something went wrong in the server, whole data set will be vanished in to thin air in a second. So there must be a mechanism to periodically write  in-memory snapshots to some persistent storage.

When queue is growing, it would be better to distribute the queue content across multiple servers. Simply put, there should be a way to distribute the queue’s  load across a cluster. For that purpose a distributed database is an ideal solution.

Everything above is worthless if performance of the in-memory data structure is lagging behind the business need.  It would be better if time complexity of read/write operations against the queue is O(1). That means, the time taken to perform a read/write operation on the queue is always a constant and does not depend on the number of items in the queue.

By considering above factors, I came across a decision to find a replacement for the in-memory queue with something meets following requirements.

1.  New data structure must be able to provide the FIFO facility as same as our good old one.

2.  It should be good in performance and time complexity of read/write operations should be O(1).

3.  It might store data in memory, but there should be a way to write that data into the disk  periodically.

4.  It should be able to distribute its storage load across a cluster.

Distributed databases and Voldemort

In these days relational databases are eventually walking towards their twilight age. Most of the web applications suffer from the pain of scalability. Mean while paradigms like ‘NoSQL’ have began to roll out in the Internet arena.

Traditional relational database management systems fail when they try to scale horizontally. Specially they take very bad face when it comes to the replication. Because of that distributed databases are gradually taking control over relational databases in heavily loaded web applications like Twitter, Facebook and LinkedIn.

Voldemort in a sense is a distributed key-value storage system. Unlike traditional relational databases it has no schema or complex querying mechanism. But it is very simple to use. In order to get to know what exactly it is, following text would helpful. It’s been taken from Voldemort website.

Voldemort is not a relational database, it does not attempt to satisfy arbitrary relations while satisfying ACID properties. Nor is it an object database that attempts to transparently map object reference graphs. Nor does it introduce a new abstraction such as document-orientation. It is basically just a big, distributed, persistent, fault-tolerant hash table. For applications that can use an O/R mapper like active-record or hibernate this will provide horizontal scalability and much higher availability but at great loss of convenience. For large applications under Internet-type scalability pressure, a system may likely consists of a number of functionally partitioned services or APIs, which may manage storage resources across multiple data centers using storage systems which may themselves be horizontally partitioned.

How does Voldemort store data?

As I mentioned earlier, Voldemort is a simple key value storage system. Data can be saved and retrieved back calling simple get() and put() operations. All read/write operations are in O(1) time complexity and having excellent single node performance compared to other relational databases.

I need to access it sequentially, not by using a key…

I made a little detour from our initial discussion. :) Sorry about that…

Considering the drawbacks of the in-memory FIFO queue, I started to design a new data structure which stores data in memory but periodically writes them to persistent storage. In addition to that data structure is capable of distributing its load across a cluster.

Since our price server is written in Java and Voldemort is also in Java, I started to implement a thin  Java wrapper  for Voldemort client which behaves exactly like a FIFO queue but persistence is done through Voldemort.

The major problem I had to face here was Voldemort is a key-value store and all data entries being stored on it should have a unique key assigned to it. Simply speaking, if you have something to save in Voldemort, you must provide a unique key for that. But all I need here is a simple FIFO queue which is capable of inserting new entries and getting the firstly inserted entry(or head of the queue). I don’t need to assign any keys for the values which are being inserted into the queue and users of the queue should not aware of those keys.

So I need to find a way to automatically assign unique keys to the entries before saving them in to Voldemort. This is because I don’t want to assign queue users an extra responsibility of generating unique ids for the entries.

UUID is a perfect solution for this context. UUID is a universally unique identifier and by using a UUID generator inside the queue, I could assign unique keys for the entries. It saved my day finally :)

This strategy is illustrated below

Case : Implementing a persistent FIFO Queue by wrapping Voldemort client

Step 1: Write a simple Voldemort client

First step is to crate an interface to the Voldemort engine. This can be done through Voldemort client API. Using this client, queue can perform storage operations like saving and retrieving entries.

Below is the complete source code for my Voldemort client.

package com.dunithd;

import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClient;
import voldemort.client.StoreClientFactory;
import voldemort.versioning.Versioned;

/**
 * Actual Voldemort client that does the storage operations
 * </p>
 * Author: <a href="https://dunithd.wordpress.com">Dunith Dhanushka</a>, Date: Jul 2, 2010
 */
public class VoldemortClient {

    private String host = "tcp://localhost:6666";
    private StoreClientFactory factory;
    private StoreClient client;

    public VoldemortClient() {
        this.factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(host));
        this.client = factory.getStoreClient("test");
    }

    public void put(String key, Object value) throws NullPointerException {
        if(key == null) {
            throw new NullPointerException();
        } else {
            client.put(key,value);
        }
    }

    public Object get(String key) {
        Versioned value = client.get(key);
        return value.getValue();
    }

}

This client is very simple and has only two operations.

Step 2: Write the Queue implementation

This is the actual job you have to perform.

package com.dunithd;

import java.util.LinkedList;
import java.util.UUID;

/**
 * Wraps a LinkedList and creates a FIFO queue.
 * <p/>
 * Author: <a href="https://dunithd.wordpress.com">Dunith Dhanushka</a>, Date: Jul 2, 2010
 */
public class Queue {

    private LinkedList keys;
    private UUID key;

    private VoldemortClient map;

    public Queue() {
        keys = new LinkedList<UUID>();
        map = new VoldemortClient();
    }

    public void put(Object value) throws NullPointerException {
        if(value == null) {
            throw new NullPointerException();
        }
        //first, generate a random key for the given value
        key = UUID.randomUUID();
        keys.add(key);  //add the key to the LinkedList

        map.put(key.toString(),value); //finally put an entry to the Hashtable
    }

    public Object take() {
        UUID headKey = (UUID)keys.remove(0);    //retrieve the first key entry
        //get the associated entry related to the first key
        Object entry = map.get(headKey.toString());

        return entry;
    }

}

Again, this is a simple class which is having two operations, put() and take(). To store the keys of the entries which are being stored, I used a Linked List.

When you need to insert an entry(<V>) to the queue, following activities will be performed.

1.  A random unique identifier is generated using UUID class. This identifier is the key(<K>) for that entry.

2.  Generated key is inserted to the keys Linked List. In this case, key will be added to as the last item of the Linked List.

3.  Save the entry into Voldemort engine as a key value pair of <K,V> where K is the generated key and V is the value being saved. Here the save operation will done through our Voldemort client.

Above set of activities are included in the put() operation of the Queue class.

When retrieving the head of the queue, following activities will be performed.

1.  Retrieve the key of the entry which has been inserted first most. This can be done by calling Linked List’s getFirst() method so that it will return the first item of the key list.

2.  Using this key, retrieve the corresponding value by calling Voldemort client’s get() method.

Above set of activities are included in the take() operation of the Queue class.

Note that you can extend this Queue class to meet your business requirements and take() and put() operations are just work horses.

Step 3: Use the Queue class as your wish

Hardest part is over. Now you can use the Queue to meet your business needs.

package com.dunithd;

/**
 * Main class to test our Voldemort based FIFO queue
 * <p/>
 * Author: <a href="https://dunithd.wordpress.com">Dunith Dhanushka</a>, Date: Jul 2, 2010
 */
public class Main {

    public static void main(String[] args) {
        Queue queue = new Queue();

        queue.put("Monday");
        queue.put("Tuesday");
        queue.put("Wednesday");
        queue.put("Thursday");
        queue.put("Friday");

        for (int i = 0; i < 5; i++) {
            System.out.println(queue.take().toString());
        }

    }
}

An improved design for the Queue

Queue design can be elaborated by decoupling the storage engine from the Queue. So you don’t have to limit only to Voldemort. You can try other key-value storage systems like Redis, MongoDb and Apache Cassandra as the storage of your queue. Although Redis and MongoDb is not implemented in Java, they do have APIs for Java.

Below is the decoupled architecture for the Queue.

About these ads

6 thoughts on “Implementing a persistent FIFO queue in Java using Voldemort

  1. Thanks for the nice writeup. I do have two concerns about the proposed implementation, though. The more easily-dealt-with one is that items in the queue aren’t being deleted after they’re fetched, even though no in-memory references survive the call to take(). This seems like a pretty serious storage leak, but it’s trivial to fix by adding a delete call.

    The second issue is more difficult: the keys themselves are persisted, but the list of keys isn’t. I assume the reason to persist the keys is to survive a crash, but without persisting the list there’s no way to do proper recovery. One way to solve this would be to have each stored key include the keys of its predecessor and successor (which would require pre-allocating the successor key) and then storing the head/tail as part of each put/take. Since the successor key is preallocated it might not actually have been used when the crash occurs, so a get() on it would fail, but recovery should be working from the head anyway.

    Even the space-conserving crash-resistant form of this still seems a lot simpler than any message-queuing package which would provide an obvious alternative, though, so I think it’s still a great idea. Just wanted to add a couple of thoughts about next steps.

    • Hey Jeff,
      Many thanks for your comment on this and I highly appreciate your suggestions.
      I highly concerned your second suggestion and currently writing another post that has more focus on recovering this FIFO queue after a crash. In addition to that I’m writing a JMS subscriber implementation that uses Voldemort as a persistent ‘sink’ to store incoming JMS messages from a messaging system. So I’m planning to implement this crash recovery mechanism inside that sink.

      regards,
      Dunith

  2. Pingback: Voldemort sink implementation for Cloudera Flume « Dunith Dhanushka's Weblog

  3. You could’ve just kept two counters (a writeIndex and a readIndex) and then store entries using a sequence-number as the key. The counters themselves could also be stored in voldemort using special keys….

  4. Hi Dunith – Thank you for the nice article. Can you please let me know if we can store a text file in Voldemort? If so, how do we do that?

    Thanks,
    Rashmi

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s