java USES delayedQueue to implement local delay queues

  • 2020-07-21 08:16:01
  • OfStack

1. Understand the DelayQueue

What is DelayQueue?

DelayQueue is an unbounded BlockingQueue for placing objects that implement the Delayed interface, where objects can only be removed from the queue when they expire. This queue is ordered, that is, queued header objects have the longest latency to expire.

Note: The null element cannot be placed in such a queue.

What can DelayQueue do?

There are usually 1 requirements in our business that look like this:

Taobao order business: if no payment is made within 310 minutes after placing an order, the order will be cancelled automatically. Order notification: Send a text message to the user after 60s successfully placed the order.

Then we can summarize one characteristic of this kind of business: the need to delay the work.
This is where our DelayQueue application requirements come in.

2. How to use DelayQueue to solve such problems

Declare 1 object for Delayed


import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * <p>
 * [ Task scheduling system ]
 * <br>
 * [ The tasks to be performed in the queue ]
 * </p>
 *
 * @author wangguangdong
 * @version 1.0
 * @Date 2015 years 11 month 22 day 19:46:39
 */
public class Task<T extends Runnable> implements Delayed {
 /**
  *  Due to the time 
  */
 private final long time;

 /**
  *  The problem object 
  */
 private final T task;
 private static final AtomicLong atomic = new AtomicLong(0);

 private final long n;

 public Task(long timeout, T t) {
  this.time = System.nanoTime() + timeout;
  this.task = t;
  this.n = atomic.getAndIncrement();
 }

 /**
  *  Returns the remaining latency associated with this object in a given unit of time 
  */
 @Override
 public long getDelay(TimeUnit unit) {
  return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
 }

 @Override
 public int compareTo(Delayed other) {
  // TODO Auto-generated method stub
  if (other == this) // compare zero ONLY if same object
   return 0;
  if (other instanceof Task) {
   Task x = (Task) other;
   long diff = time - x.time;
   if (diff < 0)
    return -1;
   else if (diff > 0)
    return 1;
   else if (n < x.n)
    return -1;
   else
    return 1;
  }
  long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
  return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
 }

 public T getTask() {
  return this.task;
 }

 @Override
 public int hashCode() {
  return task.hashCode();
 }

 @Override
 public boolean equals(Object object) {
  if (object instanceof Task) {
   return object.hashCode() == hashCode() ? true : false;
  }
  return false;
 }


}

Implement another class to manage deferred tasks


import org.apache.log4j.Logger;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * [ Task scheduling system ]
 * <br>
 * [ The daemon thread is constantly performing the detection work ]
 * </p>
 *
 * @author wangguangdong
 * @version 1.0
 * @Date 2015 years 11 month 23 day 14:19:40
 */
public class TaskQueueDaemonThread {

 private static final Logger LOG = Logger.getLogger(TaskQueueDaemonThread.class);

 private TaskQueueDaemonThread() {
 }

 private static class LazyHolder {
  private static TaskQueueDaemonThread taskQueueDaemonThread = new TaskQueueDaemonThread();
 }

 public static TaskQueueDaemonThread getInstance() {
  return LazyHolder.taskQueueDaemonThread;
 }

 Executor executor = Executors.newFixedThreadPool(20);
 /**
  *  Daemon thread 
  */
 private Thread daemonThread;

 /**
  *  Initializes the daemon thread 
  */
 public void init() {
  daemonThread = new Thread(() -> execute());
  daemonThread.setDaemon(true);
  daemonThread.setName("Task Queue Daemon Thread");
  daemonThread.start();
 }

 private void execute() {
  System.out.println("start:" + System.currentTimeMillis());
  while (true) {
   try {
    // Value from the delay queue , If no object expires, the queue 1 Waiting for, 
    Task t1 = t.take();
    if (t1 != null) {
     // Modify the state of the problem 
     Runnable task = t1.getTask();
     if (task == null) {
      continue;
     }
     executor.execute(task);
     LOG.info("[at task:" + task + "] [Time:" + System.currentTimeMillis() + "]");
    }
   } catch (Exception e) {
    e.printStackTrace();
    break;
   }
  }
 }

 /**
  *  create 1 A new one that was originally empty  DelayQueue
  */
 private DelayQueue<Task> t = new DelayQueue<>();

 /**
  *  Add tasks, 
  * time  Delay time 
  * task  task 
  *  The user sets the delay time for the problem 
  */
 public void put(long time, Runnable task) {
  // Converted to ns
  long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS);
  // create 1 A task 
  Task k = new Task(nanoTime, task);
  // Put the task on a deferred queue 
  t.put(k);
 }

 /**
  *  End of the order 
  * @param task
  */
 public boolean endTask(Task<Runnable> task){
  return t.remove(task);
 }
}

Method of use

The init method is called when the container is initialized. A class that implements an runnable interface and calls the put method of TaskQueueDaemonThread into it. If you need to implement dynamic cancellations, the class that needs the task task redoes the hashcode method, preferably with a business limit hashcode conflict.

conclusion


Related articles: