Voldemort sink implementation for Cloudera Flume

Recently I implemented a Flume sink which uses Voldemort as its data store. Now it is hosted in GitHub and please feel free to fork it and experiment with it. But before that I’ll give you some insight into Flume and Voldemort so that it’ll help you to get a good picture about both of them.

What is Flume?

Flume is a distributed system that gets your logs from their source and aggregates them to where you want to process them . The primary use case is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent store such as HDFS.

Flume is an open source project from Cloudera and it is licensed under Apache license v2.0. Cloudera provides enterprise support for the Apache Hadoop project and it’s sub projects such as Pig, Hive, HBase and ZooKeeper. Their distribution is named as “Cloudera’s Distribution for Hadoop(CDH)” and it has got several sub components from Cloudera including Flume, Sqoop and Hue.

Before proceed with rest of the article, I recommend to yo visit www.cloudera.com and have a better picture about Flume and its architecture.

Why Flume?

Flume is not limited to collect logs from distributed systems, but it is capable of performing other use cases such as

  • Collecting readings from array of sensors
  • Collecting impressions from custom apps for an ad network
  • Collecting readings from network devices in order to monitor their performance.

Flume is targeted to preserve the reliability, scalability, manageability and extensibility while it serves maximum number of clients with higher QoS.

Flume is easy to extend

Flume was awesome to me because it is very easy to extend. The source and sink architecture is flexible and provides simpler APIs.

What is a Source?
Source in the sense, a data provider that produces event data and send them downstream. Flume supports reading from many data sources such as standard console(stdin), syslog, text file and even tailing a file. Sources are the origin of events.

What is a Sink?

Sinks are the destinations of events. They travel downward from sources to sinks. There are many possible destinations for events — to local disk, to HDFS, to the console, or forwarding across the network. You use the sink abstractions as an interface for forwarding events to any of these destinations.

The beauty of the Flume is you can implement your own sink or source and plug it to the core very easily. Plugging a new source/sink it to Flume core is just a matter of editing some configuration files.

because of this cool feature, I started thinking about adding Voldemort as a Flume event sink.

What is Voldemort?

I’ve blogged about Voldemort in my previous blog post. You can read more details about Voldemort there. But for the sake of clarity I’ll introduce you Voldemort again.

Voldemort is an open source distributed key-value storage which is implemented by SNA team at LinkedIn. It realised the Amazon’s Dynamo paper which has been a great milestone in implementing key-value storage systems.

One of most loveable feature of Voldemort is its rich documentation compared to other competitor key-value storage systems. Apart from that it has been backed by a strong community of developers so that getting help and support can be easily done.

Why Voldemort?

Voldemort is a distributed, fault tolerant and highly scalable key-value storage. So it is well suited for enterprise level operations.

Analysing logs in enterprise applications is a challenging task. Logs are produced by multiple computers so that amount of data required to analyse is comparatively large. Until logs are processed, they needs to be stored in a reliable, fault tolerant and scalable storage medium. So Voldemort would fit into this situation without any doubts.

Voldemort sink for Flume

Now we are on our business. I think the prelude is somewhat lengthy one 🙂

Voldemort sink for Flume is supposed to store events which are received from different Flume sources into Voldemort. It looks like this is pretty much straightforward task initially. But there were several issues required more consideration.

Design considerations for Voldemort sink

There were several design considerations had to take place when I was started designing the sink. I must specially thanks you for Roshan Sumbaly from LinkedIn for his ideas. So below are the design decisions I had to consider

Key generation strategy and granularity of the key space.

I already mentioned that Voldemort is a key-value storage system. So any value being stored there must have a unique key assigned to it.

Regarding this Flume sink, same situation applied. When sink receives an event from a source, we have to store this event in Voldemort. But before that, a unique key must be generated and assigned to that event value. There are several ways to generate a unique Id for a key such as random numbers, UUID generators and time stamps.

But the biggest problem we have to face here is when retrieving the stored log entries(or events) from Voldemort, how can we exactly specify what entry we need back? We can retrieve a stored value in Voldemort by performing GET operation with key of the stored value. Since we are not keeping track of the generated key set inside the sink it is impossible to retrieve the stored entries back.

Using time stamps as keys seems to be a solution to this problem. If we use “yyyyMMddhhmmS” as the time stamp format, an example key might look like this


2010 <- Year, 10 <- month, 03 <- day, 10 <- hour, 30 <- minute, 345 <- mili seconds

But even if we use this strategy we have to face a problem. Voldemort doesnt support for range query operations so that if we need to retrieve an entry inside Voldemort, it is mandatory to know the exact time stamp when entry is inserted. Because that time stamp is the key for that entry.

At this stage Roshan told me that bucketing the keys using the time stamp would be a better solution. That means we can append log entries/events for the same key. Entries can be collected within several granularity levels such as per day, per hour and per minute. This granularity is configurable at sink’s construction time so that users have to specify the granularity level as “DAY”, “HOUR” or “MINUTE”.

Say for example the current time stamp is 2010/10/03 10:30. So the generated key will be like below under different granularity levels.

  • If “DAY” is specified => key will be 20101003
  • If “HOUR” is specified => key will be 2010100310
  • If “MINUTE” is specified => key will be 201010031030

When “DAY” is specified, all the log entries will be appended to key “20101003”. When “HOUR” is specified all entries will be appended to “2010100310” and so on. Keys will be changed once the day, hour or minute changes in the day. To delimit the log entries, each entry is delimited with a pipe “|” character so that once you retrieve the entries it is just a matter of splitting the value String to original log lines.

Using JSON as the key serialization format

JSON is obviously the most efficient serialization format compared to Strings. So its recommended to configure the Voldemort store with JSON as key serialization format. Value can be in any prefered format.

Installing and configuring Voldemort sink for Flume

You can find the detailed installation guide at GitHub.

Coming up next…

I’m also working on the Voldemort source and planning to release it soon. Before that I’d like to know your feedbacks on this. So feel free to fork the project and hack it!

Happy hacking! 🙂

Implementing a persistent FIFO queue in Java using Voldemort


Recently I joined a company which provides real-time stock market information to its clients through a proprietary stock price dissemination server. Initially server connects to stock exchanges and listens for incoming market data frames. Once a frame has been read from the socket, it will be placed in a In-memory queue so that it can be used for further processing.
Problem arose with this approach is Queue is getting filled rapidly with frames and theres no control over its upper limit. Since the queue is a in-memory data structure, its rapid growth affects the overall server  performance.

I was assigned the task of designing a new data structure to address this problem.

Queue design requirements

Holding a large data set inside the memory is inefficient and risky. All of a sudden if something went wrong in the server, whole data set will be vanished in to thin air in a second. So there must be a mechanism to periodically write  in-memory snapshots to some persistent storage.

When queue is growing, it would be better to distribute the queue content across multiple servers. Simply put, there should be a way to distribute the queue’s  load across a cluster. For that purpose a distributed database is an ideal solution.

Everything above is worthless if performance of the in-memory data structure is lagging behind the business need.  It would be better if time complexity of read/write operations against the queue is O(1). That means, the time taken to perform a read/write operation on the queue is always a constant and does not depend on the number of items in the queue.

By considering above factors, I came across a decision to find a replacement for the in-memory queue with something meets following requirements.

1.  New data structure must be able to provide the FIFO facility as same as our good old one.

2.  It should be good in performance and time complexity of read/write operations should be O(1).

3.  It might store data in memory, but there should be a way to write that data into the disk  periodically.

4.  It should be able to distribute its storage load across a cluster.

Distributed databases and Voldemort

In these days relational databases are eventually walking towards their twilight age. Most of the web applications suffer from the pain of scalability. Mean while paradigms like ‘NoSQL’ have began to roll out in the Internet arena.

Traditional relational database management systems fail when they try to scale horizontally. Specially they take very bad face when it comes to the replication. Because of that distributed databases are gradually taking control over relational databases in heavily loaded web applications like Twitter, Facebook and LinkedIn.

Voldemort in a sense is a distributed key-value storage system. Unlike traditional relational databases it has no schema or complex querying mechanism. But it is very simple to use. In order to get to know what exactly it is, following text would helpful. It’s been taken from Voldemort website.

Voldemort is not a relational database, it does not attempt to satisfy arbitrary relations while satisfying ACID properties. Nor is it an object database that attempts to transparently map object reference graphs. Nor does it introduce a new abstraction such as document-orientation. It is basically just a big, distributed, persistent, fault-tolerant hash table. For applications that can use an O/R mapper like active-record or hibernate this will provide horizontal scalability and much higher availability but at great loss of convenience. For large applications under Internet-type scalability pressure, a system may likely consists of a number of functionally partitioned services or APIs, which may manage storage resources across multiple data centers using storage systems which may themselves be horizontally partitioned.

How does Voldemort store data?

As I mentioned earlier, Voldemort is a simple key value storage system. Data can be saved and retrieved back calling simple get() and put() operations. All read/write operations are in O(1) time complexity and having excellent single node performance compared to other relational databases.

I need to access it sequentially, not by using a key…

I made a little detour from our initial discussion. 🙂 Sorry about that…

Considering the drawbacks of the in-memory FIFO queue, I started to design a new data structure which stores data in memory but periodically writes them to persistent storage. In addition to that data structure is capable of distributing its load across a cluster.

Since our price server is written in Java and Voldemort is also in Java, I started to implement a thin  Java wrapper  for Voldemort client which behaves exactly like a FIFO queue but persistence is done through Voldemort.

The major problem I had to face here was Voldemort is a key-value store and all data entries being stored on it should have a unique key assigned to it. Simply speaking, if you have something to save in Voldemort, you must provide a unique key for that. But all I need here is a simple FIFO queue which is capable of inserting new entries and getting the firstly inserted entry(or head of the queue). I don’t need to assign any keys for the values which are being inserted into the queue and users of the queue should not aware of those keys.

So I need to find a way to automatically assign unique keys to the entries before saving them in to Voldemort. This is because I don’t want to assign queue users an extra responsibility of generating unique ids for the entries.

UUID is a perfect solution for this context. UUID is a universally unique identifier and by using a UUID generator inside the queue, I could assign unique keys for the entries. It saved my day finally 🙂

This strategy is illustrated below

Case : Implementing a persistent FIFO Queue by wrapping Voldemort client

Step 1: Write a simple Voldemort client

First step is to crate an interface to the Voldemort engine. This can be done through Voldemort client API. Using this client, queue can perform storage operations like saving and retrieving entries.

Below is the complete source code for my Voldemort client.

package com.dunithd;

import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClient;
import voldemort.client.StoreClientFactory;
import voldemort.versioning.Versioned;

 * Actual Voldemort client that does the storage operations
 * </p>
 * Author: <a href="https://dunithd.wordpress.com">Dunith Dhanushka</a>, Date: Jul 2, 2010
public class VoldemortClient {

    private String host = "tcp://localhost:6666";
    private StoreClientFactory factory;
    private StoreClient client;

    public VoldemortClient() {
        this.factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(host));
        this.client = factory.getStoreClient("test");

    public void put(String key, Object value) throws NullPointerException {
        if(key == null) {
            throw new NullPointerException();
        } else {

    public Object get(String key) {
        Versioned value = client.get(key);
        return value.getValue();


This client is very simple and has only two operations.

Step 2: Write the Queue implementation

This is the actual job you have to perform.

package com.dunithd;

import java.util.LinkedList;
import java.util.UUID;

 * Wraps a LinkedList and creates a FIFO queue.
 * <p/>
 * Author: <a href="https://dunithd.wordpress.com">Dunith Dhanushka</a>, Date: Jul 2, 2010
public class Queue {

    private LinkedList keys;
    private UUID key;

    private VoldemortClient map;

    public Queue() {
        keys = new LinkedList<UUID>();
        map = new VoldemortClient();

    public void put(Object value) throws NullPointerException {
        if(value == null) {
            throw new NullPointerException();
        //first, generate a random key for the given value
        key = UUID.randomUUID();
        keys.add(key);  //add the key to the LinkedList

        map.put(key.toString(),value); //finally put an entry to the Hashtable

    public Object take() {
        UUID headKey = (UUID)keys.remove(0);    //retrieve the first key entry
        //get the associated entry related to the first key
        Object entry = map.get(headKey.toString());

        return entry;


Again, this is a simple class which is having two operations, put() and take(). To store the keys of the entries which are being stored, I used a Linked List.

When you need to insert an entry(<V>) to the queue, following activities will be performed.

1.  A random unique identifier is generated using UUID class. This identifier is the key(<K>) for that entry.

2.  Generated key is inserted to the keys Linked List. In this case, key will be added to as the last item of the Linked List.

3.  Save the entry into Voldemort engine as a key value pair of <K,V> where K is the generated key and V is the value being saved. Here the save operation will done through our Voldemort client.

Above set of activities are included in the put() operation of the Queue class.

When retrieving the head of the queue, following activities will be performed.

1.  Retrieve the key of the entry which has been inserted first most. This can be done by calling Linked List’s getFirst() method so that it will return the first item of the key list.

2.  Using this key, retrieve the corresponding value by calling Voldemort client’s get() method.

Above set of activities are included in the take() operation of the Queue class.

Note that you can extend this Queue class to meet your business requirements and take() and put() operations are just work horses.

Step 3: Use the Queue class as your wish

Hardest part is over. Now you can use the Queue to meet your business needs.

package com.dunithd;

 * Main class to test our Voldemort based FIFO queue
 * <p/>
 * Author: <a href="https://dunithd.wordpress.com">Dunith Dhanushka</a>, Date: Jul 2, 2010
public class Main {

    public static void main(String[] args) {
        Queue queue = new Queue();


        for (int i = 0; i < 5; i++) {


An improved design for the Queue

Queue design can be elaborated by decoupling the storage engine from the Queue. So you don’t have to limit only to Voldemort. You can try other key-value storage systems like Redis, MongoDb and Apache Cassandra as the storage of your queue. Although Redis and MongoDb is not implemented in Java, they do have APIs for Java.

Below is the decoupled architecture for the Queue.