分享我使用Knative设置事件驱动架构的经验和工作流程。
我现在构建的大多数最近的应用程序都严重依赖于 Kafka 和 Kubernetes,长话短说,这最终会产生一个向主题发送事件的生产者和一个消费该事件的while循环。在 Kafka 中,您可以配置消息的自动提交偏移量,也可以在使用消息后手动提交。在大多数情况下,这些 while 循环是消耗同步/异步消息的大型进程,并且很难扩展。为了改进这个过程,我想介绍和讨论 Knative。
在本文中,我将重点介绍使用 Kafka 作为事件源、FastAPI (Python) 作为 Web 服务的 Knative Broker(Kafka 源和接收器)设置,当然还有 Kubernetes 作为跨多个主机管理容器化应用程序的系统(本文假设您已经对 Kafka 和 Kubernetes 有基本的了解)。
Knative
Knative 是一个用于构建无服务器和事件驱动应用程序的开源解决方案。由 Google 创建并移交给一个不断改进它的优秀团队。受 vmware、Google、RedHat、IBM 等公司信赖的软件。考虑到微服务、Kubernetes 和事件驱动方法的当前趋势,Knative 可能是您的完美选择。
项目分为两个主要模块:事件和服务。
Knative事件
它是一组工具,允许您使用事件驱动的架构来处理您的应用程序。多亏了有许多 API,它创建了将事件生产者路由到事件消费者(称为后来的接收器)的组件。它使用标准的 HTTP POST 请求在生产者和接收器之间发送这些事件。
稍后,这些接收器可以用作“桶”,事件将从这些桶中通过 HTTP 推送到您的应用程序。启用它的组件称为触发器,它们可以将给定的服务订阅到接收器,事件将由您的应用程序生成和使用。
我们所指的应用程序可能是一个简单的 Web 服务器,例如 k8s Deployment + Service,或者在本例中为 Knative Service。
Knative服务
它是一组对象(Kubernetes 自定义资源定义),用于定义和控制无服务器工作负载在集群上的行为方式。它全权负责为您设置、管理流量、Pod、扩展和修订 Kubernetes。主要用于避免耗时的操作资源、快速开发、自动缩放(包括缩放到零个 pod 以节省资源)和无聊的 Kafka 消费者循环。
设置
Knative 服务
Kubernetes 资源
1、为服务模块安装 CRD:
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.7.1/serving-crds.yaml |
安装核心服务组件:
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.7.1/serving-core.yaml |
安装网络层(在这种情况下,我使用的是 kourier)——它需要连接到 kservice:
kubectl apply -f https://github.com/knative/net-kourier/releases/download/knative-v1.7.0/kourier.yaml |
通过运行以下命令将 Knative Serving 配置为默认使用 Kourier:
kubectl patch configmap/config-network \ |
验证您的外部 IP:
kubectl --namespace kourier-system get service kourier |
为了使服务对公共流量关闭并仅启用它私有,我们需要使用特殊设置标记我们的服务:networking.knative.dev/visibility: cluster-local
组件
Knative Service 运行一个普通的 web 应用程序,当然,我将使用一个简单的 FastAPI Python 应用程序来记录我们的通知事件:
# main.py |
为 Knative Service 定义一个 Kubernetes 资源:
# my-service.yaml |
记住要注释服务,autoscaling.knative.dev/scale-to-zero: "false"否则在没有流量的情况下,Knative 会杀死所有的 pod,你不会看到它正在运行。
请注意networking.knative.dev/visibility仅定义内部集群连接的标签。要验证您的网络是内部运行命令并检查它是否以svc.cluster.local[ docs ] 结尾:
kubectl get kservice my-notifications |
我还始终建议为您的应用程序设置准备就绪和活跃性探测器,以 ping 服务。由于时间关系,我这里略过。
将您的配置应用到 k8s 中:
kubectl apply -f my-service.yaml |
验证服务器是否运行:
kubectl logs -l app=my-notifications |
Kubernetes 资源
为事件模块安装 CRD:
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.7.1/eventing-crds.yaml |
安装核心事件组件:
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.7.1/eventing-core.yaml |
安装 apache Kafka 代理(负责事件路由):
kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.7.0/eventing-kafka-controller.yaml |
组件
第一个配置是一个Broker k8s 自定义资源,它定义了一个用于收集事件池的事件网格。我们的 Knative Broker 类是Kafka.
apiVersion: eventing.knative.dev/v1 |
正如您在上面可能注意到的,我们使用默认的 ConfigMap 进行定义,我们可以覆盖默认的或使用新的。最重要的是设置bootstrap.servers数据,确保它与运行 Kafka 的 URL 相同:
apiVersion: v1 |
非常重要的通知是 Kafka 代理将创建一个默认主题(在我们的例子中knative-broker-default-default),来自我们的 Kafka 源的所有事件都将被复制到该主题。如果我们的代理路由来自多个主题的事件,则所有这些事件都将复制到这一主题。确保您的分区号符合您的需要。
如上所述,我们还需要一个KafkaSource资源。它将映射我们已经构建的 Kafka 主题中的事件并将它们推送到我们的代理接收器。
apiVersion: sources.knative.dev/v1beta1 |
最后一步是配置一种机制,该机制将订阅通知服务并(仅通过 HTTP)从代理推送通知事件:
# triggers.yaml |
最好为多个主题使用一个代理(KafkaSource 配置)。您可以使用filter配置根据类型或来源过滤特定事件,并订阅不同的服务或不同的 API 路由。归根结底,经纪人就是经纪人!
如果您关心每个分区的事件顺序,分区中的所有事件在成功消息后按顺序处理,请使用 config 注释您的触发器kafka.eventing.knative.dev/delivery.order: ordered。knative 文档中的更多详细信息
应用触发器资源:
kubectl apply -f triggers.yaml |
验证触发器准备情况:
kubectl get Trigger |
确保 URI 与我们的服务 URI 相同,结尾为: svc.cluster.local。据我所知,您还需要集成像 istio 或 kourier 这样的网络层以供内部使用。
测试
最好的方法是简单地为您的 Kafka 主题生成事件。它们应该被汇集到我们定义的接收器中,然后通过触发器推送到 Knative 服务。如果显示事件日志,只需检查日志。
如果你使用的是 Confluent Kafka,你可以做简单的 Producer:
import uuid, json |
然后,记录您的应用程序以查看事件是否到达:
kubectl logs -l app=my-notifications |
如果您在事件流程中发现任何问题,请应用日志记录 ConfigMap 以获取更多调试信息Knative 文档
概括
您可以决定此工具是否适合您。在我看来,如果您需要快速开发应用程序并且没有太多时间维护 k8s 基础架构,这是一个很好的解决方案。
这是引入“无服务器”并自己维护它的好方法。下一个美好的未来是按需基础设施(自动扩展),包括归零,这可以节省一些钱,特别是对于初创公司和业余爱好者项目。可以肯定的是,它使部署和部署变得更轻松。开箱即用的 Knative 创建具有滚动部署的修订版,允许使用 3 行代码拆分流量/回滚。