Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ services:
SQLITE_DATABASE_DSN: sqlite:////tmp/ecotone_test.db
SECONDARY_DATABASE_DSN: mysql://ecotone:secret@database-mysql:3306/ecotone?serverVersion=8.0
DATABASE_MYSQL: mysql://ecotone:secret@database-mysql:3306/ecotone?serverVersion=8.0
DATABASE_MARIADB: mysql://ecotone:secret@database-mariadb:3306/ecotone?serverVersion=11.4.5-MariaDB
SQS_DSN: sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4566&version=latest
REDIS_DSN: redis://redis:6379
KAFKA_DSN: kafka:9092
Expand All @@ -40,6 +41,7 @@ services:
SQLITE_DATABASE_DSN: sqlite:////tmp/ecotone_test.db
SECONDARY_DATABASE_DSN: pgsql://ecotone:secret@database:5432/ecotone?serverVersion=16
DATABASE_MYSQL: mysql://ecotone:secret@database-mysql:3306/ecotone?serverVersion=8.0
DATABASE_MARIADB: mysql://ecotone:secret@database-mariadb:3306/ecotone?serverVersion=11.4.5-MariaDB
SQS_DSN: sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4566&version=latest
REDIS_DSN: redis://redis:6379
KAFKA_DSN: kafka:9092
Expand All @@ -61,6 +63,15 @@ services:
MYSQL_DATABASE: "ecotone"
ports:
- "${MYSQL_PORT:-0}:3306"
database-mariadb:
image: mariadb:11.4
environment:
MARIADB_ROOT_PASSWORD: "secret"
MARIADB_USER: "ecotone"
MARIADB_PASSWORD: "secret"
MARIADB_DATABASE: "ecotone"
ports:
- "${MARIADB_PORT:-0}:3306"
rabbitmq:
build: ./.docker/rabbitmq
environment:
Expand Down
2 changes: 1 addition & 1 deletion packages/DataProtection/src/Encryption/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static function substr(string $str, int $start, ?int $length = null): boo
// mb_substr($str, 0, NULL, '8bit') returns an empty string on PHP 5.3,
// so we have to find the length ourselves. Also, substr() doesn't
// accept null for the length.
if (! isset($length)) {
if ($length === null) {
if ($start >= 0) {
$length = $input_len - $start;
} else {
Expand Down
4 changes: 2 additions & 2 deletions packages/Dbal/src/DbalTransaction/ImplicitCommit.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
namespace Ecotone\Dbal\DbalTransaction;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
use Throwable;

class ImplicitCommit
{
public static function isImplicitCommitException(Throwable $exception, Connection $connection): bool
{
if (! ($connection->getDriver()->getDatabasePlatform($connection) instanceof MySQLPlatform)) {
if (! ($connection->getDriver()->getDatabasePlatform($connection) instanceof AbstractMySQLPlatform)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public function test_deduplication_inserts_before_handler_when_transaction_is_ac
// Set global lock timeout for the session
if ($platform instanceof \Doctrine\DBAL\Platforms\PostgreSQLPlatform) {
$connection2->executeStatement('SET lock_timeout = 1000'); // 1 second
} elseif ($platform instanceof \Doctrine\DBAL\Platforms\MySQLPlatform) {
} elseif ($platform instanceof \Doctrine\DBAL\Platforms\AbstractMySQLPlatform) {
// For MySQL, we need to set this on the session level
$connection2->executeStatement('SET SESSION innodb_lock_wait_timeout = 1'); // 1 second
}
Expand Down
1 change: 1 addition & 0 deletions packages/Laravel/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"nesbot/carbon": "^2.71|^3.0",
"moneyphp/money": "^4.1.0",
"ecotone/dbal": "~1.314.0",
"doctrine/dbal": "^4.0",
"timacdonald/log-fake": "^2.0"
},
"extra": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Ecotone\EventSourcing\Database;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
use Ecotone\Dbal\Compatibility\SchemaManagerCompatibility;
use Ecotone\Dbal\Database\DbalTableManager;
use Ecotone\Messaging\Config\Container\Definition;
Expand Down Expand Up @@ -46,7 +46,7 @@ public function getTableName(): string

public function getCreateTableSql(Connection $connection): string|array
{
if ($connection->getDatabasePlatform() instanceof MySQLPlatform) {
if ($connection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
return $this->getMysqlCreateSql();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Ecotone\EventSourcing\Projecting\PartitionState;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
use Ecotone\Dbal\AlreadyConnectedDbalConnectionFactory;
use Ecotone\Dbal\MultiTenant\MultiTenantConnectionFactory;
use Ecotone\EventSourcing\Database\ProjectionStateTableManager;
Expand Down Expand Up @@ -115,7 +115,7 @@ public function initPartition(string $projectionName, ?string $partitionKey = nu

// Try to insert the partition state, ignoring if it already exists
$insertQuery = match (true) {
$connection->getDatabasePlatform() instanceof MySQLPlatform => <<<SQL
$connection->getDatabasePlatform() instanceof AbstractMySQLPlatform => <<<SQL
INSERT INTO {$tableName} (projection_name, partition_key, last_position, user_state, metadata)
VALUES (:projectionName, :partitionKey, :lastPosition, :userState, :metadata)
ON DUPLICATE KEY UPDATE projection_name = projection_name -- no-op to ignore
Expand Down Expand Up @@ -155,7 +155,7 @@ public function savePartition(ProjectionPartitionState $projectionState): void
$tableName = $this->getTableName();

$saveStateQuery = match (true) {
$connection->getDatabasePlatform() instanceof MySQLPlatform => <<<SQL
$connection->getDatabasePlatform() instanceof AbstractMySQLPlatform => <<<SQL
INSERT INTO {$tableName} (projection_name, partition_key, last_position, user_state, metadata)
VALUES (:projectionName, :partitionKey, :lastPosition, :userState, :metadata)
ON DUPLICATE KEY UPDATE last_position = :lastPosition, user_state = :userState, metadata = :metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

namespace Test\Ecotone\EventSourcing\Integration;

use Doctrine\DBAL\Driver\PDO\PgSQL\Driver;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Ecotone\EventSourcing\EventSourcingConfiguration;
use Ecotone\EventSourcing\Prooph\FromProophMessageToArrayConverter;
use Ecotone\Lite\EcotoneLite;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ServiceConfiguration;
use Enqueue\Dbal\DbalConnectionFactory;
use Prooph\EventStore\Pdo\PersistenceStrategy;
use Prooph\EventStore\Pdo\PersistenceStrategy\MariaDbSingleStreamStrategy;
use Prooph\EventStore\Pdo\PersistenceStrategy\MySqlSingleStreamStrategy;
use Prooph\EventStore\Pdo\PersistenceStrategy\PostgresSingleStreamStrategy;
use Test\Ecotone\EventSourcing\EventSourcingMessagingTestCase;
Expand Down Expand Up @@ -42,11 +45,7 @@ public function test_handling_custom_event_stream_when_custom_stream_persistence
])
->withExtensionObjects([
EventSourcingConfiguration::createWithDefaults()
->withCustomPersistenceStrategy(
$this->isPostgres()
? new PostgresSingleStreamStrategy(new FromProophMessageToArrayConverter())
: new MySqlSingleStreamStrategy(new FromProophMessageToArrayConverter())
),
->withCustomPersistenceStrategy($this->persistenceStrategy()),
]),
pathToRootCatalog: __DIR__ . '/../../',
runForProductionEventStore: true
Expand All @@ -60,8 +59,14 @@ public function test_handling_custom_event_stream_when_custom_stream_persistence
self::assertEquals(2, $ecotone->sendQueryWithRouting('action_collector.getCount'));
}

private function isPostgres(): bool
private function persistenceStrategy(): PersistenceStrategy
{
return $this->getConnection()->getDriver() instanceof Driver;
$platform = $this->getConnection()->getDatabasePlatform();

return match (true) {
$platform instanceof PostgreSQLPlatform => new PostgresSingleStreamStrategy(new FromProophMessageToArrayConverter()),
$platform instanceof MariaDBPlatform => new MariaDbSingleStreamStrategy(new FromProophMessageToArrayConverter()),
default => new MySqlSingleStreamStrategy(new FromProophMessageToArrayConverter()),
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ protected function setUp(): void

$metadata = json_decode($connection->fetchOne(sprintf('select metadata from %s where no = ?', $streamName), [1]), true);
$metadata['timestamp'] = $initialTimestamp;
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date(DATE_ATOM, $initialTimestamp)], ['no' => 1]);
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date('Y-m-d\TH:i:s', $initialTimestamp)], ['no' => 1]);

$metadata = json_decode($connection->fetchOne(sprintf('select metadata from %s where no = ?', $streamName), [3]), true);
$metadata['timestamp'] = $initialTimestamp + 10;
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date(DATE_ATOM, $initialTimestamp + 10)], ['no' => 3]);
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date('Y-m-d\TH:i:s', $initialTimestamp + 10)], ['no' => 3]);

$metadata = json_decode($connection->fetchOne(sprintf('select metadata from %s where no = ?', $streamName), [4]), true);
$metadata['timestamp'] = $initialTimestamp + 20;
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date(DATE_ATOM, $initialTimestamp + 20)], ['no' => 4]);
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date('Y-m-d\TH:i:s', $initialTimestamp + 20)], ['no' => 4]);
}

public function test_detecting_gaps_without_detection_window(): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ protected function setUp(): void

$metadata = json_decode($connection->fetchOne(sprintf('select metadata from %s where no = ?', $streamName), [1]), true);
$metadata['timestamp'] = $initialTimestamp;
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date(DATE_ATOM, $initialTimestamp)], ['no' => 1]);
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date('Y-m-d\TH:i:s', $initialTimestamp)], ['no' => 1]);

$metadata = json_decode($connection->fetchOne(sprintf('select metadata from %s where no = ?', $streamName), [3]), true);
$metadata['timestamp'] = $initialTimestamp + 10;
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date(DATE_ATOM, $initialTimestamp + 10)], ['no' => 3]);
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date('Y-m-d\TH:i:s', $initialTimestamp + 10)], ['no' => 3]);

$metadata = json_decode($connection->fetchOne(sprintf('select metadata from %s where no = ?', $streamName), [4]), true);
$metadata['timestamp'] = $initialTimestamp + 20;
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date(DATE_ATOM, $initialTimestamp + 20)], ['no' => 4]);
$connection->update($streamName, ['metadata' => json_encode($metadata), 'created_at' => date('Y-m-d\TH:i:s', $initialTimestamp + 20)], ['no' => 4]);
}

public function test_detecting_gaps_without_detection_window(): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Test\Ecotone\EventSourcing\Projecting\Partitioned;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
use Ecotone\EventSourcing\Attribute\FromStream;
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
Expand Down Expand Up @@ -262,7 +262,7 @@ public function getTickets(): array
#[EventHandler]
public function addTicket(TicketWasRegistered $event): void
{
if ($this->connection->getDatabasePlatform() instanceof MySQLPlatform) {
if ($this->connection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
$this->connection->executeStatement('INSERT IGNORE INTO idempotent_projection_tickets VALUES (?,?)', [$event->getTicketId(), $event->getTicketType()]);
} else {
$this->connection->executeStatement('INSERT INTO idempotent_projection_tickets VALUES (?,?) ON CONFLICT (ticket_id) DO NOTHING', [$event->getTicketId(), $event->getTicketType()]);
Expand Down Expand Up @@ -413,7 +413,7 @@ public function whenMeetingCreated(MeetingCreated $event): void
#[ProjectionInitialization]
public function initialization(): void
{
if ($this->connection->getDatabasePlatform() instanceof MySQLPlatform) {
if ($this->connection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
$this->connection->executeStatement('CREATE TABLE IF NOT EXISTS multi_stream_edge_events (id INT AUTO_INCREMENT PRIMARY KEY, event_type VARCHAR(100), aggregate_id VARCHAR(36), stream_name VARCHAR(255))');
} else {
$this->connection->executeStatement('CREATE TABLE IF NOT EXISTS multi_stream_edge_events (id SERIAL PRIMARY KEY, event_type VARCHAR(100), aggregate_id VARCHAR(36), stream_name VARCHAR(255))');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Test\Ecotone\EventSourcing\Projecting\Partitioned;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
use Ecotone\EventSourcing\Attribute\FromStream;
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
Expand Down Expand Up @@ -174,7 +174,7 @@ public function initialization(): void
)
SQL);
$insertQuery = match (true) {
$this->connection->getDatabasePlatform() instanceof MySQLPlatform => <<<SQL
$this->connection->getDatabasePlatform() instanceof AbstractMySQLPlatform => <<<SQL
INSERT INTO ticket_counter_partitioned (id, ticket_count, closed_count) VALUES (1, 0, 0)
ON DUPLICATE KEY UPDATE id = id
SQL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Test\Ecotone\EventSourcing\Projecting;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
use Ecotone\EventSourcing\Attribute\FromStream;
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
Expand Down Expand Up @@ -141,7 +141,7 @@ public function __construct(private Connection $connection)
public function onTicketRegistered(TicketWasRegistered $event): void
{
$this->handleEvent($event->getTicketId());
if ($this->connection->getDatabasePlatform() instanceof MySQLPlatform) {
if ($this->connection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
$this->connection->executeStatement('INSERT IGNORE INTO rebuild_rollback_tickets VALUES (?,?)', [$event->getTicketId(), $event->getTicketType()]);
} else {
$this->connection->executeStatement('INSERT INTO rebuild_rollback_tickets VALUES (?,?) ON CONFLICT(ticket_id) DO NOTHING', [$event->getTicketId(), $event->getTicketType()]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Test\Ecotone\EventSourcing\Projecting;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
use Ecotone\Dbal\DbalBackedMessageChannelBuilder;
use Ecotone\EventSourcing\Attribute\FromStream;
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
Expand Down Expand Up @@ -111,7 +111,7 @@ public function test_partitioned_multi_stream_projection_rolls_back_on_failure_a

$ecotone = $this->bootstrapEcotoneForCalendar([$projection::class], [$projection], $projection::CHANNEL);

if ($this->getConnection()->getDatabasePlatform() instanceof MySQLPlatform) {
if ($this->getConnection()->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
$this->getConnection()->executeStatement('CREATE TABLE IF NOT EXISTS multi_stream_rollback_events (id INT AUTO_INCREMENT PRIMARY KEY, event_type VARCHAR(100), aggregate_id VARCHAR(36), stream_name VARCHAR(255))');
} else {
$this->getConnection()->executeStatement('CREATE TABLE IF NOT EXISTS multi_stream_rollback_events (id SERIAL PRIMARY KEY, event_type VARCHAR(100), aggregate_id VARCHAR(36), stream_name VARCHAR(255))');
Expand Down Expand Up @@ -146,7 +146,7 @@ public function test_partitioned_multi_stream_with_events_in_both_streams_proces

$ecotone = $this->bootstrapEcotoneForCalendar([$projection::class], [$projection], $projection::CHANNEL);

if ($this->getConnection()->getDatabasePlatform() instanceof MySQLPlatform) {
if ($this->getConnection()->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
$this->getConnection()->executeStatement('CREATE TABLE IF NOT EXISTS multi_stream_fail_second_events (id INT AUTO_INCREMENT PRIMARY KEY, event_type VARCHAR(100), aggregate_id VARCHAR(36), stream_name VARCHAR(255))');
} else {
$this->getConnection()->executeStatement('CREATE TABLE IF NOT EXISTS multi_stream_fail_second_events (id SERIAL PRIMARY KEY, event_type VARCHAR(100), aggregate_id VARCHAR(36), stream_name VARCHAR(255))');
Expand Down
Loading