在本教程中,我们将学习如何使用 Spring Boot 将数据从 CSV 文件导入 Elasticsearch。当我们需要从旧系统或外部来源迁移数据,或者准备测试数据集时,从 CSV 文件导入数据是一种常见的用例。
在本文中,我们介绍了如何使用三种方法将 CSV 数据导入 Elasticsearch:手动 for 循环、Spring Batch 和 Logstash。每种方法都有其优点,适用于不同的用例。
使用 Docker 设置 Elasticsearch
要使用 Elasticsearch,我们将使用 Docker 在本地进行设置。请按照以下步骤启动 Elasticsearch 容器:
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.17.0
接下来,我们使用以下命令运行容器:
docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:8.17.0
让我们创建一个包含以下数据的示例 Excel 文件“ products.csv ”:
id,name,category,price,stock |
1、使用手动for循环处理 CSV 数据
第一种方法是使用手动for循环读取 CSV 文件中的记录并将其索引到 Elasticsearch 中。为了实现此方法,我们将使用Apache Commons CSV库来解析 CSV 文件,并使用Elasticsearch Rest High-Level Client与 Elasticsearch 搜索引擎集成。
让我们首先在pom.xml文件中添加所需的依赖项:
<dependency> |
添加依赖项后,我们需要设置 Elasticsearch 配置。让我们创建一个配置类来设置RestHighLevelClient:
@Configuration |
接下来,我们创建一个Product类来表示 CSV 数据:
@Document(indexName = "products") |
之后,我们将在 Spring Boot 应用程序中创建一个服务来处理 CSV 导入过程。在服务中,我们使用for循环遍历 CSV 文件中的每个记录:
@Autowired |
对于每条记录,我们构造一个IndexRequest对象来准备在 Elasticsearch 中索引的数据。然后使用 RestHighLevelClient 对数据进行索引, RestHighLevelClient是与 Elasticsearch 交互的主要客户端库。
我们将 CSV 文件中的数据导入 Elasticsearch 索引:
File csvFile = Paths.get("src", "test", "resources", "products.csv").toFile(); |
接下来,让我们查询第一个索引并根据预期值验证其内容:
IndexRequest firstRequest = captor.getAllValues().get(0); |
这种方法很简单,让我们可以完全控制整个过程。但是,它更适合较小的数据集,因为对于大型文件来说,它可能效率低下且耗时。
2、使用 Spring Batch 进行可扩展数据导入
Spring Batch是一个功能强大的 Java 批处理框架。它通过分块处理数据,非常适合处理大规模数据导入。
要使用 Spring Batch,我们需要将Spring Batch 依赖项添加到我们的pom.xml文件中:
<dependency> |
定义 Spring 配置文件
接下来,让我们创建一个配置类来定义批处理作业。在此配置中,我们使用@EnableBatchProcessing注释来激活允许我们创建和管理批处理作业的 Spring Batch 功能。
我们设置了一个FlatFileItemReader来读取 CSV 文件,并设置了一个ItemWriter来将数据写入 Elasticsearch。我们还在Spring 配置文件中创建并配置了一个RestHighLevelClient bean:
@Configuration |
定义阅读器
要从 CSV 文件读取数据,让我们创建一个方法reader()并定义一个FlatFileItemReader。我们将使用FlatFileItemReaderBuilder为读取器配置各种设置:
@Bean |
我们使用name()方法为读取器指定一个名称,这有助于在批处理作业中识别它。此外,resource()方法使用FileSystemResource指定 CSV 文件“ products.csv ”的位置。该文件应以逗号分隔,这通过delimited()方法指定。
names ()方法列出 CSV 文件中的列标题,并将它们映射到Product类的字段。最后,fieldSetMapper ()方法使用BeanWrapperFieldSetMapper将 CSV 文件的每一行映射到Product对象。
定义一个 Writer
接下来,让我们创建一个writer()方法来处理将处理后的数据写入 Elasticsearch。此方法定义一个ItemWriter ,它接收Product对象列表。它使用RestHighLevelClient与 Elasticsearch 交互:
@Bean |
对于列表中的每个产品,我们创建一个IndexRequest来指定 Elasticsearch 索引和文档结构。id ()方法使用Product对象的ID为每个文档分配一个唯一的 ID 。
source ()方法会将Product对象的字段(例如name、category、price和stock)映射到 Elasticsearch 可以存储的键值格式。配置请求后,我们使用client.index()方法将Product记录发送到 Elasticsearch,确保产品被索引以供搜索和检索。
定义 Spring Batch 作业
最后,让我们创建importJob()方法并使用 Spring Batch 的JobBuilder 和StepBuilder 来配置作业及其步骤:
@Bean
public Job importJob(JobRepository jobRepository, PlatformTransactionManager transactionManager,
RestHighLevelClient restHighLevelClient) {
return new JobBuilder("importJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.
.reader(reader())
.writer(writer(restHighLevelClient))
.build())
.build();
}
在此示例中,我们使用JobBuilder来配置作业。它以作业名称“ importJob ”和JobRepository作为参数。我们还配置了一个名为“ step1 ”的步骤,并指定该作业将一次处理10 条记录。transactionManager确保在处理块期间的数据一致性。
reader ()和writer()方法集成到步骤中,以处理从 CSV 到 Elasticsearch 的数据流。接下来,我们使用start( ) 方法将作业与步骤链接起来。此连接确保步骤作为作业的一部分执行。完成此配置后,我们可以使用 Spring 的JobLauncher运行作业。
运行批处理作业
让我们看一下使用JobLauncher运行 Spring Batch 作业的代码。我们将创建一个CommandLineRunner bean 以在应用程序启动时执行该作业:
@Configuration |
成功运行作业后,我们可以通过使用curl发出请求来测试结果:
curl -X GET "http://localhost:9200/products/_search" \ |
我们来看看预期的结果:
{ |
这种方法的设置比以前的方法更复杂,但为导入数据提供了可扩展性和灵活性。
3、使用Logstash导入CSV数据
Logstash是 Elastic stack 的一部分,专为数据处理和提取而设计。
我们可以使用 Docker 快速设置 Logstash。首先,让我们拉取并运行 Logstash 镜像:
docker pull docker.elastic.co/logstash/logstash:8.17.0
拉取镜像后,我们为 Logstash 创建一个配置文件“ csv-to-es.conf ”。此文件定义 Logstash 如何读取 CSV 文件并将数据发送到 Elasticsearch:
input { |
在此文件中,我们定义数据管道的输入、过滤和输出阶段。输入阶段指定要读取的 CSV 文件,而过滤阶段处理和转换数据。最后,输出阶段将处理后的数据发送到 Elasticsearch。
设置配置文件后,我们需要调用docker run命令来执行 Logstash 管道:
docker run --rm -v $(pwd)/csv-to-es.conf:/usr/share/logstash/pipeline/logstash.conf \ |
此命令将我们的配置和 CSV 文件挂载到 Logstash 容器,并运行数据管道以将数据导入 Elasticsearch。成功运行命令后,我们可以再次运行curl查询来验证结果。
Logstash 可以高效地将 CSV 数据导入 Elasticsearch,而无需自定义代码,使其成为处理大型数据集和设置自动化数据管道的热门选择。