Prooph: CQRS+ES in PHP. How to use.

prooph

Pattern CQRS and Event Sourcing become more and more popular every day in PHP community. You can easily find lot libraries to store events, to execute commands, to manage repositories, etc. The leader there doesn’t exist yet, but prooph as the most complex solution is close to that position. They have only one problem, documentation.

Ok, they have two problems. Documentation and many not to updated examples in their GitHub repository. Because of that and because prooph is de facto five not related components running everything out-of-the-box is really a pain. Even if some examples contain Docker on board or especially then because a tree of dependencies grows fast.

  • Event Sourcing
  • Event Store
  • Snapshot Store
  • Projection
  • Service Bus

When would you like to work with CQRS+ES you have to connect those components somehow. Good luck!

I did it. I created the simple example to show how to use all that components, almost without dependencies. You don’t need frameworks here (symfony, laravel, zend), other libraries (uuid, DI), queue systems (rabbitmq, kafka) or external dependencies (docker, vagrant). Everything what do you need is PHP 7.1 and MySQL.

In this tutorial I assume that you know the theory about CQRS+ES, you don’t need any explanation why and when it’s better solution than CRUD. If you are looking for this kind of information, please get back to Google.

GIT Repository

I pushed all code into repository on GitHub. You can clone it and run or follow this tutorial.

git clone https://github.com/mmp4k/prooph-in-action.git
cd prooph-in-action
composer install

Prepare MySQL database

You have to create empty database and add there four tables. Three of them (event_streams, projections, snapshots) are part of prooph, last one (read_users) is used only in this tutorial.

DROP TABLE IF EXISTS `event_streams`;
CREATE TABLE `event_streams` (
  `no` bigint(20) NOT NULL AUTO_INCREMENT,
  `real_stream_name` varchar(150) COLLATE utf8_bin NOT NULL,
  `stream_name` char(41) COLLATE utf8_bin NOT NULL,
  `metadata` json DEFAULT NULL,
  `category` varchar(150) COLLATE utf8_bin DEFAULT NULL,
  PRIMARY KEY (`no`),
  UNIQUE KEY `ix_rsn` (`real_stream_name`),
  KEY `ix_cat` (`category`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

DROP TABLE IF EXISTS `projections`;
CREATE TABLE `projections` (
  `no` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(150) COLLATE utf8_bin NOT NULL,
  `position` json DEFAULT NULL,
  `state` json DEFAULT NULL,
  `status` varchar(28) COLLATE utf8_bin NOT NULL,
  `locked_until` char(26) COLLATE utf8_bin DEFAULT NULL,
  PRIMARY KEY (`no`),
  UNIQUE KEY `ix_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

DROP TABLE IF EXISTS `read_users`;
CREATE TABLE `read_users` (
  `id` int(11) NOT NULL,
  `email` varchar(45) DEFAULT NULL,
  `password` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS `snapshots`;
CREATE TABLE `snapshots` (
  `aggregate_id` varchar(150) COLLATE utf8_bin NOT NULL,
  `aggregate_type` varchar(150) COLLATE utf8_bin NOT NULL,
  `last_version` int(11) NOT NULL,
  `created_at` char(26) COLLATE utf8_bin NOT NULL,
  `aggregate_root` blob,
  UNIQUE KEY `ix_aggregate_id` (`aggregate_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

Empty prooph project

You need some place to work. Create empty folder, init composer project there and get dependencies:

mkdir prooph-in-action
cd prooph-in-action
composer init -n
composer require prooph/service-bus prooph/event-sourcing prooph/event-store prooph/pdo-event-store prooph/event-store-bus-bridge prooph/snapshotter prooph/pdo-snapshot-store

When you did it, you have to add autoload to your composer.json.

    "autoload": {
        "psr-4": {"App\\": "src/"}
    }

Folders

All your code you should store into src catalog, you can start from creating correct structure.

./src/
./src/Infrastruture/
./src/Projection/
./src/Model/
./src/Model/Command/
./src/Model/Event/

To create that you can use follow command:

mkdir -p ./src/Infrastruture/ ./src/Model/Command/ ./src/Model/Event/ ./src/Projection/

Files

When your folders are ready, then you have to add all necessary files. We do that in few steps.

1. Infrastructure

You have to create UserRepository. It allows you to save and fetch data from event-store or/and snapshots.

<?php

// src/Infrastructure/UserRepository.php

declare(strict_types=1);

namespace App\Infrastructure;

use App\Model\User;
use Prooph\EventSourcing\Aggregate\AggregateRepository;
use Prooph\EventSourcing\Aggregate\AggregateType;
use Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator;
use Prooph\EventStore\EventStore;
use Prooph\SnapshotStore\SnapshotStore;
use App\Model\UserRepository as BaseUserRepository;

class UserRepository extends AggregateRepository implements BaseUserRepository
{
    public function __construct(EventStore $eventStore, SnapshotStore $snapshotStore)
    {
        parent::__construct(
            $eventStore,
            AggregateType::fromAggregateRootClass(User::class),
            new AggregateTranslator(),
            $snapshotStore,
            null,
            true
        );
    }

    public function save(User $user): void
    {
        $this->saveAggregateRoot($user);
    }

    public function get(string $id): ?User
    {
        return $this->getAggregateRoot($id);
    }
}

2. Projections

Projections are nice feature they allow you to fill read-only database for queries, which should be independent of event-store and can be performed to read data faster.

<?php

// ./src/Projection/UserProjector.php

declare(strict_types=1);

namespace App\Projection;

use App\Model\Event\EmailChanged;
use App\Model\Event\UserRegistered;

class UserProjector
{
    private $PDO;

    public function __construct(\PDO $PDO)
    {
        $this->PDO = $PDO;
    }

    public function onUserRegistered(UserRegistered $userRegistered): void
    {
        $query = $this->PDO->prepare('INSERT INTO `read_users` SET email = ?, password = ?, id = ?');
        $query->bindValue(1, $userRegistered->email());
        $query->bindValue(2, $userRegistered->password());
        $query->bindValue(3, $userRegistered->aggregateId());
        $query->execute();

    }

    public function onEmailChanged(EmailChanged $emailChanged): void
    {
        $query = $this->PDO->prepare('UPDATE `read_users` SET email = ? WHERE id = ?');
        $query->bindValue(1, $emailChanged->email());
        $query->bindValue(2, $emailChanged->aggregateId());
        $query->execute();
    }
}

3. Model/Commands

In this tutorial, you will have access to two commands. A first create a new user, a second change email. You have to create here four files.

<?php

// ./src/Model/Command/ChangeEmail.php

declare(strict_types=1);

namespace App\Model\Command;

use Prooph\Common\Messaging\Command;
use Prooph\Common\Messaging\PayloadTrait;

class ChangeEmail extends Command
{
    use PayloadTrait;

    public function id(): string
    {
        return $this->payload()['id'];
    }

    public function email(): string
    {
        return $this->payload()['email'];
    }
}

<?php

// ./src/Model/Command/ChangeEmailHandler.php

declare(strict_types=1);

namespace App\Model\Command;

use App\Model\UserRepository;

class ChangeEmailHandler
{
    private $repository;

    public function __construct(UserRepository $repository)
    {
        $this->repository = $repository;
    }

    public function __invoke(ChangeEmail $changeEmail): void
    {
        $user = $this->repository->get($changeEmail->id());
        $user->changeEmail($changeEmail->email());
        $this->repository->save($user);
    }
}
<?php

// ./src/Model/Command/RegisterUser.php

declare(strict_types=1);

namespace App\Model\Command;

use Prooph\Common\Messaging\Command;
use Prooph\Common\Messaging\PayloadTrait;

class RegisterUser extends Command
{
    use PayloadTrait;

    public function id(): string
    {
        return $this->payload()['id'];
    }

    public function email(): string
    {
        return $this->payload()['email'];
    }

    public function password(): string
    {
        return $this->payload()['password'];
    }
}
<?php

// ./src/Model/Command/RegisterUserHandler.php

declare(strict_types=1);

namespace App\Model\Command;

use App\Model\User;
use App\Model\UserRepository;

class RegisterUserHandler
{
    private $repository;

    public function __construct(UserRepository $repository)
    {
        $this->repository = $repository;
    }

    public function __invoke(RegisterUser $registerUser): void
    {
        $user = User::registerWithData($registerUser->id(), $registerUser->email(), $registerUser->password());
        $this->repository->save($user);
    }
}

4. Model/Events

We have two events there. The first one is notification user created an account, the second one is notification user changed his email.

<?php

// ./src/Model/Event/EmailChanged.php

declare(strict_types=1);

namespace App\Model\Event;

use Prooph\EventSourcing\AggregateChanged;

class EmailChanged extends AggregateChanged
{
    public function email(): string
    {
        return $this->payload['email'];
    }
}
<?php

// ./src/Model/Event/UserRegistered.php

declare(strict_types=1);

namespace App\Model\Event;

use Prooph\EventSourcing\AggregateChanged;

class UserRegistered extends AggregateChanged
{
    public function email(): string
    {
        return $this->payload['email'];
    }

    public function password(): string
    {
        return $this->payload['password'];
    }
}

5. Model

Finally. Our Aggregate Root and interface for repositories.

<?php

// ./src/Model/User.php

declare(strict_types=1);

namespace App\Model;

use App\Model\Event\EmailChanged;
use App\Model\Event\UserRegistered;
use Prooph\EventSourcing\AggregateChanged;
use Prooph\EventSourcing\AggregateRoot;

class User extends AggregateRoot
{
    private $id, $email, $password;

    public function changeEmail($newEmail): void
    {
        if ($this->email === $newEmail) {
            return;
        }

        $this->recordThat(EmailChanged::occur($this->id, [
            'email' => $newEmail
        ]));
    }

    static public function registerWithData(string $id, string $email, string $password): self
    {
        $obj = new self;
        $obj->recordThat(UserRegistered::occur($id, [
            'email' => $email,
            'password' => $password
        ]));

        return $obj;
    }

    protected function aggregateId(): string
    {
        return $this->id;
    }

    protected function apply(AggregateChanged $event): void
    {
        switch (get_class($event)) {
            case UserRegistered::class:
                /** @var UserRegistered $event */
                $this->id = $event->aggregateId();
                $this->email = $event->email();
                $this->password = $event->password();
                break;
            case EmailChanged::class:
                /** @var EmailChanged $event */
                $this->id = $event->aggregateId();
                $this->email = $event->email();
                break;
        }
    }
}
<?php

// ./src/Model/UserRepository.php

declare(strict_types=1);

namespace App\Model;

interface UserRepository
{
    public function save(User $user): void;
    public function get(string $id): ?User;
}

./src/ folder

Look at your ./src/ folder, it should look like:

./src/Infrastruture/UserRepository.php
./src/Projection/UserProjector.php
./src/Model/Command/ChangeEmail.php
./src/Model/Command/ChangeEmailHandler.php
./src/Model/Command/RegisterUser.php
./src/Model/Command/RegisterUserHandler.php
./src/Model/Event/EmailChanged.php
./src/Model/Event/UserRegistered.php
./src/Model/User.php
./src/Model/UserRepository.php

Configuration

The biggest file is config.php. It is because we want to connect all prooph components in single app. In this file, you should update MySQL credential.

<?php

// ./config.php

namespace {

    use App\Model\Command\ChangeEmail;
    use App\Model\Command\ChangeEmailHandler;
    use App\Model\Command\RegisterUser;
    use App\Model\Command\RegisterUserHandler;
    use App\Model\Event\EmailChanged;
    use App\Model\Event\UserRegistered;
    use App\Infrastructure\UserRepository;
    use App\Projection\UserProjector;
    use Prooph\Common\Event\ProophActionEventEmitter;
    use Prooph\Common\Messaging\FQCNMessageFactory;
    use Prooph\EventStore\ActionEventEmitterEventStore;
    use Prooph\EventStore\Pdo\MySqlEventStore;
    use Prooph\EventStore\Pdo\PersistenceStrategy\MySqlAggregateStreamStrategy;
    use Prooph\EventStore\Pdo\Projection\MySqlProjectionManager;
    use Prooph\EventStoreBusBridge\EventPublisher;
    use Prooph\ServiceBus\CommandBus;
    use Prooph\ServiceBus\EventBus;
    use Prooph\ServiceBus\Plugin\Router\CommandRouter;
    use Prooph\ServiceBus\Plugin\Router\EventRouter;
    use Prooph\SnapshotStore\Pdo\PdoSnapshotStore;

    include "./vendor/autoload.php";

    $pdo = new PDO('mysql:dbname=prooph;host=localhost', 'root', '');
    $eventStore = new MySqlEventStore(new FQCNMessageFactory(), $pdo, new MySqlAggregateStreamStrategy());
    $eventEmitter = new ProophActionEventEmitter();
    $eventStore = new ActionEventEmitterEventStore($eventStore, $eventEmitter);

    $eventBus = new EventBus($eventEmitter);
    $eventPublisher = new EventPublisher($eventBus);
    $eventPublisher->attachToEventStore($eventStore);

    $pdoSnapshotStore = new PdoSnapshotStore($pdo);
    $userRepository = new UserRepository($eventStore, $pdoSnapshotStore);

    $projectionManager = new MySqlProjectionManager($eventStore, $pdo);

    $commandBus = new CommandBus();
    $router = new CommandRouter();
    $router->route(RegisterUser::class)->to(new RegisterUserHandler($userRepository));
    $router->route(ChangeEmail::class)->to(new ChangeEmailHandler($userRepository));
    $router->attachToMessageBus($commandBus);

    $userProjector = new UserProjector($pdo);
    $eventRouter = new EventRouter();
    $eventRouter->route(EmailChanged::class)->to([$userProjector, 'onEmailChanged']);
    $eventRouter->route(UserRegistered::class)->to([$userProjector, 'onUserRegistered']);
    $eventRouter->attachToMessageBus($eventBus);

    $userId = '20';
}

And this is it. Your prooph app is ready to fun. In next paragraphs, you will see some familiar scenarios.

Before we start

If you want to understand better how prooph works (and CQRS, ES, etc..), you should log all MySQL queries into file and follow them. Bellow, you find easy instruction how to do that.

MySQL Query logging

Open your MySQL console and paste that queries:

SET GLOBAL general_log = 'ON';
SHOW VARIABLES LIKE "general_log%";

You should see something like that:

mysql> SHOW VARIABLES LIKE "general_log%";
+------------------+-----------------------------------------------+
| Variable_name    | Value                                         |
+------------------+-----------------------------------------------+
| general_log      | ON                                           |
| general_log_file | /usr/local/var/mysql/MacBook-Pro-Marcin-3.log |
+------------------+-----------------------------------------------+
2 rows in set (0.01 sec)

In that case you just need follow that file /usr/local/var/mysql/MacBook-Pro-Marcin-3.log i.e.

tail -f /usr/local/var/mysql/MacBook-Pro-Marcin-3.log

This is it. You see all queries that are executed in real-time.

Create User

Do you want to create user and change his email… five times? Be my guest. Just create file create_user_and_change_email.php

<?php

// ./create_user_ang_change_email.php

namespace {

    use App\Model\Command\ChangeEmail;
    use App\Model\Command\RegisterUser;

    include "./config.php";

    $commandBus->dispatch(new RegisterUser([
        'id' => $userId,
        'email' => 'random@email.com',
        'password' => 'test'
    ]));

    for ($i = 0; $i < 5; $i++) {
        $commandBus->dispatch(new ChangeEmail([
            'email' => 'random' . $i . '@email.com',
            'id' => $userId
        ]));
    }
}

Fetch information about user

Do you like to know how to fetch information about user from database? Create file get_user.php

<?php

// ./get_user.php

namespace {
    include './config.php';

    var_dump($userRepository->get($userId));
}

Oh no. Junior Developer cleans up users’ database.

Run that command in your MySQL console:

SELECT * FROM read_users;
TRUNCATE read_users;

Did you lose that data forever? No. Create file restore_read_model.php

<?php

// ./restore_read_model.php

namespace {

    use App\Model\User;
    use Prooph\Common\Messaging\Message;
    use Prooph\EventStore\Projection\Projector;

    include "./config.php";

    $projection = $projectionManager->createProjection('test', [Projector::OPTION_PCNTL_DISPATCH => true,]);
    $projection->reset();

    pcntl_signal(SIGQUIT, function () use ($projection) {
        $projection->stop();
    });
    $projection
        ->fromCategory(User::class)
        ->whenAny(
            function (array $state, Message $event) use ($eventBus): array {
                $eventBus->dispatch($event);
                return $state;
            }
        );
    $projection->run(false);
}

Run script and magic will happen. All data have been restored.

User changed their email 80 000 times, and app works slower.

Because to fetch information about user prooph has to sum up information from all events that happened it can be slow after some time. You can create snapshots to resolve that problem. Create create_snapshot.php.

<?php

// ./create_snapshot.php

namespace {

    use App\Model\User;
    use Prooph\EventSourcing\EventStoreIntegration\AggregateTranslator;
    use Prooph\Snapshotter\CategorySnapshotProjection;
    use Prooph\Snapshotter\SnapshotReadModel;

    include "./config.php";

    $snapshotReadModel = new SnapshotReadModel(
        $userRepository,
        new AggregateTranslator(),
        $pdoSnapshotStore,
        [User::class]
    );

    $projection = $projectionManager->createReadModelProjection(
        'user_snapshots',
        $snapshotReadModel
    );
    $categoryProjection = new CategorySnapshotProjection($projection, User::class);
    $categoryProjection();
    $projection->run(false);
}

Run script ./get_user.php again. Check in console and compare how the queries were changed. On the beginning prooph fetches data from snapshot and then gets only data which were added after you did snapshot.

The End.

I hope you enjoyed this tutorial. If you have some questions, feel free to ask.


Related posts:

Share On Facebook
Share On Twitter
Share On Google Plus
Share On Linkedin
Share On Reddit
  • Sascha-Oliver Prolic

    Really great post. Just one mistake, do not set properties in the aggregate root action methods, properties are only updated after an event has been recorded by the when…-methods.

    • Thanks! I’ve just fixed it moment ago, either repository and blog post

  • Filip Górny

    Thanks, I’m working on something very similar now and it helped.

    • Great! Good luck with your project.

  • Chris Harrison

    I’m slightly confused by restore_read_model.php

    I may be completely wrong, but my reading of that code is it takes all the previously occuring events and dispatches them again through the event bus. What is stopping duplicate events being created?

  • Ivan Jaros

    Thanks for this! It was really helpful. Just FYI the restore_read_model.php won’ run on Windows due to pcntl and the create_snapshot.php runs indefinitely.

    • Simon

      indeed, create_snapshot.php run indefinitely also on my side

  • Petr Malina

    There’s a “c” missing in the mkdir command in the “Infrastruture” word.
    Thanks for this article!

  • Just a small tip, unrelated to Prooph, that I’m sharing just because I find that many people don’t seem to know this. The mkdir -p command actually can be shortened. Using slashes and braces (to group under a certain level) you can actually do:

    mkdir -p src/{Infrastructure,Projection,Model/{Command,Event}}

    to create your whole structure! 😉