Scala的Event Sourcing使用案例

13-05-15 banq
                   

EventSourced使用一个Event Store事件存储的库包,能够无锁控制事件的存储和重新播放。

dwestheide/eventhub-flatmap2013 · GitHub代码展示了如何使用EventSourced库包进行CQRS的命令或事件存储播放。其核心围绕状态。

User.java为案例:这是一个DDD聚合根,,包含两个方法:用户注册命令和用户注册事件:

  // commands for the user aggregate:
//用户注册命令
  sealed trait UserCommand extends Command
  case class RegisterUser(
      uniqueName: String,
      firstName: String,
      lastName: String,
      password: String,
      issuedAt: DateTime) extends UserCommand

  // domain events originating from the user aggregate:
//用户注册事件
  sealed trait UserEvent extends DomainEvent
  case class UserRegistered(
    uniqueName: String,
    firstName: String,
    lastName: String,
    occurredAt: DateTime,
    snr: Long
  ) extends UserEvent

}
<p>

当接收到UI的用户注册命令后,服务器通过IdentityApiService接收处理:

  pathPrefix("api") {
      respondWithMediaType(`application/json`) {
        pathPrefix("users" / "commands") {
          post {
            path("register") {
              entity(as[RegisterUser]) { command =>
                complete {
                  userService.process(command).map(createCommandResponse(_).userInfo)
                }
              }
            }
          }
        } ~
<p>

委托有界上下文的user.scala中的UserService:

class UserService(processor: ActorRef) {

    import com.danielwestheide.eventhub.iam.domain.user.UserCommand
    import concurrent.Future
    import akka.pattern.ask
    import org.eligosource.eventsourced.core.Message
    import akka.util.Timeout
    import scala.concurrent.duration._

    implicit val timeout = Timeout(5.seconds)
    def process(command: UserCommand): Future[Result] =
      (processor ? Message(command)).mapTo[Result]
  }
<p>

userservice其实发送消息给Actor模型userProcessor处理:

class UserProcessor(userRepository: UserRepository) extends Actor {
    this: Emitter =>
    import com.danielwestheide.eventhub.iam.domain.user.RegisterUser
    import com.danielwestheide.eventhub.iam.domain.user.UserRegistered
    override def receive = {
      case RegisterUser(uniqueName, firstName, lastName, password, issuedAt) =>
        if (userRepository.contains(uniqueName))
          sender ! s"User name $uniqueName is already taken".fail
        else {
          import com.danielwestheide.eventhub.iam.domain.user.UserInformation
          val result = userRepository.saveOrUpdate(
            User(UserInformation(uniqueName, firstName, lastName), password))
          result.foreach { _ =>
            val snr = sequenceNr
            emitter("iam").sendEvent(
              UserRegistered(uniqueName, firstName, lastName, issuedAt, snr))
          }
          sender ! result
        }
    }
  }

<p>

注意,UserProcessor中根据接收的消息,判断是否是用户注册命令,如果是执行聚合User的相应命令,然后在再激活领域事件:

emitter("iam").sendEvent(

UserRegistered(uniqueName, firstName, lastName, issuedAt, snr))

                   

1