在kubernetes上通过Knative服务和FastAPI消费使用 Kafka事件


分享我使用Knative设置事件驱动架构的经验和工作流程。

我现在构建的大多数最近的应用程序都严重依赖于 Kafka 和 Kubernetes,长话短说,这最终会产生一个向主题发送事件的生产者和一个消费该事件的while循环。在 Kafka 中,您可以配置消息的自动提交偏移量,也可以在使用消息后手动提交。在大多数情况下,这些 while 循环是消耗同步/异步消息的大型进程,并且很难扩展。为了改进这个过程,我想介绍和讨论 Knative。

在本文中,我将重点介绍使用 Kafka 作为事件源、FastAPI (Python) 作为 Web 服务的 Knative Broker(Kafka 源和接收器)设置,当然还有 Kubernetes 作为跨多个主机管理容器化应用程序的系统(本文假设您已经对 Kafka 和 Kubernetes 有基本的了解)。

Knative
Knative 是一个用于构建无服务器和事件驱动应用程序的开源解决方案。由 Google 创建并移交给一个不断改进它的优秀团队。受 vmware、Google、RedHat、IBM 等公司信赖的软件。考虑到微服务、Kubernetes 和事件驱动方法的当前趋势,Knative 可能是您的完美选择。

项目分为两个主要模块:事件和服务。

Knative事件
它是一组工具,允许您使用事件驱动的架构来处理您的应用程序。多亏了有许多 API,它创建了将事件生产者路由到事件消费者(称为后来的接收器)的组件。它使用标准的 HTTP POST 请求在生产者和接收器之间发送这些事件。

稍后,这些接收器可以用作“桶”,事件将从这些桶中通过 HTTP 推送到您的应用程序。启用它的组件称为触发器,它们可以将给定的服务订阅到接收器,事件将由您的应用程序生成和使用。

我们所指的应用程序可能是一个简单的 Web 服务器,例如 k8s Deployment + Service,或者在本例中为 Knative Service。

Knative服务
它是一组对象(Kubernetes 自定义资源定义),用于定义和控制无服务器工作负载在集群上的行为方式。它全权负责为您设置、管理流量、Pod、扩展和修订 Kubernetes。主要用于避免耗时的操作资源、快速开发、自动缩放(包括缩放到零个 pod 以节省资源)和无聊的 Kafka 消费者循环。

设置

Knative 服务

Kubernetes 资源
1、为服务模块安装 CRD:

kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.7.1/serving-crds.yaml

安装核心服务组件:

kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.7.1/serving-core.yaml

安装网络层(在这种情况下,我使用的是 kourier)——它需要连接到 kservice:

kubectl apply -f https://github.com/knative/net-kourier/releases/download/knative-v1.7.0/kourier.yaml

通过运行以下命令将 Knative Serving 配置为默认使用 Kourier:

kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress-class":"kourier.ingress.networking.knative.dev"}}'

验证您的外部 IP:

kubectl --namespace kourier-system get service kourier


为了使服务对公共流量关闭并仅启用它私有,我们需要使用特殊设置标记我们的服务:networking.knative.dev/visibility: cluster-local

组件
Knative Service 运行一个普通的 web 应用程序,当然,我将使用一个简单的 FastAPI Python 应用程序来记录我们的通知事件:

# main.py
import logging
from typing import Dict
from fastapi import FastAPI, Request

app = FastAPI()


@app.post("/events/notifications")
async def root(request: Request) -> Dict[str, str]:
    event_data = await request.json()
    logging.info(event_data)
    return {
"message": "ok"}

为 Knative Service 定义一个 Kubernetes 资源:

# my-service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: my-notifications
  labels:
    networking.knative.dev/visibility: cluster-local
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/max-scale: "3"
        autoscaling.knative.dev/scale-to-zero:
"false"
      labels:
        app: my-notifications
    spec:
      containers:
        - name: my-notifications
          resources:
            requests:
              memory:
"200Mi"
              cpu:
"200m"
            limits:
              memory:
"400Mi"
              cpu:
"400m"
          image: my-notifications-image
          imagePullPolicy: Always
          args: [ 'python', 'main.py' ]


记住要注释服务,autoscaling.knative.dev/scale-to-zero: "false"否则在没有流量的情况下,Knative 会杀死所有的 pod,你不会看到它正在运行。


请注意networking.knative.dev/visibility仅定义内部集群连接的标签。要验证您的网络是内部运行命令并检查它是否以svc.cluster.local[ docs ] 结尾:

kubectl get kservice my-notifications

NAME               URL                                                 LATESTCREATED            LATESTREADY              READY   REASON
my-notifications   http://my-notifications.default.svc.cluster.local   my-notifications-1b2ce   my-notifications-1b2ce   True

我还始终建议为您的应用程序设置准备就绪和活跃性探测器,以 ping 服务。由于时间关系,我这里略过。
将您的配置应用到 k8s 中:

kubectl apply -f my-service.yaml

验证服务器是否运行:

kubectl logs -l app=my-notifications

Kubernetes 资源
为事件模块安装 CRD:

kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.7.1/eventing-crds.yaml

安装核心事件组件:

kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.7.1/eventing-core.yaml

安装 apache Kafka 代理(负责事件路由):

kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.7.0/eventing-kafka-controller.yaml
kubectl apply -f https:
//github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.7.0/eventing-kafka-broker.yaml

组件
第一个配置是一个Broker k8s 自定义资源,它定义了一个用于收集事件池的事件网格。我们的 Knative Broker 类是Kafka.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config

正如您在上面可能注意到的,我们使用默认的 ConfigMap 进行定义,我们可以覆盖默认的或使用新的。最重要的是设置bootstrap.servers数据,确保它与运行 Kafka 的 URL 相同:

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: default
data:
  default.topic.partitions: '10'
  default.topic.replication.factor: '1'
  bootstrap.servers: '...kafka.svc.cluster.local:29092'


非常重要的通知是 Kafka 代理将创建一个默认主题(在我们的例子中knative-broker-default-default),来自我们的 Kafka 源的所有事件都将被复制到该主题。如果我们的代理路由来自多个主题的事件,则所有这些事件都将复制到这一主题。确保您的分区号符合您的需要。


如上所述,我们还需要一个KafkaSource资源。它将映射我们已经构建的 Kafka 主题中的事件并将它们推送到我们的代理接收器。

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  consumerGroup: knative-group
  bootstrapServers:
    - ...kafka.svc.cluster.local:29092 # note the kafka namespace
  topics:
    - notifications
    - loggings
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

最后一步是配置一种机制,该机制将订阅通知服务并(仅通过 HTTP)从代理推送通知事件:

# triggers.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: notification-trigger
spec:
  broker: default
  filter:
    attributes:
      source: /apis/v1/namespaces/default/kafkasources/kafka-sourcenotifications
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: my-notifications
    uri: /events/notifications


最好为多个主题使用一个代理(KafkaSource 配置)。您可以使用filter配置根据类型或来源过滤特定事件,并订阅不同的服务或不同的 API 路由。归根结底,经纪人就是经纪人!


如果您关心每个分区的事件顺序,分区中的所有事件在成功消息后按顺序处理,请使用 config 注释您的触发器kafka.eventing.knative.dev/delivery.order: ordered。knative 文档中的更多详细信息


应用触发器资源:

kubectl apply -f triggers.yaml

验证触发器准备情况:

kubectl get Trigger

NAME                            BROKER    SUBSCRIBER_URI                                                                                    AGE     READY   REASON
notification-trigger            default   http://my-notification.default.svc.cluster.local/v1/user-notifications                            0d1h    True


确保 URI 与我们的服务 URI 相同,结尾为: svc.cluster.local。据我所知,您还需要集成像 istio 或 kourier 这样的网络层以供内部使用。


测试
最好的方法是简单地为您的 Kafka 主题生成事件。它们应该被汇集到我们定义的接收器中,然后通过触发器推送到 Knative 服务。如果显示事件日志,只需检查日志。
如果你使用的是 Confluent Kafka,你可以做简单的 Producer:

import uuid, json
from confluent_kafka import Producer


producer = Producer()
producer.produce(
    topic="notifications",
    value=json.dumps({
"message": "test"}),
    key=str(uuid.uuid4()),
)
producer.poll(0)

然后,记录您的应用程序以查看事件是否到达:

kubectl logs -l app=my-notifications


如果您在事件流程中发现任何问题,请应用日志记录 ConfigMap 以获取更多调试信息Knative 文档

概括
您可以决定此工具是否适合您。在我看来,如果您需要快速开发应用程序并且没有太多时间维护 k8s 基础架构,这是一个很好的解决方案。
这是引入“无服务器”并自己维护它的好方法。下一个美好的未来是按需基础设施(自动扩展),包括归零,这可以节省一些钱,特别是对于初创公司和业余爱好者项目。可以肯定的是,它使部署和部署变得更轻松。开箱即用的 Knative 创建具有滚动部署的修订版,允许使用 3 行代码拆分流量/回​​滚。