Java Multithreaded Programming Simulates Mass Data Synchronization

  • 2021-06-28 12:51:54
  • OfStack

background

Recently, I have been learning for Java multi-threaded for a period of time. I have always believed that learning is to be applied to the actual business needs.Otherwise, you will either not be able to understand it in depth, or you will simply apply the technology to a glamorous effect.

However, the author still does not think that he is skilled in multi-threading and can not easily be applied to production code.This fills in a business scenario that is likely to exist, according to the actual problems encountered in the normal work:

A company is known to manage 1,000 WeChat service numbers, each with a range of 1 w ~ 50 w fans.Assume that the company needs to update the fan data of all WeChat service numbers to the local database by calling WeChat API every day.

requirement analysis

For the analysis of this demand, there are the following main problems:

Single service number to get fans id, only 1w can be pulled in sequence at a time WeChat's API has a limit on concurrent requests from service providers

A single service number to get fans id, only 1w can be pulled in sequence at a time.This problem determines that a single public number cannot be assigned to multiple threads to execute on pull-out fan id.

WeChat's API has a limit on the number of concurrent requests from service providers.This is the easiest thing to ignore, and if we have too many requests at the same time, it can result in the interface being blocked.Here you can use semaphores to control the number of threads that execute simultaneously.

In order to complete data synchronization as soon as possible, the whole data synchronization can be divided into two parts: read data and write data.Reading data is obtained through API, which is slower when going to IO.Writing data to a database is faster.So conclude that more threads need to be allocated to read data and fewer to write data.

Design Essentials

First, we need to determine how many threads are opened (often using thread pools in production), and the number of threads depends on the performance of the server. Here we designate 40 read data threads (40 for 1000 public numbers) and 1 write data thread (40 for 40 threads).How many threads are opened depends on the capacity of the thread pool and the number of threads that can be allocated to this business.Specific figures need to be tested according to the actual situation, which is a little lower than the server threshold.Of course, the larger the configuration allows, the better)

Secondly, considering WeChat's restrictions on concurrent requests from API, we need to limit the number of threads that execute simultaneously. java.util.concurrent.Semaphore Control, here we limit to 20 (depending on the number of semaphore credentials that can be executed at the same time, API limit, server performance).

Then, we need to know when the data is read and written to control the program logic and terminate the program. Here we use java.util.concurrent.CountDownLatch Control.

Finally, we need a data structure to share the processed data across multiple threads, where the scenario of synchronizing data works well with queues, where we use thread-safe java.util.concurrent.ConcurrentLinkedQueue To proceed. (It is important to note that in actual development, the queue cannot grow indefinitely, which will consume memory very quickly. We need to control the length of the queue according to the actual situation.For example, you can control the length of the queue by controlling the ratio of the number of read threads to the number of write threads)

Analog code

Since this article focuses on the use of multithreading, the simulation code only reflects the method of multithreaded operation.A lot of comments have been added to the code to make it easier for everyone to read and understand.

JDK:1.8


import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * N个线程向队列添加数据
 * 1个线程消费队列数据
 */
public class QueueTest {
  private static List<String> data = Arrays.asList("a", "b", "c", "d", "e");

  private static final int OFFER_COUNT = 40; // 开启的线程数量

  private static Semaphore semaphore = new Semaphore(20); // 同1时间执行的线程数量(大多用于控制API调用次数或数据库查询连接数)

  public static void main(String[] args) throws InterruptedException {
    Queue<String> queue = new ConcurrentLinkedQueue<>(); // 处理队列,需要处理的数据,放置到此队列中

    CountDownLatch offerLatch = new CountDownLatch(OFFER_COUNT); // offer线程latch,每完成1个,latch减1,lacth的count为0时表示offer处理完毕
    CountDownLatch pollLatch = new CountDownLatch(1); // poll线程latch,latch的count为0时,表示poll处理完毕

    Runnable offerRunnable = () -> {
      try {
        semaphore.acquire(); // 信号量控制
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      try {
        for (String datum : data) {
          queue.offer(datum);
          TimeUnit.SECONDS.sleep(2); // 模拟取数据很慢的情况
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 在finally中执行latch.countDown()以及信号量释放,避免因异常导致没有正常释放
        offerLatch.countDown();
        semaphore.release();
      }
    };

    Runnable pollRunnable = () -> {
      int count = 0;
      try {
        while (offerLatch.getCount() > 0 || queue.size() > 0) { // 只要offer的latch未执行完,或queue仍旧有数据,则继续循环
          String poll = queue.poll();
          if (poll != null) {
            System.out.println(poll);
            count++;
          }
          // 无论是否poll到数据,均暂停1小段时间,可降低CPU消耗
          TimeUnit.MILLISECONDS.sleep(100);
        }
        System.out.println("total count:" + count);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 在finally中执行latch.countDown(),避免因异常导致没有正常释放
        pollLatch.countDown();
      }
    };

    // 启动线程(生产环境中建议使用线程池)
    new Thread(pollRunnable).start(); // 启动1个poll线程
    for (int i = 0; i < OFFER_COUNT; i++) {
      new Thread(offerRunnable).start();
    } // 模拟取数据很慢,需要开启40个线程处理

    // latch等待,会block主线程直到latch的count为0
    offerLatch.await();
    pollLatch.await();

    System.out.println("===the end===");
  }
}

This concludes the article.Above is the solution of one common requirement of the author.

Note: Multi-threaded programming is highly dependent on the actual environment and requirements, and each parameter needs to be adjusted according to the actual requirements.In practice, it is necessary to simulate the data situation of the production environment as much as possible for testing. CPU, memory, network IO, disk IO are observed for the number of concurrencies during server execution.And reduce concurrency appropriately to leave the server with a margin to handle other requests.


Related articles: