使用Spring Boot + Redis实现优先级队列的长任务模式 - vinsguru

20-10-24 banq

假设有一个分布式应用程序,其中请求在到达时进行处理。我们还要考虑这些请求是耗时的任务,它是由用户对应用程序的操作触发的。由于任务需要花费时间,因此最好将这些请求排入队列,我们​​可以按顺序处理它们,并在任务完成后通知用户。这样我们的系统可以松散地耦合在一起,并提供更好的用户体验,因为不会阻塞用户。

有时,我们的业务规则可能会说我们应该根据某些类别对任务进行优先级排序。例如,在处理任何普通用户的请求之前,应先处理高级用户的请求。我们可以在运输在线产品等业务流程中看到这种行为。例如,在下图中,红色是高优先级任务,而蓝色是低优先级任务。即使队列中第二个位置有蓝色/低优先级任务,在处理任何低优先级任务之前,也应处理第四个位置的红色/高优先级任务。

让我们看看如何使用Redis实现这一目标!

应用:

  • 和往常一样,为了简单起见,我假设

    • 任务将在斐波那契数列中找到第N个位置!我将使用2 ^ N算法使过程非常缓慢。
    • 将有一些类别(例如LOW,HIGH,URGENT)对任务进行优先级排序。显然,应该首先执行紧急任务!
  • 我正在创建一个多模块的Spring Boot Maven项目。

  • 模型模块包含代表任务和优先级的类。

public enum Priority implements Serializable {
    LOW,
    HIGH,
    URGENT
}
@Getter
@AllArgsConstructor
public class Task implements Comparable<Task>, Serializable {

    private final Priority priority;
    private final int number;

    @Override
    public int compareTo(Task o) {
        return o.getPriority().compareTo(this.getPriority());
    }

}

任务执行器:

  • task-executor是一个微服务,它将继续轮询Redis的任务。Redis在这里就像一个任务队列!
  • 服务:

@Service
public class FibService {

    // intentional - 2^N
    public long calculate(int n){
        if(n < 2)
            return n;
        return calculate(n - 1) + calculate(n - 2);
    }

}

  • 队列Bean

@EnableScheduling
@SpringBootApplication
public class TaskExecutorApplication {

    @Autowired
    private RedissonClient redissonClient;

    public static void main(String[] args) {
        SpringApplication.run(TaskExecutorApplication.class, args);
    }

    @Bean
    public RPriorityBlockingQueue<Task> getQueue(){
        return this.redissonClient.getPriorityBlockingQueue("task");
    }

}

  • 执行者

@Service
public class Executor {

    @Autowired
    private RPriorityBlockingQueue<Task> priorityQueue;

    @Autowired
    private FibService fibService;

    @Scheduled(fixedDelay = 1000)
    public void runTask() throws InterruptedException {
        System.out.println("----------------------------------------");
        Task task = this.priorityQueue.take();
        System.out.println("Priority : " + task.getPriority());
        System.out.println("Input    : " + task.getNumber());
        System.out.println("Result   : " + this.fibService.calculate(task.getNumber()));
    }

}

  • 以下配置中将Redisson客户端库用于Redis。

singleServerConfig:
  idleConnectionTimeout: 10000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  password: null
  subscriptionsPerConnection: 5
  clientName: null
  address: "redis://master:6379"
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 24
  connectionPoolSize: 64
  database: 0
  dnsMonitoringInterval: 5000
threads: 2
nettyThreads: 2
codec: !<org.redisson.codec.FstCodec> {}
transportMode: "NIO"

任务计划程序:

  • 任务计划程序是一个Web应用程序,他们通过该应用程序将任务提交到队列/ Redis。
  • 它公开了用于提交任务的REST API!

@RestController
@RequestMapping("/task")
public class TaskController {

    @Autowired
    private RPriorityBlockingQueue<Task> priorityBlockingQueue;

    @GetMapping("/{priority}/{number}")
    public void schedule(@PathVariable String priority, @PathVariable int number){
        this.priorityBlockingQueue.add(this.getTask(priority, number));
    }

    private Task getTask(final String priority, final int number){
        return new Task(
                Priority.valueOf(priority.toUpperCase()),
                number
        );
    }

}

Docker配置:

version: '3'
services:
  master:
    container_name: master
    image: redis
    ports:
      - 6379:6379
  task-scheduler:
    build: ./task-scheduler
    image: vinsdocker/task-scheduler
    ports:
    - 8080:8080
  task-executor:
    build: ./task-executor
    image: vinsdocker/task-executor
  redis-commander:
    container_name: redis-commander
    hostname: redis-commander
    image: rediscommander/redis-commander:latest
    restart: always
    environment:
      - REDIS_HOSTS=master:master
    ports:
      - 8081:8081

  • 当我们发出docker-compose up命令时,将构建docker映像,并启动并运行基础架构!
  • 我用输入46提交了100个低优先级任务。处理每个请求都花费大量时间。
  • 当队列中已经有98个请求时,当我为输入33提交一个带有HIGH的请求时,该请求将立即执行,然后继续执行其余的低任务。

task-executor_1    | Priority : LOW
task-executor_1    | Input    : 46
task-executor_1    | Result   : 1836311903
task-executor_1    | ----------------------------------------
task-executor_1    | Priority : LOW
task-executor_1    | Input    : 46
task-executor_1    | Result   : 1836311903
task-executor_1    | ----------------------------------------
task-executor_1    | Priority : HIGH
task-executor_1    | Input    : 33
task-executor_1    | Result   : 3524578
task-executor_1    | ----------------------------------------
task-executor_1    | Priority : LOW
task-executor_1    | Input    : 46
task-executor_1    | Result   : 1836311903
task-executor_1    | ----------------------------------------

 

                   

1
猜你喜欢