public interface XcTaskRepository extends JpaRepository<XcTask, String> {
//取出指定时间之前的记录
Page<XcTask> findByUpdateTimeBefore(Pageable pageable,Date updateTime);
}
@Service
public class TaskService {
@Autowired
XcTaskRepository xcTaskRepository;
@Autowired
RabbitTemplate rabbitTemplate;
//取出前n条任务,取出指定时间之前处理的任务
public List<XcTask> findTaskList(Date updateTime,int n){
//设置分页参数,取出前n 条记录
Pageable pageable = new PageRequest(0, n);
Page<XcTask> xcTasks = xcTaskRepository.findByUpdateTimeBefore(pageable,updateTime);
return xcTasks.getContent();
}
}
package com.xuecheng.order.mq;
@Component
public class ChooseCourseTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ChooseCourseTask.class);
@Autowired
TaskService taskService;
//每隔1分钟扫描消息表,向mq发送消息
@Scheduled(fixedDelay = 60000)
public void sendChoosecourseTask(){
//取出当前时间1分钟之前的时间
Calendar calendar =new GregorianCalendar();
calendar.setTime(new Date());
calendar.add(GregorianCalendar.MINUTE,‐1);
Date time = calendar.getTime();
List<XcTask> taskList = taskService.findTaskList(time, 1000);
}
}
添加更新任务方法:
//更新任务处理时间
@Modifying
@Query("update XcTask t set t.updateTime = :updateTime where t.id = :id ")
public int updateTaskTime(@Param(value = "id") String id,@Param(value = "updateTime")Date
updateTime);
添加发送消息方法:
/**
* //发送消息
* @param xcTask 任务对象
* @param ex 交换机id
* @param routingKey
*/
@Transactional
public void publish(XcTask xcTask,String ex,String routingKey){
//查询任务
Optional<XcTask> taskOptional = xcTaskRepository.findById(taskId);
if(taskOptional.isPresent()){
XcTask xcTask = taskOptional.get();
//String exchange, String routingKey, Object object
rabbitTemplate.convertAndSend(ex,routingKey,xcTask);
//更新任务时间为当前时间
xcTask.setUpdateTime(new Date());
xcTaskRepository.save(xcTask);
}
}
package com.xuecheng.order.mq;
@Component
public class ChooseCourseTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ChooseCourseTask.class);
@Autowired
TaskService taskService;
//每隔1分钟扫描消息表,向mq发送消息
@Scheduled(fixedDelay = 60000)
public void sendChoosecourseTask(){
//取出当前时间1分钟之前的时间
Calendar calendar =new GregorianCalendar();
calendar.setTime(new Date());
calendar.add(GregorianCalendar.MINUTE,‐1);
Date time = calendar.getTime();
List<XcTask> taskList = taskService.findTaskList(time, 1000);
//遍历任务列表
for(XcTask xcTask:taskList){
//发送选课消息
taskService.publish(xcTask, xcTask.getMqExchange(),xcTask.getMqRoutingkey());
LOGGER.info("send choose course task id:{}",taskId);
}
}
}
欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) | 黑马程序员IT技术论坛 X3.2 |