diff --git a/packages/Tempest/src/Config/TempestTenantDatabaseSwitcher.php b/packages/Tempest/src/Config/TempestTenantDatabaseSwitcher.php index 15a039fa7..97d5ae506 100644 --- a/packages/Tempest/src/Config/TempestTenantDatabaseSwitcher.php +++ b/packages/Tempest/src/Config/TempestTenantDatabaseSwitcher.php @@ -11,6 +11,10 @@ use Tempest\Database\Connection\Connection; use Tempest\Database\Connection\PDOConnection; use Tempest\Database\Database; +use Tempest\Database\GenericDatabase; +use Tempest\Database\Transactions\GenericTransactionManager; +use Tempest\EventBus\EventBus; +use Tempest\Mapper\SerializerFactory; /** * licence Apache-2.0 @@ -51,12 +55,14 @@ public function switchOn(string|ConnectionReference $activatedConnection): void $container->singleton(Connection::class, $connection, tag: $configTag); } - // Register the tenant's already-built Connection as the default so IsDatabaseModel - // and Ecotone's DBAL share one PDO — enabling transaction rollback across both + // Promote the tenant's already-built Connection as the default so IsDatabaseModel + // and Ecotone's DBAL share one PDO — enabling transaction rollback across both. + // The default Database is rebuilt from that same Connection and registered as a + // singleton, otherwise Tempest's DatabaseInitializer would resolve it lazily from the + // discovered default DatabaseConfig and clobber the promoted Connection back to it. $tenantConnection = $container->get(Connection::class, tag: $configTag); - $container->singleton(DatabaseConfig::class, $tenantConfig); $container->singleton(Connection::class, $tenantConnection); - $container->unregister(Database::class); + $container->singleton(Database::class, $this->buildDatabaseFor($container, $tenantConnection)); // Close the Doctrine Connection so TempestDynamicDriver reconnects on next use, // picking up the now-promoted default Connection's PDO @@ -73,6 +79,16 @@ public function switchOff(): void $this->closeDoctrineDefaultConnection($container); } + private function buildDatabaseFor(GenericContainer $container, Connection $connection): GenericDatabase + { + return new GenericDatabase( + $connection, + new GenericTransactionManager($connection), + $container->get(SerializerFactory::class), + $container->get(EventBus::class), + ); + } + private function closeDoctrineDefaultConnection(GenericContainer $container): void { if (! $container->has(DbalConnectionFactory::class)) { diff --git a/packages/Tempest/tests/Fixture/TenantAggregate/RegisterTenantProduct.php b/packages/Tempest/tests/Fixture/TenantAggregate/RegisterTenantProduct.php new file mode 100644 index 000000000..2fcda1f7a --- /dev/null +++ b/packages/Tempest/tests/Fixture/TenantAggregate/RegisterTenantProduct.php @@ -0,0 +1,16 @@ + TempestConnectionReference::create('tenant_a'), + 'tenant_b' => TempestConnectionReference::create('tenant_b'), + ], + ); + } +} diff --git a/packages/Tempest/tests/Fixture/TenantAggregate/TenantProduct.php b/packages/Tempest/tests/Fixture/TenantAggregate/TenantProduct.php new file mode 100644 index 000000000..c7b956a44 --- /dev/null +++ b/packages/Tempest/tests/Fixture/TenantAggregate/TenantProduct.php @@ -0,0 +1,44 @@ +name = $command->name; + $product->save(); + + return $product; + } + + #[IdentifierMethod('id')] + public function getId(): string + { + return (string) $this->id->value; + } +} diff --git a/packages/Tempest/tests/MultiTenant/TenantAggregatePersistenceTest.php b/packages/Tempest/tests/MultiTenant/TenantAggregatePersistenceTest.php new file mode 100644 index 000000000..2c16d4d90 --- /dev/null +++ b/packages/Tempest/tests/MultiTenant/TenantAggregatePersistenceTest.php @@ -0,0 +1,112 @@ +setupKernel(); + + $this->container->config(TempestDatabaseConfigFactory::primary('tenant_a')); + $this->container->config(TempestDatabaseConfigFactory::secondary('tenant_b')); + + $this->createTenantProductsTable($this->postgresConnection()); + $this->createTenantProductsTable($this->mysqlConnection()); + + $this->commandBus = $this->container->get(CommandBus::class); + } + + protected function tearDown(): void + { + $this->postgresConnection()->exec('DROP TABLE IF EXISTS tenant_products'); + $this->mysqlConnection()->exec('DROP TABLE IF EXISTS tenant_products'); + parent::tearDown(); + } + + public function test_tempest_model_aggregate_persists_to_the_active_tenant(): void + { + $this->commandBus->send(new RegisterTenantProduct('Alice'), metadata: ['tenant' => 'tenant_a']); + $this->commandBus->send(new RegisterTenantProduct('Bob'), metadata: ['tenant' => 'tenant_a']); + $this->commandBus->send(new RegisterTenantProduct('Carol'), metadata: ['tenant' => 'tenant_b']); + + $this->assertSame(['Alice', 'Bob'], $this->registeredNames($this->postgresConnection())); + $this->assertSame(['Carol'], $this->registeredNames($this->mysqlConnection())); + } + + private function registeredNames(PDO $pdo): array + { + return $pdo->query('SELECT name FROM tenant_products ORDER BY name')->fetchAll(PDO::FETCH_COLUMN); + } + + private function createTenantProductsTable(PDO $pdo): void + { + $pdo->exec('DROP TABLE IF EXISTS tenant_products'); + $pdo->exec( + 'CREATE TABLE tenant_products ( + id VARCHAR(36) NOT NULL, + name VARCHAR(255) NOT NULL, + PRIMARY KEY (id) + )', + ); + } + + private function postgresConnection(): PDO + { + $config = TempestDatabaseConfigFactory::primary(); + + return new PDO($config->dsn, $config->username, $config->password); + } + + private function mysqlConnection(): PDO + { + $config = TempestDatabaseConfigFactory::secondary(); + + return new PDO($config->dsn, $config->username, $config->password); + } +} diff --git a/quickstart-examples/Tempest/Model/.gitignore b/quickstart-examples/Tempest/Model/.gitignore new file mode 100644 index 000000000..779352374 --- /dev/null +++ b/quickstart-examples/Tempest/Model/.gitignore @@ -0,0 +1,3 @@ +/vendor/ +composer.lock +/.tempest/ diff --git a/quickstart-examples/Tempest/Model/README.md b/quickstart-examples/Tempest/Model/README.md new file mode 100644 index 000000000..0ceec6f90 --- /dev/null +++ b/quickstart-examples/Tempest/Model/README.md @@ -0,0 +1,106 @@ +# Tempest Model — Active-Record Model as an Ecotone Aggregate + +## 1. What you'll learn + +This example shows how a **Tempest active-record model** (`use Tempest\Database\IsDatabaseModel`) becomes an **Ecotone `#[Aggregate]`**. The model carries its own `#[CommandHandler]` and `#[QueryHandler]` methods, and Ecotone persists it automatically through the `ecotone/tempest` package's `TempestRepository` (a `StandardRepository` that calls the model's own `save()`). + +The same aggregate is then exercised three ways: + +1. **Command Bus calling the model directly** — commands route to the model's handlers; persistence is automatic. +2. **`#[Repository]` business interface** — an Ecotone gateway that loads (and saves) the aggregate. +3. **`#[DbalQuery]` business interface** — an SQL read-side gateway over the underlying table. + +## 2. How it fits together + +```mermaid +flowchart LR + Client -->|send command| CommandBus + CommandBus -->|route| Product["Product\n#[Aggregate] + IsDatabaseModel"] + Product -->|save| Table[(products table\nPostgreSQL)] + Client -->|getBy / findBy / save| ProductRepository["ProductRepository\n#[Repository] gateway"] + ProductRepository -->|TempestRepository| Table + Client -->|findAll| ProductFinder["ProductFinder\n#[DbalQuery] gateway"] + ProductFinder -->|SELECT| Table +``` + +*Files involved:* +- `app/Domain/Product.php` — the Tempest model annotated with `#[Aggregate]` +- `app/Domain/Command/RegisterProduct.php`, `ChangePrice.php` — command messages +- `app/ProductRepository.php` — `#[Repository]` business-interface gateway (load + save) +- `app/ProductFinder.php` — `#[DbalQuery]` read-side business interface +- `app/Infrastructure/EcotoneConfiguration.php` — registers Tempest's `DatabaseConfig` as Ecotone's default `DbalConnectionFactory` +- `app/Infrastructure/ConnectionFactoryInitializer.php` — exposes the same DBAL connection to Tempest autowiring +- `app/database.config.php` — the Tempest `PostgresConfig` + +## 3. The model as an aggregate + +```php +#[Aggregate] +final class Product +{ + use IsDatabaseModel; + + public PrimaryKey $id; + public string $name; + public int $price; + + #[CommandHandler] + public static function register(RegisterProduct $command): self { /* new self(); ...; $product->save(); */ } + + #[CommandHandler(routingKey: 'product.changePrice')] + public function changePrice(ChangePrice $command): void { $this->price = $command->price; } + + #[QueryHandler('product.getPrice')] + public function getPrice(): int { return $this->price; } + + #[IdentifierMethod('id')] + public function getId(): int { return $this->id->value; } +} +``` + +- The **static** handler is the factory: it creates the row (and calls `save()` so the generated id is returned to the caller). +- The **instance** handler mutates state; `TempestRepository::save()` persists it after the handler returns — no explicit `save()` needed. +- `#[IdentifierMethod('id')]` maps Ecotone's aggregate identifier to the Tempest `PrimaryKey`'s int `value`, so commands/queries target the right row via `metadata: ['aggregate.id' => $id]`. + +## 4. Identifier mapping and table setup + +Ecotone needs a scalar identifier, but the model's identity is a `Tempest\Database\PrimaryKey` object. `#[IdentifierMethod('id')] getId(): int` exposes the underlying int. Loading uses `TempestRepository::findBy()` → `Product::findById($id)`. + +The `products` table is created in `run_example.php` with Tempest's own schema builder (`CreateTableStatement`), matching how the package's integration test sets up its `orders` table — dropped first for idempotency: + +```php +$createSql = (new CreateTableStatement('products')) + ->primary('id')->string('name')->integer('price') + ->compile($database->dialect); +``` + +## 5. Running it + +```bash +docker compose up -d app database +docker compose exec app bash -lc 'cd quickstart-examples/Tempest/Model && composer update && php run_example.php' +``` + +The script prints a six-step ribbon ending with `== Example completed successfully ==`. + +## 6. Tempest-specific wiring + +1. `app/database.config.php` returns a Tempest `PostgresConfig`, auto-discovered as the container's `DatabaseConfig`. +2. `EcotoneConfiguration::databaseConnection()` returns `TempestConnectionReference::defaultConnection()`, registering that config as Ecotone's default `DbalConnectionFactory` (used by the DBAL business interface). +3. `ConnectionFactoryInitializer` resolves `Interop\Queue\ConnectionFactory` to the same Ecotone `DbalConnectionFactory`, so any Tempest-autowired service shares one connection. + +Handlers, the aggregate and the business interfaces are discovered automatically from the `App\` PSR-4 root — **no `ecotone.config.php` is required** (zero-config). + +> Boot order note: resolve `ConfiguredMessagingSystem` once right after `Tempest::boot()` before fetching the buses/gateways. That call compiles Ecotone's services and registers them with Tempest's container so `#[Repository]`/`#[DbalQuery]` gateways and the buses become resolvable via `$container->get(...)`. + +## 7. The three demonstrations + +| # | Mechanism | What it proves | +|---|-----------|----------------| +| 2-4 | Command/Query Bus → model | `RegisterProduct` creates and persists; `product.changePrice` mutates by `aggregate.id`; `product.getPrice` reads reconstituted state | +| 5 | `#[Repository]` gateway | `getBy(int)` / `findBy(int)` load the model; `save(Product)` persists an **already-loaded** aggregate (UPDATE) | +| 6 | `#[DbalQuery]` gateway | `findAll()` reads the underlying `products` table with raw SQL | + +## 8. Known limitation (active-record + `#[Repository]` save) + +The `#[Repository]` `save(Product $product)` gateway works for **existing** aggregates (their `PrimaryKey` is set, so `getId()` resolves). It does **not** work for a brand-new, unsaved `Product`: Ecotone reads the aggregate identifier via `getId()` before persisting, but Tempest only generates the `PrimaryKey` during the INSERT, so `getId()` throws "must not be accessed before initialization". Create new aggregates through the Command Bus path (the static `#[CommandHandler]` factory) — that returns the generated id — and use the repository gateway for loads and updates. diff --git a/quickstart-examples/Tempest/Model/app/Domain/Command/ChangePrice.php b/quickstart-examples/Tempest/Model/app/Domain/Command/ChangePrice.php new file mode 100644 index 000000000..0acfe0840 --- /dev/null +++ b/quickstart-examples/Tempest/Model/app/Domain/Command/ChangePrice.php @@ -0,0 +1,17 @@ +name = $command->name; + $product->price = $command->price; + $product->save(); + + return $product; + } + + #[CommandHandler(routingKey: 'product.changePrice')] + public function changePrice(ChangePrice $command): void + { + $this->price = $command->price; + } + + #[QueryHandler('product.getPrice')] + public function getPrice(): int + { + return $this->price; + } + + #[IdentifierMethod('id')] + public function getId(): int + { + return $this->id->value; + } +} diff --git a/quickstart-examples/Tempest/Model/app/Infrastructure/ConnectionFactoryInitializer.php b/quickstart-examples/Tempest/Model/app/Infrastructure/ConnectionFactoryInitializer.php new file mode 100644 index 000000000..3c68a5e3e --- /dev/null +++ b/quickstart-examples/Tempest/Model/app/Infrastructure/ConnectionFactoryInitializer.php @@ -0,0 +1,26 @@ +get(ConfiguredMessagingSystem::class) + ->getServiceFromContainer(DbalConnectionFactory::class); + } +} diff --git a/quickstart-examples/Tempest/Model/app/Infrastructure/EcotoneConfiguration.php b/quickstart-examples/Tempest/Model/app/Infrastructure/EcotoneConfiguration.php new file mode 100644 index 000000000..efcd6c717 --- /dev/null +++ b/quickstart-examples/Tempest/Model/app/Infrastructure/EcotoneConfiguration.php @@ -0,0 +1,21 @@ +get(ConfiguredMessagingSystem::class); + +/** @var CommandBus $commandBus */ +$commandBus = $container->get(CommandBus::class); +/** @var QueryBus $queryBus */ +$queryBus = $container->get(QueryBus::class); +/** @var ProductRepository $productRepository */ +$productRepository = $container->get(ProductRepository::class); +/** @var ProductFinder $productFinder */ +$productFinder = $container->get(ProductFinder::class); + +echo "== Tempest Model-as-Aggregate Quickstart ==\n\n"; + +echo "1) Create the products table (Tempest Database + CreateTableStatement)\n"; +$database = $container->get(Database::class); +$database->execute(new Query('DROP TABLE IF EXISTS products')); +$createSql = (new CreateTableStatement('products')) + ->primary('id') + ->string('name') + ->integer('price') + ->compile($database->dialect); +$database->execute(new Query($createSql)); +echo " Table 'products' is ready\n\n"; + +echo "2) Command Bus -> model static #[CommandHandler] (creation + automatic persistence)\n"; +$id = $commandBus->send(new RegisterProduct('Milk', 100)); +Assert::assertIsInt($id); +Assert::assertNotNull(Product::findById($id)); +echo " Registered 'Milk' (id=$id), persisted by TempestRepository\n\n"; + +echo "3) Command Bus -> model instance #[CommandHandler] (mutation by aggregate.id)\n"; +$commandBus->sendWithRouting('product.changePrice', new ChangePrice(200), metadata: ['aggregate.id' => $id]); +echo " Price changed to 200\n\n"; + +echo "4) Query Bus -> model #[QueryHandler] (state loaded from the table)\n"; +$price = $queryBus->sendWithRouting('product.getPrice', metadata: ['aggregate.id' => $id]); +Assert::assertSame(200, $price); +echo " product.getPrice = $price\n\n"; + +echo "5) Repository business interface (#[Repository] gateway over the active-record model)\n"; +$product = $productRepository->getBy($id); +Assert::assertInstanceOf(Product::class, $product); +Assert::assertSame('Milk', $product->name); +Assert::assertSame(200, $product->getPrice()); +Assert::assertNull($productRepository->findBy(999999)); +echo " getBy($id) -> Milk @ 200, findBy(999999) -> null\n"; + +$product->price = 250; +$productRepository->save($product); +echo " save(loaded product @ 250) via repository gateway (UPDATE)\n\n"; + +echo "6) Business Interface (DBAL) -> #[DbalQuery] read side\n"; +$rows = $productFinder->findAll(); +Assert::assertCount(1, $rows); +Assert::assertSame('Milk', $rows[0]['name']); +Assert::assertSame(250, (int) $rows[0]['price']); +Assert::assertSame($id, (int) $rows[0]['id']); +echo " findAll() -> [id=$id] Milk @ 250 (" . count($rows) . " row)\n\n"; + +echo "== Example completed successfully ==\n"; diff --git a/quickstart-examples/Tempest/MultiTenant/MessageBus/.gitignore b/quickstart-examples/Tempest/MultiTenant/MessageBus/.gitignore new file mode 100644 index 000000000..779352374 --- /dev/null +++ b/quickstart-examples/Tempest/MultiTenant/MessageBus/.gitignore @@ -0,0 +1,3 @@ +/vendor/ +composer.lock +/.tempest/ diff --git a/quickstart-examples/Tempest/MultiTenant/MessageBus/README.md b/quickstart-examples/Tempest/MultiTenant/MessageBus/README.md new file mode 100644 index 000000000..b9772eca0 --- /dev/null +++ b/quickstart-examples/Tempest/MultiTenant/MessageBus/README.md @@ -0,0 +1,157 @@ +# Tempest Multi-Tenant — One Aggregate, Two Databases + +## 1. What you'll learn + +This example shows how Ecotone routes a single **Tempest active-record aggregate** to +**different physical databases per tenant**, chosen at runtime from a message header. Two +tenants are wired to two separate database engines: + +- `tenant_a` → PostgreSQL (`database`) +- `tenant_b` → MySQL (`database-mysql`) + +The same `Customer` aggregate, the same command, the same handler — only +`metadata: ['tenant' => '...']` decides where the row lands. The example then proves, from +inside a query handler, that each tenant's `#[MultiTenantConnection]` really points at its +own engine, and that neither tenant can see the other's data. + +## 2. How it fits together + +```mermaid +flowchart LR + Client -->|"send(command, metadata[tenant=tenant_a])"| CommandBus + Client -->|"send(command, metadata[tenant=tenant_b])"| CommandBus + CommandBus -->|tenant switch| Customer["Customer\n#[Aggregate] + IsDatabaseModel"] + Customer -->|save → tenant_a| PG[(PostgreSQL\ncustomers)] + Customer -->|save → tenant_b| MY[(MySQL\ncustomers)] + Client -->|sendWithRouting| QueryBus + QueryBus -->|listForActiveTenant / platformForActiveTenant| CustomerFinder["CustomerFinder\n#[MultiTenantConnection]"] + CustomerFinder -->|active tenant connection| PG + CustomerFinder -->|active tenant connection| MY +``` + +*Files involved:* +- `app/Domain/Customer.php` — the Tempest model annotated with `#[Aggregate]` and a `#[Uuid]` id +- `app/Domain/Command/RegisterCustomer.php` — the command message +- `app/ReadModel/CustomerFinder.php` — query handlers that receive the active tenant's connection +- `app/Infrastructure/EcotoneConfiguration.php` — the `MultiTenantConfiguration` mapping +- `app/tenant_a.config.php`, `app/tenant_b.config.php` — the two tagged Tempest database configs + +## 3. The aggregate + +```php +#[Aggregate] +#[Table('customers')] +final class Customer +{ + use IsDatabaseModel; + + #[Uuid] + public PrimaryKey $id; + + public string $name; + + #[CommandHandler] + public static function register(RegisterCustomer $command): self + { + $customer = new self(); + $customer->name = $command->name; + $customer->save(); + + return $customer; + } + + #[IdentifierMethod('id')] + public function getId(): string { return (string) $this->id->value; } +} +``` + +- `#[Uuid] PrimaryKey $id` makes Tempest generate a **UUID v7 at insert time**, so the schema + is identical on PostgreSQL and MySQL (`id VARCHAR(36) PRIMARY KEY`) — no per-engine + auto-increment column. The generated id is returned from the register handler. +- `save()` persists through `TempestRepository`. Because a tenant is active for the duration + of the command, the write lands in that tenant's database (see section 5). + +## 4. Choosing the tenant per message + +```php +#[ServiceContext] +public function multiTenantConfiguration(): MultiTenantConfiguration +{ + return MultiTenantConfiguration::create( + tenantHeaderName: 'tenant', + tenantToConnectionMapping: [ + 'tenant_a' => TempestConnectionReference::create('tenant_a'), + 'tenant_b' => TempestConnectionReference::create('tenant_b'), + ], + ); +} +``` + +Each tenant maps to a **tagged** Tempest config, auto-discovered from `*.config.php`: + +```php +// app/tenant_a.config.php +return new PostgresConfig(host: 'database', ..., tag: 'tenant_a'); + +// app/tenant_b.config.php +return new MysqlConfig(host: 'database-mysql', ..., tag: 'tenant_b'); +``` + +Sending a command with `metadata: ['tenant' => 'tenant_a']` activates that tenant for the +whole message, then deactivates it afterwards. + +## 5. How the tenant switch reaches the Tempest ORM + +When Ecotone activates a tenant it promotes that tenant's connection as Tempest's **default** +connection (via `TempestTenantDatabaseSwitcher`), and rebuilds the default `Database` from it. +That is what makes `IsDatabaseModel::save()` — and therefore the aggregate — write to the +active tenant's database rather than to Tempest's discovered default. The same promoted +connection backs Ecotone's DBAL, so a single PDO serves both the ORM write and the +surrounding transaction. + +## 6. Proving the routing with `#[MultiTenantConnection]` + +`CustomerFinder` does not hardcode a connection. Each query handler receives the **active +tenant's** Doctrine connection through the attribute: + +```php +#[QueryHandler('customer.platformForActiveTenant')] +public function platformForActiveTenant(#[MultiTenantConnection] Connection $connection): string +{ + return $connection->getDatabasePlatform()::class; +} +``` + +`run_example.php` uses this to assert that the connection handed to the `tenant_a` query is a +PostgreSQL platform and the `tenant_b` query is a MySQL platform — direct proof that the +attribute resolves to the correct physical database. It then lists each tenant's customers and +asserts the two sets are disjoint. + +## 7. Running it + +```bash +docker compose up -d app database database-mysql +docker compose exec app bash -lc 'cd quickstart-examples/Tempest/MultiTenant/MessageBus && composer update && php run_example.php' +``` + +The script prints a four-step ribbon ending with `== Example completed successfully ==`: + +``` +tenant_a -> Doctrine\DBAL\Platforms\PostgreSQL120Platform +tenant_b -> Doctrine\DBAL\Platforms\MySQL80Platform +tenant_a -> [Alice, Bob] +tenant_b -> [Carol] +``` + +## 8. Tempest-specific wiring + +1. `app/tenant_a.config.php` / `tenant_b.config.php` return **tagged** `PostgresConfig` / + `MysqlConfig`, auto-discovered as tagged `DatabaseConfig` singletons. +2. `EcotoneConfiguration` maps each tenant header value to a + `TempestConnectionReference::create()`. +3. `#[MultiTenantConnection]` (from `ecotone/dbal`) injects the active tenant's connection into + query handlers — it needs `symfony/expression-language`, which is why that package is in + `composer.json`. + +Handlers, the aggregate and the configuration are discovered automatically from the `App\` +PSR-4 root — **no `ecotone.config.php` is required** (zero-config). diff --git a/quickstart-examples/Tempest/MultiTenant/MessageBus/app/Domain/Command/RegisterCustomer.php b/quickstart-examples/Tempest/MultiTenant/MessageBus/app/Domain/Command/RegisterCustomer.php new file mode 100644 index 000000000..160dc7c7c --- /dev/null +++ b/quickstart-examples/Tempest/MultiTenant/MessageBus/app/Domain/Command/RegisterCustomer.php @@ -0,0 +1,17 @@ +name = $command->name; + $customer->save(); + + return $customer; + } + + #[IdentifierMethod('id')] + public function getId(): string + { + return (string) $this->id->value; + } +} diff --git a/quickstart-examples/Tempest/MultiTenant/MessageBus/app/Infrastructure/EcotoneConfiguration.php b/quickstart-examples/Tempest/MultiTenant/MessageBus/app/Infrastructure/EcotoneConfiguration.php new file mode 100644 index 000000000..b2090c685 --- /dev/null +++ b/quickstart-examples/Tempest/MultiTenant/MessageBus/app/Infrastructure/EcotoneConfiguration.php @@ -0,0 +1,28 @@ + TempestConnectionReference::create('tenant_a'), + 'tenant_b' => TempestConnectionReference::create('tenant_b'), + ], + ); + } +} diff --git a/quickstart-examples/Tempest/MultiTenant/MessageBus/app/ReadModel/CustomerFinder.php b/quickstart-examples/Tempest/MultiTenant/MessageBus/app/ReadModel/CustomerFinder.php new file mode 100644 index 000000000..ef1c5ded9 --- /dev/null +++ b/quickstart-examples/Tempest/MultiTenant/MessageBus/app/ReadModel/CustomerFinder.php @@ -0,0 +1,30 @@ +executeQuery('SELECT id, name FROM customers ORDER BY name') + ->fetchAllAssociative(); + } + + #[QueryHandler('customer.platformForActiveTenant')] + public function platformForActiveTenant(#[MultiTenantConnection] Connection $connection): string + { + return $connection->getDatabasePlatform()::class; + } +} diff --git a/quickstart-examples/Tempest/MultiTenant/MessageBus/app/tenant_a.config.php b/quickstart-examples/Tempest/MultiTenant/MessageBus/app/tenant_a.config.php new file mode 100644 index 000000000..8075b794d --- /dev/null +++ b/quickstart-examples/Tempest/MultiTenant/MessageBus/app/tenant_a.config.php @@ -0,0 +1,18 @@ +get(ConfiguredMessagingSystem::class); +/** @var CommandBus $commandBus */ +$commandBus = $container->get(CommandBus::class); +/** @var QueryBus $queryBus */ +$queryBus = $container->get(QueryBus::class); + +echo "== Tempest Multi-Tenant Quickstart - Message Bus ==\n\n"; + +echo "1) Create the customers table in each tenant database\n"; +$multiTenant = $messagingSystem->getServiceFromContainer(MultiTenantConnectionFactory::class); +foreach (['tenant_a', 'tenant_b'] as $tenant) { + $connection = $multiTenant->getConnection($tenant); + $connection->executeStatement('DROP TABLE IF EXISTS customers'); + $connection->executeStatement('CREATE TABLE customers (id VARCHAR(36) PRIMARY KEY, name VARCHAR(255) NOT NULL)'); +} +echo " Table 'customers' ready in tenant_a (PostgreSQL) and tenant_b (MySQL)\n\n"; + +echo "2) Register customers, routing each write to its tenant via metadata['tenant']\n"; +$commandBus->send(new RegisterCustomer('Alice'), metadata: ['tenant' => 'tenant_a']); +$commandBus->send(new RegisterCustomer('Bob'), metadata: ['tenant' => 'tenant_a']); +$commandBus->send(new RegisterCustomer('Carol'), metadata: ['tenant' => 'tenant_b']); +echo " tenant_a <- Alice, Bob tenant_b <- Carol\n\n"; + +echo "3) #[MultiTenantConnection] resolves to the active tenant's physical database\n"; +$platformA = $queryBus->sendWithRouting('customer.platformForActiveTenant', metadata: ['tenant' => 'tenant_a']); +$platformB = $queryBus->sendWithRouting('customer.platformForActiveTenant', metadata: ['tenant' => 'tenant_b']); +Assert::assertStringContainsStringIgnoringCase('postgre', $platformA); +Assert::assertStringContainsStringIgnoringCase('mysql', $platformB); +echo " tenant_a -> $platformA\n tenant_b -> $platformB\n\n"; + +echo "4) Each tenant sees only its own customers (isolation)\n"; +$tenantA = $queryBus->sendWithRouting('customer.listForActiveTenant', metadata: ['tenant' => 'tenant_a']); +$tenantB = $queryBus->sendWithRouting('customer.listForActiveTenant', metadata: ['tenant' => 'tenant_b']); + +$namesA = array_column($tenantA, 'name'); +$namesB = array_column($tenantB, 'name'); + +Assert::assertSame(['Alice', 'Bob'], $namesA); +Assert::assertSame(['Carol'], $namesB); +Assert::assertNotContains('Carol', $namesA); +Assert::assertNotContains('Alice', $namesB); +Assert::assertNotContains('Bob', $namesB); +echo " tenant_a -> [" . implode(', ', $namesA) . "]\n tenant_b -> [" . implode(', ', $namesB) . "]\n\n"; + +echo "== Example completed successfully ==\n"; diff --git a/quickstart-examples/Tempest/Projection/DatabaseReadModel/.gitignore b/quickstart-examples/Tempest/Projection/DatabaseReadModel/.gitignore new file mode 100644 index 000000000..779352374 --- /dev/null +++ b/quickstart-examples/Tempest/Projection/DatabaseReadModel/.gitignore @@ -0,0 +1,3 @@ +/vendor/ +composer.lock +/.tempest/ diff --git a/quickstart-examples/Tempest/Projection/DatabaseReadModel/README.md b/quickstart-examples/Tempest/Projection/DatabaseReadModel/README.md new file mode 100644 index 000000000..8f7a18e83 --- /dev/null +++ b/quickstart-examples/Tempest/Projection/DatabaseReadModel/README.md @@ -0,0 +1,101 @@ +# Tempest Projection — Database Read Model + +## 1. What you'll learn + +This example shows how to build a **projection** (a read-optimised view) on top of an event-sourced `User` aggregate using Tempest and Ecotone. You will see how the projection's `#[ProjectionInitialization]` hook creates the storage, how `#[EventHandler]` methods react to each domain event, and how the projection lifecycle commands (init, delete, reset) let you wipe and recreate the read model whenever you need to. + +It is the Tempest counterpart of the [Laravel `DatabaseReadModel` example](../../../Laravel/Projection/DatabaseReadModel/README.md). The domain code is identical; only the database wiring differs. + +## 2. The problem this solves + +In a traditional application, if you need a new view on your data — say "all active users ordered by name" — you run a database migration and populate the new table. In an event-sourced system you still have every domain event ever emitted. You can **replay** them into any new shape without touching the write side. This is the projection pattern: the events are the truth; the read model is just a cache you can always discard and rebuild. + +## 3. How it fits together + +```mermaid +flowchart LR + Client -->|send command| CommandBus + CommandBus -->|route| User["User\n#[EventSourcingAggregate]"] + User -->|return events| EventStore[(Event Store\nPostgreSQL)] + EventStore -->|stream| UserListProjection["UserListProjection\n#[ProjectionV2]"] + UserListProjection -->|INSERT / UPDATE| ReadModel[(user_list_database\ntable)] + Client -->|sendWithRouting| QueryBus + QueryBus -->|listActive| UserListProjection + UserListProjection -->|SELECT| ReadModel +``` + +*Files involved:* +- `app/Domain/User.php` — aggregate that produces the events +- `app/Domain/Event/` — `UserWasRegistered`, `UserNameWasChanged`, `UserWasDeactivated` +- `app/ReadModel/UserListProjection.php` — projection that maintains `user_list_database` +- `app/Infrastructure/EcotoneConfiguration.php` — registers the PostgreSQL connection as Ecotone's default `DbalConnectionFactory` +- `app/Infrastructure/ConnectionFactoryInitializer.php` — Tempest initializer that hands the same DBAL connection to the projection +- `app/database.config.php` — the Tempest `PostgresConfig` the connection is derived from + +## 4. Walkthrough of the code + +### 4.1 Domain — User aggregate + +The `User` aggregate is annotated with `#[EventSourcingAggregate]`. Command handlers are `static` for creation (`register`) and instance methods for mutations (`changeName`, `deactivate`). Each handler returns an array of events. `#[EventSourcingHandler]` methods reconstruct aggregate state from stored events — they must have no side effects. + +Each event class is annotated with `#[NamedEvent('user.was_registered')]` (and so on). The name is what Ecotone stores alongside the event payload, so the recorded stream stays readable even if you later move or rename the PHP class. The domain layer is framework-agnostic — these files are byte-for-byte identical to the Laravel example. + +### 4.2 The projection — direct database writes + +`UserListProjection` receives an `Interop\Queue\ConnectionFactory` (Ecotone's DBAL connection) and obtains a Doctrine `Connection` via `createContext()->getDbalConnection()`. Each `#[EventHandler]` method writes directly to the `user_list_database` table with the framework-agnostic Ecotone DBAL idiom. The boolean `active` column is written with `Doctrine\DBAL\ParameterType::BOOLEAN` so PostgreSQL receives a real boolean. + +### 4.3 Lifecycle hooks + +| Hook | Attribute | What it does | +|------|-----------|--------------| +| Initialise | `#[ProjectionInitialization]` | `CREATE TABLE IF NOT EXISTS user_list_database (...)` | +| Delete | `#[ProjectionDelete]` | `DROP TABLE IF EXISTS user_list_database` | + +Resetting the projection is done by deleting and re-initialising it, which clears both the read model table and Ecotone's stored stream position for this projection. + +### 4.4 Querying the read model + +The `#[QueryHandler('user.listActive')]` method runs a `SELECT` and returns rows as associative arrays. Callers use the query bus: + +```php +$rows = $queryBus->sendWithRouting('user.listActive'); +// $rows[0]['name'] === 'Alice Cooper' +``` + +## 5. How the database connection is wired (Tempest specifics) + +Tempest has no Laravel-style query builder, so the connection is wired through the `ecotone/tempest` bridge instead of `LaravelConnectionReference`: + +1. `app/database.config.php` returns a Tempest `PostgresConfig`. Tempest auto-discovers any `*.config.php` inside the app's discovery locations and registers it as the container's `DatabaseConfig`. +2. `EcotoneConfiguration::databaseConnection()` returns `TempestConnectionReference::defaultConnection()`, registering that `DatabaseConfig` as Ecotone's default `DbalConnectionFactory`. The event store, the DBAL module and the projection state storage all use this single connection. +3. `ConnectionFactoryInitializer` is a Tempest `Initializer` that resolves `Interop\Queue\ConnectionFactory` to the very same `DbalConnectionFactory` from Ecotone's container. This lets Tempest autowire the projection's constructor with the shared connection — one PostgreSQL connection for both the write side (event store) and the read side (projection). + +Ecotone discovers the `App\` handlers, aggregates and projections automatically from the composer PSR-4 root — no `ecotone.config.php` is required. + +## 6. Running it + +```bash +# Start services +docker compose up -d app database + +# Install and run inside the container +docker compose exec app bash -lc 'cd quickstart-examples/Tempest/Projection/DatabaseReadModel && composer update && php run_example.php' +``` + +The script exits 0 and prints a six-step ribbon ending with `== Example completed successfully ==`. + +## 7. Reset vs Delete + +| Command | Effect | +|---------|--------| +| `ecotone:projection:init` | Calls `#[ProjectionInitialization]`, records projection as known | +| `ecotone:projection:delete` | Calls `#[ProjectionDelete]`, removes projection tracking | + +**Reset = delete + re-init.** This two-step approach makes the state transitions explicit: you see the table disappear, then reappear empty. + +## 8. Common pitfalls + +1. **Forgetting `CREATE TABLE IF NOT EXISTS`.** Without `IF NOT EXISTS` the `init` hook fails if the table already exists, for example after a partial run. +2. **Querying before init.** If you call `user.listActive` before `ecotone:projection:init` the table does not exist and you get a DB error. Always initialise before querying. +3. **Event store accumulates across runs.** This example cleans up the `User` aggregate stream at the start of `run_example.php`. In production you would never delete the event stream — that is your source of truth. +4. **Booleans on PostgreSQL.** Bind boolean columns with `ParameterType::BOOLEAN` so PostgreSQL receives `true`/`false` rather than an empty string. diff --git a/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/Domain/Command/ChangeUserName.php b/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/Domain/Command/ChangeUserName.php new file mode 100644 index 000000000..b1f9e2b51 --- /dev/null +++ b/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/Domain/Command/ChangeUserName.php @@ -0,0 +1,18 @@ +userId, $command->name, $command->email)]; + } + + #[CommandHandler] + public function changeName(ChangeUserName $command): array + { + if ($command->name === $this->name) { + return []; + } + + return [new UserNameWasChanged($this->userId, $command->name)]; + } + + #[CommandHandler] + public function deactivate(DeactivateUser $command): array + { + if (! $this->active) { + return []; + } + + return [new UserWasDeactivated($this->userId)]; + } + + #[EventSourcingHandler] + public function applyRegistered(UserWasRegistered $event): void + { + $this->userId = $event->userId; + $this->name = $event->name; + $this->active = true; + } + + #[EventSourcingHandler] + public function applyNameChanged(UserNameWasChanged $event): void + { + $this->name = $event->name; + } + + #[EventSourcingHandler] + public function applyDeactivated(UserWasDeactivated $event): void + { + $this->active = false; + } +} diff --git a/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/Infrastructure/ConnectionFactoryInitializer.php b/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/Infrastructure/ConnectionFactoryInitializer.php new file mode 100644 index 000000000..3c68a5e3e --- /dev/null +++ b/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/Infrastructure/ConnectionFactoryInitializer.php @@ -0,0 +1,26 @@ +get(ConfiguredMessagingSystem::class) + ->getServiceFromContainer(DbalConnectionFactory::class); + } +} diff --git a/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/Infrastructure/EcotoneConfiguration.php b/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/Infrastructure/EcotoneConfiguration.php new file mode 100644 index 000000000..efcd6c717 --- /dev/null +++ b/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/Infrastructure/EcotoneConfiguration.php @@ -0,0 +1,21 @@ +getConnection()->executeStatement('CREATE TABLE IF NOT EXISTS user_list_database ( + user_id VARCHAR(36) PRIMARY KEY, + name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL, + active BOOLEAN NOT NULL DEFAULT TRUE + )'); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->getConnection()->executeStatement('DROP TABLE IF EXISTS user_list_database'); + } + + #[EventHandler] + public function onRegistered(UserWasRegistered $event): void + { + $this->getConnection()->insert('user_list_database', [ + 'user_id' => $event->userId, + 'name' => $event->name, + 'email' => $event->email, + 'active' => true, + ], ['active' => ParameterType::BOOLEAN]); + } + + #[EventHandler] + public function onNameChanged(UserNameWasChanged $event): void + { + $this->getConnection()->update( + 'user_list_database', + ['name' => $event->name], + ['user_id' => $event->userId], + ); + } + + #[EventHandler] + public function onDeactivated(UserWasDeactivated $event): void + { + $this->getConnection()->update( + 'user_list_database', + ['active' => false], + ['user_id' => $event->userId], + ['active' => ParameterType::BOOLEAN], + ); + } + + #[QueryHandler('user.listActive')] + public function listActive(): array + { + return $this->getConnection() + ->executeQuery('SELECT user_id, name, email, active FROM user_list_database WHERE active = true ORDER BY name') + ->fetchAllAssociative(); + } + + private function getConnection(): Connection + { + return $this->connectionFactory->createContext()->getDbalConnection(); + } +} diff --git a/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/database.config.php b/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/database.config.php new file mode 100644 index 000000000..2178d36af --- /dev/null +++ b/quickstart-examples/Tempest/Projection/DatabaseReadModel/app/database.config.php @@ -0,0 +1,17 @@ +get(ConfiguredMessagingSystem::class); +/** @var CommandBus $commandBus */ +$commandBus = $container->get(CommandBus::class); +/** @var QueryBus $queryBus */ +$queryBus = $container->get(QueryBus::class); +/** @var EventStore $eventStore */ +$eventStore = $messagingSystem->getServiceFromContainer(EventStore::class); + +echo "== Tempest Projection Quickstart - Database Read Model ==\n\n"; + +if ($eventStore->hasStream(User::class)) { + $eventStore->delete(User::class); +} + +echo "1) Delete projection (clean slate)\n"; +$messagingSystem->runConsoleCommand('ecotone:projection:delete', ['name' => 'user_list_database']); +echo " Projection deleted\n\n"; + +echo "2) Initialise projection (create read model storage)\n"; +$messagingSystem->runConsoleCommand('ecotone:projection:init', ['name' => 'user_list_database']); +echo " Projection initialised\n\n"; + +echo "3) Emit events via commands\n"; +$aliceId = Uuid::uuid4()->toString(); +$bobId = Uuid::uuid4()->toString(); +$commandBus->send(new RegisterUser($aliceId, 'Alice', 'alice@example.com')); +$commandBus->send(new RegisterUser($bobId, 'Bob', 'bob@example.com')); +$commandBus->send(new ChangeUserName($aliceId, 'Alice Cooper')); +$commandBus->send(new DeactivateUser($bobId)); +echo " Registered Alice and Bob, renamed Alice to Alice Cooper, deactivated Bob\n\n"; + +echo "4) Query and assert active users\n"; +$rows = $queryBus->sendWithRouting('user.listActive'); +Assert::assertCount(1, $rows); +Assert::assertSame('Alice Cooper', $rows[0]['name']); +echo " Active users: " . count($rows) . " (Alice Cooper only - Bob is deactivated)\n\n"; + +echo "5) Reset projection (delete + re-initialise = wipe read model + clear position)\n"; +$messagingSystem->runConsoleCommand('ecotone:projection:delete', ['name' => 'user_list_database']); +$messagingSystem->runConsoleCommand('ecotone:projection:init', ['name' => 'user_list_database']); +$rows = $queryBus->sendWithRouting('user.listActive'); +Assert::assertSame([], $rows); +echo " Read model is empty after reset\n\n"; + +echo "6) Delete projection (drop storage)\n"; +$messagingSystem->runConsoleCommand('ecotone:projection:delete', ['name' => 'user_list_database']); +echo " Projection deleted\n\n"; + +echo "== Example completed successfully ==\n";