Spring Boot中用Elasticsearch导入CSV

在本教程中,我们将学习如何使用 Spring Boot 将数据从 CSV 文件导入 Elasticsearch。当我们需要从旧系统或外部来源迁移数据,或者准备测试数据集时,从 CSV 文件导入数据是一种常见的用例。

在本文中,我们介绍了如何使用三种方法将 CSV 数据导入 Elasticsearch:手动 for 循环、Spring BatchLogstash。每种方法都有其优点,适用于不同的用例。


使用 Docker 设置 Elasticsearch
要使用 Elasticsearch,我们将使用 Docker 在本地进行设置。请按照以下步骤启动 Elasticsearch 容器:

docker pull docker.elastic.co/elasticsearch/elasticsearch:8.17.0

接下来,我们使用以下命令运行容器:

docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:8.17.0

让我们创建一个包含以下数据的示例 Excel 文件“ products.csv ”:

id,name,category,price,stock
1,Microwave,Appliances,705.77,136
2,Vacuum Cleaner,Appliances,1397.23,92
...

1、使用手动for循环处理 CSV 数据
第一种方法是使用手动for循环读取 CSV 文件中的记录并将其索引到 Elasticsearch 中。为了实现此方法,我们将使用Apache Commons CSV库来解析 CSV 文件,并使用Elasticsearch Rest High-Level Client与 Elasticsearch 搜索引擎集成。

让我们首先在pom.xml文件中添加所需的依赖项:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-csv</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.17.11</version>
</dependency>

添加依赖项后,我们需要设置 Elasticsearch 配置。让我们创建一个配置类来设置RestHighLevelClient:

@Configuration
public class ElasticsearchConfig {
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        return RestClients.create(ClientConfiguration.builder()
          .connectedTo("localhost:9200")
          .build()).rest();
    }
}

接下来,我们创建一个Product类来表示 CSV 数据:

@Document(indexName = "products")
public class Product {
    @Id
    private String id;
    private String name;
    private String category;
    private double price;
    private int stock;
   
// Getters and setters
}

之后,我们将在 Spring Boot 应用程序中创建一个服务来处理 CSV 导入过程。在服务中,我们使用for循环遍历 CSV 文件中的每个记录:

@Autowired
private RestHighLevelClient restHighLevelClient;
public void importCSV(File file) {
    try (Reader reader = new FileReader(file)) {
        Iterable<CSVRecord> records = CSVFormat.DEFAULT
          .withHeader("id", "name", "category", "price", "stock")
          .withFirstRecordAsHeader()
          .parse(reader);
        for (CSVRecord record : records) {
            IndexRequest request = new IndexRequest(
"products")
              .id(record.get(
"id"))
              .source(Map.of(
               
"name", record.get("name"),
               
"category", record.get("category"),
               
"price", Double.parseDouble(record.get("price")),
               
"stock", Integer.parseInt(record.get("stock"))
              ));
            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        }
    } catch (Exception e) {
       
// handle exception
    }
}

对于每条记录,我们构造一个IndexRequest对象来准备在 Elasticsearch 中索引的数据。然后使用 RestHighLevelClient 对数据进行索引, RestHighLevelClient是与 Elasticsearch 交互的主要客户端库。

我们将 CSV 文件中的数据导入 Elasticsearch 索引:

File csvFile = Paths.get("src", "test", "resources", "products.csv").toFile();
importCSV(csvFile);

接下来,让我们查询第一个索引并根据预期值验证其内容:

IndexRequest firstRequest = captor.getAllValues().get(0);
assertEquals(Map.of(
  "name", "Microwave",
 
"category", "Appliances",
 
"price", 705.77,
 
"stock", 136
), firstRequest.sourceAsMap());

这种方法很简单,让我们可以完全控制整个过程。但是,它更适合较小的数据集,因为对于大型文件来说,它可能效率低下且耗时。

2、使用 Spring Batch 进行可扩展数据导入
Spring Batch是一个功能强大的 Java 批处理框架。它通过分块处理数据,非常适合处理大规模数据导入。

要使用 Spring Batch,我们需要将Spring Batch 依赖项添加到我们的pom.xml文件中:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.4.1</version>
</dependency>

定义 Spring 配置文件
接下来,让我们创建一个配置类来定义批处理作业。在此配置中,我们使用@EnableBatchProcessing注释来激活允许我们创建和管理批处理作业的 Spring Batch 功能。

我们设置了一个FlatFileItemReader来读取 CSV 文件,并设置了一个ItemWriter来将数据写入 Elasticsearch。我们还在Spring 配置文件中创建并配置了一个RestHighLevelClient bean:

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    // ...
    @Autowired
    private RestHighLevelClient restHighLevelClient
}

定义阅读器
要从 CSV 文件读取数据,让我们创建一个方法reader()并定义一个FlatFileItemReader。我们将使用FlatFileItemReaderBuilder为读取器配置各种设置:

@Bean
public FlatFileItemReader<Product> reader() {
    return new FlatFileItemReaderBuilder<Product>()
      .name("productReader")
      .resource(new FileSystemResource(
"products.csv"))
      .delimited()
      .names(
"id", "name", "category", "price", "stock")
      .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
          setTargetType(Product.class);
      }})
      .build();
}

我们使用name()方法为读取器指定一个名称,这有助于在批处理作业中识别它。此外,resource()方法使用FileSystemResource指定 CSV 文件“ products.csv ”的位置。该文件应以逗号分隔,这通过delimited()方法指定。

names ()方法列出 CSV 文件中的列标题,并将它们映射到Product类的字段。最后,fieldSetMapper ()方法使用BeanWrapperFieldSetMapper将 CSV 文件的每一行映射到Product对象。

定义一个 Writer
接下来,让我们创建一个writer()方法来处理将处理后的数据写入 Elasticsearch。此方法定义一个ItemWriter ,它接收Product对象列表。它使用RestHighLevelClient与 Elasticsearch 交互:

@Bean
public ItemWriter<Product> writer(RestHighLevelClient restHighLevelClient) {
    return products -> {
        for (Product product : products) {
            IndexRequest request = new IndexRequest("products")
              .id(product.getId())
              .source(Map.of(
               
"name", product.getName(),
               
"category", product.getCategory(),
               
"price", product.getPrice(),
               
"stock", product.getStock()
              ));
            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        }
    };
}

对于列表中的每个产品,我们创建一个IndexRequest来指定 Elasticsearch 索引和文档结构。id ()方法使用Product对象的ID为每个文档分配一个唯一的 ID 。

source ()方法会将Product对象的字段(例如name、category、price和stock)映射到 Elasticsearch 可以存储的键值格式。配置请求后,我们使用client.index()方法将Product记录发送到 Elasticsearch,确保产品被索引以供搜索和检索。

定义 Spring Batch 作业
最后,让我们创建importJob()方法并使用 Spring Batch 的JobBuilder 和StepBuilder 来配置作业及其步骤:

@Bean
public Job importJob(JobRepository jobRepository, PlatformTransactionManager transactionManager, 
  RestHighLevelClient restHighLevelClient) {
    return new JobBuilder("importJob", jobRepository)
      .start(new StepBuilder("step1", jobRepository)
        .chunk(10, transactionManager)
        .reader(reader())
        .writer(writer(restHighLevelClient))
        .build())
      .build();
}
在此示例中,我们使用JobBuilder来配置作业。它以作业名称“ importJob ”和JobRepository作为参数。我们还配置了一个名为“ step1 ”的步骤,并指定该作业将一次处理10 条记录。transactionManager确保在处理块期间的数据一致性。

reader ()和writer()方法集成到步骤中,以处理从 CSV 到 Elasticsearch 的数据流。接下来,我们使用start( ) 方法将作业与步骤链接起来。此连接确保步骤作为作业的一部分执行。完成此配置后,我们可以使用 Spring 的JobLauncher运行作业。

运行批处理作业
让我们看一下使用JobLauncher运行 Spring Batch 作业的代码。我们将创建一个CommandLineRunner bean 以在应用程序启动时执行该作业:

@Configuration
public class JobRunnerConfig {
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job importJob;
    @Bean
    public CommandLineRunner runJob() {
        return args -> {
            try {
                JobExecution execution = jobLauncher.run(importJob, new JobParameters());
            } catch (Exception e) {
                // handle exception
            }
        };
    }
}

成功运行作业后,我们可以通过使用curl发出请求来测试结果:

curl -X GET "http://localhost:9200/products/_search" \
  -H
"Content-Type: application/json" \
  -d '{
       
"query": {
         
"match_all": {}
        }
      }'

我们来看看预期的结果:

{
  ...
  "hits": {
   
"total": {
     
"value": 25,
     
"relation": "eq"
    },
   
"max_score": 1.0,
   
"hits": [
      {
       
"_index": "products",
       
"_type": "_doc",
       
"_id": "1",
       
"_score": 1.0,
       
"_source": {
         
"id": "1",
         
"name": "Microwave",
         
"category": "Appliances",
         
"price": 705.77,
         
"stock": 136
        }
      },
      {
       
"_index": "products",
       
"_type": "_doc",
       
"_id": "2",
       
"_score": 1.0,
       
"_source": {
         
"id": "1",
         
"name": "Vacuum Cleaner",
         
"category": "Appliances",
         
"price": 1397.23,
         
"stock": 92
        }
      }
      ...
    ]
  }
}

这种方法的设置比以前的方法更复杂,但为导入数据提供了可扩展性和灵活性。

3、使用Logstash导入CSV数据
Logstash是 Elastic stack 的一部分,专为数据处理和提取而设计。

我们可以使用 Docker 快速设置 Logstash。首先,让我们拉取并运行 Logstash 镜像:

docker pull docker.elastic.co/logstash/logstash:8.17.0
拉取镜像后,我们为 Logstash 创建一个配置文件“ csv-to-es.conf ”。此文件定义 Logstash 如何读取 CSV 文件并将数据发送到 Elasticsearch:

input {
    file {
        path => "/path/to/your/products.csv"
        start_position =>
"beginning"
        sincedb_path =>
"/dev/null"
    }
}
filter {
    csv {
        separator =>
","
        columns => [
"id", "name", "category", "price", "stock"]
    }
    mutate {
        convert => {
"price" => "float" }
        convert => {
"stock" => "integer" }
    }
}
output {
    elasticsearch {
        hosts => [
"http://localhost:9200"]
        index =>
"products"
    }
    stdout {
        codec => json_lines
    }
}

在此文件中,我们定义数据管道的输入、过滤和输出阶段。输入阶段指定要读取的 CSV 文件,而过滤阶段处理和转换数据。最后,输出阶段将处理后的数据发送到 Elasticsearch。

设置配置文件后,我们需要调用docker run命令来执行 Logstash 管道:

docker run --rm -v $(pwd)/csv-to-es.conf:/usr/share/logstash/pipeline/logstash.conf \
  -v $(pwd)/products.csv:/usr/share/logstash/products.csv \
  docker.elastic.co/logstash/logstash:8.17.0

此命令将我们的配置和 CSV 文件挂载到 Logstash 容器,并运行数据管道以将数据导入 Elasticsearch。成功运行命令后,我们可以再次运行curl查询来验证结果。

Logstash 可以高效地将 CSV 数据导入 Elasticsearch,而无需自定义代码,使其成为处理大型数据集和设置自动化数据管道的热门选择。