使用事件溯源、Kafka和OGG从Oracle内部复制数据

19-09-11 banq
                   

OGG:Oracle GoldenGate是一个产品,它允许我们为数据库中发生的每一项活动(Kafka)生成消息 - 更新,插入,删除 - 我们将全部获取。Debezium是一个类似的产品,可以与MySQL以及许多其他数据存储一起使用。

衍生的事件溯源实际上是从Oracle等数据库中获取事件,这与更常见的直接从发生事件的服务中获取事件的做法不同。变更数据捕获(CDC)是当前最常用的派生事件溯源,两者虽然有区别,但是事件日志是共同点。

当将OGG的变更数据写入Kafka以后,例如是写入DB.ORDERS这个主题Topic,使用Kafka Streams创建一个消费DB.ORDERS主题:

KStream<String,JsonNode> baseOrderStream = builder
                .stream(DB.ORDERS, Consumed.with(stringSerde, jsonSerde));

KStream<String, JsonNode> orderService1 = baseOrderStream
                .filter(isInsert)
                .filter(hasNonNullOrderNumber)
                .map((key,value) ->                    
KeyValue.pair(value.path("after").path("ORDER_NUMBER").asText(), 
createOrderCreatedEvent(value, ORDER_SERVICE_1)));

KStream<String, JsonNode> orderService5 = baseOrderStream
                .filter(isUpdate)
                .filter(hasNewNonNullOrderNumber)
                .map((key,value) ->                      
KeyValue.pair(value.path("after").path("ORDER_NUMBER").asText(), 
createOrderCreatedEvent(value, ORDER_SERVICE_5)));

 

                   

2