5. Multiple queues

Some notification backends (doctrine and rabbitmq) support running multiple queues.

This makes it possible to send different messages to different queues - for example to avoid messages which take longer to consume to block messages which take only a short amount of time.

Note

Depends on the backend used, the configuration can be differents and the message handling can also be different.

5.1. RabbitMQ

To enable multiple queues, simply define a queues node in your configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# config/packages/sonata_notification.yaml

sonata_notification:
    backend: sonata.notification.backend.rabbitmq
    queues:
        - { queue: transcoder, routing_key: start.transcode.video }
        - { queue: catchall, default: true }

    backends:
        rabbitmq:
            exchange: router
            connection:
                host: '%rabbitmq_host%'
                user: '%rabbitmq_user%'
                pass: '%rabbitmq_pass%'
                port: '%rabbitmq_port%'
                vhost: '%rabbitmq_vhost%'

This will define 2 different queues: transcoder and catchall and where the transcoder queue is bound to a routing key:

  • start.transcode.video

In the above example you will need to start 2 processes, where each process will handle messages sent by a specific queue:

1
bin/console sonata:notification:start --env=prod --iteration=250 --type=start.transcode.video
1
bin/console sonata:notification:start --env=prod --iteration=250

Messages published with the start.transcode.video type will be handled by the first consumer. Any other message types will be handled by the catchall consumer, as it has been set as the default one.

5.2. Doctrine

To enable multiple queues, simply define a queues node in your configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# config/packages/sonata_notification.yaml

sonata_notification:
    backend: sonata.notification.backend.doctrine
    backends:
        doctrine:
            max_age:   86400     # max age in second
            pause:     500000    # delay in microseconds
            states:              # raising errors level
                in_progress: 10
                error:       20
                open:        100
                done:        10000

    queues:
        - { queue: sonata_page, types: [sonata.page.create_snapshot, sonata.page.create_snapshots]}
        - { queue: catchall, default: true }

This will define 2 different queues: sonata_page and catchall and where the sonata_page queue is bound to two messages types:

  • sonata.page.create_snapshot
  • sonata.page.create_snapshots

In the above example you will need to create 2 processes, where each process will handle messages sent by a specific queue:

1
bin/console sonata:notification:start --env=prod --iteration=250 --type=sonata.page.create_snapshot
1
bin/console sonata:notification:start --env=prod --iteration=250

Messages published with sonata.page.create_snapshot or sonata.page.create_snapshots types will be handled by the first consumer. Any other message types will be handled by the catchall consumer, as it has been set as the default one.