Pattern CQRS and Event Sourcing become more and more popular every day in PHP community. You can easily find lot libraries to store events, to execute commands, to manage repositories, etc. The leader there doesn’t exist yet, but prooph as the most complex solution is close to that position. They have only one problem, documentation.
Ok, they have two problems. Documentation and many not to updated examples in their GitHub repository. Because of that and because prooph is de facto five not related components running everything out-of-the-box is really a pain. Even if some examples contain Docker on board or especially then because a tree of dependencies grows fast.
- Event Sourcing
- Event Store
- Snapshot Store
- Projection
- Service Bus
When would you like to work with CQRS+ES you have to connect those components somehow. Good luck!
I did it. I created the simple example to show how to use all that components, almost without dependencies. You don’t need frameworks here (symfony, laravel, zend), other libraries (uuid, DI), queue systems (rabbitmq, kafka) or external dependencies (docker, vagrant). Everything what do you need is PHP 7.1 and MySQL.
In this tutorial I assume that you know the theory about CQRS+ES, you don’t need any explanation why and when it’s better solution than CRUD. If you are looking for this kind of information, please get back to Google.
[toc]
GIT Repository
I pushed all code into repository on GitHub. You can clone it and run or follow this tutorial.
git clone https://github.com/mmp4k/prooph-in-action.git
cd prooph-in-action
composer install
Prepare MySQL database
You have to create empty database and add there four tables. Three of them (event_streams
, projections
, snapshots
) are part of prooph, last one (read_users
) is used only in this tutorial.
DROP TABLE IF EXISTS `event_streams`;
CREATE TABLE `event_streams` (
`no` bigint(20) NOT NULL AUTO_INCREMENT,
`real_stream_name` varchar(150) COLLATE utf8_bin NOT NULL,
`stream_name` char(41) COLLATE utf8_bin NOT NULL,
`metadata` json DEFAULT NULL,
`category` varchar(150) COLLATE utf8_bin DEFAULT NULL,
PRIMARY KEY (`no`),
UNIQUE KEY `ix_rsn` (`real_stream_name`),
KEY `ix_cat` (`category`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
DROP TABLE IF EXISTS `projections`;
CREATE TABLE `projections` (
`no` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(150) COLLATE utf8_bin NOT NULL,
`position` json DEFAULT NULL,
`state` json DEFAULT NULL,
`status` varchar(28) COLLATE utf8_bin NOT NULL,
`locked_until` char(26) COLLATE utf8_bin DEFAULT NULL,
PRIMARY KEY (`no`),
UNIQUE KEY `ix_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
DROP TABLE IF EXISTS `read_users`;
CREATE TABLE `read_users` (
`id` int(11) NOT NULL,
`email` varchar(45) DEFAULT NULL,
`password` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `snapshots`;
CREATE TABLE `snapshots` (
`aggregate_id` varchar(150) COLLATE utf8_bin NOT NULL,
`aggregate_type` varchar(150) COLLATE utf8_bin NOT NULL,
`last_version` int(11) NOT NULL,
`created_at` char(26) COLLATE utf8_bin NOT NULL,
`aggregate_root` blob,
UNIQUE KEY `ix_aggregate_id` (`aggregate_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
Empty prooph project
You need some place to work. Create empty folder, init composer project there and get dependencies:
mkdir prooph-in-action
cd prooph-in-action
composer init -n
composer require prooph/service-bus prooph/event-sourcing prooph/event-store prooph/pdo-event-store prooph/event-store-bus-bridge prooph/snapshotter prooph/pdo-snapshot-store
When you did it, you have to add autoload
to your composer.json
.
"autoload": {
"psr-4": {"App\\": "src/"}
}
Folders
All your code you should store into src
catalog, you can start from creating correct structure.
./src/
./src/Infrastructure/
./src/Projection/
./src/Model/
./src/Model/Command/
./src/Model/Event/
To create that you can use follow command:
mkdir -p ./src/Infrastructure/ ./src/Model/Command/ ./src/Model/Event/ ./src/Projection/
Files
When your folders are ready, then you have to add all necessary files. We do that in few steps.
1. Infrastructure
You have to create UserRepository
. It allows you to save and fetch data from event-store
or/and snapshots
.
<?php
// src/Infrastructure/UserRepository.php
declare(strict_types=1);
namespace App\Infrastructure;
use App\Model\User;
use Prooph\EventSourcing\Aggregate\AggregateRepository;
use Prooph\EventSourcing\Aggregate\AggregateType;
use Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator;
use Prooph\EventStore\EventStore;
use Prooph\SnapshotStore\SnapshotStore;
use App\Model\UserRepository as BaseUserRepository;
class UserRepository extends AggregateRepository implements BaseUserRepository
{
public function __construct(EventStore $eventStore, SnapshotStore $snapshotStore)
{
parent::__construct(
$eventStore,
AggregateType::fromAggregateRootClass(User::class),
new AggregateTranslator(),
$snapshotStore,
null,
true
);
}
public function save(User $user): void
{
$this->saveAggregateRoot($user);
}
public function get(string $id): ?User
{
return $this->getAggregateRoot($id);
}
}
2. Projections
Projections are nice feature they allow you to fill read-only database for queries, which should be independent of event-store
and can be performed to read data faster.
<?php
// ./src/Projection/UserProjector.php
declare(strict_types=1);
namespace App\Projection;
use App\Model\Event\EmailChanged;
use App\Model\Event\UserRegistered;
class UserProjector
{
private $PDO;
public function __construct(\PDO $PDO)
{
$this->PDO = $PDO;
}
public function onUserRegistered(UserRegistered $userRegistered): void
{
$query = $this->PDO->prepare('INSERT INTO `read_users` SET email = ?, password = ?, id = ?');
$query->bindValue(1, $userRegistered->email());
$query->bindValue(2, $userRegistered->password());
$query->bindValue(3, $userRegistered->aggregateId());
$query->execute();
}
public function onEmailChanged(EmailChanged $emailChanged): void
{
$query = $this->PDO->prepare('UPDATE `read_users` SET email = ? WHERE id = ?');
$query->bindValue(1, $emailChanged->email());
$query->bindValue(2, $emailChanged->aggregateId());
$query->execute();
}
}
3. Model/Commands
In this tutorial, you will have access to two commands. A first create a new user, a second change email. You have to create here four files.
<?php
// ./src/Model/Command/ChangeEmail.php
declare(strict_types=1);
namespace App\Model\Command;
use Prooph\Common\Messaging\Command;
use Prooph\Common\Messaging\PayloadTrait;
class ChangeEmail extends Command
{
use PayloadTrait;
public function id(): string
{
return $this->payload()['id'];
}
public function email(): string
{
return $this->payload()['email'];
}
}
<?php
// ./src/Model/Command/ChangeEmailHandler.php
declare(strict_types=1);
namespace App\Model\Command;
use App\Model\UserRepository;
class ChangeEmailHandler
{
private $repository;
public function __construct(UserRepository $repository)
{
$this->repository = $repository;
}
public function __invoke(ChangeEmail $changeEmail): void
{
$user = $this->repository->get($changeEmail->id());
$user->changeEmail($changeEmail->email());
$this->repository->save($user);
}
}
<?php
// ./src/Model/Command/RegisterUser.php
declare(strict_types=1);
namespace App\Model\Command;
use Prooph\Common\Messaging\Command;
use Prooph\Common\Messaging\PayloadTrait;
class RegisterUser extends Command
{
use PayloadTrait;
public function id(): string
{
return $this->payload()['id'];
}
public function email(): string
{
return $this->payload()['email'];
}
public function password(): string
{
return $this->payload()['password'];
}
}
<?php
// ./src/Model/Command/RegisterUserHandler.php
declare(strict_types=1);
namespace App\Model\Command;
use App\Model\User;
use App\Model\UserRepository;
class RegisterUserHandler
{
private $repository;
public function __construct(UserRepository $repository)
{
$this->repository = $repository;
}
public function __invoke(RegisterUser $registerUser): void
{
$user = User::registerWithData($registerUser->id(), $registerUser->email(), $registerUser->password());
$this->repository->save($user);
}
}
4. Model/Events
We have two events there. The first one is notification user created an account, the second one is notification user changed his email.
<?php
// ./src/Model/Event/EmailChanged.php
declare(strict_types=1);
namespace App\Model\Event;
use Prooph\EventSourcing\AggregateChanged;
class EmailChanged extends AggregateChanged
{
public function email(): string
{
return $this->payload['email'];
}
}
<?php
// ./src/Model/Event/UserRegistered.php
declare(strict_types=1);
namespace App\Model\Event;
use Prooph\EventSourcing\AggregateChanged;
class UserRegistered extends AggregateChanged
{
public function email(): string
{
return $this->payload['email'];
}
public function password(): string
{
return $this->payload['password'];
}
}
5. Model
Finally. Our Aggregate Root and interface for repositories.
<?php
// ./src/Model/User.php
declare(strict_types=1);
namespace App\Model;
use App\Model\Event\EmailChanged;
use App\Model\Event\UserRegistered;
use Prooph\EventSourcing\AggregateChanged;
use Prooph\EventSourcing\AggregateRoot;
class User extends AggregateRoot
{
private $id, $email, $password;
public function changeEmail($newEmail): void
{
if ($this->email === $newEmail) {
return;
}
$this->recordThat(EmailChanged::occur($this->id, [
'email' => $newEmail
]));
}
static public function registerWithData(string $id, string $email, string $password): self
{
$obj = new self;
$obj->recordThat(UserRegistered::occur($id, [
'email' => $email,
'password' => $password
]));
return $obj;
}
protected function aggregateId(): string
{
return $this->id;
}
protected function apply(AggregateChanged $event): void
{
switch (get_class($event)) {
case UserRegistered::class:
/** @var UserRegistered $event */
$this->id = $event->aggregateId();
$this->email = $event->email();
$this->password = $event->password();
break;
case EmailChanged::class:
/** @var EmailChanged $event */
$this->id = $event->aggregateId();
$this->email = $event->email();
break;
}
}
}
<?php
// ./src/Model/UserRepository.php
declare(strict_types=1);
namespace App\Model;
interface UserRepository
{
public function save(User $user): void;
public function get(string $id): ?User;
}
./src/ folder
Look at your ./src/
folder, it should look like:
./src/Infrastructure/UserRepository.php
./src/Projection/UserProjector.php
./src/Model/Command/ChangeEmail.php
./src/Model/Command/ChangeEmailHandler.php
./src/Model/Command/RegisterUser.php
./src/Model/Command/RegisterUserHandler.php
./src/Model/Event/EmailChanged.php
./src/Model/Event/UserRegistered.php
./src/Model/User.php
./src/Model/UserRepository.php
Configuration
The biggest file is config.php
. It is because we want to connect all prooph components in single app. In this file, you should update MySQL credential.
<?php
// ./config.php
namespace {
use App\Model\Command\ChangeEmail;
use App\Model\Command\ChangeEmailHandler;
use App\Model\Command\RegisterUser;
use App\Model\Command\RegisterUserHandler;
use App\Model\Event\EmailChanged;
use App\Model\Event\UserRegistered;
use App\Infrastructure\UserRepository;
use App\Projection\UserProjector;
use Prooph\Common\Event\ProophActionEventEmitter;
use Prooph\Common\Messaging\FQCNMessageFactory;
use Prooph\EventStore\ActionEventEmitterEventStore;
use Prooph\EventStore\Pdo\MySqlEventStore;
use Prooph\EventStore\Pdo\PersistenceStrategy\MySqlAggregateStreamStrategy;
use Prooph\EventStore\Pdo\Projection\MySqlProjectionManager;
use Prooph\EventStoreBusBridge\EventPublisher;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use Prooph\SnapshotStore\Pdo\PdoSnapshotStore;
include "./vendor/autoload.php";
$pdo = new PDO('mysql:dbname=prooph;host=localhost', 'root', '');
$eventStore = new MySqlEventStore(new FQCNMessageFactory(), $pdo, new MySqlAggregateStreamStrategy());
$eventEmitter = new ProophActionEventEmitter();
$eventStore = new ActionEventEmitterEventStore($eventStore, $eventEmitter);
$eventBus = new EventBus($eventEmitter);
$eventPublisher = new EventPublisher($eventBus);
$eventPublisher->attachToEventStore($eventStore);
$pdoSnapshotStore = new PdoSnapshotStore($pdo);
$userRepository = new UserRepository($eventStore, $pdoSnapshotStore);
$projectionManager = new MySqlProjectionManager($eventStore, $pdo);
$commandBus = new CommandBus();
$router = new CommandRouter();
$router->route(RegisterUser::class)->to(new RegisterUserHandler($userRepository));
$router->route(ChangeEmail::class)->to(new ChangeEmailHandler($userRepository));
$router->attachToMessageBus($commandBus);
$userProjector = new UserProjector($pdo);
$eventRouter = new EventRouter();
$eventRouter->route(EmailChanged::class)->to([$userProjector, 'onEmailChanged']);
$eventRouter->route(UserRegistered::class)->to([$userProjector, 'onUserRegistered']);
$eventRouter->attachToMessageBus($eventBus);
$userId = '20';
}
And this is it. Your prooph app is ready to fun. In next paragraphs, you will see some familiar scenarios.
Before we start
If you want to understand better how prooph works (and CQRS, ES, etc..), you should log all MySQL queries into file and follow them. Bellow, you find easy instruction how to do that.
MySQL Query logging
Open your MySQL console and paste that queries:
SET GLOBAL general_log = 'ON';
SHOW VARIABLES LIKE "general_log%";
You should see something like that:
mysql> SHOW VARIABLES LIKE "general_log%";
+------------------+-----------------------------------------------+
| Variable_name | Value |
+------------------+-----------------------------------------------+
| general_log | ON |
| general_log_file | /usr/local/var/mysql/MacBook-Pro-Marcin-3.log |
+------------------+-----------------------------------------------+
2 rows in set (0.01 sec)
In that case you just need follow that file /usr/local/var/mysql/MacBook-Pro-Marcin-3.log
i.e.
tail -f /usr/local/var/mysql/MacBook-Pro-Marcin-3.log
This is it. You see all queries that are executed in real-time.
Create User
Do you want to create user and change his email… five times? Be my guest. Just create file create_user_and_change_email.php
<?php
// ./create_user_ang_change_email.php
namespace {
use App\Model\Command\ChangeEmail;
use App\Model\Command\RegisterUser;
include "./config.php";
$commandBus->dispatch(new RegisterUser([
'id' => $userId,
'email' => 'random@email.com',
'password' => 'test'
]));
for ($i = 0; $i < 5; $i++) {
$commandBus->dispatch(new ChangeEmail([
'email' => 'random' . $i . '@email.com',
'id' => $userId
]));
}
}
Fetch information about user
Do you like to know how to fetch information about user from database? Create file get_user.php
<?php
// ./get_user.php
namespace {
include './config.php';
var_dump($userRepository->get($userId));
}
Oh no. Junior Developer cleans up users’ database.
Run that command in your MySQL console:
SELECT * FROM read_users;
TRUNCATE read_users;
Did you lose that data forever? No. Create file restore_read_model.php
<?php
// ./restore_read_model.php
namespace {
use App\Model\User;
use Prooph\Common\Messaging\Message;
use Prooph\EventStore\Projection\Projector;
include "./config.php";
$projection = $projectionManager->createProjection('test', [Projector::OPTION_PCNTL_DISPATCH => true,]);
$projection->reset();
pcntl_signal(SIGQUIT, function () use ($projection) {
$projection->stop();
});
$projection
->fromCategory(User::class)
->whenAny(
function (array $state, Message $event) use ($eventBus): array {
$eventBus->dispatch($event);
return $state;
}
);
$projection->run(false);
}
Run script and magic will happen. All data have been restored.
User changed their email 80 000 times, and app works slower.
Because to fetch information about user prooph has to sum up information from all events that happened it can be slow after some time. You can create snapshots to resolve that problem. Create create_snapshot.php
.
<?php
// ./create_snapshot.php
namespace {
use App\Model\User;
use Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator;
use Prooph\Snapshotter\CategorySnapshotProjection;
use Prooph\Snapshotter\SnapshotReadModel;
include "./config.php";
$snapshotReadModel = new SnapshotReadModel(
$userRepository,
new AggregateTranslator(),
$pdoSnapshotStore,
[User::class]
);
$projection = $projectionManager->createReadModelProjection(
'user_snapshots',
$snapshotReadModel
);
$categoryProjection = new CategorySnapshotProjection($projection, User::class);
$categoryProjection();
$projection->run(false);
}
Run script ./get_user.php
again. Check in console and compare how the queries were changed. On the beginning prooph fetches data from snapshot
and then gets only data which were added after you did snapshot.
The End.
I hope you enjoyed this tutorial. If you have some questions, feel free to ask.