Home arrow Java & J2EE arrow Page 3 - Taming Tiger: Concurrent Collections

Using the Blocking Queues - Java

Moving beyond Map, Collection, List, and Set: John Zukowski discusses the new library release in the Tiger release of the J2SE platform and what it provides: a set of utilities commonly needed in concurrent programs. If you are interested in optimizing multithreaded access to your collections, you've come to the right place. (This intermediate-level article was first published by IBM developerWorks, June 16, 2004, at http://www.ibm.com/developerWorks.)

TABLE OF CONTENTS:
  1. Taming Tiger: Concurrent Collections
  2. Using the Basic Queues
  3. Using the Blocking Queues
  4. Using the ConcurrentMap implementation
  5. Resources
By: developerWorks
Rating: starstarstarstarstar / 11
October 27, 2004

print this article
SEARCH DEV SHED

TOOLS YOU CAN USE

advertisement

The new java.util.concurrent package adds the BlockingQueue interface and five blocking queue classes to the set of concrete collection classes available in the Collections Framework. For those unfamiliar with the concept of a blocking queue, it is essentially a FIFO data structure, with a twist. Instead of adding and removing elements from the queue immediately, the thread performing the operation blocks until space or an element is available. The Javadoc for the BlockingQueue interface demonstrates the basic usage of a blocking queue, as shown in Listing 2. The put() operation in the producer will block when there is no space available and the take() operation in the consumer will block when there is nothing in the queue.

Listing 2. Using a BlockingQueue

 class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while(true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

Each of the five queues offers something different:

  • ArrayBlockingQueue: A bounded queue backed by an array

  • LinkedBlockingQueue: An optionally bounded queue backed by linked nodes

  • PriorityBlockingQueue: An unbounded priority queue backed by a priority heap

  • DelayQueue: A time-based scheduling queue backed by a priority heap

  • SynchronousQueue: A simple rendezvous mechanism utilizing the
    BlockingQueue interface

The first two classes, ArrayBlockingQueue and LinkedBlockingQueue are nearly identical, differing only by their backing store and that LinkedBlockingQueue is not always bounded by capacity. A LinkedBlockingQueue class unbound by size will never cause a wait when adding an element to the blocking queue (at least not until there are Integer.MAX_VALUE elements in it).

PriorityBlockingQueue is a queue with an unbound capacity that maintains elements in their logical order through use of the Comparable sort order of the contained elements. Think of it as a possible replacement for TreeSet. For instance, adding the strings One, Two, Three, and Four to the queue will result in Four being the first one taken out. For elements without a natural order, you can provide a Comparator to the constructor. There is one trick with PriorityBlockingQueue, though. The Iterator instance returned from iterator() doesn't necessarily return the elements in priority order. If you must get all the elements in priority order for traversal, get them all through the toArray() method and sort them yourself, like Arrays.sort(pq.toArray()).

The new DelayQueue implementation is probably the most interesting (and complicated) of the bunch. Elements added to the queue must implement the new Delayed interface (just one method -- long getDelay(java.util.concurrent.TimeUnit unit)). While the queue is unbound in size, enabling adds to return immediately, one cannot take an element from the queue until the delay time has expired. When multiple elements have expired delays, the element with the earliest/oldest delay expiration will be taken first. It sounds more complicated then it is. Listing 3 demonstrates the use of this new blocking queue collection:

Listing 3. Using a DelayQueue implementation

import java.util.*;
import java.util.concurrent.*;

public class Delay {
  /**
   * Delayed implementation that actually delays
   */
  static class NanoDelay implements Delayed {
    long trigger;
    NanoDelay(long i) {
      trigger = System.nanoTime() + i;
    }
    public int compareTo(Object y) {
      long i = trigger;
      long j = ((NanoDelay)y).trigger;
      if (i < j) return -1;
      if (i > j) return 1;
      return 0;
    }
    public boolean equals(Object other) {
      return ((NanoDelay)other).trigger == trigger;
    }
    public boolean equals(NanoDelay other) {
      return ((NanoDelay)other).trigger == trigger;
    }
    public long getDelay(TimeUnit unit) {
      long n = trigger - System.nanoTime();
      return unit.convert(n, TimeUnit.NANOSECONDS);
    }
    public long getTriggerTime() {
      return trigger;
    }
    public String toString() {
      return String.valueOf(trigger);
    }
  }
  public static void main(String args[]) throws InterruptedException {
    Random random = new Random();
    DelayQueue queue = new DelayQueue();
    for (int i=0; i < 5; i++) {
      queue.add(new NanoDelay(random.nextInt(1000)));
    }
    long last = 0;
    for (int i=0; i < 5; i++) {
      NanoDelay delay = (NanoDelay)(queue.take());
      long tt = delay.getTriggerTime();
      System.out.println("Trigger time: " + tt);
      if (i != 0) {
        System.out.println("Delta: " + (tt - last));
      }
      last = tt;
    }
  }
}

The example starts with an inner class NanoDelay that will essentially pause for the given random number of nanoseconds, taking advantage of the new nanoTime() method of System. The main() method then only puts NanoDelay objects into the queue and takes them out again. If you wanted the queued item to do something else, you would need to add that to the implementation of the Delayed object and call that new method upon retrieval from the queue. (Feel free to expand on NanoDelay yourself to demonstrate having an additional method to do something interesting.) The time delta is displayed between successive calls to get elements from the queue. If the delta is ever negative, consider that an error, as you should never get an item from the queue with an earlier trigger time, when the delay has ended.

The SynchronousQueue class is the simplest of the bunch. It has no internal capacity. It works as a handoff mechanism between threads. The producer adding an element to the queue will wait for a consumer in another thread. When that consumer is available, the element is passed directly between consumer and producer, never literally getting added to the blocking queue.

IBM developerWorksVisit developerWorks for thousands of developer articles, tutorials, and resources related to open standard technologies, IBM products, and more. See developerWorks.



 
 
>>> More Java & J2EE Articles          >>> More By developerWorks
 

blog comments powered by Disqus
escort Bursa Bursa escort Antalya eskort
   

JAVA & J2EE ARTICLES

- More Java Bugs Lead to More Attacks
- Oracle's Java One Brings News, Surprises
- Oracle Patches Java Runtime Environment
- Apple Syncs Java Update with Oracle
- Spring 3.1 Java Development Framework Compat...
- Jelastic Java PaaS Availability and Pricing ...
- NetBeans 7.1 Released, Supports JavaFX 2
- SolarWinds Releases Newest Version of Java M...
- Free Monitoring Tool for Java Apps on Heroku
- Heroku Adds JCloud Platform Support, Java 7 ...
- Java SE 8 Speculation in Full Swing
- Java SE 7 Now Available
- New JVM Language and Java Reporting Tool
- Java 7 Release Update and New Eclipse Toolkit
- The Best Java Netbeans IDE Plugins

Developer Shed Affiliates

 


Dev Shed Tutorial Topics: