步骤 18: 使用异步

    理想情况是,你应该存储提交的数据,但先不发布它,并且立刻返回一个应答。检查垃圾信息可以之后进行。

    我们要为评论引入 信息,它可以取以下值之一:submittedspampublished

    state 属性加入 Comment 类:

    创建一个数据库结构迁移:

    1. $ symfony console make:migration

    修改这个迁移,让已有评论的状态默认设为 published

    patch_file

    1. --- a/migrations/Version00000000000000.php
    2. +++ b/migrations/Version00000000000000.php
    3. @@ -20,7 +20,9 @@ final class Version20200714155905 extends AbstractMigration
    4. public function up(Schema $schema) : void
    5. {
    6. // this up() migration is auto-generated, please modify it to your needs
    7. - $this->addSql('ALTER TABLE comment ADD state VARCHAR(255) NOT NULL');
    8. + $this->addSql('ALTER TABLE comment ADD state VARCHAR(255)');
    9. + $this->addSql("UPDATE comment SET state='published'");
    10. + $this->addSql('ALTER TABLE comment ALTER COLUMN state SET NOT NULL');
    11. }
    12. public function down(Schema $schema) : void

    更新数据库结构:

    1. $ symfony console doctrine:migrations:migrate

    我们也要确保 state 的默认值设为 submitted

    patch_file

    1. --- a/src/Entity/Comment.php
    2. +++ b/src/Entity/Comment.php
    3. @@ -55,9 +55,9 @@ class Comment
    4. private $photoFilename;
    5. /**
    6. - * @ORM\Column(type="string", length=255)
    7. + * @ORM\Column(type="string", length=255, options={"default": "submitted"})
    8. */
    9. - private $state;
    10. + private $state = 'submitted';
    11. public function __toString(): string
    12. {

    更新 EasyAdmin 的配置,这样我们可以看到评论的状态:

    patch_file

    1. --- a/src/Controller/Admin/CommentCrudController.php
    2. +++ b/src/Controller/Admin/CommentCrudController.php
    3. @@ -51,6 +51,7 @@ class CommentCrudController extends AbstractCrudController
    4. ->setLabel('Photo')
    5. ->onlyOnIndex()
    6. ;
    7. + yield TextField::new('state');
    8. $createdAt = DateTimeField::new('createdAt')->setFormTypeOptions([
    9. 'html5' => true,

    别忘了也要更新测试,就是在 fixture 里设置 state

    patch_file

    1. --- a/src/DataFixtures/AppFixtures.php
    2. +++ b/src/DataFixtures/AppFixtures.php
    3. @@ -37,8 +37,16 @@ class AppFixtures extends Fixture
    4. $comment1->setAuthor('Fabien');
    5. $comment1->setEmail('[email protected]');
    6. $comment1->setText('This was a great conference.');
    7. + $comment1->setState('published');
    8. $manager->persist($comment1);
    9. + $comment2 = new Comment();
    10. + $comment2->setConference($amsterdam);
    11. + $comment2->setAuthor('Lucas');
    12. + $comment2->setEmail('');
    13. + $comment2->setText('I think this one is going to be moderated.');
    14. + $manager->persist($comment2);
    15. +
    16. $admin = new Admin();
    17. $admin->setRoles(['ROLE_ADMIN']);
    18. $admin->setUsername('admin');

    在控制器的测试里,模拟验证:

    patch_file

    1. --- a/tests/Controller/ConferenceControllerTest.php
    2. +++ b/tests/Controller/ConferenceControllerTest.php
    3. @@ -2,6 +2,8 @@
    4. namespace App\Tests\Controller;
    5. +use App\Repository\CommentRepository;
    6. +use Doctrine\ORM\EntityManagerInterface;
    7. use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
    8. class ConferenceControllerTest extends WebTestCase
    9. @@ -22,10 +24,16 @@ class ConferenceControllerTest extends WebTestCase
    10. $client->submitForm('Submit', [
    11. 'comment_form[author]' => 'Fabien',
    12. 'comment_form[text]' => 'Some feedback from an automated functional test',
    13. - 'comment_form[email]' => '[email protected]',
    14. + 'comment_form[email]' => $email = '',
    15. 'comment_form[photo]' => dirname(__DIR__, 2).'/public/images/under-construction.gif',
    16. ]);
    17. $this->assertResponseRedirects();
    18. +
    19. + // simulate comment validation
    20. + $comment = self::$container->get(CommentRepository::class)->findOneByEmail($email);
    21. + $comment->setState('published');
    22. + self::$container->get(EntityManagerInterface::class)->flush();
    23. +
    24. $client->followRedirect();
    25. $this->assertSelectorExists('div:contains("There are 2 comments")');
    26. }

    在 PHPUnit 测试中,你可以用 self::$container->get() 获取容器里的任何服务;你也可以用它访问非公共服务。

    理解 Messenger

    在 Symfony 中由 Messenger 组件来管理异步代码:

    当需要异步执行一些逻辑时,发送一个 消息消息总线。总线会在一个 队列 中存储消息,并且立即返回,这样执行流程就能尽快恢复。

    一个 消费者 程序会在后台持续运行,它会读取队列里的消息,并且执行相关逻辑。消费者程序可以和 web 应用程序运行在同一个服务器上,也可以运行在另一个服务器上。

    它和 HTTP 请求的处理方式很像,只是它不会返回应答。

    编写一个消息处理器

    一个消息是一个仅包含数据的类,它不含任何逻辑。它会被持久化,然后保存在队列中,所以它只包含“简单”的可序列化数据。

    src/Message/CommentMessage.php

    1. namespace App\Message;
    2. class CommentMessage
    3. {
    4. private $id;
    5. private $context;
    6. {
    7. $this->id = $id;
    8. $this->context = $context;
    9. }
    10. public function getId(): int
    11. {
    12. return $this->id;
    13. public function getContext(): array
    14. {
    15. return $this->context;
    16. }
    17. }

    在 Messenger 的世界里,我们没有控制器,但有消息处理器。

    在新的 App\MessageHandler 命名空间下创建 CommentMessageHandler 类,它知道如何处理 CommentMessage 类的消息:

    src/MessageHandler/CommentMessageHandler.php

    1. namespace App\MessageHandler;
    2. use App\Message\CommentMessage;
    3. use App\Repository\CommentRepository;
    4. use App\SpamChecker;
    5. use Doctrine\ORM\EntityManagerInterface;
    6. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
    7. class CommentMessageHandler implements MessageHandlerInterface
    8. {
    9. private $spamChecker;
    10. private $entityManager;
    11. private $commentRepository;
    12. public function __construct(EntityManagerInterface $entityManager, SpamChecker $spamChecker, CommentRepository $commentRepository)
    13. {
    14. $this->entityManager = $entityManager;
    15. $this->spamChecker = $spamChecker;
    16. $this->commentRepository = $commentRepository;
    17. }
    18. public function __invoke(CommentMessage $message)
    19. {
    20. $comment = $this->commentRepository->find($message->getId());
    21. if (!$comment) {
    22. return;
    23. }
    24. if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) {
    25. $comment->setState('spam');
    26. } else {
    27. $comment->setState('published');
    28. }
    29. $this->entityManager->flush();
    30. }
    31. }

    MessageHandlerInterface 是一个 标记 接口。它帮助 Symfony 自动注册和自动配置该类为一个消息处理器。根据惯例,处理逻辑放在名为 __invoke() 的方法中。该方法中的 CommentMessage 类型提示告诉 Messenger 它要处理的类是哪一个。

    更新控制器来使用新的系统:

    patch_file

    1. --- a/src/Controller/ConferenceController.php
    2. +++ b/src/Controller/ConferenceController.php
    3. @@ -5,14 +5,15 @@ namespace App\Controller;
    4. use App\Entity\Comment;
    5. use App\Entity\Conference;
    6. use App\Form\CommentFormType;
    7. +use App\Message\CommentMessage;
    8. use App\Repository\CommentRepository;
    9. use App\Repository\ConferenceRepository;
    10. -use App\SpamChecker;
    11. use Doctrine\ORM\EntityManagerInterface;
    12. use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
    13. use Symfony\Component\HttpFoundation\File\Exception\FileException;
    14. use Symfony\Component\HttpFoundation\Request;
    15. use Symfony\Component\HttpFoundation\Response;
    16. +use Symfony\Component\Messenger\MessageBusInterface;
    17. use Symfony\Component\Routing\Annotation\Route;
    18. use Twig\Environment;
    19. @@ -20,11 +21,13 @@ class ConferenceController extends AbstractController
    20. {
    21. private $twig;
    22. private $entityManager;
    23. + private $bus;
    24. - public function __construct(Environment $twig, EntityManagerInterface $entityManager)
    25. + public function __construct(Environment $twig, EntityManagerInterface $entityManager, MessageBusInterface $bus)
    26. {
    27. $this->twig = $twig;
    28. $this->entityManager = $entityManager;
    29. + $this->bus = $bus;
    30. }
    31. #[Route('/', name: 'homepage')]
    32. @@ -36,7 +39,7 @@ class ConferenceController extends AbstractController
    33. }
    34. #[Route('/conference/{slug}', name: 'conference')]
    35. - public function show(Request $request, Conference $conference, CommentRepository $commentRepository, SpamChecker $spamChecker, string $photoDir): Response
    36. + public function show(Request $request, Conference $conference, CommentRepository $commentRepository, string $photoDir): Response
    37. {
    38. $comment = new Comment();
    39. $form = $this->createForm(CommentFormType::class, $comment);
    40. @@ -54,6 +57,7 @@ class ConferenceController extends AbstractController
    41. }
    42. $this->entityManager->persist($comment);
    43. + $this->entityManager->flush();
    44. $context = [
    45. 'user_ip' => $request->getClientIp(),
    46. 'referrer' => $request->headers->get('referer'),
    47. 'permalink' => $request->getUri(),
    48. ];
    49. - if (2 === $spamChecker->getSpamScore($comment, $context)) {
    50. - throw new \RuntimeException('Blatant spam, go away!');
    51. - }
    52. - $this->entityManager->flush();
    53. + $this->bus->dispatch(new CommentMessage($comment->getId(), $context));
    54. return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]);
    55. }

    我们现在分发一个消息到总线上,而不是依赖于垃圾信息检查器。然后消息处理器会决定怎么处理消息。

    这达到了一些意想不到的效果。我们把控制器和垃圾信息检查器进行了解耦,把逻辑移到了消息处理器这个新的类中。这是一个使用总线的完美案例。测试一下代码,它能通过。所有的执行仍然是同步的,但代码大概已经变得“更好”了。

    更新评论的展示逻辑,让未发布的评论不出现在前端:

    patch_file

    1. --- a/src/Repository/CommentRepository.php
    2. @@ -27,7 +27,9 @@ class CommentRepository extends ServiceEntityRepository
    3. {
    4. $query = $this->createQueryBuilder('c')
    5. ->andWhere('c.conference = :conference')
    6. + ->andWhere('c.state = :state')
    7. ->setParameter('conference', $conference)
    8. + ->setParameter('state', 'published')
    9. ->orderBy('c.createdAt', 'DESC')
    10. ->setMaxResults(self::PAGINATOR_PER_PAGE)
    11. ->setFirstResult($offset)

    真正走向异步

    默认情况下,消息处理器都是同步调用。为了使用异步方式,你需要在 config/packages/messenger.yaml 配置文件中为每个消息处理器显式地配置要用到的队列:

    patch_file

    1. --- a/.env
    2. +++ b/.env
    3. @@ -29,7 +29,7 @@ DATABASE_URL="postgresql://127.0.0.1:5432/db?serverVersion=13&charset=utf8"
    4. ###> symfony/messenger ###
    5. # Choose one of the transports below
    6. -# MESSENGER_TRANSPORT_DSN=doctrine://default
    7. +MESSENGER_TRANSPORT_DSN=doctrine://default
    8. # MESSENGER_TRANSPORT_DSN=amqp://guest::5672/%2f/messages
    9. # MESSENGER_TRANSPORT_DSN=redis://localhost:6379/messages
    10. ###< symfony/messenger ###
    11. --- a/config/packages/messenger.yaml
    12. +++ b/config/packages/messenger.yaml
    13. @@ -5,10 +5,15 @@ framework:
    14. transports:
    15. # https://symfony.com/doc/current/messenger.html#transport-configuration
    16. - # async: '%env(MESSENGER_TRANSPORT_DSN)%'
    17. + async:
    18. + dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    19. + options:
    20. + auto_setup: false
    21. + use_notify: true
    22. + check_delayed_interval: 60000
    23. # failed: 'doctrine://default?queue_name=failed'
    24. # sync: 'sync://'
    25. routing:
    26. # Route your messages to the transports
    27. - # 'App\Message\YourMessage': async
    28. + App\Message\CommentMessage: async

    该配置告诉总线把 App\Message\CommentMessage 类的实例发送到 async 队列,这个队列由 DSN (MESSENGER_TRANSPORT_DSN)变量定义,它指向了 Doctrine,这是在 .env 文件中配置的。用大白话说,我们使用 PostgreSQL 作为消息队列。

    设置 PostgreSQL 表和触发器:

    1. $ symfony console make:migration

    并且迁移数据库:

    1. $ symfony console doctrine:migrations:migrate

    小技巧

    在幕后,Symfony 使用了 PostgreSQL 的发布/订阅系统(LISTEN/NOTIFY),这个系统是 PostgreSQL 内置的、性能优越、具备可升缩性并且支持事务。如果你想要用 RabbitMQ 作为消息代理来取代 PostgreSQL,你可以阅读关于 RabbitMQ 的那一章。

    用消费者程序处理消息

    如果你试着提交一个新评论,垃圾检查器不再被调用。在 getSpamScore() 方法中加上一行 error_log() 调用来进行验证。运行结果是,消息会在消息队列中等待,直到有消费者进程来处理。

    你可以想象得到,Symfony 自带了一个消费者命令。现在来运行它:

    它会立刻处理掉那个表单提交后分发的消息:

    1. [OK] Consuming messages from transports "async".
    2. // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.
    3. // Quit the worker with CONTROL-C.
    4. 11:30:20 INFO [messenger] Received message App\Message\CommentMessage ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]
    5. 11:30:20 INFO [http_client] Request: "POST https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
    6. 11:30:20 INFO [http_client] Response: "200 https://80cea32be1f6.rest.akismet.com/1.1/comment-check"
    7. 11:30:20 INFO [messenger] Message App\Message\CommentMessage handled by App\MessageHandler\CommentMessageHandler::__invoke ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage","handler" => "App\MessageHandler\CommentMessageHandler::__invoke"]
    8. 11:30:20 INFO [messenger] App\Message\CommentMessage was handled successfully (acknowledging to transport). ["message" => App\Message\CommentMessage^ { …},"class" => "App\Message\CommentMessage"]

    Ctrl+C 可以停止消费者程序。

    我们想要在不打开很多终端的情况下就可以让消费者程序持续运行,而不是每次提交一个评论时打开它,提交完后再立刻关闭它。

    symfony 命令在执行 run 命令时,可以通过守护进程选项(-d)来管理这种后台运行的命令或工作进程。

    再执行一次消费者程序,但在后台程序中发送消息:

    1. $ symfony run -d --watch=config,src,templates,vendor symfony console messenger:consume async

    --watch 选项告诉Symfony,每当 config/src/templates/vendor/ 这些目录下有文件系统的改动时,这个命令必须重启。

    注解

    不要使用 -vv 选项,否则你会在执行 server:log 时信息会重复(日志信息和控制台信息)。

    如果消费者程序因为一些原因停止运行了(内存限制,程序错误等待),它会被自动重启。如果运行消费者程序失败得太频繁,symfony 命令就会放弃运行。

    日志通过 symfony server:log 流式输出,它会整合所有来自 PHP、web 服务器和应用程序的其它日志:

    1. $ symfony server:log

    server:status 命令来列出当前项目所管理的全部后台 worker 进程:

    1. $ symfony server:status
    2. Web server listening on https://127.0.0.1:8000
    3. Command symfony console messenger:consume async running with PID 15774 (watching config/, src/, templates/)

    要停止一个 worker 进程,需要停止 web 服务器,或者根据 server:status 命令显示的进程 ID 杀死对应进程:

    1. $ kill 15774

    对失败的消息进行重试

    如果处理消息时 Akismet 下线了怎么办?提交评论的人不会受影响,但这个消息丢失了,垃圾信息检查也没有做。

    Messenger 针对处理消息时抛出异常有一个重试机制。让我们来对它进行配置:

    patch_file

    1. --- a/config/packages/messenger.yaml
    2. +++ b/config/packages/messenger.yaml
    3. @@ -1,7 +1,7 @@
    4. framework:
    5. messenger:
    6. # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
    7. - # failure_transport: failed
    8. + failure_transport: failed
    9. transports:
    10. # https://symfony.com/doc/current/messenger.html#transport-configuration
    11. @@ -10,7 +10,10 @@ framework:
    12. options:
    13. use_notify: true
    14. check_delayed_interval: 60000
    15. - # failed: 'doctrine://default?queue_name=failed'
    16. + retry_strategy:
    17. + max_retries: 3
    18. + multiplier: 2
    19. + failed: 'doctrine://default?queue_name=failed'
    20. # sync: 'sync://'
    21. routing:

    当处理消息时出现了问题,消费者程序会在放弃前重试 3 次。放弃后不会丢掉消息,而是把消息永久存储在 failed 队列中,这个队列使用了另一个数据库表。

    查看处理失败的消息,用以下命令来重试处理:

    1. $ symfony console messenger:failed:show
    2. $ symfony console messenger:failed:retry

    在 SymfonyCloud 上运行 worker 进程

    我们需要持续运行 messenger:consume 命令来处理 PostgreSQL 队列中的消息。在 SymfonyCloud 上,这是 worker 进程 的角色:

    patch_file

    就像本地 命令一样,SymfonyCloud 会管理重启和日志。

    用这个命令来查看 worker 进程中的日志:

    1. $ symfony logs --worker=messages all

    深入学习