oro-integration

Installation
SKILL.md

OroCommerce v6.1 Integration Development

Message Queue: Async Processing

OroCommerce uses message queues to defer heavy work outside the HTTP request cycle. Producers send messages to topics, consumers process them asynchronously.

Processor Pattern

A processor implements MessageProcessorInterface and TopicSubscriberInterface. It must return self::ACK (success), self::REJECT (discard), or self::REQUEUE (retry). Never return void or null.

use Oro\Component\MessageQueue\Client\TopicSubscriberInterface;
use Oro\Component\MessageQueue\Consumption\MessageProcessorInterface;

class ProcessDocumentProcessor implements MessageProcessorInterface, TopicSubscriberInterface
{
    #[\Override]
    public function process(MessageInterface $message, SessionInterface $session): string
    {
        $data = json_decode($message->getBody(), true);
        // Process data...
        return self::ACK;
    }

    #[\Override]
    public static function getSubscribedTopics(): array
    {
        return ['acme_demo.document.process'];
    }
}

Register with the oro.message_queue.client.message_processor tag:

services:
    acme_demo.async.process_document:
        class: Acme\Bundle\DemoBundle\Async\Processor\ProcessDocumentProcessor
        arguments: ['@logger']
        tags:
            - { name: 'oro.message_queue.client.message_processor' }

For the full processor example, producer pattern, and message sending, see integration-patterns.md.

Import/Export

Oro's import/export uses a pipeline: Reader -> Processor -> Writer. Each processor handles one item at a time.

  • Export: Entity -> Serializer normalizes -> DataConverter -> flat array (CSV row)
  • Import: Flat array (CSV row) -> DataConverter -> Serializer denormalizes -> Strategy -> Entity

Most custom import/export needs only a DataConverter and service config — no custom processor class. Use oro_importexport.processor.export_abstract / oro_importexport.processor.import_abstract as parent services.

Import processors need both import and import_validation tags. Tag attributes: type, entity (FQCN), alias (unique identifier).

For complete DataConverter example, service registration YAML, custom processor class, and import strategy setup, see integration-patterns.md.

Cron Jobs

Create a command implementing CronCommandInterface:

use Oro\Bundle\CronBundle\Command\CronCommandInterface;
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'acme:cron:sync-documents')]
class SyncDocumentsCronCommand extends Command implements CronCommandInterface
{
    #[\Override]
    public function getDefaultDefinition(): string { return '0 */4 * * *'; }

    #[\Override]
    public function isActive(): bool { return true; }

    #[\Override]
    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        // Perform sync work
        return Command::SUCCESS;
    }
}

Schedule format is standard Linux cron: minute hour dayOfMonth month dayOfWeek.

Key Pitfalls

  1. Processor return values: Processors MUST return self::ACK, self::REJECT, or self::REQUEUE. Returning void or null causes messages to loop indefinitely.

  2. DBAL polling: DBAL transport polls every second. Use AMQP (RabbitMQ) for production workloads.

  3. Message serialization: Message bodies are JSON strings. Always json_decode() on receipt and json_encode() before sending.

For integration channels, transports, webhooks, common commands, and additional pitfalls, see integration-patterns.md.

Version Notes

For version-specific details, see v6.1 notes | v7.0 notes.

See also message-queue-config.md for MQ configuration options.

Weekly Installs
2
First Seen
2 days ago