将聚合添加到 Elasticsearch 查询

Elasticsearch是一个搜索和分析引擎,适合需要灵活过滤的场景。有时,我们需要检索所请求的数据及其聚合信息。 在本教程中,我们将探讨如何做到这一点。

Elasticsearch 聚合搜索 让我们首先探索 Elasticsearch 的聚合功能。

一旦我们在 localhost 上运行了 Elasticsearch 实例,我们就创建一个名为store-items 的索引,其中包含一些文档:

POST http://localhost:9200/store-items/_doc
{
    "type": "Multimedia",
    "name": "PC Monitor",
    "price": 1000
}
...
POST http://localhost:9200/store-items/_doc
{
    "type": "Pets",
    "name": "Dog Toy",
    "price": 10
}
现在,让我们在不应用任何过滤器的情况下查询它:

GET http://localhost:9200/store-items/_search 现在让我们看一下响应:

{
...
    "hits": {
        "total": {
            "value": 5,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "store-items",
                "_type": "_doc",
                "_id": "J49VVI8B6ADL84Kpbm8A",
                "_score": 1.0,
                "_source": {
                    "_class": "com.baeldung.model.StoreItem",
                    "type": "Multimedia",
                    "name": "PC Monitor",
                    "price": 1000
                }
            },
            {
                "_index": "store-items",
                "_type": "_doc",
                "_id": "KI9VVI8B6ADL84Kpbm8A",
                "_score": 1.0,
                "_source": {
                    "type": "Pets",
                    "name": "Dog Toy",
                    "price": 10
                }
            },
 ...
        ]
    }
}
我们在回复中提供了一些与商店物品相关的文档。每个文档对应于特定类型的商店项目。

接下来,假设我们想知道每种类型有多少个项目。让我们将聚合部分添加到请求正文并再次搜索索引:

GET http://localhost:9200/store-items/_search
{
    "aggs": {
        "type_aggregation": {
            "terms": {
                "field": "type"
            }
        }
    }
}
我们添加了名为type_aggregation的聚合,它使用术语聚合。

正如我们在响应中看到的,有一个新的聚合部分,我们可以在其中找到有关每种类型的文档数量的信息:

{
...
    "aggregations": {
        "type_aggregation": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
                {
                    "key": "Multimedia",
                    "doc_count": 2
                },
                {
                    "key": "Pets",
                    "doc_count": 2
                },
                {
                    "key": "Home tech",
                    "doc_count": 1
                }
            ]
        }
    }
}

Spring Data Elasticsearch 聚合搜索 让我们使用Spring Data Elasticsearch实现上一节中的功能。让我们首先添加依赖项:

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-elasticsearch</artifactId>
</dependency>
下一步,我们提供一个 Elasticsearch 配置类:

@Configuration
@EnableElasticsearchRepositories(basePackages = "com.baeldung.spring.data.es.aggregation.repository")
@ComponentScan(basePackages = "com.baeldung.spring.data.es.aggregation")
public class ElasticSearchConfig {
    @Bean
    public RestClient elasticsearchRestClient() {
        return RestClient.builder(HttpHost.create("localhost:9200"))
          .setHttpClientConfigCallback(httpClientBuilder -> {
              httpClientBuilder.addInterceptorLast((HttpResponseInterceptor) (response, context) ->
                  response.addHeader("X-Elastic-Product", "Elasticsearch"));
              return httpClientBuilder;
            })
          .build();
    }
    @Bean
    public ElasticsearchClient elasticsearchClient(RestClient restClient) {
        return ElasticsearchClients.createImperative(restClient);
    }
    @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
    public ElasticsearchOperations elasticsearchOperations(
        ElasticsearchClient elasticsearchClient) {
        ElasticsearchTemplate template = new ElasticsearchTemplate(elasticsearchClient);
        template.setRefreshPolicy(null);
        return template;
    }
}
在这里,我们指定了一个低级 Elasticsearch REST 客户端及其实现ElasticsearchOperations接口的包装器 bean。现在,让我们创建一个StoreItem实体:

@Document(indexName = "store-items")
public class StoreItem {
    @Id
    private String id;
    @Field(type = Keyword)
    private String type;
    @Field(type = Keyword)
    private String name;
    @Field(type = Keyword)
    private Long price;
    //getters and setters
}
我们使用了与上一节相同的商店项目索引。由于我们无法使用 Spring Data 存储库的内置功能来检索聚合,因此我们需要创建一个存储库扩展。让我们创建一个扩展接口:

public interface StoreItemRepositoryExtension {
    SearchPage<StoreItem> findAllWithAggregations(Pageable pageable);
}
这里我们有findAllWithAggregations()方法,它使用Pageable接口实现并返回包含我们的项目的SearchPage。接下来,让我们创建该接口的实现:

@Component
public class StoreItemRepositoryExtensionImpl implements StoreItemRepositoryExtension {
    @Autowired
    private ElasticsearchOperations elasticsearchOperations;
    @Override
    public SearchPage<StoreItem> findAllWithAggregations(Pageable pageable) {
        Query query = NativeQuery.builder()
          .withAggregation("type_aggregation",
            Aggregation.of(b -> b.terms(t -> t.field("type"))))
          .build();
        SearchHits<StoreItem> response = elasticsearchOperations.search(query, StoreItem.class);
        return SearchHitSupport.searchPageFor(response, pageable);
    }
}
我们构建了本机查询,并合并了聚合部分。按照上一节的模式,我们使用type_aggregation作为聚合名称。然后,我们利用术语聚合类型来计算响应中每个指定字段的文档数。

最后,让我们创建一个 Spring Data 存储库,在其中扩展ElasticsearchRepository以支持通用 Spring Data 功能,并扩展StoreItemRepositoryExtension以合并我们的自定义方法实现:

@Repository
public interface StoreItemRepository extends ElasticsearchRepository<StoreItem, String>,
  StoreItemRepositoryExtension {
}
之后,让我们为聚合功能创建一个测试:

@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = ElasticSearchConfig.class)
public class ElasticSearchAggregationManualTest {
    private static final List<StoreItem> EXPECTED_ITEMS = List.of(
      new StoreItem("Multimedia", "PC Monitor", 1000L),
      new StoreItem("Multimedia", "Headphones", 200L), 
      new StoreItem("Home tech", "Barbecue Grill", 2000L), 
      new StoreItem("Pets", "Dog Toy", 10L),
      new StoreItem("Pets", "Cat shampoo", 5L));
...
    @BeforeEach
    public void before() {
        repository.saveAll(EXPECTED_ITEMS);
    }
...
} 
我们创建了一个包含五个商品的测试数据集,其中每种类型都有一些商店商品。在测试用例开始执行之前,我们将这些数据填充到 Elasticsearch 中。继续,让我们调用findAllWithAggregations()方法并看看它返回什么:

@Test
void givenFullTitle_whenRunMatchQuery_thenDocIsFound() {
    SearchHits<StoreItem> searchHits = repository.findAllWithAggregations(Pageable.ofSize(2))
      .getSearchHits();
    List<StoreItem> data = searchHits.getSearchHits()
      .stream()
      .map(SearchHit::getContent)
      .toList();
    Assertions.assertThat(data).containsAll(EXPECTED_ITEMS);
    Map<String, Long> aggregatedData = ((ElasticsearchAggregations) searchHits
      .getAggregations())
      .get("type_aggregation")
      .aggregation()
      .getAggregate()
      .sterms()
      .buckets()
      .array()
      .stream()
      .collect(Collectors.toMap(bucket -> bucket.key()
        .stringValue(), MultiBucketBase::docCount));
    Assertions.assertThat(aggregatedData).containsExactlyInAnyOrderEntriesOf(
      Map.of("Multimedia", 2L, "Home tech", 1L, "Pets", 2L));
}
正如我们在响应中看到的,我们已经检索了搜索命中,从中我们可以提取准确的查询结果。此外,我们还检索了聚合数据,其中包含搜索结果的所有预期聚合。