使用Kafka分区扩展Spring Batch大数据调度批处理 – Arnold


假设有一个您需要定期运行的流程,例如一天结束 (EOD)。假设这个流程中需要处理的数据量在不断增加。
最初,你可以做一个非常简单的 Spring 调度(或者 Quartz 或者你有什么),它只执行一个方法,一次加载所有数据,处理所有数据并将结果写回数据库。
如果读取的行数(例如从数据库中)是 10,000 行,它可能工作得很好,但如果突然有 10 000 000 行怎么办?执行可能会失败,因为内存不足错误
或者需要很长的时间才能完成。

远程分区
获取初始数据集时,例如,如果我们从数据库中读取事务(或任何域对象),我们只获取事务 ID。
将它们划分为分区(不是块;块在 Spring Batch 世界中具有不同的含义)并将分区发送给可以处理它们并执行实际业务逻辑的工作人员。

常规分区和远程分区的主要区别在于工作者的位置。

  • 在常规分区的情况下,作为工作者的进程是与正在进行数据分区的进程在同一JVM中的本地线程。
  • 但在远程分区的情况下,工作者不是在同一个JVM中运行,而是完全不同的JVM。当有一些工作需要处理时,会通过消息传递系统通知各个工作者。

局限性
Kafka是基于主题运行的。主题可以有分区。你可以拥有的消费者数量(对于同一个消费者组)取决于你对主题的分区数量。这意味着,你的分区批处理作业的并发系数与主题分区的数量直接相关。
一个主题所使用的分区数量应在创建该主题时设置。后来,我们可以改变现有主题的分区数量,但是你必须注意到某些副作用。

这意味着Kafka不可能根据数据量来动态地扩展工作者的数量。我所说的动态是指,有时你需要10个工人,但假设在圣诞节期间数据量大增,你就需要50个。这就需要一些自定义的脚本了。

毕竟,我认为一个好的经验法则--在Kafka的情况下--是过度扩大主题分区的数量。比方说,如果你在非高峰期需要10个消费者,而在高峰期需要20个,我认为你可以选择两倍/三倍的数量,以确保你有增长的空间,而不会有太多的头痛。因此,我认为60是一个很好的分区数字,最多可以支持60个同时进行的消费者。当然,这取决于你的数据量的增长速度,但你应该明白这个道理。

技术栈

  • Spring Batch
  • Spring Integration
  • Spring for Apache Kafka
  • MySQL
  • Liquibase

Manager
我们将从管理器和它的配置开始。让我们有一个ManagerConfiguration类。我们将需要几个配置的依赖项和两个注释。
@Configuration
@Profile("manager")
public class ManagerConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;

    @Autowired
    private KafkaTemplate kafkaTemplate;
}

@Profile注解是至关重要的,因为我们只想让这个配置在我们试图运行管理器时启动,我们将用Spring的profile来控制它。

JobBuilderFactory将被用来创建我们的分区作业。RemotePartitioningManagerStepBuilderFactory将用于为我们的工作创建步骤,使用这个类而不是普通的StepBuilderFactory非常重要。另外,请注意,有一个非常类似的StepBuilderFactory,叫做RemotePartitioningWorkerStepBuilderFactory,它是用来给工人而不是经理使用的。我们很快就会到那里。

KafkaTemplate是自动为我们配置的,我们将需要它来配置管理器和Kafka之间的通道。

现在,让我们上添加一个通道,我们将把它作为从应用程序到Kafka的输出通道。

@Configuration
@Profile("manager")
public class ManagerConfiguration {
   
// previous content is omitted for simplicity

    @Bean
    public DirectChannel outboundRequests() {
        return new DirectChannel();
    }
}

DirectChannel只是Spring Integration中对消息通道的一个抽象。

接下来,让我们创建一个Partitioner,对我们的数据集进行分区。这将是一个新的类,我把它叫做ExamplePartitioner。

public class ExamplePartitioner implements Partitioner {
    public static final String PARTITION_PREFIX = "partition";

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        int partitionCount = 50;
        Map<String, ExecutionContext> partitions = new HashMap<>();
        for (int i = 0; i < partitionCount; i++) {
            ExecutionContext executionContext = new ExecutionContext();
            executionContext.put(
"data", new ArrayList<Integer>());
            partitions.put(PARTITION_PREFIX + i, executionContext);
        }
        for (int i = 0; i < 1000; i++) {
            String key = PARTITION_PREFIX + (i % partitionCount);
            ExecutionContext executionContext = partitions.get(key);
            List<Integer> data = (List<Integer>) executionContext.get(
"data");
            data.add(i + 1);
        }
        return partitions;
    }
}


这个分区器的实现没有做任何有趣的事情。它创建了50个分区,对于每个分区,它把一些数字放入一个列表中,在关键数据下可以访问。
这意味着,每个分区在列表中会有20个数字。

这就是你可以想象获得交易或任何你想处理的ID的地方,稍后下线,工作者将从数据库中加载相应的行。

很好,让我们创建工作步骤job steps并从分区器中创建一个bean。

@Configuration
@Profile("manager")
public class ManagerConfiguration {
   
// previous content is omitted for simplicity

    @Bean
    public ExamplePartitioner partitioner() {
        return new ExamplePartitioner();
    }

    @Bean
    public Step partitionerStep() {
        return stepBuilderFactory.get(
"partitionerStep")
                .partitioner(Constants.WORKER_STEP_NAME, partitioner())
                .outputChannel(outboundRequests())
                .build();
    }
}


没有什么特别的,我们创建了调用分区器的步骤,以及我们想把分区发送到的输出通道。
另外,这里有一个对常量类的引用,让我给你看看它的内容。

public class Constants {
    public static final String TOPIC_NAME = "work";
    public static final String WORKER_STEP_NAME =
"simpleStep";
    public static final int TOPIC_PARTITION_COUNT = 3;
}

这就是全部。我们将调用Kafka主题工作,它将有3个主题分区,我们想在分区数据集上调用的工作步骤被称为simpleStep。

很好,现在我们来创建分区器工作。

@Configuration
@Profile("manager")
public class ManagerConfiguration {
   
// previous content is omitted for simplicity

    @Bean(name =
"partitionerJob")
    public Job partitionerJob() {
        return jobBuilderFactory.get(
"partitioningJob")
                .start(partitionerStep())
                .incrementer(new RunIdIncrementer())
                .build();
    }
}

同样,没有什么特别的,只是引用了我们之前创建的分区器步骤,并在作业中添加了RunIdIncrementer,这样我们就可以轻松地重新运行作业。

很好。现在,我想说的是最复杂的东西,如何将通道接入Kafka,并确保主题分区被正确利用。

我们也会用Spring Integration来做这个:

@Configuration
@Profile("manager")
public class ManagerConfiguration {
   
// previous content is omitted for simplicity

    @Bean
    public IntegrationFlow outboundFlow() {
        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME));
        return IntegrationFlows
                .from(outboundRequests())
                .log()
                .handle(messageHandler)
                .get();
    }
}

首先,我们需要一个KafkaProducerMessageHandler,它将接收到的消息并将其发布到Kafka主题中。
该主题由setTopicExpression方法调用来标记,最后,我们只需将所有东西作为一个集成流来连接。

然而,这还不会利用主题分区,消息将被发布到同一个分区。
让我们通过setPartitionIdExpression方法为其添加一个自定义表达式。

@Configuration
@Profile("manager")
public class ManagerConfiguration {
   
// previous content is omitted for simplicity

    @Bean
    public IntegrationFlow outboundFlow() {
        KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate);
        messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME));
        Function<Message<?>, Long> partitionIdFn = (m) -> {
            StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload();
            return executionRequest.getStepExecutionId() % Constants.TOPIC_PARTITION_COUNT;
        };
        messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn));
        return IntegrationFlows
                .from(outboundRequests())
                .log()
                .handle(messageHandler)
                .get();
    }
}

我们提供一个FunctionExpression,它将动态地解开消息,获得stepExecutionId属性并与modulo运算符相结合。
分区计数的当前值是3。这意味着分区ID表达式将从[0, 1, 2]范围内返回一个值,这将表示目标主题分区。
这算是在分区之间提供了一种平均分配,但不是100%。
如果你需要一个复杂的分区ID决定器,你肯定可以调整实现。

另外,你也可以同样使用setMessageKeyExpression方法来提供一个类似的FunctionExpression来计算消息key,而不是直接告诉Kafka要使用哪个分区。

还有一点需要注意的是,我在集成流程中加入了log(),所以发送出去的消息会被记录下来;只是为了调试的目的。

这就是管理器的配置。

 
Worker
工作者的配置将是类似的。让我们创建一个WorkerConfiguration类。

@Configuration
@Profile("worker")
public class WorkerConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public IntegrationFlow inboundFlow(ConsumerFactory<String, String> cf) {
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(cf, Constants.TOPIC_NAME))
                .channel(inboundRequests())
                .get();
    }

    @Bean
    public QueueChannel inboundRequests() {
        return new QueueChannel();
    }
}

几个依赖关系,一个用于入站消息的消息通道,并将其与Spring Integration连接起来。

让我们来创建一个工作步骤。

@Configuration
@Profile("worker")
public class WorkerConfiguration {
   
// previous content is omitted for simplicity

    @Bean
    public Step simpleStep() {
        return stepBuilderFactory.get(Constants.WORKER_STEP_NAME)
                .inputChannel(inboundRequests())
                .<Integer, Customer>chunk(100)
                .reader(itemReader(null))
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }
}

这将创建步骤定义,将其与入站消息通道相连,并引用 ItemReader、ItemProcessor 和 ItemWriter 实例。这些看起来如下。
@Configuration
@Profile("worker")
public class WorkerConfiguration {
   
// previous content is omitted for simplicity
    
    @Bean
    @StepScope
    public ItemReader<Integer> itemReader(@Value(
"#{stepExecutionContext['data']}") List<Integer> data) {
        List<Integer> remainingData = new ArrayList<>(data);
        return new ItemReader<>() {
            @Override
            public Integer read() {
                if (remainingData.size() > 0) {
                    return remainingData.remove(0);
                }

                return null;
            }
        };
    }
}

ItemReader是一个Bean,它将在Spring Batch执行上下文中的数据键下接收分区数据作为一个参数。请注意,必须在Bean定义上使用@StepScope,以便为该步骤启用后期绑定。

实现很简单。我们将把收到的ID存储在一个本地列表中,在每个ItemReader调用期间,我们将从列表中删除一个项目,直到没有剩余。

@Configuration
@Profile("worker")
public class WorkerConfiguration {
   
// previous content is omitted for simplicity
    
    @Bean
    public ItemWriter<Customer> itemWriter() {
        return new JdbcBatchItemWriterBuilder<Customer>()
                .beanMapped()
                .dataSource(dataSource)
                .sql(
"INSERT INTO customers (id) VALUES (:id)")
                .build();
    }

    @Bean
    public ItemProcessor<Integer, Customer> itemProcessor() {
        return new ItemProcessor<>() {
            @Override
            public Customer process(Integer item) {
                return new Customer(item);
            }
        };
    }
}

ItemProcessor和ItemWriter则更简单。ItemProcessor只是将ID转换为Customer对象,模拟对DTO的某种处理,ItemWriter只是将Customers写入数据库。

客户类是一个简单的POJO,没有什么特别的。

public class Customer {
    private int id;

    public Customer(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }
}

 
最后的配置步骤
接下来我们需要做的是用所需的分区数量创建Kafka主题,所以让我们创建一个新的KafkaConfiguration类。

@Configuration
public class KafkaConfiguration {
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name(Constants.TOPIC_NAME)
                .partitions(Constants.TOPIC_PARTITION_COUNT)
                .build();
    }
}

如果分区计数还不存在,这将自动创建一个主题。

接下来,我们需要创建数据库结构来存储我们的客户,并允许Spring管理其状态。让我们在 src/main/resources/db/changelog 文件夹下创建一个 db.changelog-master.xml 文件,内容如下。

<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns=
"http://www.liquibase.org/xml/ns/dbchangelog"
                   xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation=
"http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd">
    <changeSet id=
"0001-initial" author="Arnold Galovics">
        <createTable tableName=
"customers">
            <column name=
"id" type="number">
            </column>
        </createTable>
        <sqlFile path=
"classpath:/org/springframework/batch/core/schema-mysql.sql" relativeToChangelogFile="false"/>
    </changeSet>
</databaseChangeLog>

createTable很简单,SQL文件的导入是由Spring Batch的核心模块提供的东西。

让我们在application.properties中添加一些配置。

spring.datasource.url=jdbc:mysql://localhost:3306/db_example?createDatabaseIfNotExist=true
spring.datasource.username=root
spring.datasource.password=mysql
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.xml

用Liquibase配置DataSource。然后是Kafka生产者的配置。

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.group-id=producer-g

这里最重要的是使用JsonSerializer,这样Spring Batch要发送的消息就会被编码成JSON。

同样地,消费者:

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=consumer-g

还有一件事:
spring.kafka.consumer.properties.spring.json.trusted.packages=*
 
 
运行
创建用于启动应用程序的信息库。我将创建一个docker-compose.yml。

version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT:
//:9093,EXTERNAL://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT:
//kafka:9093,EXTERNAL://localhost:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - '8080:8080'
    environment:
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093
    depends_on:
      - kafka
  mysql:
    image: mysql
    ports:
      - '3306:3306'
    command: --default-authentication-plugin=mysql_native_password
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: mysql

我就不多说了。它启动了一个Kafka代理,一个位于8080端口的Kafka UI实例,如果你想查看主题的状态,还有一个MySQL服务器。

docker-compose up启动一切。

完整代码可在GitHub 上获得。