用 GoLang 编写类似 Apache Camel 路由引擎


在本文中我们将讨论的是:
  • 用 GoLang 编写的类似 Apache Camel 的路由引擎
  • 嵌入式 WebAssembly 引擎,用于可扩展且安全的消息路由和转换
  • Actors 模型
  • OCI 工件

背景

  • Apache Camel是一个开源的集成框架,提供了一套工具和模式,以简化不同应用程序、系统和技术之间的集成。
  • WebAssembly(Wasm)是一种低级字节码格式,设计为C、C++和Rust等高级语言的可移植编译目标。
  • Actor模型(Actors Model)是一种用于开发并行和分布式系统的并发计算模型。
  • OCI Artifacts是一种使用符合Open Container Initiative规范的容器注册表来存储任意文件的方式。

我们来定义一下首字母缩略词的含义:

  • EIP:企业集成模式 
  • DSL:领域特定语言

让我们从一个简单的例子开始:

- route:
   from:
     uri: 'kafka:iot'
     steps:
       - to: 'log:in'
       - choice:
           when:
             - jq: 'header("kafka.KEY") == "sensor-1"'
               steps:
               - transform:
                   jq: '.foo'
               - to: 'log:s1'
             - jq: 'header(
"kafka.KEY") == "sensor-2"'
               steps:
                 - transform:
                     jq: '.bar'
                 - to: 'log:s2
           otherwise:
             steps:
               - to:
"log:unknown"


在 Apache Camel 中,从高层次上讲,每条路由都被转换为一系列函数( Camel 术语中的处理器),每个函数都实现一个特定的 EIP,形成一个执行管道。当事件触发路由的执行时,管道会将每个函数提交给内部执行器引擎,以便在线程池中执行。 

由于新引擎是用 Go 编写的,我们现在可以利用 Go 的并发构建块(goroutines、channels)来实现类似的模型。我们甚至可以更进一步,将路由引擎实现为一个 Actor 系统(在这个阶段,引擎利用优秀的Proto.Actor库作为基础)。

#Actor 模型
因为Actor 有一些特点,使得他们在我们的用例中非常适合和有趣,举一些例子:

  • 状态:表示参与者的内部状态,取决于具体的参与者;有些参与者是无状态的,但有些可能需要存储某些状态(即节流消息)
  • 行为:在某一时间点针对消息做出反应而采取的行动。
  • 邮箱:连接发送者和接收者;每个参与者都有一个邮箱,所有发送者都会将他们的消息放入其中
  • 子级:演员可以创建子级来委派子任务,在这种情况下,演员将自动监督他们。
  • 主管:  主管将任务委托给下属,因此必须对其故障做出响应。当下属检测到故障(即恐慌)时,它会向其主管发送一条消息,表示故障默认情况下,子 Actor 会将错误转发给他们的主管,直到到达全局主管。
除了 Actor 的常见功能外,Prto.Actor 库还提供了一些附加功能(尚未成为 POC 的一部分):
  • 中间件:允许拦截传入和传出的消息并添加一些特定的行为,例如跟踪、指标和日志记录。
  • 位置透明性:参与者的所有交互都使用纯消息传递,并且一切都是异步的,因此在单个参与者系统或机器集群中运行时,所有功能都应该平等可用,从而可以根据数据亲和性规则安排一些处理逻辑 
  • 持久性:在某些情况下,需要或非常希望永久保存参与者的状态,以便可以在参与者重新启动时恢复,从而使系统能够从其离开的位置进行恢复。

当在 Camel Go 运行时中加载路由时,运行时会创建一个根参与者,该参与者通常由from定义来描述,然后它会生成其所有子参与者,而这些子参与者本身又可以生成其他子参与者。 

这棵Actor 演员树被称为Actor 演员系统。

每个父级 Actor 都知道子级 Actor,并可以访问子级 Actor 的地址,因此父级 Actor 可以向子级 Actor 发送消息。子级 Actor 知道他们收到的每条消息的发送者是谁,并且会在消息处理完毕后以消息的形式回复发送者。

参与者系统的一个重要特性是,由于参与者只能通过消息进行通信,并且它们依赖于邮箱来确保一次只处理一条消息,因此参与者可以在单线程假象中运行,从而保护参与者的状态免受任何正常的并发问题的影响。

Wasm 用于扩展
我一直在致力于为 Apache Kafka 提供托管连接服务,其中最关键但最困难的部分之一是如何以更简单、更安全的方式运行非平凡的处理逻辑。随着时间的推移,我们尝试了许多选项,例如函数、脚本语言、自定义图像等,但没有一个真正令人满意,即 

  • 脚本语言:
    • 要求用户最终学习一门新语言
    • 由于脚本可以访问文件、环境和其他主机资源,因此执行可能会损害主机系统
  • 功能:
    • 使应用程序的部署更加复杂,因为它需要与其他资源一起部署(功能)
    • 成本通常较高,因为必须在主应用程序和功能之间传输数据,这会导致 I/O 并且经常脱离数据局部性  
    • 故障处理变得更加复杂 
因此,我决定尝试一下 Wasm,因为 Wasm 的初衷是:
  • 多语言:许多语言都支持 Wasm 作为编译目标,这吸引了更多可能不熟悉运行时语言的开发人员。
  • 安全:WebAssembly 的主要目标之一是在沙箱内安全地执行不受信任的代码,只有主机可以配置在沙箱中运行的代码可以访问的内容,这使其非常适合插件/扩展。 
  • 可嵌入: Wasm 运行时可以嵌入到主机应用程序中,从而可以使用任何可以编译为 Wasm 的语言进行安全地扩展,而无需离开应用程序的额外基础设施或数据。
目前有许多 Wasm 运行时,但由于 Go 是此 POC 的首选语言,因此我利用Wazero,因为它是唯一零依赖的 WebAssembly 运行时(即它不需要任何本机库绑定)。 

在此阶段,运行时期望的消息处理的伪签名是:
func (inOut Message) error
尽管这是一个非常简单的函数,但从主机程序调用它并不简单,因为您需要跨越主机/客户机内存边界,这可以通过多种方式完成,其中包括:

  1. 通过手动处理 WASM 线性内存中的内存分配和释放
  2. 通过使用 STDIN/OUT 作为交换数据的方式(CGI 任何人)
我的第一次尝试是使用选项A,这导致我进行了非常长时间的研究,以了解如何安全地管理主机和客户机之间的内存,特别是与具有垃圾收集器的语言(例如 Go)相关的内存。通过查看wazero repo 上的分配示例可以看到一些结果(感谢 Adrian Cole 和 Edoardo Vacchi 的耐心和指导),但为了简单起见(请记住,这只是一个 POC)和可移植性,我决定转向选项B,在主机端,它最终与此处的示例类似:

func (p *Plugin) invoke(in any, out any) error {
   fn := p.lookupFunction("process")
   if fn == nil {
       return nil, errors.New(
"process is not exported")
   }

   data, err := json.Marshal(in)
   if err != nil {
       return err
   }

   
// clean up the buffer
   p.stdin.Reset()
   p.stdout.Reset()

   defer func() {
       
// clean up the buffer when the method
       p.stdin.Reset()
       p.stdout.Reset()
   }()

   ws, err := p.stdin.Write(data)
   if err != nil {
       return err
   }

   
// invoke the function with the size of the message
   
// so the guest knows how many bites have to be read
   
// from STDIN
   ptrSize, err := fn.Call(context.Background(), uint64(ws))
   if err != nil {
       return err
   }

   
// since WASM virtual machine supports only 32 bits
   
// we can use 32 bit to hold the response data size
   
// and the remaining for flags, i.e. to indicate
   
// that an error has occurred
   resFlag := uint32(ptrSize[0] >> 32)
   resSize := uint32(ptrSize[0])

   bytes := make([]byte, resSize)
   _, err = p.stdout.Read(bytes)
   if err != nil {
       return err
   }

   switch resFlag {
   case 1:
       return errors.New(string(bytes))
   default:
       return json.Unmarshal(bytes, &out)
   }
}

为了简化编写处理器的过程,已经实现了一个小型 SDK:

type Processor func(context.Context, *Message) (*Message, error)

var processor Processor

func RegisterProcessors(p Processor) {
   processor = p
}

//export process
func _process(size uint32) uint64 {
   b := make([]byte, size)

   _, err := io.ReadAtLeast(os.Stdin, b, int(size))
   if err != nil {
       return 0
   }

   req := Message{}
   if err := json.Unmarshal(b, &req); err != nil {
       return 0
   }
   res, err := processor(context.Background(), &req)
   if err != nil {
       n, err := os.Stdout.WriteString(err.Error())
       if err != nil {
           return 0
       }

       
// Indicate that this is the error string
       return (uint64(1) << uint64(32)) | uint64(n)
   }

   b, err = json.Marshal(res)
   if err != nil {
       return 0
   }

   n, err := os.Stdout.Write(b)
   if err != nil {
       return 0
   }

   return uint64(n)
}

然后可以利用它来编写处理器,而不必处理序列化/反序列化和/或分配: 

func main() {
   // register the processor function
   RegisterProcessors(Process)
}

func Process(_ context.Context, r *Message) (*Message, error) {
   request.Data = []byte(strings.ToLower(string(r.Data)))
   return request, nil
}

要在路由引擎中使用 Wasm 函数,我们可以利用wasm 语言。例如:

- route:
   from:
     uri: 'timer:foo?period=1s'
     steps:
       - transform:
           wasm: 'etc/wasm/fn/to_upper.wasm'
       - to:
           uri: "log:info"


Wasm 分发版的 OCI Artifacts
现在我们有了一个可以以某种方式运行的路由引擎(注意: 现阶段仅支持少数 EIP),它支持 Wasm 作为实现转换逻辑的一种方式,我们必须定义如何让用户轻松运送和使用 Wasm 工件。

当然,有很多方法可以做到这一点,例如使用已编译的 Wasm 模型构建自定义容器镜像,或者使 Wasm 模块在引擎可以读取的文件系统中可用,但是,由于几乎每个云原生系统都必须处理容器镜像注册表,我们可以利用 OCI 注册表和 OCI Artifacts。 

那么,什么是 OCI Artifact?
这是一项正在进行的开放容器倡议,旨在定义允许 OCI Registry 存储任意文件的规范。这并不是什么新鲜事,许多项目已经开始使用 OCI Artifacts,例如:

在我们的案例中,我们利用ORAS项目轻松地将 Wasm 模块打包为 OCI 工件,因此我们需要做的就是设置正确的媒体类型,让路由引擎识别提供 Wasm 模块的层,例如:

oras push \
    quay.io/lburgazzoli/camel-go-wasm:latest \
    etc/wasm/fn/to_upper.wasm:application/vnd.module.wasm.content.layer.v1+wasm

此命令将使用 Go Routing Engine 所需的媒体类型将simple_process.wasm模块文件推送到 quay.io(兼容的 OCI 注册表)。然后,存储 wasm 模块的层将以文件路径命名,因此它将是etc/wasm/fn/simple_process.wasm。

要使用 OCI Artifact,只需指示wasm 语言从图像中查找模块即可:

- route:
   from:
     uri: 'timer:foo?period=1s'
     steps:
       - transform:
           wasm:
             image: 'quay.io/lburgazzoli/camel-go-wasm:latest'
             path: 'etc/wasm/fn/yo_upper.wasm'
       - to:
           uri: "log:info"

此时,Camel Go 的wasm语言会检查已配置的容器镜像,然后下载并加载包含已配置的 Wasm 模块的层。

结论
在第一部分中,我详细介绍了用 GoLang 编写的类似 Apache Camel 的路由引擎的实现,该引擎利用 Wasm 实现可扩展性。在下一篇文章中,我将提供更多实现细节和部署选项。    

代码可以在我的camel-go存储库中找到