skills/j5ik2o/okite-ai/pekko-cqrs-es-implementation

pekko-cqrs-es-implementation

SKILL.md

Pekko CQRS/ES 実装ガイド

Apache Pekko + Scala 3でCQRS/Event Sourcingを実装する際の具体的なパターン集。

適用条件

このスキルは以下のすべてを満たす場合にのみ使用する:

  1. プログラミング言語がScala(3.x推奨)であること
  2. CQRS/Event Sourcingアーキテクチャを採用していること
  3. Apache Pekko(またはAkka)をアクターフレームワークとして使用していること

上記を満たさない場合は、cqrs-tradeoffscqrs-to-event-sourcingcqrs-aggregate-modelingなど 言語非依存のスキルを使用すること。

アーキテクチャ概要

システム全体のデータフロー

クライアント
コマンドAPI(GraphQL Mutation)
ユースケース層(ZIO)
集約アクター(Pekko Typed)
ドメインモデル(純粋Scala)→ イベント生成
イベントストア(DynamoDB)
DynamoDB Streams
リードモデルアップデータ(AWS Lambda)
リードモデル(PostgreSQL)
クエリAPI(GraphQL Query)

コマンド側とクエリ側の分離

コマンド側 クエリ側
目的 ビジネスルール実行 効率的なデータ取得
データストア DynamoDB(イベントストア) PostgreSQL(リードモデル)
API GraphQL Mutation GraphQL Query
レイヤー ドメイン → ユースケース → インターフェースアダプタ インターフェースアダプタのみ
整合性 強い整合性(集約内) 結果整合性

クエリ側にユースケース層はない。 GraphQL自体がユースケースに相当する。

モジュール構成

modules/
├── command/
│   ├── domain/                          # ドメイン層(純粋Scala)
│   ├── use-case/                        # ユースケース層(ZIO)
│   ├── interface-adapter/               # アクター、GraphQL、シリアライザ
│   ├── interface-adapter-contract/      # コマンド/リプライのプロトコル定義
│   └── interface-adapter-event-serializer/  # Protocol Buffersシリアライザ
├── query/
│   ├── interface-adapter/               # DAO(Slick)、GraphQL
│   └── flyway-migration/               # DBマイグレーション
└── infrastructure/                      # 共有ユーティリティ

apps/
├── command-api/                         # コマンド側HTTPサーバー
├── query-api/                           # クエリ側HTTPサーバー
└── read-model-updater/                  # AWS Lambda

依存方向

domain ← use-case ← interface-adapter ← apps
  │                       ↑
  │               interface-adapter-contract
  │               interface-adapter-event-serializer
  └── Scalaのみに依存。フレームワーク非依存。

コマンド側実装パターン

1. ドメインモデル

原則: ドメインモデルは純粋なScalaコードで、Pekkoに一切依存しない。

集約(trait + private case class)

trait UserAccount extends Entity {
  override type IdType = UserAccountId
  def id: UserAccountId
  def name: UserAccountName
  def emailAddress: EmailAddress
  def createdAt: DateTime
  def updatedAt: DateTime

  def rename(newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)]
  def delete: Either[DeleteError, (UserAccount, UserAccountEvent)]
}

object UserAccount {
  // ファクトリ: 新しい状態とイベントのペアを返す
  def apply(
    id: UserAccountId,
    name: UserAccountName,
    emailAddress: EmailAddress,
    createdAt: DateTime = DateTime.now(),
    updatedAt: DateTime = DateTime.now()
  ): (UserAccount, UserAccountEvent) =
    (
      UserAccountImpl(id, false, name, emailAddress, createdAt, updatedAt),
      UserAccountEvent.Created_V1(
        id = DomainEventId.generate(),
        entityId = id,
        name = name,
        emailAddress = emailAddress,
        occurredAt = DateTime.now()
      ))

  // 実装はprivateで隠蔽
  private final case class UserAccountImpl(
    id: UserAccountId,
    deleted: Boolean,
    name: UserAccountName,
    emailAddress: EmailAddress,
    createdAt: DateTime,
    updatedAt: DateTime
  ) extends UserAccount {

    override def rename(
      newName: UserAccountName): Either[RenameError, (UserAccount, UserAccountEvent)] =
      if (name == newName) {
        Left(RenameError.FamilyNameSame)
      } else {
        val updated = this.copy(name = newName, updatedAt = DateTime.now())
        val event = UserAccountEvent.Renamed_V1(
          id = DomainEventId.generate(),
          entityId = id,
          oldName = name,
          newName = newName,
          occurredAt = DateTime.now()
        )
        Right((updated, event))
      }

    override def delete: Either[DeleteError, (UserAccount, UserAccountEvent)] =
      if (deleted) {
        Left(DeleteError.AlreadyDeleted)
      } else {
        val updated = copy(deleted = true, updatedAt = DateTime.now())
        val event = UserAccountEvent.Deleted_V1(
          id = DomainEventId.generate(),
          entityId = id,
          occurredAt = DateTime.now()
        )
        Right((updated, event))
      }
  }
}

重要なパターン:

  • 状態変更メソッドは Either[Error, (NewState, Event)] を返す
  • ファクトリも (State, Event) のペアを返す
  • 実装クラスは private で外部から直接構築できない
  • ドメインモデル内でPekkoのimportは一切ない

イベント定義(enum + バージョニング)

enum UserAccountEvent extends DomainEvent {
  override type EntityIdType = UserAccountId

  case Created_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    name: UserAccountName,
    emailAddress: EmailAddress,
    occurredAt: DateTime
  )

  case Renamed_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    oldName: UserAccountName,
    newName: UserAccountName,
    occurredAt: DateTime
  )

  case Deleted_V1(
    id: DomainEventId,
    entityId: UserAccountId,
    occurredAt: DateTime
  )
}

イベント設計ルール:

ルール 説明
過去形で命名 「何が起きたか」を表す Created, Renamed, Deleted
_V1サフィックス スキーマ進化に対応 Renamed_V1, Renamed_V2
不変 case classで自動的に保証
自己完結 変更前後の値を含む oldName, newName
必須フィールド id, entityId, occurredAt すべてのイベントに共通

2. 集約の状態(enum)

enum UserAccountAggregateState {
  case NotCreated(id: UserAccountId)
  case Created(user: UserAccount)
  case Deleted(user: UserAccount)

  def applyEvent(event: UserAccountEvent): UserAccountAggregateState = (this, event) match {
    case (NotCreated(id), UserAccountEvent.Created_V1(_, entityId, name, emailAddress, _))
        if id == entityId =>
      Created(UserAccount(entityId, name, emailAddress)._1)

    case (Created(user), UserAccountEvent.Renamed_V1(_, entityId, _, newName, _))
        if user.id == entityId =>
      Created(user.rename(newName) match {
        case Right((u, _)) => u
        case Left(error) =>
          throw new IllegalStateException(s"Failed to rename user: $error")
      })

    case (Created(user), UserAccountEvent.Deleted_V1(_, entityId, _))
        if user.id == entityId =>
      Deleted(user.delete match {
        case Right((deletedUser, _)) => deletedUser
        case Left(error) =>
          throw new IllegalStateException(s"Failed to delete user: $error")
      })

    case _ =>
      throw new IllegalStateException(s"Cannot apply event $event to state $this")
  }
}

状態遷移の型安全性:

  • NotCreatedCreated(Created_V1イベントのみ)
  • CreatedCreated(Renamed_V1)/ Deleted(Deleted_V1)
  • Deleted → 遷移なし(どのイベントも受け付けない)

3. プロトコル定義(コマンド/リプライ)

object UserAccountProtocol {
  // コマンド: すべてidを持つ
  sealed trait Command { def id: UserAccountId }
  final case class Create(
    id: UserAccountId, name: UserAccountName,
    emailAddress: EmailAddress, replyTo: ActorRef[CreateReply]) extends Command
  final case class Rename(
    id: UserAccountId, newName: UserAccountName,
    replyTo: ActorRef[RenameReply]) extends Command
  final case class Delete(
    id: UserAccountId, replyTo: ActorRef[DeleteReply]) extends Command
  final case class Get(
    id: UserAccountId, replyTo: ActorRef[GetReply]) extends Command

  // リプライ: コマンドごとに専用の型
  sealed trait CreateReply
  final case class CreateSucceeded(id: UserAccountId) extends CreateReply

  sealed trait RenameReply
  final case class RenameSucceeded(id: UserAccountId) extends RenameReply
  final case class RenameFailed(id: UserAccountId, reason: RenameError) extends RenameReply

  sealed trait DeleteReply
  final case class DeleteSucceeded(id: UserAccountId) extends DeleteReply
  final case class DeleteFailed(id: UserAccountId, reason: DeleteError) extends DeleteReply

  sealed trait GetReply
  final case class GetSucceeded(value: UserAccount) extends GetReply
  final case class GetNotFoundFailed(id: UserAccountId) extends GetReply
}

プロトコル設計ルール:

  • コマンドはすべて sealed trait Command を継承し、id を持つ
  • リプライはコマンドごとに専用の sealed trait を定義する(CreateReply, RenameReply等)
  • replyTo: ActorRef[XxxReply] で型安全な応答を保証
  • 成功/失敗をcase classで表現し、パターンマッチで網羅性チェック

4. 集約アクター(PersistenceEffector)

object UserAccountAggregate {

  // 状態ごとにハンドラ関数を分離
  private def handleNotCreated(
    state: UserAccountAggregateState.NotCreated,
    effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
  ): Behavior[Command] = Behaviors.receiveMessagePartial {
    case Create(id, name, emailAddress, replyTo) if state.id == id =>
      val (newState, event) = UserAccount(id, name, emailAddress)
      effector.persistEvent(event) { _ =>
        replyTo ! CreateSucceeded(id)
        handleCreated(UserAccountAggregateState.Created(newState), effector)
      }
    case Get(id, replyTo) if state.id == id =>
      replyTo ! GetNotFoundFailed(id)
      Behaviors.same
  }

  private def handleCreated(
    state: UserAccountAggregateState.Created,
    effector: PersistenceEffector[UserAccountAggregateState, UserAccountEvent, Command]
  ): Behavior[Command] = Behaviors.receiveMessagePartial {
    case Rename(id, newName, replyTo) if state.user.id == id =>
      // ドメインモデルに委譲
      state.user.rename(newName) match {
        case Left(reason) =>
          replyTo ! RenameFailed(id, reason)
          Behaviors.same
        case Right((newUser, event)) =>
          effector.persistEvent(event) { _ =>
            replyTo ! RenameSucceeded(id)
            handleCreated(state.copy(user = newUser), effector)
          }
      }
    // ... Delete, Get も同様
  }

  // エントリポイント
  def apply(id: UserAccountId): Behavior[Command] = {
    val config = PersistenceEffectorConfig
      .create[UserAccountAggregateState, UserAccountEvent, Command](
        persistenceId = s"${id.entityTypeName}-${id.asString}",
        initialState = UserAccountAggregateState.NotCreated(id),
        applyEvent = (state, event) => state.applyEvent(event)
      )
      .withPersistenceMode(PersistenceMode.Persisted)
      .withSnapshotCriteria(SnapshotCriteria.every(1000))
      .withRetentionCriteria(RetentionCriteria.snapshotEvery(2))

    Behaviors.setup[Command] { implicit ctx =>
      Behaviors
        .supervise(
          PersistenceEffector.fromConfig(config) {
            case (state: UserAccountAggregateState.NotCreated, effector) =>
              handleNotCreated(state, effector)
            case (state: UserAccountAggregateState.Created, effector) =>
              handleCreated(state, effector)
            case (state: UserAccountAggregateState.Deleted, effector) =>
              handleDeleted(state, effector)
          })
        .onFailure[IllegalArgumentException](SupervisorStrategy.restart)
    }
  }
}

アクター実装ルール:

  • ドメインロジックをアクターに書かない。 アクターは永続化とライフサイクル管理に徹する
  • 状態ごとにハンドラ関数を分離し、受け付けるコマンドを限定する
  • effector.persistEvent(event) { _ => ... } でイベント永続化後にリプライ
  • ドメインモデルが Left を返したらリプライのみ(永続化しない)
  • ドメインモデルが Right を返したらイベントを永続化してからリプライ

5. ユースケース層(ZIO)

private[users] final class UserAccountUseCaseImpl(
  userAccountAggregateRef: ActorRef[UserAccountProtocol.Command]
)(implicit
  timeout: Timeout,
  scheduler: Scheduler,
  ec: ExecutionContext
) extends UserAccountUseCase {

  override def createUserAccount(
    userAccountName: UserAccountName,
    emailAddress: EmailAddress
  ): IO[UserAccountUseCaseError, UserAccountId] =
    for {
      userAccountId <- ZIO.succeed(UserAccountId.generate())
      reply <- askActor[UserAccountProtocol.CreateReply] { replyTo =>
        UserAccountProtocol.Create(
          id = userAccountId,
          name = userAccountName,
          emailAddress = emailAddress,
          replyTo = replyTo
        )
      }.mapError(e => UserAccountUseCaseError.UnexpectedError(e.getMessage, Some(e)))
      result <- reply match {
        case UserAccountProtocol.CreateSucceeded(id) => ZIO.succeed(id)
      }
    } yield result

  private def askActor[R](
    createMessage: ActorRef[R] => UserAccountProtocol.Command
  ): Task[R] =
    PekkoInterop.fromFuture { userAccountAggregateRef.ask(createMessage) }
}

ユースケース層のルール:

  • ビジネスロジックを書く場所ではない。 処理ステップの調整役に徹する
  • ZIOのfor式でアクターとの通信を型安全に記述
  • askActor ヘルパーでPekko Ask → ZIO Task変換を共通化
  • エラーは UserAccountUseCaseError にマッピング

シリアライズ

Protocol Buffersによるイベントシリアライズ

// event.proto
syntax = "proto3";

message UserAccountCreatedV1 {
  string id = 1;
  string entity_id = 2;
  string name = 3;
  string email_address = 4;
  string occurred_at = 5;
}

message UserAccountRenamedV1 {
  string id = 1;
  string entity_id = 2;
  string old_name = 3;
  string new_name = 4;
  string occurred_at = 5;
}

シリアライズ方針:

  • イベントとスナップショットの両方をProtocol Buffersで定義
  • ScalaPBでScalaコードを自動生成
  • カスタムシリアライザでドメインイベント ↔ Protobufメッセージを変換
  • バージョニングはProtobufのフィールド番号で後方互換性を確保

スナップショット戦略

設定 理由
保存頻度 1000イベントごと 起動時間とストレージのバランス
保持数 最新2つ リカバリ安全性の確保

テスト戦略

ドメインモデルテスト(Pekko非依存)

test("ユーザー名を変更できる") {
  val (user, _) = UserAccount(id, name, email)
  val result = user.rename(newName)
  result match {
    case Right((updated, event)) =>
      updated.name shouldBe newName
      event shouldBe a[UserAccountEvent.Renamed_V1]
    case Left(error) => fail(s"Unexpected error: $error")
  }
}

アクターテスト(ActorTestKit)

// PersistenceEffectorのテスト
val probe = testKit.createTestProbe[CreateReply]()
aggregateRef ! Create(id, name, email, probe.ref)
probe.expectMessage(CreateSucceeded(id))

テストの分離

レベル 対象 ツール Pekko依存
ドメイン ビジネスロジック ScalaTest なし
アクター メッセージング、永続化 ActorTestKit あり
シリアライザ イベント/スナップショットの変換 ScalaTest なし
統合 エンドツーエンド LocalStack あり

関連スキルとの使い分け

スキル フォーカス 使うタイミング
本スキル Pekko + Scala 3での具体的実装 CQRS/ESをScalaで実装するとき
cqrs-tradeoffs CQRS採用判断のトレードオフ分析 CQRS導入の是非を検討するとき
cqrs-to-event-sourcing なぜESが必要になるかの論理的説明 ESの必然性を理解したいとき
cqrs-aggregate-modeling CQRS導入時の集約境界再定義 集約の粒度を見直すとき
aggregate-design 集約設計ルール全般 集約の新規設計やレビューのとき
domain-building-blocks VO/Entity/Aggregate等の設計 ドメインモデル全体を設計するとき

参考文献

関連スキル(併読推奨)

このスキルを使用する際は、以下のスキルも併せて参照すること:

  • cqrs-tradeoffs: CQRS/ES採用判断のトレードオフ分析
  • cqrs-to-event-sourcing: CQRSからイベントソーシングへの必然性
  • aggregate-design: ドメインモデルとしての集約設計ルール
Weekly Installs
14
Repository
j5ik2o/okite-ai
GitHub Stars
73
First Seen
12 days ago
Installed on
opencode14
gemini-cli14
github-copilot14
codex14
amp14
cline14