Redisson是使用Redis实现分布式任务执行和调度的Java开源项目,它是通过标准JDK的ExecutorService和ScheduledExecutorService API实现的,被提交的任务在Redisson节点服务器上运行,它们共同连接同样的Redis数据库。

Redisson节点
Redisson节点是标准的Java应用,唯一目标就是执行被提交的任务,每个Redisson节点可以看成是分布式环境中远程worker。
它也可以通过一个Redisson实例在主要应用中跨多个进程。
所有任务都是动态加载,这样你不必将任务放在classpath或者每次任务改变时重启。
任务
一个任务应该实现接口java.util.concurrent.Callable or java.lang.Runnable interface.
下面是Callable 接口实现案例:
public class CallableTask implements Callable<Long> { @RInject private RedissonClient redissonClient; private long anyParam; public CallableTask() { } public CallableTask(long anyParam) { this.anyParam = anyParam; } @Override public Long call() throws Exception { // ... } }
|
下面是Runnable 接口实现:
public class RunnableTask implements Runnable { @RInject private RedissonClient redissonClient; private long anyParam; public RunnableTask() { } public RunnableTask(long anyParam) { this.anyParam = anyParam; } @Override public void run() { // ... } }
|
任务能够通过构造器赋予参数,一个被提交的任务能够通过@RInject访问Redisson实例。
任务提交执行是通过提交给ExecutorService API,RExecutorService 已经实现了 java.util.concurrent.ExecutorService:
RExecutorService executorService = redisson.getExecutorService("myExecutor"); executorService.submit(new RunnableTask()); // or with parameter executorService.submit(new RunnableTask(41)); executorService.submit(new CallableTask()); // or with parameter executorService.submit(new CallableTask(53));
|
使用 java.util.concurrent.ScheduledExecutorService 递交给调度执行:
RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor"); executorService.schedule(new CallableTask(), 10, TimeUnit.MINUTES); // or executorService.schedule(new RunnableTask(), 5, TimeUnit.SECONDS); executorService.scheduleAtFixedRate(new RunnableTask(), 10, 25, TimeUnit.HOURS); // or executorService.scheduleWithFixedDelay(new RunnableTask(), 5, 10, TimeUnit.HOURS);
|
递交给CRON定时执行,兼容于 Quartz cron 格式:
RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor"); executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?")); // or executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5)); // or executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));
|
任务取消:
Future<?> f = executorService.schedule(...); // or Future<?> f = executorService.submit(...); f.cancel(true);
|
任务取消类似执行Java的线程中断。下面是将所有值聚合在大的Redis map中,这个过程会花费很长时间:
public class CallableTask implements Callable<Long> { @RInject private RedissonClient redissonClient; @Override public Long call() throws Exception { RMap<String, Integer> map = redissonClient.getMap("myMap"); Long result = 0; for (Integer value : map.values()) { // check if task has been canceled if (Thread.currentThread().isInterrupted()) { // task has been canceled return null; } result += value; } return result; } }
|
提交任务给ExecutorService是同步的,处理结果接受可以使用异步的,这是通过标准java.util.concurrent.Future实现。Redisson也提供一系列方法异步提交任务:RExecutorServiceAsync.*Async。
RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor"); RFuture<MyResultObject> future = executorService.submitAsync(new CallableTask()); // or RScheduledFuture<MyResultObject> future = executorService.scheduleAsync(new RunnableTask(), 5, TimeUnit.SECONDS); // or RScheduledFuture<MyResultObject> future = executorService.scheduleAtFixedRateAsync(new RunnableTask(), 10, 25, TimeUnit.HOURS); future.addListener(new FutureListener<MyResultObject>() { public void operationComplete(Future<MyResultObject> f) { // ... } }); // cancel task by id String taskId = future.getId(); // ... executorService.cancelScheduledTask(taskId);
|
Redisson构成了对另外一个分布式内存数据网格产品Hazelcast的挑战。
项目地址:
Redisson