public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
@Data
public class DelayedUser implements Delayed {
private String name;
private long avaibleTime;
public DelayedUser(String name, long delayTime){
this.name=name;
//avaibleTime = 当前时间+ delayTime
this.avaibleTime=delayTime + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
//判断avaibleTime是否大于当前系统时间,并将结果转换成MILLISECONDS
long diffTime= avaibleTime- System.currentTimeMillis();
return unit.convert(diffTime,TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
//compareTo用在DelayedUser的排序
return (int)(this.avaibleTime - ((DelayedUser) o).getAvaibleTime());
}
}
@Slf4j
@Data
@AllArgsConstructor
class DelayedQueueProducer implements Runnable {
private DelayQueue<DelayedUser> delayQueue;
private Integer messageCount;
private long delayedTime;
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
DelayedUser delayedUser = new DelayedUser(
new Random().nextInt(1000)+"", delayedTime);
log.info("put delayedUser {}",delayedUser);
delayQueue.put(delayedUser);
Thread.sleep(500);
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
}
}
@Slf4j
@Data
@AllArgsConstructor
public class DelayedQueueConsumer implements Runnable {
private DelayQueue<DelayedUser> delayQueue;
private int messageCount;
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
DelayedUser element = delayQueue.take();
log.info("take {}",element );
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
}
}
@Test
public void useDelayedQueue() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
DelayQueue<DelayedUser> queue = new DelayQueue<>();
int messageCount = 2;
long delayTime = 500;
DelayedQueueConsumer consumer = new DelayedQueueConsumer(
queue, messageCount);
DelayedQueueProducer producer = new DelayedQueueProducer(
queue, messageCount, delayTime);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
}
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=917, avaibleTime=1587623188389)
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=487, avaibleTime=1587623188899)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=487, avaibleTime=1587623188899)
欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) | 黑马程序员IT技术论坛 X3.2 |