假设有一个分布式应用程序,其中请求在到达时进行处理。我们还要考虑这些请求是耗时的任务,它是由用户对应用程序的操作触发的。由于任务需要花费时间,因此最好将这些请求排入队列,我们可以按顺序处理它们,并在任务完成后通知用户。这样我们的系统可以松散地耦合在一起,并提供更好的用户体验,因为不会阻塞用户。
有时,我们的业务规则可能会说我们应该根据某些类别对任务进行优先级排序。例如,在处理任何普通用户的请求之前,应先处理高级用户的请求。我们可以在运输在线产品等业务流程中看到这种行为。例如,在下图中,红色是高优先级任务,而蓝色是低优先级任务。即使队列中第二个位置有蓝色/低优先级任务,在处理任何低优先级任务之前,也应处理第四个位置的红色/高优先级任务。
让我们看看如何使用Redis实现这一目标!
应用:
- 和往常一样,为了简单起见,我假设
- 任务将在斐波那契数列中找到第N个位置!我将使用2 ^ N算法使过程非常缓慢。
- 将有一些类别(例如LOW,HIGH,URGENT)对任务进行优先级排序。显然,应该首先执行紧急任务!
- 我正在创建一个多模块的Spring Boot Maven项目。
public enum Priority implements Serializable { LOW, HIGH, URGENT } @Getter @AllArgsConstructor public class Task implements Comparable<Task>, Serializable {
private final Priority priority; private final int number;
@Override public int compareTo(Task o) { return o.getPriority().compareTo(this.getPriority()); }
}
|
任务执行器:
- task-executor是一个微服务,它将继续轮询Redis的任务。Redis在这里就像一个任务队列!
- 服务:
@Service public class FibService {
// intentional - 2^N public long calculate(int n){ if(n < 2) return n; return calculate(n - 1) + calculate(n - 2); }
}
|
@EnableScheduling @SpringBootApplication public class TaskExecutorApplication {
@Autowired private RedissonClient redissonClient;
public static void main(String[] args) { SpringApplication.run(TaskExecutorApplication.class, args); }
@Bean public RPriorityBlockingQueue<Task> getQueue(){ return this.redissonClient.getPriorityBlockingQueue("task"); }
}
|
@Service public class Executor {
@Autowired private RPriorityBlockingQueue<Task> priorityQueue;
@Autowired private FibService fibService;
@Scheduled(fixedDelay = 1000) public void runTask() throws InterruptedException { System.out.println("----------------------------------------"); Task task = this.priorityQueue.take(); System.out.println("Priority : " + task.getPriority()); System.out.println("Input : " + task.getNumber()); System.out.println("Result : " + this.fibService.calculate(task.getNumber())); }
}
|
- 以下配置中将Redisson客户端库用于Redis。
singleServerConfig: idleConnectionTimeout: 10000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 password: null subscriptionsPerConnection: 5 clientName: null address: "redis://master:6379" subscriptionConnectionMinimumIdleSize: 1 subscriptionConnectionPoolSize: 50 connectionMinimumIdleSize: 24 connectionPoolSize: 64 database: 0 dnsMonitoringInterval: 5000 threads: 2 nettyThreads: 2 codec: !<org.redisson.codec.FstCodec> {} transportMode: "NIO"
|
任务计划程序:
- 任务计划程序是一个Web应用程序,他们通过该应用程序将任务提交到队列/ Redis。
- 它公开了用于提交任务的REST API!
@RestController @RequestMapping("/task") public class TaskController {
@Autowired private RPriorityBlockingQueue<Task> priorityBlockingQueue;
@GetMapping("/{priority}/{number}") public void schedule(@PathVariable String priority, @PathVariable int number){ this.priorityBlockingQueue.add(this.getTask(priority, number)); }
private Task getTask(final String priority, final int number){ return new Task( Priority.valueOf(priority.toUpperCase()), number ); }
}
|
Docker配置:
version: '3' services: master: container_name: master image: redis ports: - 6379:6379 task-scheduler: build: ./task-scheduler image: vinsdocker/task-scheduler ports: - 8080:8080 task-executor: build: ./task-executor image: vinsdocker/task-executor redis-commander: container_name: redis-commander hostname: redis-commander image: rediscommander/redis-commander:latest restart: always environment: - REDIS_HOSTS=master:master ports: - 8081:8081
|
- 当我们发出docker-compose up命令时,将构建docker映像,并启动并运行基础架构!
- 我用输入46提交了100个低优先级任务。处理每个请求都花费大量时间。
- 当队列中已经有98个请求时,当我为输入33提交一个带有HIGH的请求时,该请求将立即执行,然后继续执行其余的低任务。
task-executor_1 | Priority : LOW task-executor_1 | Input : 46 task-executor_1 | Result : 1836311903 task-executor_1 | ---------------------------------------- task-executor_1 | Priority : LOW task-executor_1 | Input : 46 task-executor_1 | Result : 1836311903 task-executor_1 | ---------------------------------------- task-executor_1 | Priority : HIGH task-executor_1 | Input : 33 task-executor_1 | Result : 3524578 task-executor_1 | ---------------------------------------- task-executor_1 | Priority : LOW task-executor_1 | Input : 46 task-executor_1 | Result : 1836311903 task-executor_1 | ----------------------------------------
|