A blocking queue is used in Java to control an instance of a thread set

  • 2020-04-01 03:35:35
  • OfStack

Queues manage data in a first in, first out manner. If you try to add an element to an already full blocking queue, or remove an element from an empty blocking queue, the thread will block. Blocking queues is a useful tool when multiple threads collaborate. Worker threads can periodically store intermediate results in a blocking queue. Other worker threads take the intermediate results out and modify them in the future. The queue will automatically balance the load. If the first thread set is slower than the second, the second thread set blocks while waiting for the result. If the first thread set is fast, it waits for the second to catch up.

The following program shows how to use a blocking queue to control a set of threads. The program searches all files in a directory and all its subdirectories and prints out a list of files containing the specified keyword.

The java.util.concurrent package provides four variants of the blocking queue: LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, and DelayQueue. We're using ArrayBlockingQueue. The ArrayBlockingQueue requires a given capacity when it is constructed and can choose whether fairness is required. If the fairness parameter is set, the thread with the longest wait is processed first. Often, fairness costs you in terms of performance, and you only use it when you really need it.

The producer thread enumerates all files in all subdirectories and puts them in a blocking queue. This is fast, and if the queue is not capped, it will soon contain files that were not found.

We also started a large number of search threads. Each search thread pulls a file from the queue, opens it, prints out all the lines containing the keyword, and then pulls out the next file. We used a little trick to terminate the thread after the work was done. To signal completion, the enumeration thread queues a virtual object. (this is similar to placing a virtual bag on a carousel that says "last bag.") When the search thread fetches the virtual object, it puts it back and terminates.

Note that there is no need for any displayed thread synchronization. In this program, we use the queue data structure as a synchronization mechanism.


import java.io.*; 
import java.util.*; 
import java.util.concurrent.*;  public class BlockingQueueTest 

   public static void main(String[] args) 
   { 
      Scanner in = new Scanner(System.in); 
      System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): "); 
      String directory = in.nextLine(); 
      System.out.print("Enter keyword (e.g. volatile): "); 
      String keyword = in.nextLine();        final int FILE_QUEUE_SIZE = 10; 
      final int SEARCH_THREADS = 100;        BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);        FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory)); 
      new Thread(enumerator).start(); 
      for (int i = 1; i <= SEARCH_THREADS; i++) 
         new Thread(new SearchTask(queue, keyword)).start(); 
   } 
}  /** 
 * This task enumerates all files in a directory and its subdirectories. 
 */
class FileEnumerationTask implements Runnable 

   /** 
    * Constructs a FileEnumerationTask. 
    * @param queue the blocking queue to which the enumerated files are added 
    * @param startingDirectory the directory in which to start the enumeration 
    */
   public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory) 
   { 
      this.queue = queue; 
      this.startingDirectory = startingDirectory; 
   }     public void run() 
   { 
      try
      { 
         enumerate(startingDirectory); 
         queue.put(DUMMY); 
      } 
      catch (InterruptedException e) 
      { 
      } 
   }     /** 
    * Recursively enumerates all files in a given directory and its subdirectories 
    * @param directory the directory in which to start 
    */
   public void enumerate(File directory) throws InterruptedException 
   { 
      File[] files = directory.listFiles(); 
      for (File file : files) 
      { 
         if (file.isDirectory()) enumerate(file); 
         else queue.put(file); 
      } 
   }     public static File DUMMY = new File("");     private BlockingQueue<File> queue; 
   private File startingDirectory; 
}  /** 
 * This task searches files for a given keyword. 
 */
class SearchTask implements Runnable 

   /** 
    * Constructs a SearchTask. 
    * @param queue the queue from which to take files 
    * @param keyword the keyword to look for 
    */
   public SearchTask(BlockingQueue<File> queue, String keyword) 
   { 
      this.queue = queue; 
      this.keyword = keyword; 
   }     public void run() 
   { 
      try
      { 
         boolean done = false; 
         while (!done) 
         { 
            File file = queue.take(); 
            if (file == FileEnumerationTask.DUMMY) 
            { 
               queue.put(file); 
               done = true; 
            } 
            else search(file);             
         } 
      } 
      catch (IOException e) 
      { 
         e.printStackTrace(); 
      } 
      catch (InterruptedException e) 
      { 
      }       
   }     /** 
    * Searches a file for a given keyword and prints all matching lines. 
    * @param file the file to search 
    */
   public void search(File file) throws IOException 
   { 
      Scanner in = new Scanner(new FileInputStream(file)); 
      int lineNumber = 0; 
      while (in.hasNextLine()) 
      { 
         lineNumber++; 
         String line = in.nextLine().trim(); 
         if (line.contains(keyword)) System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber, line); 
      } 
      in.close(); 
   }     private BlockingQueue<File> queue; 
   private String keyword; 
}


Related articles: