How to use RabbitMQ message queues in Magento 2 plugins?

Magento 2 supports MySQL and RabbitMQ systems to create message queues. Here is a tutorial on how you can create a custom mod for RabbitMQ message queues in Magento 2 with examples.

First of all, you need to install the RabbitMQ server for Magento 2. Here is an example of how you can do it on Ubuntu. You need to use the command:

sudo apt install -y rabbitmq-server

Then you need to connect RabbitMQ with Magento 2. To do this, create a queue section in the <install_directory>/app/etc/env.php file. Your code can look like:

'queue' => [
'amqp' => [
'host' => 'yourhost.yourdomain.com',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'virtualhost' => '/'
],
],

Then, apply the changes with the command:

bin/magento setup:upgrade

You can find guides for other systems and for Magento EE and Cloud versions in the official documentation.

Also, we need to enable the rabbitmq_management extension. It allows getting the WEB interface and managing Magento 2 message queues:

rabbitmq-plugins enable rabbitmq_management

Then you need to go to http://127.0.0.1:5672/ and login with default data: guest/guest. Or you can make an ssh tunnel for remote access:

ssh -L 5672:localhost:5672 [email protected]

Now we move forward and create a new module.xml in app/code/Amasty/QExample/etc with the following code:

<<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
<module name="Amasty_QExample" setup_version="1.0.0">
</module>
</config>


To register it, create registration.php in app/code/Amasty/QExample and use this code:

<?php
\Magento\Framework\Component\ComponentRegistrar::register(
\Magento\Framework\Component\ComponentRegistrar::MODULE,
'Amasty_QExample',
__DIR__
);

According to Magento 2 documentation, if you use amqp (RabbitMQ) connection, you need to set up the message queue by customizing the following files in the app/code/Amasty/Amasty_QExample/etc:

  1. communication.xml - Determines elements of the message queue system that all communication types use.
  2. queue_consumer.xml - Describes the connection between a queue and its consumer.
  3. queue_topology.xml - Sets the message routing rules and defines queues and exchanges.
  4. queue_publisher.xml - Defines the exchange where a topic is published.

Let’s see a working example of using RabbitMQ in a Magento 2 module and its configurations.

In communication.xml define topic and its elements. Here you can use as many topics you need:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">

<topic name="customer.clean" request="Magento\Customer\Api\Data\CustomerInterface"/>
</config>

Then go to queue_consumer.xml and specify the consumer elements such as name, queue, handler, consumerInstance, connection, and maxMessages. In our example, we have used the only name, queue, handler, and connection, but you can combine them as you need:

<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<consumer name="AmCustomerClean" queue="customer_clean" connection="queue"
handler="Amasty\QExample\Model\Customer\DeleteConsumer::processMessage"/>
</config>

Next open queue_topology.xml to configure exchange and 3 optional elements: binding, arguments, and binding/arguments. We have used only binding to connect topic id and consumer:

<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="am.customer.delete" type="topic" connection="queue">
<binding id="AmCustomerClean" topic="customer.clean" destinationType="queue"
destination="customer_clean"/>
</exchange>
</config>


And the last file is queue_publisher.xml. It has only 2 elements: publisher and connection. This file publishes all the data in Magento RabbitMQ:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
<publisher topic="customer.clean">
<connection name="amqp" exchange="am.customer.delete" />
</publisher>
</config>

Next, we need to create a plugin. For this, create di.xml in the folder app/code/Amasty/QExample/etc:

<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
<type name="Magento\Customer\Model\ResourceModel\Customer">
<plugin name="customer_clean_publisher"
type="Amasty\QExample\Plugin\DeleteCustomerPlugin"/>
</type>
</config>

Also, for this plugin, you need to create 2 new folders app/code/Amasty/QExample/Model/Customer and app/code/Amasty/QExample/Plugin.

In the Plugin folder, you need to create a PHP file to specify its work. In our case DeleteCustomerPlugin will look like this:

<?php namespace Amasty\QExample\Plugin;
use Amasty\QExample\Model\Customer\DeletePublisher;
use Magento\Customer\Model\Customer as CustomerModel;
use Magento\Customer\Model\ResourceModel\Customer; /**
* Class DeleteCustomerPlugin
* @package Amasty\QExample\Plugin
*/
class DeleteCustomerPlugin
{
/**
* @var DeletePublisher
*/
private $customerCleanPublisher;
/**
* DeleteCustomerPlugin constructor.
* @param DeletePublisher $customerCleanPublisher
*/
public function __construct(DeletePublisher $customerCleanPublisher)
{
$this->customerCleanPublisher = $customerCleanPublisher;
}
/**
* @param Customer $subject
* @param Customer $result
* @param CustomerModel $customer
* @return Customer
*/
public function afterDelete(
Customer $subject,
Customer $result,
CustomerModel $customer
) {
$this->customerCleanPublisher->execute($customer);
return $result;
}
}

And the very last step you need to do is tho specify the model of the plugin with 2 PHP files. They will include any actions you need to do with the constructor and what customer data you need to delete.

DeleteConsumer.php:

<?php
namespace Amasty\QExample\Model\Customer;

use Magento\Customer\Api\Data\CustomerInterface;

/**
* Class DeleteConsumer
* @package Amasty\QExample\Model\Customer
*/
class DeleteConsumer
{
/**
* DeleteConsumer constructor.
*/
public function __construct()
{
do smth what you need in constructor
}

/**
* @param CustomerInterface $customer
*/
public function processMessage(CustomerInterface $customer)
{
clean up data actions for customer
}
}

DeletePublisher.php:

<?php
namespace Amasty\QExample\Model\Customer;

use Magento\Customer\Api\Data\CustomerInterface;
use Magento\Framework\MessageQueue\PublisherInterface;

class DeletePublisher
{
const TOPIC_NAME = 'customer.clean';

/**
* @var PublisherInterface
*/
private $publisher;

/**
* DeletePublisher constructor.
* @param PublisherInterface $publisher
*/
public function __construct(PublisherInterface $publisher)
{
$this->publisher = $publisher;
}

/**
* @param CustomerInterface $customer
*/
public function execute(CustomerInterface $customer)
{
$this->publisher->publish(self::TOPIC_NAME, $customer);
}
}

When you are ready, don’t forget to run:

bin/magento setup:upgrade

And start your consumer with the command:

bin/magento queue:consumers:start AmCustomerClean

This little module can help you to delete some customers’ data that you don’t need to store. You can use Magento Custom Development Services so that our specialists will develop any custom extension your Magento store may need.

Seeking for a ready-made pack of anti-crisis solutions? See our program.

How can we help you?

Didn’t you find the answer to your question? We are always happy to help you out.

© 2009-2024 Amasty. All Rights Reserved.