The starting template for the Event sourcing application: https://github.com/stepin/kotlin-event-sourcing-app

In the note “Classic event sourcing“the basics have been disassembled, in “Inline event sourcing” the architecture of this template has been disassembled.

This repository is an example application, not a separate engine. So far, I am not sure that this engine can be used “as is” in other applications. At the same time, there is confidence that starting with this template, it is quite realistic to develop applications.

This project is an extraction of a common part from one of my personal projects. This is already about the 5th version of the engine (the first one was actually on Golang). At the same time, the version is new – some roughness is possible at first.

The template is based on my basic Kotlin application template: https://github.com/stepin/kotlin-bootstrap-app

Events

We begin by identifying events and entities.

Let’s say we have a simple business entity, a User:

data class User(
  displayName: String,
  firstName: String,
  seconfName: String,
  email: String
)

And we want to support the following scenarios (events):

  • User registration
  • name change
  • deleting a user

For the sake of simplicity of the example, we will not pay attention to confirmations and authorization.

Example of a user registration event:

data class UserRegistered(  
  val email: String,  
  val firstName: String?,  
  val secondName: String?,  
  val displayName: String,  
  override val accountGuid: AccountGuid,  
  override val aggregatorGuid: UserGuid = UUID.randomUUID(),  
  override val guid: EventGuid = UUID.randomUUID(),  
) : UserEvent(eventTypeVersion = 3)
  • 4 main fields: email, firstName, secondName, DisplayName
  • the guid of the event itself (random)
    • aggregator guide = user guide – it’s inconvenient that there is no synonym, but you can get used to it (and the type alias User Guide is specified)
    • account guid – the engine is designed for multi-account applications
  • data class – convenient. And it is even more convenient that UserEvent is a sealed class, you can make such constructions:
when (val e = event as UserEvent) {  
  is UserMetaUpdated -> "updated $e"  
  is UserRegistered -> "user registered with id $id ${meta.createdAt} $e"  
  is UserRemoved -> "user ${e.email} deleted at ${meta.createdAt}"  
}

The base class for the User aggregate events looks like this:

sealed class UserEvent(  
  override val eventTypeVersion: Short = 0,  
) : DomainEvent {  
  override val aggregatorType: String  
    get() = "user"  
  
  override val eventType: String  
    get() = this.javaClass.simpleName  
  
  abstract override val aggregatorGuid: UserGuid  
}
  • the DomainEvent engine interface is implemented
  • typealias UserGuid is set for aggregatorGuid – optional, as documentation
  • the type of unit is set
  • the event type is set – the name of the event class is automatically taken (for example, UserRegistered)
  • the default version of the event is set to 0, but the event can override this value

In fact, the engine requires 2 things from the event:

  • implementations of the DomainEvent interface
  • correct serialization and deserialization of JSONB

The rest is at the discretion of the developer. At the same time, the base class for all aggregate events is considered good practice.

About id/guid: in this example, it is assumed that commands work with guid, and if necessary, join in SQL queries uses id (because it is faster).

The team

Our team is either a separate Spring service, or a method inside the Spring service. In fact, the only critical point is that the EventStorePublisher interface should be used to publish events, and the engine does not limit the rest.

The registration team:

@Service  
class RegisterUser(  
  private val store: EventStorePublisher,  
  private val userRepository: UserRepository,  
) {
  data class Params(  
    val email: String,  
    val firstName: String?,  
    val secondName: String?,  
    val displayName: String?,  
  )  
  
  sealed class Response {  
    data class Created(val userGuid: UUID) : Response()  
    data class Error(val errorCode: ErrorCode) : Response()  
  }  
  
  suspend fun execute(params: Params): Response = with(params) {  
    val user = userRepository.findByEmail(email)  
    if (user != null) {  
      return Response.Error(ErrorCode.USER_ALREADY_REGISTERED)  
    }  
  
    val accountGuid = UUID.randomUUID()
    val userGuid = UUID.randomUUID()
  
    val userRegistered = UserRegistered(  
      accountGuid = accountGuid,
      aggregatorGuid = userGuid,  
      email = email,  
      firstName = firstName,  
      secondName = secondName,  
      displayName = displayName ?: calcDisplayName(email, firstName, secondName),  
    ) 
    store.publish(userRegistered)  
  
    val accountCreated = AccountCreated(  
      name = "Неизвестная компания",  
      accountGuid = accountGuid,  
      userGuid = userGuid,  
    )  
    store.publish(accountCreated)  
  
    return Response.Created(userGuid)
  }
}

The values returned from the commands depend on the business logic: whether there may be business errors, whether the guid needs to be returned, etc. In some cases, nothing may be returned.

Projectors

Example of 2 projectors in one class:

@Service  
class UserProjector(  
  private val userRepository: UserRepository,  
  private val accountRepository: AccountRepository,  
) {  
  companion object : Logging  
  
  @Projector  
  suspend fun handleUserRegistered(e: UserRegistered, meta: EventMetadata) {  
    val account = accountRepository.findByGuid(e.accountGuid)  
  
    val u = UserEntity()  
    u.accountGuid = e.accountGuid  
    u.accountId = account?.id ?: 0  
    u.guid = e.aggregatorGuid  
    u.email = e.email  
    u.displayName = e.displayName  
    u.firstName = e.firstName  
    u.secondName = e.secondName  
    u.createdAt = meta.createdAt.toInstant(ZoneOffset.UTC)  
  
    val savedUser = userRepository.save(u)  
    logger.debug { "new user id: ${savedUser.id}" }  
  }  
  
  @Projector  
  suspend fun handleUserRemoved(e: UserRemoved) {  
    val user = getUser(e.aggregatorGuid)  
    userRepository.delete(user)  
  }  
  
  private suspend fun getUser(userGuid: UUID) = userRepository.findByGuid(userGuid)  
    ?: throw DomainException(ErrorCode.USER_NOT_FOUND)
}
  • the projector method should be in the Spring bin
  • there should be an annotation @Projector
  • there can be several methods in a class – there are no restrictions
  • the first argument is an event
  • second (optional) – event metadata
  • the method should be suspend (in principle, this restriction can be removed, but now it is in the engine, and I do not plan to use it without suspend)
    • exception in the projector to cancel saving the event

Reactors

@Service  
class UserRegisteredEmailReactor(  
  private val emailService: SendEmailService,  
) {  
  companion object : Logging  
  
  @Reactor  
  suspend fun handle(e: UserRegistered) {  
    emailService.sendEmailConfirmationEmail(e.displayName, e.email, e.aggregatorGuid.toString())  
  }  
}
  • the projector method should be in the Spring bin
  • there should be an annotation @Reactor
  • there can be several methods in a class – there are no restrictions
  • the first argument is an event
  • second (optional) – event metadata
  • the method should be suspend (in principle, this restriction can be removed, but now it is in the engine, and I do not plan to use it without suspend)
  • an exception in the reactor will NOT cancel the saving of the event and the start of other reactors

Reading data

Reading the data of the main projection – no restrictions, as usual.

Event reading is also available:

interface EventStoreReader {
  fun <T : DomainEvent> findEventsSinceId(  
    eventIdFrom: Long,  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>  

  fun <T : DomainEvent> findEventsSinceGuid(  
    eventGuidFrom: UUID,  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>  
 
  fun <T : DomainEvent> findEventsSinceDate(  
    date: LocalDateTime,  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>  

  fun <T : DomainEvent> findEvents(  
    aggregator: String? = null,  
    aggregatorGuid: UUID? = null,  
    accountGuid: AccountGuid? = null,  
    eventTypes: List<String>? = null,  
    maxBatchSize: Int? = null,  
  ): Flow<DomainEventWithIdAndMeta<T>>
}

This API can be used to get a history or to create asynchronous projections.

You can potentially write your own event reading API, jOOQ has everything for this.

You can also do full or partial database regeneration (application startup arguments or custom code).

An example of getting a story (of course, you can mix reading from events and from the main projection, because it’s all in even one database):

@Service  
class DebugService(  
  private val eventStoreReader: EventStoreReader,  
) {  
  suspend fun getUserAudit(userGuid: UUID): List<String> {  
    return eventStoreReader.findEvents<UserEvent>("user", userGuid, maxBatchSize = 100)  
      .map { (id, event, meta) ->  
        when (event) {  
          is UserMetaUpdated -> "updated $event"  
          is UserRegistered -> "user registered with id $id ${meta.createdAt} $event"  
          is UserRemoved -> "user deleted at ${meta.createdAt}"  
        }  
    }  
  }
}

It’s a little ugly in the API here – there is no connection between “user” and User Event. It might make sense to pass a base class, but it’s abstract. If anyone has any ideas on how to make the API better (without the line “user” and without casting “as User Event”), they will be glad to read it.

Restrictions

  • In this implementation, the Event Bus is not implemented (for broadcasting events through some Kafka or NATS), but nothing prevents this from being screwed on if anyone needs it.

The result

The code is slightly larger due to the allocation of a separate abstraction – the Event. Also, time is spent on abstraction itself - naming, highlighting fields, etc.

CRUD gets more code, but not as much as it might seem - you need to train yourself to think in business domain events, rather than create/delete an entry in the database table.

Overall, I like it, that’s why I decided to share it with the community.