EventSourced使用一个Event Store事件存储的库包,能够无锁控制事件的存储和重新播放。
dwestheide/eventhub-flatmap2013 · GitHub代码展示了如何使用EventSourced库包进行CQRS的命令或事件存储播放。其核心围绕状态。
以User.java为案例:这是一个DDD聚合根,,包含两个方法:用户注册命令和用户注册事件:
// commands for the user aggregate: //用户注册命令 sealed trait UserCommand extends Command case class RegisterUser( uniqueName: String, firstName: String, lastName: String, password: String, issuedAt: DateTime) extends UserCommand
// domain events originating from the user aggregate: //用户注册事件 sealed trait UserEvent extends DomainEvent case class UserRegistered( uniqueName: String, firstName: String, lastName: String, occurredAt: DateTime, snr: Long ) extends UserEvent
}
|
当接收到UI的用户注册命令后,服务器通过IdentityApiService接收处理:
pathPrefix("api") { respondWithMediaType(`application/json`) { pathPrefix("users" / "commands") { post { path("register") { entity(as[RegisterUser]) { command => complete { userService.process(command).map(createCommandResponse(_).userInfo) } } } } } ~
|
委托有界上下文的user.scala中的UserService:
class UserService(processor: ActorRef) {
import com.danielwestheide.eventhub.iam.domain.user.UserCommand import concurrent.Future import akka.pattern.ask import org.eligosource.eventsourced.core.Message import akka.util.Timeout import scala.concurrent.duration._
implicit val timeout = Timeout(5.seconds) def process(command: UserCommand): Future[Result] = (processor ? Message(command)).mapTo[Result] }
|
userservice其实发送消息给Actor模型userProcessor处理:
class UserProcessor(userRepository: UserRepository) extends Actor { this: Emitter => import com.danielwestheide.eventhub.iam.domain.user.RegisterUser import com.danielwestheide.eventhub.iam.domain.user.UserRegistered override def receive = { case RegisterUser(uniqueName, firstName, lastName, password, issuedAt) => if (userRepository.contains(uniqueName)) sender ! s"User name $uniqueName is already taken".fail else { import com.danielwestheide.eventhub.iam.domain.user.UserInformation val result = userRepository.saveOrUpdate( User(UserInformation(uniqueName, firstName, lastName), password)) result.foreach { _ => val snr = sequenceNr emitter("iam").sendEvent( UserRegistered(uniqueName, firstName, lastName, issuedAt, snr)) } sender ! result } } }
|
注意,UserProcessor中根据接收的消息,判断是否是用户注册命令,如果是执行聚合User的相应命令,然后在再激活领域事件:
emitter("iam").sendEvent(
UserRegistered(uniqueName, firstName, lastName, issuedAt, snr))