Example for Drupal 8 Queue API and the Batch API
An example module to help understanding the Queue API and the Batch API in Drupal 8
xml_import_example.info.yml
type: module
name: XML import example
package: Examples
description: "This module helps understanding the Batch API and Queue API with an XML import example"
core: 8.x
xml_import_example.permissions.yml
import content from xml:
title: 'Import content from xml'
description: 'With this permission user can import contents from a XML source'
restrict access: TRUE
xml_import_example.routing.yml
# Get contents from the xml source
xml_import_example.get_contents_from_xml:
path: '/get-contents-from-xml'
defaults: { _controller: '\Drupal\xml_import_example\Controller\ImportContentFromXML::getContentsFromXMLPage' }
requirements:
_permission: 'import content from xml'
# Process all queue items with batch
xml_import_example.process_all_queue_items_with_batch:
path: '/process-all-queue-items'
defaults: { _controller: '\Drupal\xml_import_example\Controller\ImportContentFromXML::processAllQueueItemsWithBatch' }
requirements:
_permission: 'import content from xml'
src/Controller/ImportContentFromXML.php
<?php
/**
* @file
* Contains \Drupal\xml_import_example\Controller\ImportContentFromXML.
*/
namespace Drupal\xml_import_example\Controller;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Queue\QueueWorkerManager;
use Drupal\Core\Queue\QueueFactory;
/**
* You can use this constant to set how many queued items
* you want to be processed in one batch operation
*/
define("IMPORT_XML_BATCH_SIZE", 1);
class ImportContentFromXML extends ControllerBase {
/**
* We add QueueFactory and QueueWorkerManager services with the Dependency Injection solution
*/
/**
* @var QueueFactory
*/
protected $queueFactory;
/**
* @var QueueWorkerManager
*/
protected $queueManager;
/**
* {@inheritdoc}
*/
public function __construct(QueueFactory $queue_factory, QueueWorkerManager $queue_manager) {
$this->queue_factory = $queue_factory;
$this->queue_manager = $queue_manager;
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container) {
$queue_factory = $container->get('queue');
$queue_manager = $container->get('plugin.manager.queue_worker');
return new static($queue_factory, $queue_manager);
}
/**
* Get XML from the API and convert it to
*/
protected function getContentsFromXML() {
// Here you should get the XML content and convert it to an array of content arrays for example
// I use now an example array of contents:
$contents = array();
for ($i = 1; $i <= 20; $i++) {
$contents[] = array(
'title' => 'Test title ' . $i,
'body' => 'Test body ' . $i,
);
}
// Return with the contents
return $contents;
}
/**
* Page where the xml source is preprocessed
*/
public function getContentsFromXMLPage() {
// Get contents array
$contents = $this->getContentsFromXML();
foreach ($contents as $content) {
// Get the queue implementation for import_content_from_xml queue
$queue = $this->queue_factory->get('import_content_from_xml');
// Create new queue item
$item = new \stdClass();
$item->data = $content;
$queue->createItem($item);
}
return array(
'#type' => 'markup',
'#markup' => $this->t('@count queue items are created.', array('@count' => count($contents))),
);
}
/**
* Process all queue items with batch
*/
public function processAllQueueItemsWithBatch() {
// Create batch which collects all the specified queue items and process them one after another
$batch = array(
'title' => $this->t("Process all XML Import queues with batch"),
'operations' => array(),
'finished' => 'Drupal\xml_import_example\Controller\ImportContentFromXML::batchFinished',
);
// Get the queue implementation for import_content_from_xml queue
$queue_factory = \Drupal::service('queue');
$queue = $queue_factory->get('import_content_from_xml');
// Count number of the items in this queue, and create enough batch operations
for($i = 0; $i < ceil($queue->numberOfItems() / IMPORT_XML_BATCH_SIZE); $i++) {
// Create batch operations
$batch['operations'][] = array('Drupal\xml_import_example\Controller\ImportContentFromXML::batchProcess', array());
}
// Adds the batch sets
batch_set($batch);
// Process the batch and after redirect to the frontpage
return batch_process('<front>');
}
/**
* Common batch processing callback for all operations.
*/
public static function batchProcess(&$context) {
// We can't use here the Dependency Injection solution
// so we load the necessary services in the other way
$queue_factory = \Drupal::service('queue');
$queue_manager = \Drupal::service('plugin.manager.queue_worker');
// Get the queue implementation for import_content_from_xml queue
$queue = $queue_factory->get('import_content_from_xml');
// Get the queue worker
$queue_worker = $queue_manager->createInstance('import_content_from_xml');
// Get the number of items
$number_of_queue = ($queue->numberOfItems() < IMPORT_XML_BATCH_SIZE) ? $queue->numberOfItems() : IMPORT_XML_BATCH_SIZE;
// Repeat $number_of_queue times
for ($i = 0; $i < $number_of_queue; $i++) {
// Get a queued item
if ($item = $queue->claimItem()) {
try {
// Process it
$queue_worker->processItem($item->data);
// If everything was correct, delete the processed item from the queue
$queue->deleteItem($item);
}
catch (SuspendQueueException $e) {
// If there was an Exception trown because of an error
// Releases the item that the worker could not process.
// Another worker can come and process it
$queue->releaseItem($item);
break;
}
}
}
}
/**
* Batch finished callback.
*/
public static function batchFinished($success, $results, $operations) {
if ($success) {
drupal_set_message(t("The contents are successfully imported from the XML source."));
}
else {
$error_operation = reset($operations);
drupal_set_message(t('An error occurred while processing @operation with arguments : @args', array('@operation' => $error_operation[0], '@args' => print_r($error_operation[0], TRUE))));
}
}
}
src/Plugin/QueueWorker/ImportContentFromXMLQueueBase.php
<?php
/**
* @file
* Contains Drupal\xml_import_example\Plugin\QueueWorker\ImportContentFromXMLQueueBase
*/
namespace Drupal\xml_import_example\Plugin\QueueWorker;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Queue\SuspendQueueException;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\node\Entity\Node;
/**
* Provides base functionality for the Import Content From XML Queue Workers.
*/
abstract class ImportContentFromXMLQueueBase extends QueueWorkerBase implements ContainerFactoryPluginInterface {
// Here we don't use the Dependency Injection,
// but the create method and __construct method are necessary to implement
/**
* {@inheritdoc}
*/
public function __construct() {}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static();
}
/**
* {@inheritdoc}
*/
public function processItem($item) {
// Get the content array
$content = $item->data;
// Create node from the array
$this->createContent($content);
}
/**
* Create content
*
* @return int
*/
protected function createContent($content) {
// Create node object from the $content array
$node = Node::create(array(
'type' => 'page',
'title' => $content['title'],
'body' => array(
'value' => $content['body'],
'format' => 'basic_html',
),
));
$node->save();
}
}
src/Plugin/QueueWorker/ImportContentFromXMLQueue.php
<?php
namespace Drupal\xml_import_example\Plugin\QueueWorker;
/**
* Create node object from the imported XML content
*
* @QueueWorker(
* id = "import_content_from_xml",
* title = @Translation("Import Content From XML"),
* cron = {"time" = 60}
* )
*/
class ImportContentFromXMLQueue extends ImportContentFromXMLQueueBase {}
So this is the working module, you can test it in you site.
If you visit the /get-contents-from-xml URL 20 queue items are made from a contents array.
The src/Plugin/QueueWorker/ImportContentFromXMLQueue.php contains this annotation: cron = {“time” = 60}
So if you run cron, the queue items are processed for maximum 60 seconds. You can increase or decrease this time, with that annotation.
If you remove the cron = {“time” = 60} line, cron do nothing with your queue items.
If you would like to process all the queue items in your browser, you have to visit the following url: /process-all-queue-items
It will collect all of your queue items, creates batch operations from them, and after that it process one after another.