Scala的Event Sourcing使用案例

EventSourced使用一个Event Store事件存储的库包,能够无锁控制事件的存储和重新播放。

dwestheide/eventhub-flatmap2013 · GitHub代码展示了如何使用EventSourced库包进行CQRS的命令或事件存储播放。其核心围绕状态。

User.java为案例:这是一个DDD聚合根,,包含两个方法:用户注册命令和用户注册事件:


// commands for the user aggregate:
//用户注册命令
sealed trait UserCommand extends Command
case class RegisterUser(
uniqueName: String,
firstName: String,
lastName: String,
password: String,
issuedAt: DateTime) extends UserCommand

// domain events originating from the user aggregate:
//用户注册事件
sealed trait UserEvent extends DomainEvent
case class UserRegistered(
uniqueName: String,
firstName: String,
lastName: String,
occurredAt: DateTime,
snr: Long
) extends UserEvent

}

当接收到UI的用户注册命令后,服务器通过IdentityApiService接收处理:


pathPrefix("api") {
respondWithMediaType(`application/json`) {
pathPrefix(
"users" / "commands") {
post {
path(
"register") {
entity(as[RegisterUser]) { command =>
complete {
userService.process(command).map(createCommandResponse(_).userInfo)
}
}
}
}
} ~

委托有界上下文的user.scala中的UserService:


class UserService(processor: ActorRef) {

import com.danielwestheide.eventhub.iam.domain.user.UserCommand
import concurrent.Future
import akka.pattern.ask
import org.eligosource.eventsourced.core.Message
import akka.util.Timeout
import scala.concurrent.duration._

implicit val timeout = Timeout(5.seconds)
def process(command: UserCommand): Future[Result] =
(processor ? Message(command)).mapTo[Result]
}

userservice其实发送消息给Actor模型userProcessor处理:

class UserProcessor(userRepository: UserRepository) extends Actor {
this: Emitter =>
import com.danielwestheide.eventhub.iam.domain.user.RegisterUser
import com.danielwestheide.eventhub.iam.domain.user.UserRegistered
override def receive = {
case RegisterUser(uniqueName, firstName, lastName, password, issuedAt) =>
if (userRepository.contains(uniqueName))
sender ! s"User name $uniqueName is already taken".fail
else {
import com.danielwestheide.eventhub.iam.domain.user.UserInformation
val result = userRepository.saveOrUpdate(
User(UserInformation(uniqueName, firstName, lastName), password))
result.foreach { _ =>
val snr = sequenceNr
emitter(
"iam").sendEvent(
UserRegistered(uniqueName, firstName, lastName, issuedAt, snr))
}
sender ! result
}
}
}

注意,UserProcessor中根据接收的消息,判断是否是用户注册命令,如果是执行聚合User的相应命令,然后在再激活领域事件:
emitter("iam").sendEvent(
UserRegistered(uniqueName, firstName, lastName, issuedAt, snr))