Based on the use of BlockingQueue in Java

  • 2020-04-01 01:44:35
  • OfStack

          Recently in the maintenance of a Java project, in the group also talk about the pros and cons of Java! But some of the ultimate fans of Java, always claimed that the performance is not as bad as C++, and many standard class libraries are written by masters, how to stabilize and so on. After some serious research, one of the things they told me was that Posting messages between threads using BlockingQueue, which Java had already encapsulated, was enough.

          Since there is enough to use, write the code test, simply write a small program to do some testing:

//The default package
import java.util.concurrent.*; 

import base.MyRunnable; 

public class Test 
{ 
    public static void main(String[] args) 
    { 
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); 
        java.lang.Runnable r = new MyRunnable(queue); 
        Thread t = new Thread(r); 
        t.start(); 

        while(true) 
        { 
            try
            { 
                while(true) 
                { 
                    for(int i =0;i < 10000;i++) 
                    { 
                        queue.offer(i); 
                    } 
                } 
            } 
            catch ( Exception e) 
            { 
                e.printStackTrace(); 
            } 
        } 
    } 
} 

  
//Packages to add
package base; 

import java.lang.Runnable; 
import java.util.concurrent.*; 
import java.util.*; 

public class MyRunnable implements Runnable 
{ 
    public MyRunnable(BlockingQueue<Integer> queue) 
    { 
        this.queue = queue; 
    } 
    public void run() 
    { 
        Date d = new Date(); 
        long starttime = d.getTime(); 
        System.err.println(starttime); 
        int count = 0; 
        while(true) 
        { 
            try
            { 
                Integer i = this.queue.poll(); 
                if(i != null) 
                { 
                    count ++; 
                } 
                if(count == 100000) 
                { 
                    Date e = new Date(); 
                    long endtime = e.getTime(); 
                    System.err.println(count); 
                    System.err.println(endtime); 
                    System.err.print(endtime - starttime); 
                    break; 
                } 

            } 
            catch (Exception e) 
            { 

            } 
        } 
    } 
    private BlockingQueue<Integer> queue; 
}

                Transfer 100,000 pieces of data, on my test machine, probably need about 50ms, it is ok! Just look at the underlying implementation of BlockingQueue

  The offer and poll I used in the above test code, just look at these two implementation functions, the first is the offer

public E poll() { 
        final AtomicInteger count = this.count; 
        if (count.get() == 0) 
            return null; 
        E x = null; 
        int c = -1; 
        final ReentrantLock takeLock = this.takeLock; 
        takeLock.lock(); 
        try { 
            if (count.get() > 0) { 
                x = extract(); 
                c = count.getAndDecrement(); 
                if (c > 1) 
                    notEmpty.signal(); 
            } 
        } finally { 
            takeLock.unlock(); 
        } 
        if (c == capacity) 
            signalNotFull(); 
        return x; 
    }

          Similar to the general synchronization thread, but added a signal, when learning Unix environment advanced programming, see the condition variable used for synchronization between threads, threads can be achieved in a competitive way to achieve synchronization!
The pollfunction implementation is similar!

public boolean offer(E e) { 
        if (e == null) throw new NullPointerException(); 
        final AtomicInteger count = this.count; 
        if (count.get() == capacity) 
            return false; 
        int c = -1; 
        final ReentrantLock putLock = this.putLock; 
        putLock.lock(); 
        try { 
            if (count.get() < capacity) { 
                insert(e); 
                c = count.getAndIncrement(); 
                if (c + 1 < capacity) 
                    notFull.signal(); 
            } 
        } finally { 
            putLock.unlock(); 
        } 
        if (c == 0) 
            signalNotEmpty(); 
        return c >= 0; 
    }


Related articles: