如何实现Spring boot应用并行调用?

在我们的一个财务项目中,我们遇到了性能问题,其中一些问题是由于多次连续调用造成的,实际上,我们做了很多单独的同步调用。

例如,我们进行了三个调用来获取一些信息:客户信息、账户信息和他的投资选择,在我们的这个例子中,当发生这些调用之后,我们需要使用调用结果,因此我们希望并行化这个三个调用以提高性能,这缩短时间,等于执行时间除以2,每个客户端只要600毫秒,客户的体验得到了改进。

1. 如何在java中进行并行调用?
2. 如何测试异步功能?

本文将帮助您在Spring Boot Java应用程序中实现并行调用并测试这些异步函数。

实现异步调用的先决条件
我们的要求是对第三方API进行两次或更多次独立调用,并且可以同时执行,比如你想要实现一个Spring MVC资源,其目标是筛选出官方语言为法语的欧洲国家列表,这样就需要两个独立调用:一个用于获取所有欧洲国家/地区,另一个用于获取官方语言为法语的所有国家/地区。

使用Swagger可以让我们的资源有一个更好的接口:


CountryResource.java
@Component
@Api(value = "CountryResource")
@RestController
public class CountryResource {

private final CountryClient countryClient;

public CountryResource(
CountryClient countryClient
) {
this.countryClient = countryClient;
}

@ApiOperation(httpMethod =
"GET", value = "Get all European and French speaking countries", response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 404, message =
"Countries not found"),
@ApiResponse(code = 500, message =
"The countries could not be fetched")
}) @GetMapping(
"")
public List<String> getAllEuropeanFrenchSpeakingCountries() {
List<Country> countriesByLanguage = countryClient.getCountriesByLanguage(
"fr");
List<Country> countriesByRegion = countryClient.getCountriesByRegion(
"europe");

List<String> europeanFrenchSpeakingCountries = new ArrayList<>(countriesByLanguage.stream().map(Country::getName).collect(Collectors.toList()));
europeanFrenchSpeakingCountries.retainAll(countriesByRegion.stream().map(Country::getName).collect(Collectors.toList()));

return europeanFrenchSpeakingCountries;
}
}

上面代码提供了API(https://restcountries.eu/rest-countries),下面的客户端能让我们发出HTTP请求以按语言和区域获取国家/地区:


@Service
public class CountryClient {
RestTemplate restTemplate = new RestTemplate();

public List<Country> getCountriesByLanguage(String language) {
String url = "https://restcountries.eu/rest/v2/lang/" + language + "?fields=name";
Country[] response = restTemplate.getForObject(url, Country[].class);

return Arrays.asList(response);
}

public List<Country> getCountriesByRegion(String region) {
String url =
"https://restcountries.eu/rest/v2/region/" + region + "?fields=name";
Country[] response = restTemplate.getForObject(url, Country[].class);

return Arrays.asList(response);
}
}

下面国家对象的代码:

public class Country {
private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

让我们假设调用所有法语国家时间都是2秒长,并且要求调用所有欧洲国家的时间是3秒,如果使用同步调用,需要等待5秒才能获得结果,所以需要并行化这两个独立的调用。为此,必须执行以下步骤:

1. 添加@Async注释到要实现并行化的函数getCountriesByLanguage和getCountriesByRegion上

2. 更改函数的返回类型 CompletableFuture<List<Country>>

3. 更改getCountriesByLanguage和getCountriesByRegion的返回值为: CompletableFuture.completedFuture(Arrays.asList(response)

4. 更改返回getCountriesByLanguage和Region by的类型 CompletableFuture<List<Country>>

5. 在资源中使用completableFuture时添加try-catch

6. 添加a.get()以使用国家/地区列表的元素

7.在getAllEuropeanFrenchSpeakingCountries函数方法添加throws Throwable

8. 添加AsyncConfiguration

try-catch不是必需的,但放上它比较好。回顾一下,您的新代码应该如下所示


@Service
public class CountryClient {
RestTemplate restTemplate = new RestTemplate();

@Async
public CompletableFuture<List<Country>> getCountriesByLanguage(String language) {
String url = "https://restcountries.eu/rest/v2/lang/" + language + "?fields=name";
Country[] response = restTemplate.getForObject(url, Country[].class);

return CompletableFuture.completedFuture(Arrays.asList(response));
}

@Async
public CompletableFuture<List<Country>> getCountriesByRegion(String region) {
String url =
"https://restcountries.eu/rest/v2/region/" + region + "?fields=name";
Country[] response = restTemplate.getForObject(url, Country[].class);

return CompletableFuture.completedFuture(Arrays.asList(response));
}
}

@Component
@Api(value =
"CountryResource")
@RestController
public class CountryResource {

private final CountryClient countryClient;

public CountryResource(
CountryClient countryClient
) {
this.countryClient = countryClient;
}

@ApiOperation(httpMethod =
"GET", value = "Get all European and French speaking countries", response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 404, message =
"Countries not found"),
@ApiResponse(code = 500, message =
"The countries could not be fetched")
})
@GetMapping(
"")
public List<String> getAllEuropeanFrenchSpeakingCountries() throws Throwable {
CompletableFuture<List<Country>> countriesByLanguageFuture = countryClient.getCountriesByLanguage(
"fr");
CompletableFuture<List<Country>> countriesByRegionFuture = countryClient.getCountriesByRegion(
"europe");
List<String> europeanFrenchSpeakingCountries;
try {
europeanFrenchSpeakingCountries = new ArrayList<>(countriesByLanguageFuture.get().stream().map(Country::getName).collect(Collectors.toList()));
europeanFrenchSpeakingCountries.retainAll(countriesByRegionFuture.get().stream().map(Country::getName).collect(Collectors.toList()));
} catch (Throwable e) {
throw e.getCause();
}

return europeanFrenchSpeakingCountries;
}
}

下面配置是激活使用异步函数和@Async注释,如果你想了解更多细节,比如如何增加线程池大小,你可以在这里找到一些(https://docs.spring.io/spring/docs/3.1.x/javadoc-api/org/springframework/scheduling/annotation/ EnableAsync.html)


@Configuration
@EnableAsync
public class AsyncConfiguration {
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
return executor;
}
}

单元测试这些功能?
在这里有两个并行调用,下面创建一些单元测试。

首先,为了测试客户端,就好像我们的函数调用不是异步的,在这个例子中,我们使用Mockito来模拟客户端并获得响应,我们需要在测试值之前使用.get()。


public class CountryClientTest {
private CountryClient countryClient;

@Before
public void setUp() {
countryClient = Mockito.spy(new CountryClient());
}

@Test
public void getCountryByLanguage() throws ExecutionException, InterruptedException {
List<Country> countriesByLanguage = countryClient.getCountriesByLanguage("fr").get();
assertNotNull(countriesByLanguage);
assertEquals(
"Belgium", countriesByLanguage.get(0).getName());
}

@Test
public void getCountryByRegion() throws ExecutionException, InterruptedException {
List<Country> countriesByRegion = countryClient.getCountriesByRegion(
"europe").get();
assertNotNull(countriesByRegion);
assertEquals(
"Åland Islands", countriesByRegion.get(0).getName());
assertEquals(
"Albania", countriesByRegion.get(1).getName());
}
}

为了测试我们的资源,我们可以mock客户端响应,讲法语国家返回法国和比利时,欧洲国家返回法国和德国,这两个结果筛选出最好结果应该是法国。我们需要返回一个CompletableFuture,我们就像函数不是asyn,然后返回一样CompletableFure.completedFuture。


public class CountryResourceTest {
@InjectMocks
private CountryResource countryResource;

private CountryClient countryClient;

@Before
public void setup() {
this.countryClient = mock(CountryClient.class);
this.countryResource = new CountryResource(countryClient);
}

@Test
public void getAllEuropeanFrenchSpeakingCountries() throws Throwable {
//GIVEN
Country country = new Country();
country.setName(
"France");
Country country2 = new Country();
country2.setName(
"Belgium");
Country country3 = new Country();
country3.setName(
"Germany");
List<Country> countriesByLanguage = new ArrayList<>();
countriesByLanguage.add(country);
countriesByLanguage.add(country2);
when(countryClient.getCountriesByLanguage(anyString())).thenReturn(CompletableFuture.completedFuture(countriesByLanguage));
List<Country> countriesByRegion = new ArrayList<>();
countriesByRegion.add(country);
countriesByRegion.add(country3);
when(countryClient.getCountriesByRegion(anyString())).thenReturn(CompletableFuture.completedFuture(countriesByRegion));

List<String> expectedResult = new ArrayList<>();
expectedResult.add(
"France");

//WHEN
List<String> result = countryResource.getAllEuropeanFrenchSpeakingCountries();

//THEN
assertEquals(expectedResult, result);
}
}

就这样,我们将了两个单独同步调用变成了两个异步调用。

Sipios
[该贴被banq于2018-08-31 17:34修改过]