Moving beyond Map, Collection, List, and Set: John Zukowski discusses the new library release in the Tiger release of the J2SE platform and what it provides: a set of utilities commonly needed in concurrent programs. If you are interested in optimizing multithreaded access to your collections, you've come to the right place. (This intermediate-level article was first published by IBM developerWorks, June 16, 2004, at http://www.ibm.com/developerWorks.)
The new java.util.concurrent package adds the BlockingQueue interface and five blocking queue classes to the set of concrete collection classes available in the Collections Framework. For those unfamiliar with the concept of a blocking queue, it is essentially a FIFO data structure, with a twist. Instead of adding and removing elements from the queue immediately, the thread performing the operation blocks until space or an element is available. The Javadoc for the BlockingQueue interface demonstrates the basic usage of a blocking queue, as shown in Listing 2. The put() operation in the producer will block when there is no space available and the take() operation in the consumer will block when there is nothing in the queue.
class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
Each of the five queues offers something different:
ArrayBlockingQueue: A bounded queue backed by an array
LinkedBlockingQueue: An optionally bounded queue backed by linked nodes
PriorityBlockingQueue: An unbounded priority queue backed by a priority heap
DelayQueue: A time-based scheduling queue backed by a priority heap
SynchronousQueue: A simple rendezvous mechanism utilizing the BlockingQueue interface
The first two classes, ArrayBlockingQueue and LinkedBlockingQueue are nearly identical, differing only by their backing store and that LinkedBlockingQueue is not always bounded by capacity. A LinkedBlockingQueue class unbound by size will never cause a wait when adding an element to the blocking queue (at least not until there are Integer.MAX_VALUE elements in it).
PriorityBlockingQueue is a queue with an unbound capacity that maintains elements in their logical order through use of the Comparable sort order of the contained elements. Think of it as a possible replacement for TreeSet. For instance, adding the strings One, Two, Three, and Four to the queue will result in Four being the first one taken out. For elements without a natural order, you can provide a Comparator to the constructor. There is one trick with PriorityBlockingQueue, though. The Iterator instance returned from iterator() doesn't necessarily return the elements in priority order. If you must get all the elements in priority order for traversal, get them all through the toArray() method and sort them yourself, like Arrays.sort(pq.toArray()).
The new DelayQueue implementation is probably the most interesting (and complicated) of the bunch. Elements added to the queue must implement the new Delayed interface (just one method -- long getDelay(java.util.concurrent.TimeUnit unit)). While the queue is unbound in size, enabling adds to return immediately, one cannot take an element from the queue until the delay time has expired. When multiple elements have expired delays, the element with the earliest/oldest delay expiration will be taken first. It sounds more complicated then it is. Listing 3 demonstrates the use of this new blocking queue collection:
public class Delay { /** * Delayed implementation that actually delays */ static class NanoDelay implements Delayed { long trigger; NanoDelay(long i) { trigger = System.nanoTime() + i; } public int compareTo(Object y) { long i = trigger; long j = ((NanoDelay)y).trigger; if (i < j) return -1; if (i > j) return 1; return 0; } public boolean equals(Object other) { return ((NanoDelay)other).trigger == trigger; } public boolean equals(NanoDelay other) { return ((NanoDelay)other).trigger == trigger; } public long getDelay(TimeUnit unit) { long n = trigger - System.nanoTime(); return unit.convert(n, TimeUnit.NANOSECONDS); } public long getTriggerTime() { return trigger; } public String toString() { return String.valueOf(trigger); } } public static void main(String args[]) throws InterruptedException { Random random = new Random(); DelayQueue queue = new DelayQueue(); for (int i=0; i < 5; i++) { queue.add(new NanoDelay(random.nextInt(1000))); } long last = 0; for (int i=0; i < 5; i++) { NanoDelay delay = (NanoDelay)(queue.take()); long tt = delay.getTriggerTime(); System.out.println("Trigger time: " + tt); if (i != 0) { System.out.println("Delta: " + (tt - last)); } last = tt; } } }
The example starts with an inner class NanoDelay that will essentially pause for the given random number of nanoseconds, taking advantage of the new nanoTime() method of System. The main() method then only puts NanoDelay objects into the queue and takes them out again. If you wanted the queued item to do something else, you would need to add that to the implementation of the Delayed object and call that new method upon retrieval from the queue. (Feel free to expand on NanoDelay yourself to demonstrate having an additional method to do something interesting.) The time delta is displayed between successive calls to get elements from the queue. If the delta is ever negative, consider that an error, as you should never get an item from the queue with an earlier trigger time, when the delay has ended.
The SynchronousQueue class is the simplest of the bunch. It has no internal capacity. It works as a handoff mechanism between threads. The producer adding an element to the queue will wait for a consumer in another thread. When that consumer is available, the element is passed directly between consumer and producer, never literally getting added to the blocking queue.
Visit developerWorks for thousands of developer articles, tutorials, and resources related to open standard technologies, IBM products, and more. See developerWorks.