How to build a replicated HashMap in Java

We all have heard about clustered application servers and databases and how they can replicate data between instances. We have heard about distributed cache implementations like Ehcache, Hazelcast etc and even NOSQL databases like MongoDB that replicate data between all the instances. They do a great job of replicating data efficiently and ensuring data integrity. They provide different guarantees on data replication and availability.

Is it all black magic that is too hard to comprehend? No - its just simple logic built into the entities that store the data.

In this post I will describe how you can build a simple replicating HashMap in Java. I will build this replication using few lines of code in Netty - the easy to use NIO library from JBoss.

So what do we want to do?

We want to build a replicating HashMap - as in we create an instance of the map, add data to it and the data is magically transported across the ether to another HashMap running in a different JVM!

Wow! What high end tools do we need for this?

We don’t need much - just the usual Java and Netty NIO framework. We will use basic Java ConcurrentHashMap and HashMap classes for data storage and Netty to build the back end communication. We could have built the back end communication in any manner - JMX, Database or sockets. Netty provides an easy to use approach to build this using plain old sockets.

The design aspects

For the purpose of this example, we will replicate add and delete operations. In order to represent the operation being performed on the map we will need an entity. Lets call that Event. This has to be serializable because it has to be sent over the socket. This will have 3 fields - the operation, they key and the value of the entry.

Once we have this, we will implement a Map and override its add and remove methods so that it can send the data over a socket. This will be achieved by creating a Netty client in the Map. This client will connect to the Server and send the event for these two operations.

The server will simply listen to these messages and add or remove data from a DataCache. For the sake of proving that data can flow both ways, the server will echo back the event to the client. On the client side, the event is used to add or remove data from another DataCache. Data from both the caches is printed out for observing the results.

Lets now look at the code for all these - you will be surprised how less code we need.

The Event

package com.supercoderz.clusteredmap.event;

import java.io.Serializable;

public class Event implements Serializable {
	private static final long serialVersionUID = 1L;

	public enum Operation {
		ADD, DELETE
	}

	public Operation op;
	public String key;
	public Object value;

}

The ServerHandler

package com.supercoderz.clusteredmap.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

import com.supercoderz.clusteredmap.event.Event;
import com.supercoderz.data.DataCache;

public class ServerHandler extends SimpleChannelUpstreamHandler {

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		Object o=e.getMessage();
		if(o instanceof Event){
			if (((Event) o).op.equals(Event.Operation.ADD)) {
				DataCache.cache.put(((Event) o).key, ((Event) o).value);
			} else if (((Event) o).op.equals(Event.Operation.DELETE)) {
				DataCache.cache.remove(((Event) o).key);
			}
		}
		System.out.println("-------------------------------");
		System.out.println(DataCache.cache);
		System.out.println("-------------------------------");
		ctx.getChannel().write(o);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
	}
}

The Server

package com.supercoderz.clusteredmap.server;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;

public class Server {
	public static void main(String[] args) {
		ServerBootstrap bootstrap = new ServerBootstrap(
				new NioServerSocketChannelFactory(
						Executors.newCachedThreadPool(),
						Executors.newCachedThreadPool()));

		// Set up the pipeline factory.
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(new ObjectEncoder(),
						new ObjectDecoder(), new ServerHandler());
			}
		});
		bootstrap.bind(new InetSocketAddress(1111));
	}

}

The ReplicatingMap

package com.supercoderz.clusteredmap.map;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;

import com.supercoderz.clusteredmap.client.ClientHandler;
import com.supercoderz.clusteredmap.event.Event;

public class ReplicatingMap extends HashMap<String, Object> {

	private static final long serialVersionUID = 1L;
	private ClientBootstrap bootstrap;
	private ChannelFuture future;

	public ReplicatingMap(String hostname, int port)
			throws InterruptedException {
		bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
				Executors.newCachedThreadPool(),
				Executors.newCachedThreadPool()));
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(new ObjectEncoder(),
						new ObjectDecoder(), new ClientHandler());
			}
		});
		future = bootstrap.connect(new InetSocketAddress(hostname, port));
		future.await();
	}

	@Override
	public Object put(String key, Object value) {
		Event e = new Event();
		e.op = Event.Operation.ADD;
		e.key = key;
		e.value = value;
		future.getChannel().write(e);
		return super.put(key, value);
	}

	@Override
	public Object remove(Object key) {
		Event e = new Event();
		e.op = Event.Operation.DELETE;
		e.key = (String) key;
		future.getChannel().write(e);
		return super.remove(key);
	}

}

The ClientHandler to handle data echoed from Server

package com.supercoderz.clusteredmap.client;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

import com.supercoderz.clusteredmap.event.Event;
import com.supercoderz.data.DataCache;

public class ClientHandler extends SimpleChannelUpstreamHandler {
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		Object o = e.getMessage();
		if (o instanceof Event) {
			if (((Event) o).op.equals(Event.Operation.ADD)) {
				DataCache.cache.put(((Event) o).key, ((Event) o).value);
			} else if (((Event) o).op.equals(Event.Operation.DELETE)) {
				DataCache.cache.remove(((Event) o).key);
			}
		}
		System.out.println("-------------------------------");
		System.out.println(DataCache.cache);
		System.out.println("-------------------------------");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
	}
}

The DataCache

package com.supercoderz.data;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class DataCache {
	public static Map<String, Object> cache = new ConcurrentHashMap<String, Object>();
}

The Client

package com.supercoderz.clusteredmap.client;

import com.supercoderz.clusteredmap.map.ReplicatingMap;

public class Client {
	public static void main(String[] args) throws InterruptedException {
		ReplicatingMap map = new ReplicatingMap("localhost", 1111);
		map.put("1", 1);
		map.put("2", 2);
		map.put("3", 3);
		map.put("4", 4);
		map.remove("4");
	}

}

Usage,Analysis and Conclusion

If you run the Server and Client classes in two command prompts, then you can see that the data that gets added in the client is replicated over in the Server. The data received in the Server is echoed back to the client and is replicated again. The ReplicatedMap is just sending data to the server which flows back to the client. This flow is setup to prove that data flows both ways - in reality you will probably route the data differenly with the ReplicatingMap sometimes sending data to multiple servers or maybe ReplicatingMap acting as a server that accepts connections and responds to all clients with data that is added to it.

Once this basic flow of data is setup its really up to you which way and how many connections or replications you want to add. Its easy to add logic on top that divides and splits the data to replicate it, checks for duplicates etc etc.

The amount of code that was actually written to get the replication working is very less  - you do see a lot of verbose Java code but that is Java - the main lines that get replication to work are really simple.

So next time someone talks about replication as being some form of higher level black magic, you know how it works under the wraps!

Written on November 7, 2011