java USES delayedQueue to implement local delay queues
- 2020-07-21 08:16:01
- OfStack
1. Understand the DelayQueue
What is DelayQueue?
DelayQueue is an unbounded BlockingQueue for placing objects that implement the Delayed interface, where objects can only be removed from the queue when they expire. This queue is ordered, that is, queued header objects have the longest latency to expire.
Note: The null element cannot be placed in such a queue.
What can DelayQueue do?
There are usually 1 requirements in our business that look like this:
Taobao order business: if no payment is made within 310 minutes after placing an order, the order will be cancelled automatically. Order notification: Send a text message to the user after 60s successfully placed the order.
Then we can summarize one characteristic of this kind of business: the need to delay the work.
This is where our DelayQueue application requirements come in.
2. How to use DelayQueue to solve such problems
Declare 1 object for Delayed
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* <p>
* [ Task scheduling system ]
* <br>
* [ The tasks to be performed in the queue ]
* </p>
*
* @author wangguangdong
* @version 1.0
* @Date 2015 years 11 month 22 day 19:46:39
*/
public class Task<T extends Runnable> implements Delayed {
/**
* Due to the time
*/
private final long time;
/**
* The problem object
*/
private final T task;
private static final AtomicLong atomic = new AtomicLong(0);
private final long n;
public Task(long timeout, T t) {
this.time = System.nanoTime() + timeout;
this.task = t;
this.n = atomic.getAndIncrement();
}
/**
* Returns the remaining latency associated with this object in a given unit of time
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
// TODO Auto-generated method stub
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof Task) {
Task x = (Task) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (n < x.n)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
public T getTask() {
return this.task;
}
@Override
public int hashCode() {
return task.hashCode();
}
@Override
public boolean equals(Object object) {
if (object instanceof Task) {
return object.hashCode() == hashCode() ? true : false;
}
return false;
}
}
Implement another class to manage deferred tasks
import org.apache.log4j.Logger;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* <p>
* [ Task scheduling system ]
* <br>
* [ The daemon thread is constantly performing the detection work ]
* </p>
*
* @author wangguangdong
* @version 1.0
* @Date 2015 years 11 month 23 day 14:19:40
*/
public class TaskQueueDaemonThread {
private static final Logger LOG = Logger.getLogger(TaskQueueDaemonThread.class);
private TaskQueueDaemonThread() {
}
private static class LazyHolder {
private static TaskQueueDaemonThread taskQueueDaemonThread = new TaskQueueDaemonThread();
}
public static TaskQueueDaemonThread getInstance() {
return LazyHolder.taskQueueDaemonThread;
}
Executor executor = Executors.newFixedThreadPool(20);
/**
* Daemon thread
*/
private Thread daemonThread;
/**
* Initializes the daemon thread
*/
public void init() {
daemonThread = new Thread(() -> execute());
daemonThread.setDaemon(true);
daemonThread.setName("Task Queue Daemon Thread");
daemonThread.start();
}
private void execute() {
System.out.println("start:" + System.currentTimeMillis());
while (true) {
try {
// Value from the delay queue , If no object expires, the queue 1 Waiting for,
Task t1 = t.take();
if (t1 != null) {
// Modify the state of the problem
Runnable task = t1.getTask();
if (task == null) {
continue;
}
executor.execute(task);
LOG.info("[at task:" + task + "] [Time:" + System.currentTimeMillis() + "]");
}
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
/**
* create 1 A new one that was originally empty DelayQueue
*/
private DelayQueue<Task> t = new DelayQueue<>();
/**
* Add tasks,
* time Delay time
* task task
* The user sets the delay time for the problem
*/
public void put(long time, Runnable task) {
// Converted to ns
long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS);
// create 1 A task
Task k = new Task(nanoTime, task);
// Put the task on a deferred queue
t.put(k);
}
/**
* End of the order
* @param task
*/
public boolean endTask(Task<Runnable> task){
return t.remove(task);
}
}
Method of use
The init method is called when the container is initialized. A class that implements an runnable interface and calls the put method of TaskQueueDaemonThread into it. If you need to implement dynamic cancellations, the class that needs the task task redoes the hashcode method, preferably with a business limit hashcode conflict.conclusion