Implementation of Java Parallel Processing

  • 2021-10-27 07:20:44
  • OfStack

Directory 1. Background
2. Knowledge 3. Parallel processing in Java
STEP 4 Expand
Implementation of Parallel Processing by Thread Pool
Using the fork/join framework
5. Reference:

1. Background

This article is a short article on parallel processing in Java.
More than 10 minutes to read the article I call short articles, suitable for fast reading.

2. Knowledge

Parallel computing (parallel computing) 1 generally refers to a computing mode in which many instructions can be performed simultaneously. On the premise of simultaneous computation, the computation process can be decomposed into small parts, and then solved in a concurrent way.

That is, it is decomposed into several processes:

1. Split a large task into multiple subtasks, and the subtasks can continue to be split.
2. Each subtask is operated and executed at the same time.
3. After the execution, there may be an "induction" task, such as summing and averaging.

To simplify the understanding of one point, it is to split it first- > Calculating at the same time- > Finally, "summarize"
Why do you want to be "parallel"? What are the advantages?

1, in order to get "save time", "fast". Suitable for large-scale computing scenarios. Theoretically, the execution speed of parallel processing on n may be times that of n on a single 1 processor.
2. The former computers are single-core, the modern computers Cpu are multi-core, and the servers are even multi-Cpu. Parallel computing can make full use of the performance of hardware.

3. Parallel processing in Java

Stream API (java. util. stream) added to JDK 8 introduces functional programming of generation environment into Java library, which makes it convenient for developers to write more effective and concise code.

Another value of steam is its creative support for parallel processing (parallel processing). Example:


final Collection< Task > tasks = Arrays.asList(
    new Task( Status.OPEN, 5 ),
    new Task( Status.OPEN, 13 ),
    new Task( Status.CLOSED, 8 ) 
);

//  Perform multiple tasks in parallel, and   Summation 
final double totalPoints = tasks
   .stream()
   .parallel()
   .map( task -> task.getPoints() ) // or map( Task::getPoints ) 
   .reduce( 0, Integer::sum );
 
System.out.println( "Total points (all tasks): " + totalPoints );

For the above tasks set, the above code calculates the sum of points for all tasks.
It uses the parallel method to process all task in parallel, and uses the reduce method to calculate the final result.

STEP 4 Expand

Implementation of Parallel Processing by Thread Pool

jdk 1.5 introduces concurrent contracting, which includes ThreadPoolExecutor, with the following code:


public class ExecutorServiceTest {
 
    public static final int THRESHOLD = 10_000;
    public static long[] numbers;
 
    public static void main(String[] args) throws Exception {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
        CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor);
        int taskSize = (int) (numbers.length / THRESHOLD);
        for (int i = 1; i <= taskSize; i++) {
            final int key = i;
            completionService.submit(new Callable<Long>() {
 
                @Override
                public Long call() throws Exception {
                    return sum((key - 1) * THRESHOLD, key * THRESHOLD);
                }
            });
        }
        long sumValue = 0;
        for (int i = 0; i < taskSize; i++) {
            sumValue += completionService.take().get();
        }
        //  All tasks have been completed , Close the thread pool 
        System.out.println("sumValue = " + sumValue);
        executor.shutdown();
    }
 
    private static long sum(int start, int end) {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

Using the fork/join framework

The purpose of branching/merging framework is to recursively split parallel tasks into smaller tasks, and then merge the results of each subtask to generate overall results; The relevant codes are as follows:


public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> {
    
    private static final long serialVersionUID = 1L;
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;
 
    public ForkJoinTest(long[] numbers) {
        this(numbers, 0, numbers.length);
    }
 
    private ForkJoinTest(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
        leftTask.fork();
        ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        //  Note: join Method will block, so it is necessary to execute it after the calculation of both subtasks has started join Method 
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }
 
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
 
    public static void main(String[] args) {
        System.out.println(forkJoinSum(10_000_000));
    }
 
    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinTest(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

The above code implements the recursive splitting task and puts it into the thread pool for execution.

5. Reference:

https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97


Related articles: