Async Middleware

Matthew Weier O'Phinney

MidwestPHP 2019

Terminology

  • PHP-FIG (or FIG): Framework Interop Group (standards body)
  • PSR-7: HTTP Message interfaces
  • PSR-15: HTTP Request Handler/Middleware interfaces
  • PSR-17: HTTP Message Factory interfaces
  • PSR-14: Event Dispatcher interfaces

Goals

I will demonstrate how async enables:

  • Application-specific servers
  • Better performance, generally due to...
  • Deferred processing.

Node.js

Node.js

event loop

Event Loop; image copyright Bartolomeo Sorrentino

Why?

PHP: Shared Nothing

Shared Nothing
  • Bootstraps. Every. Request.
  • Long-running processes delay the response.

Message Queues

Message Queue; image copyright Stackify

Parallel Processing

Parallel Processing; image copyright Michael Kloran

Async Programming

Styles of Async Programming

Four general styles:

  • Callbacks
  • Promises
  • async/await
  • Coroutines

Deferment patterns

Callbacks


executeSomeAsyncProcess(
    $withData,
    $andA,
    $callbackToExecuteOnCompletion
);
          

function ($error, $result)
{
    if ($error) {
    }
}
            

Callback Hell


doFirst($payload, function ($err, $result) {
    doSecond($result, function ($err, $result) {
        doThird($result, function ($err, $result) {
            // finally got what we needed
        });
    });
});
          

aka the "Pyramid of Doom"

Promises


$promise = someAsyncOperationReturningAPromise($with, $data);
$promise
    ->then($someCallbackToExecuteOnResolution)
    ->then($someAdditionalCallbackToExecuteOnResolutionOfPrevious)
    ->catch($someCallbackToExecuteOnRejection);
          

where:


function then($successCallback = null, $rejectionCallback = null);
            

and catch() is a shortcut for:


$promise->then(null, $someCallbackToExecuteOnRejection);
            

async/await


async function ping() {
  const res = await fetch('/api/ping');
  return await res.json();
}
          

let ack = ping();
          

Coroutines


$result = $statement->execute($data);
          

Swoole provides...

  • an event loop
  • async HTTP, network, and socket clients
  • network servers

Swoole features

  • Coroutine support for many TCP/UDP and socket operations
  • Spawning multiple workers per server
  • Spawning separate task workers
  • Daemonization of servers

A Basic Web Server


use Swoole\Http\Server as HttpServer;

$server = new HttpServer('127.0.0.1', 9000);
$server->on('start', function ($server) {
    echo "Server started at http://127.0.0.1:9000\n";
});
$server->on('request', function ($request, $response) {
    $response->header('Content-Type', 'text/plain');
    $response->end("Hello World\n");
});
$server->start();
          

Deferment


$server->defer(function () {
    // work to defer
});
          

Task Workers

Prepare the server
for tasks

  • Configure the number of task workers to use (required!)
  • Register a listener to handle incoming tasks.
  • Register a listener to execute on task completion.

Registering task workers


$server->set(['task_worker_num' => 4]);
$server->on('task', function ($server, $taskId, $data) {
    // Handle task

    // Finish task:
    $server->finish('');
});
$server->on('finish', function ($server, $taskId, $returnValue) {
    // Task is complete
});
          

Triggering a task


$server->task($someData);
          

A Basic TaskWorker


class Task {
    public $handler;   // callable
    public $arguments; // array
}

$server->on('task', function ($server, $taskId, $task) {
    if (! $task instanceof Task) {
        $server->finish('');
        return;
    }

    ($task->handler)(...$task->arguments);
    $server->finish('');
});
          

Problems

Code Reloading

or lack thereof

Debugging

Coroutine support in Swoole is incompatible with XDebug and XHProf!

Unit Testing

Mocking Swoole classes is difficult.

One Listener Per Event

$response->end() is Problematic

If you forget to call it:

  • the connection will remain open until a network timeout occurs;
  • the current process will remain open; which means
  • no next tick of the event loop.

Non-Standard Request/Response API

It is none of:

  • PSR-7
  • Symfony HttpKernel
  • zend-http

Swoole Pros and Cons

Pros Cons
Multiple web workers No hot-code reloading
Separate task workers Unmockable classes
Coroutine support Single-listener events
Bootstrap elimination $response->end()
No web server Non-standard request/response API
Performance gains

PSR-15

Request Handlers and Middleware


namespace Psr\Http\Server;
use Psr\Http\Message\ResponseInterface as Response;
use Psr\Http\Message\ServerRequestInterface as Request;

interface RequestHandlerInterface {
    public function handle(Request $request) : Response;
}

interface MiddlewareInterface {
    public function process(
        Request $request,
        RequestHandlerInterface $handler
    ) : Response;
}
          

Middleware Flow

Middleware; image copyright Sergey Zhuk

Middleware + Swoole


$server->on('request', function ($request, $response) use ($app) {
    $appResponse = $app->handle(transformRequest($request));
    transformResponse($response, $appResponse);
});
          

Expressive

A middleware application runner, with...

  • Dependency Injection wiring abstraction
  • Routing abstraction
  • Template abstraction
  • Error handling abstraction
  • Application and per-route pipelines

zend-expressive-swoole

Starting your Expressive+Swoole server


$ ./vendor/bin/zend-expressive-swoole start
          

Configurable Server Features

  • Limited hot-code reloading
  • Static file serving

Async Applications

  • Eliminating bootstrap operations
  • Deferred operations

Deferred Operations

Coroutines


$result = $mysqli->query($sql);
while ($data = $result->fetch_assoc()) {
    // ...
}
          

Deferment


$server->defer($callback);
          

Task Workers


$server->task($someData);
          

phly-swoole-taskworker

phly-swoole-taskworker Usage


use Phly\Swoole\TaskWorker\Task;

$server->task(new Task(function ($event) {
    // handle the event
}, $event));
          

Listener deferment

phly/phly-event-dispatcher


$listener = new DeferredListener($server, $listener);

// Where DeferredListener is equivalent to:
function (object $event) use ($server, $listener) : void {
    $server->task(new Task($listener, $event));
}
          

In your own code:


$this->dispatcher->dispatch($someEvent);
            

Pitfalls

Pitfalls; image copyright Activision

Stateful Services

Stateful Services: Templating


$template->addDefaultParam('*', 'user', $user);
          

$metadata = $resourceGenerator
    ->getMetadataMap()
    ->get(Collection::class);
$metadata->setQueryStringArguments(array_merge(
    $metadata->getQueryStringArguments(),
    ['query' => $query]
));
          

Stateful Services: Validation


if (! $validator->isValid($value)) {
    $messages = $validator->getMessages();
}
          

echo implode("
", $validator->getMessages())

Stateful Services: Auth


return $handler(
    $request->withAttribute('user', $auth->getIdentity())
);
          

Resolving State Issues

Decoration


class StatelessVariant implements SomeInterface
{
    private $proxy;

    public function __construct(SomeInterface $proxy)
    {
        $this->proxy = $proxy;
    }
}
          

public function morphState($data) : void
{
    throw new CannotChangeStateException();
}
          

Extension


class StatelessVariant extends OriginalClass
{
    public function morphState($data) : void
    {
        throw new CannotChangeStateException();
    }
}
          

Factories as Services


public function __construct(SomeClass $dependency) : void
          

becomes:


public function __construct(SomeClassFactory $factory) : void
            

and we then:


$dependency = ($this->factory)();
            

Stateful Messages

Pass stateful data to the service:


$result = $this->router->route(
    $request->getUrl()->getPath()
);
          

or the request:


$result = $this->router->route(
    $request
);
            

Pass State Via Request Attributes


public function process(
    Request $request,
    RequestHandler $handler
) : Response {
    $result = $this->router->route($request);
    return $handler->handle(
        $request->withAttribute(RouteResult::class, $result)
    );
}
          

Using Request Attributes


$routeResult = $request->getAttribute(RouteResult::class);
return new HtmlResponse($this->renderer->render(
    'some::template',
    [ 'routeResult' => $routeResult ]
));
          

Sessions

Use zend-expressive-session

and its zend-expressive-session-cache adapter

Wrap Up

Benefits

  • Eliminating bootstrap operations
  • Deferred operations
  • Crazy fast performance!

Practices

  • Abstract async-specific details.
  • Make container services stateless
  • Aggregate state in the request
  • Avoid the session extension

Resources

Thank You!

Contact me at mwop.net

I tweet @mwop