/<strong> 按特定用户生成当天的交易。具体步骤如下 * * 1.查询当天的所有执行情况 * 2.按执行顺序分组 * 3.对于每个订单号,从订单详情中获取账号 * 4.将交易分配到客户账户 * 5.存储交易 */ def generateTrades( date: LocalDate, userId: UserId ): ZStream[Any, Throwable, Trade] = queryExecutionsForDate(date) // 1 .groupByKey(_.orderNo): // 2 case (orderNo, executions) => executions .via(getAccountNoFromExecution) // 3 .via(allocateTradeToClientAccount(userId)) // 4 .via(storeTrades) // 5
/</strong> 3、从执行流中获取客户账号 */ def getAccountNoFromExecution: ZPipeline[Any, Throwable, Execution, (Execution, AccountNo)]
/<strong>4、从执行中生成交易并分配到相关客户账户 */ def allocateTradeToClientAccount(userId: UserId): ZPipeline[Any, Throwable, (Execution, AccountNo), Trade]
/</strong>5、将交易持久化到数据库并返回存储的交易 */ def storeTrades: ZPipeline[Any, Throwable, Trade, Trade]
/**1、将交易持久化到数据库并返回存储的交易 */ def queryExecutionsForDate(date: LocalDate): ZStream[Any, Throwable, Execution]
|