My 1st ZooKeeper Recipe: Distributed Queue with Locking

ZooKeeper is in use at many different distributed systems and could probably also be of great help to your application, when it comes to distributed synchronization, and maintaining configuration. It promises to be simple, expressive, highly available, and highly reliable. To get an better understanding of what Zookeeper can do for you it’s a good idea to gain a solid understand of it’s implementation details and the consistency guarantees it makes. And it’s sure worth looking at the ZooKeeper Recipes and Solutions page.

As I just recently had the opportunity to take a closer look at ZooKeeper for the first time, this article does little more than provide the ‘Notes of a Beginner’. You’ll find here a brief overview of the consistency guarantees ZooKeeper makes, and a sample application. I decided to implement a distributed queue with locking, which I found as an example on the Solutions and Recipes page.
Imagine a web crawler who stores it’s links in a queue, so that other instances can grab link by link from the queue and crawl the web pages. In this scenario we wouldn’t want multiple instances crawling the same link, therefor we lock the link prior to performing the fetch. We don’t just remove it from the queue, because the fetch could fail. On the other hand if the instance dies unexpectedly during the fetch the lock needs to be released, so another instance can pick from there. All this can easily be done with ZooKeeper.

Consistency

A distributed queue has to meet at least some guarantees to be useful. Surely we would want our queue to be persistent and atomic, so we wont loose any information. But a queue also follows the FIFO principle, so that we would expect a sequential ordering of the elements added to the queue. Let’s find out how ZooKeeper can help us meet this requirements.

ZooKeeper makes the following guarantees for data consistency:

  • Atomicity
    Updates to ZooKeeper either fail or succeed. A failed update will never be seen by any client.
  • Durability
    Once a update succeeds it’s persisted surviving server failures.
  • Sequential Consistency
    Updates are applied in the order they are send. This also implies that a client never reads an older state of ZooKeeper than the one it has read before.
  • Single System Image
    A client always sees the same state of ZooKeeper regardless of what server it connects to.
  • Timeliness
    A client’s view can be outdated but not more than some of tens of seconds.

Up on this constraints we can build a persistent queue that stores it’s element in a sequential order by the time they were send. The Sequential Consistency and the Single System Image ensure that a client can obtain a lock for an element. Because of the Timeliness not all client might immediately see the lock, but the sequential ordering of the updates ensures that no two request can obtain the same lock for the same element.

Data Model

Once a lock is obtained it should be released, if a client crashes before being able to finish. The data model of ZooKeeper helps us with that too. ZooKeeper holds a hierarchical tree of nodes called znodes. This znodes have some very interesting properties, which are presented in the following list:

  • Ephemeral or Persistent Znodes
    Znodes can either be persistent or ephemeral. An ephemeral znode is tied to the session of a client, but visible to all clients.
  • Sequence Numbers
    A znode can be created in a way, that ZooKeeper automatically takes care of assigning a single number to it in sequential oder of the other znodes in the same hierarchy.
  • Watches
    Watches notify clients about changes of znodes. A Watch is only triggered once, so if a client wants to get notified again it needs to reregister.

For our distributed queue we will use all of this properties. The sequential numbering of znodes helps us to follow the FIFO principle of a queue. Through Watches we obtain the ability to immediately act one changes occurring to the queue, for example when nodes are added to the queue. All the locks client create will be ephemeral, in that way a lock is automatically released when the client unexpectedly dies.

Sample Queue with Locking

The sample application contains two main parts. With CreateQueue.java we simulate multiple clients adding nodes to  /queue hold by ZooKeeper running on localhost. We use the create mode PERSISTENT_SEQUENTIAL to achieve that every node is added in a sequential order and persisted throughout server failures.

public class CreateQueue  {
	private class QueueAddWorker extends ConnectionWatcher implements Runnable {
		private DateFormat dfrm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS Z z");
		private Random r = new Random(); private String name;

		public QueueAddWorker(String name){ this.name = name; }

		@Override
		public void run() {
			try {
				while(true){
					this.connect("localhost");
					String added = zk.create("/queue/q-", null, Ids.OPEN_ACL_UNSAFE, 
                                               CreateMode.PERSISTENT_SEQUENTIAL);
					this.close();
					Thread.sleep(new Long(r.nextInt(1000)+50));
				}
			} catch (Exception e){ e.printStackTrace(); }
		}

	}

	public static void main(String[] args){
		CreateQueue cQ = new CreateQueue();
		Thread addWorker1 = new Thread(cQ.new QueueAddWorker("worker1"));
		Thread addWorker2 = new Thread(cQ.new QueueAddWorker("worker2"));
		Thread addWorker3 = new Thread(cQ.new QueueAddWorker("worker3"));
		addWorker1.start(); addWorker2.start(); addWorker3.start();
	}
}

QueueAddWorker uses the method zk.create(“/queue/q-“, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL)  to add nodes to  /queue. The nodes are only named ‘q-‘, but since we defined the persistent sequential create mode, ZooKeeper takes care of a sequential ordering by appending a 10 digit number with 0 (zero) padding to the name. So the name of a node will really be ‘q-0000000003′ for example. This is done by a counter, which the parent znode holds.

Watch Triggers

The second part of the sample application consist of an instance pulling the queue for new entries by holding a getChildren Watch on /queue. The process method of a Watcher is executed every time a watch is triggered. So we don’t have to constantly pull the queue, but only if nodes are added. There are four triggers:

  1. NodeCreated
  2. NodeDeleted
  3. NodeChildrenChanged
  4. NodeDataChanged

In this example we are only interested at the NodeChildrenChanged trigger, as we want our watcher to act upon the creation of a new node in the queue. The watcher than tries to get a lock for that node. We use the ephemeral create mode, which removes the lock if the client would accidentally fail. After we are finished the node gets deleted from the queue.

public class PullQueue extends ConnectionWatcher {
	private class PullWatcher implements Watcher {
		private DateFormat dfrm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS Z z");
		private Random r = new Random(); private String name;
		private List<String> children;

		public PullWatcher(String name) throws Exception { 
			this.name = name; 
			children = zk.getChildren("/queue", this);
		}

		@Override
		public void process(WatchedEvent event) {
			try {
				if (event.getType().equals(EventType.NodeChildrenChanged)) {
					children = zk.getChildren("/queue", this); // get the children and renew watch
					Collections.sort(children); // we are getting an unsorted list
					for (String child : children) {
						if (zk.exists("/queue_lock/" + child, false) == null) {
							try {
								zk.create("/queue_lock/" + child, dfrm.format(new Date()).getBytes(), 
                                                                           Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
								Thread.sleep(new Long(r.nextInt(5000) + 2000));
								zk.delete("/queue/" + child, -1);
							} 
                                                        // even so we check the existence of a lock,
                                                        // it could have been created in the mean time
                                                        // making create fail. We catch and ignore it.
                                                        catch (Exception ignore) { } 
						}
					}
				}
			} catch (Exception e){ e.printStackTrace(); }
		}
	}

	public static void main(String[] args) throws Exception {
		PullQueue pQ = new PullQueue(); pQ.connect("localhost");
		try {
			pQ.new PullWatcher(args[0]);
			Thread.sleep(Long.MAX_VALUE);
		} finally { pQ.zk.close(); }
	}
}

ZooKeeper does not send the children ordered to the client. So prior to iterating the children we sort the list. Another thing to note here is, that we have to register the watch again once the watch was triggered. We also do this here prior to going through the list of children.
The PullWatcher than tries to obtain a lock for the node in /queue_lock. It checks first if a lock already exists and than tries to create it. The creation might fail, because the view of the client might have been out-of-date or the lock was created by another client after the PullWatcher checked. It would never occur that two clients hold the same lock, but here we have to catch the exception and simply ignore it.

Herd Effect

Looking at the execution statistics of a sample run, it can be noticed that the clients iterate most of the time through the list without even doing some sort of work. This can be explained by the so called “herd effect”. The “herd effect” describes, when a large number of clients gets notified, although a very small group can only act on the event.
This application suffers clearly from the “herd effect” and there might be a smarter way of using the watches, to avoid it

Conclusion

ZooKeeper keeps to it’s promise of being simple, expressive, and indeed very useful. Designing distributed system is hard and ZooKeeper can clearly take away some of the pain. The here shown example suffers from the “herd effect” and might as well have some other issues, but it does what it promises and it wasn’t all that hard to implement.

Further Readings:

Advertisement

One thought on “My 1st ZooKeeper Recipe: Distributed Queue with Locking

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s