在编写循环内重复调用数据库或 API 的应用程序时,经常会遇到性能问题。这是一种常见的模式,但效率非常低;每次调用都会增加延迟,并给系统带来不必要的负载。
为了解决这个问题,一个简单而优雅的方法是使用 DataLoader ,它可以批量处理和缓存我们的请求。
在本教程中,我们将通过一个完整的工作示例介绍 DataLoader 是什么、它如何帮助优化数据获取任务,以及如何在 Java 中使用它。
什么是DataLoader?
DataLoader 是 Facebook 开发的一款实用程序,用于优化 GraphQL API 调用。它无需多次调用数据库,而是将多个单独的数据请求合并为一个请求。
虽然 DataLoader 最初是为 GraphQL 创建的,但它可以与 REST API、微服务中的服务到服务调用或任何可以通过批处理优化重复数据提取的情况一起使用。
要将其嵌入到我们的项目中,我们需要包含java-dataloader库。对于 Maven,我们将依赖项添加到pom.xml中:
com.graphql-java
java-dataloader
3.2.0
如前所述,它完全通用,适用于任何需要多次重复调用后端的 Java 应用程序。它只需修改数据检索方式,即可在不改变业务逻辑的情况下提高性能。
我们已经意识到了问题所在。如果没有 DataLoader,循环逐个加载用户数据会导致多次单独的数据库访问,从而牺牲性能:
userRepository.findById(1);
userRepository.findById(2);
userRepository.findById(3);
每一行都会触发自己的 SQL 查询,从而产生三次数据库调用。
使用 DataLoader,我们可以用load()和dispatch()调用替换它们:
dataLoader.load(1);
dataLoader.load(2);
dataLoader.load(3);
dataLoader.dispatch();
dataLoader的dispatch()方法将我们的load()调用合并为单个数据库查询。接下来,我们将了解设置和使用 DataLoader 的具体细节。使用DataLoader进行批处理
现在我们了解了 DataLoader 存在的原因,让我们通过构建一个从真实数据库加载用户的 Java 应用程序来看一下它的实际作用。我们将使用H2(一个内存 SQL 数据库),并使用 JPA 建立一个简单的 Spring Boot 项目。
创建一个简单的用户实体类
首先,我们需要一个简单、轻量级的User类来表示我们的数据。在本例中,我们加载用户的 ID 和名称:
@Entity
@Table(name = "users")
@Getter
@AllArgsConstructor
public class User {
private final String id;
private final String name;
}
我们在这里使用了Lombok。@Getter注解会自动为 ID 和 name 生成 getter 方法,而@AllArgsConstructor则会创建一个接受这两个字段的构造函数。由于这两个字段都被标记为 final,因此它们只能在对象创建期间设置一次。这确保了不变性,使我们的数据模型线程安全,非常适合并发或异步场景,例如使用 DataLoader 进行批量加载。
创建UserRepository类
UserRepository是一个简单的 JPA 存储库,它扩展了JpaRepository
UserRepository负责直接数据库访问,而UserService负责管理业务逻辑和异步执行。通过使用 Spring Data JPA 内置的findAllById()方法,我们可以高效地批量处理数据库查询。
public interface UserRepository extends JpaRepository
}
创建UserService类
现在,我们需要一种方法来一次性获取所有用户,所以我们创建了一个UserService类。它接受一个用户 ID 列表,并使用 Java 的CompletableFuture异步获取相应的用户记录:
@Service
public class UserService {
private final UserRepository userRepository;
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
public CompletableFuture> getUsersByIds(List ids) {
return CompletableFuture.supplyAsync(() -> userRepository.findAllById(ids));
}
}
getUsersByIds ()方法确保数据库调用不会阻塞主线程。它还使服务能够进行非阻塞批量操作,这在使用 DataLoader 时至关重要。
在底层,该服务将实际的数据库交互委托给UserRepository。
DataLoader配置
最后,我们可以使用我们的服务来加载数据了。这个配置类将UserService包装在BatchLoader中,这使得DataLoader能够 高效地批量处理和解析用户请求。
现在,让我们更详细地了解一下BatchLoader。BatchLoader是 DataLoader 库中的核心接口。它定义了在一次调用中获取多个键的数据。我们的函数接收一个键列表,并必须返回一个CompletableFuture>,其中的项与传入键的顺序相同;这对于 DataLoader 正确匹配结果至关重要:
UserDataLoader类接受UserService作为依赖项。它公开createUserLoader()方法 ,该方法构建并返回一个DataLoader对象。然后,我们定义一个BatchLoader
@Component
public class UserDataLoader {
private final UserService userService;
public UserDataLoader(UserService userService) {
this.userService = userService;
}
public DataLoader
BatchLoader
return userService.getUsersByIds(ids)
.thenApply(users -> {
Map
.collect(Collectors.toMap(User::getId, user -> user));
return ids.stream().map(userMap::get).collect(Collectors.toList());
});
};
return DataLoaderFactory.newDataLoader(userBatchLoader);
}
}
每当DataLoader收到多个load(userId)调用时,它都会收集这些调用,等待当前进程完成,然后在此批处理加载器上使用 ID列表调用load()。我们利用UserService.getUsersByIds()和完整的ID列表一次性批量处理所有调用。然后,我们将User对象列表按 ID 转换为Map,以便能够按照传入 ID 列表的顺序返回用户列表。
最后,我们将这个批量加载器传递给DataLoaderFactory.newDataLoader(),这将返回一个功能齐全的DataLoader
其他类型的批量加载器
除了标准的BatchLoader
简而言之,BatchLoader是支持 DataLoader 批处理魔法的引擎,而选择正确的变体取决于我们希望如何构建批处理函数输出。
证明数据库只会被调用一次
现在,有了使用DataLoader执行批处理的所有代码,我们最终可以使用它来加载一批用户。
我们将把 UserService作为侦察 bean 注入:
@SpyBean
private UserService userService;
这使我们能够连接到实时数据库,同时还能够对我们服务的函数调用进行断言。
接下来,让我们将三个User实体预加载到数据库中并创建我们的DataLoader:
@BeforeEach
void setUp() {
userRepository.deleteAll();
User user1 = new User("101", "User_101");
User user2 = new User("102", "User_102");
User user3 = new User("103", "User_103");
userRepository.saveAll(Arrays.asList(user1, user2, user3));
userDataLoader = new DataLoader<>(userService::getUsersByIds);
DataLoaderRegistry registry = new DataLoaderRegistry();
registry.register("userDataLoader", userDataLoader);
}
最后我们实现DataLoader逻辑:
@Test
void whenLoadingUsers_thenBatchLoaderIsInvokedAndResultsReturned() {
CompletableFuture userFuture1 = userDataLoader.load("101");
CompletableFuture userFuture2 = userDataLoader.load("102");
CompletableFuture userFuture3 = userDataLoader.load("103");
userDataLoader.dispatchAndJoin();
verify(userService, times(1)).getUsersByIds(anyList());
assertThat(userFuture1.join().getName()).isEqualTo("User_101");
assertThat(userFuture2.join().getName()).isEqualTo("User_102");
assertThat(userFuture3.join().getName()).isEqualTo("User_103");
}
我们将三个load()调用排入队列,这些调用不会立即执行。一旦我们调用dispatchAndJoin(),DataLoader就会将这些调用批量处理,并使用我们所有的 ID 触发底层的批处理函数。
然后我们使用future.get()来检索每个用户,它会阻塞直到结果准备好。
为了验证DataLoader的批处理能力,我们结合使用verify()和times()来断言getUsersByIds()仅被调用一次。这证明即使我们请求了三个不同的用户,服务方法也只被调用了一次,这意味着所有请求都被一起批处理了。
最后,我们断言返回的用户与预加载的实体匹配,并按照预期的顺序接收。