反应式Reactor与缓存Caffeine一起使用


假设有一个使用key键并返回值得Mono类型的函数:

Mono<String> get(String key) {
    Random random = ThreadLocalRandom.current();
    return Mono.fromSupplier(() -> key + random.nextInt());
}

而且您想通过key来缓存此Mono类型的检索,一个很好的方法是使用出色的Caffeine库。但它不支持Reactor类型,但是通过以下方式将Caffeine与Reactor一起使用,相当容易:

public static <T> Function<String, Mono<T>> ofMono(@NotNull Duration duration,
                                                    @NotNull Function<String, Mono<T>> fn) {
    final Cache<String, T> cache = Caffeine.newBuilder()
            .expireAfterWrite(duration)
            .recordStats()
            .build();
 
    return key -> {
        T result = cache.getIfPresent(key);
        if (result != null) {
            return Mono.just(result);
        } else {
            return fn.apply(key).doOnNext(n -> cache.put(key, n));
        }
    };
}

它实质上包装了一个返回Mono的函数,并使用Caffeine从通过闭包定义的缓存中获取值。如果缓存中存在值,则在Mono发出值时返回该值,然后从中设置缓存中的值。那么如何使用它呢..这是使用此实用工具的测试:
Function<String, Mono<String>> fn = (k) -> get(k);
Function<String, Mono<String>> wrappedFn = CachingUtils.ofMono(Duration.ofSeconds(10), fn);
StepVerifier.create(wrappedFn.apply("key1"))
        .assertNext(result1 -> {
            StepVerifier.create(wrappedFn.apply(
"key1"))
                    .assertNext(result2 -> {
                        assertThat(result2).isEqualTo(result1);
                    })
                    .verifyComplete();
            StepVerifier.create(wrappedFn.apply(
"key1"))
                    .assertNext(result2 -> {
                        assertThat(result2).isEqualTo(result1);
                    })
                    .verifyComplete();
 
            StepVerifier.create(wrappedFn.apply(
"key2"))
                    .assertNext(result2 -> {
                        assertThat(result2).isNotEqualTo(result1);
                    })
                    .verifyComplete();
        })
        .verifyComplete();


在这里,我使用Project Reactors StepVerifier实用程序对此包装函数进行测试,并确保确实返回缓存值以重复键。完整样本可在此找到源码

以使用以下内容创建异步缓存:https://gist.github.com/glandais/215a43ac68ee2dc930e8df6129e17953
异步代码:

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.checkerframework.checker.nullness.qual.NonNull;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

public abstract class AsyncCache<K, V> {

    AsyncLoadingCache<K, V> cache = Caffeine.newBuilder().buildAsync(this::load);

    private CompletableFuture<V> load(K key, @NonNull Executor executor) {
        return doLoad(key).toFuture();
    }

    protected abstract Mono<V> doLoad(K key);

    public Mono<V> get(K key) {
        return Mono.fromCompletionStage(cache.get(key));
    }

}