OGG:Oracle GoldenGate是一个产品,它允许我们为数据库中发生的每一项活动(Kafka)生成消息 - 更新,插入,删除 - 我们将全部获取。Debezium是一个类似的产品,可以与MySQL以及许多其他数据存储一起使用。
衍生的事件溯源实际上是从Oracle等数据库中获取事件,这与更常见的直接从发生事件的服务中获取事件的做法不同。变更数据捕获(CDC)是当前最常用的派生事件溯源,两者虽然有区别,但是事件日志是共同点。
当将OGG的变更数据写入Kafka以后,例如是写入DB.ORDERS这个主题Topic,使用Kafka Streams创建一个消费DB.ORDERS主题:
KStream<String,JsonNode> baseOrderStream = builder .stream(DB.ORDERS, Consumed.with(stringSerde, jsonSerde));
KStream<String, JsonNode> orderService1 = baseOrderStream .filter(isInsert) .filter(hasNonNullOrderNumber) .map((key,value) -> KeyValue.pair(value.path("after").path("ORDER_NUMBER").asText(), createOrderCreatedEvent(value, ORDER_SERVICE_1)));
KStream<String, JsonNode> orderService5 = baseOrderStream .filter(isUpdate) .filter(hasNewNonNullOrderNumber) .map((key,value) -> KeyValue.pair(value.path("after").path("ORDER_NUMBER").asText(), createOrderCreatedEvent(value, ORDER_SERVICE_5)));
|