如何创建自定义Apache Kafka连接器(Kafka Connectors) - Prashanna

20-07-04 banq

Kafka Connect是一个框架,可帮助Kafka连接外部系统,例如数据库,文件系统等,反之亦然。

我们处理两种类型的连接器:

  1. 源连接器:这种类型的连接器有助于将数据从外部系统简化为Kafka Topic。
  2. 接收器连接器:这种类型的连接器有助于将数据从Kafka Topic简化到外部系统。

连接器通常将给定的工作划分为较小的任务,并在内部将其分配给工人。工作者是执行连接器分配的任务的过程。

市场上有很多这类连接器可供使用,可在Confluent官方网站:https://www.confluent.io/hub/

自定义Kafka连接器的需求在两种情况下出现:

  1. 当现有的Kafka连接器无法满足您的需求时,将其引入用例的目的
  2. 许可问题

因此,引入了“定制Kafka连接器”的概念,以创建可帮助用户满足要求的连接器,并退出复杂的许可策略。

注意原则:

创建一个Maven Java项目,并将以下maven依赖项添加到pom.xml文件中。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
</dependency>

1.配置类:此类用于描述将用于我们的连接器的配置属性。Java类需要扩展AbstractConfig类。

import org.apache.kafka.common.config.AbstractConfig;

2. SourceConnector或SinkConnector类:此类在连接器的实现逻辑所驻留的抽象级别上。根据创建的连接器的类型,应扩展相应的类。

import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceTask;

这些类的下面方法应该被实现覆盖:

1)start
2)stop
3)taskClass
4)taskConfigs
5)config
6)version

3.SourceTask或SinkTask类:此类是任务功能(连接器用途)所在的位置。根据创建的连接器的类型,应扩展相应的类。

import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;

需要覆盖方法:

1)start
2)stop
3)poll or put(Based on the Connector Type)
4)version

除了这些类之外,该连接器还需要两个属性文件才能在Ka​​fka Environment中运行。

  1. Config.properties:此文件将包含运行连接器所需的配置。连接器的配置类别将使用此数据进行填充。
  2. Standalone.properties或Distributed.properties:Kafka环境通常以Standalone模式或Distributed(Clustered)模式设置。根据模式的类型,各个文件将填充有所需属性,例如bootstrap.servers,rest.port,group.id等。

详细参考:

https://docs.confluent.io/current/connect/devguide.html

 

              

猜你喜欢