From 56c778b4152a1b886353933276ee3626e4e8c004 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Nicolas=20L=C5=93uillet?= Date: Fri, 15 Jan 2016 08:24:32 +0100 Subject: [PATCH] 1st draft for rabbitMQ --- app/AppKernel.php | 1 + app/config/config.yml | 25 ++++++++++ app/config/parameters.yml.dist | 12 +++++ composer.json | 3 +- docs/en/developer/rabbitmq.rst | 49 +++++++++++++++++++ .../Component/AMPQ/EntryConsumer.php | 39 +++++++++++++++ .../ImportBundle/Import/PocketImport.php | 35 ++++++++++--- .../Resources/config/services.yml | 9 ++++ .../ImportBundle/Import/PocketImportTest.php | 11 ++++- 9 files changed, 173 insertions(+), 11 deletions(-) create mode 100644 docs/en/developer/rabbitmq.rst create mode 100644 src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php diff --git a/app/AppKernel.php b/app/AppKernel.php index 96e45da8..52f85558 100644 --- a/app/AppKernel.php +++ b/app/AppKernel.php @@ -38,6 +38,7 @@ class AppKernel extends Kernel new Wallabag\UserBundle\WallabagUserBundle(), new Wallabag\ImportBundle\WallabagImportBundle(), new Wallabag\AnnotationBundle\WallabagAnnotationBundle(), + new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(), ]; if (in_array($this->getEnvironment(), ['dev', 'test'], true)) { diff --git a/app/config/config.yml b/app/config/config.yml index 31bd8a8c..ef5ae0aa 100644 --- a/app/config/config.yml +++ b/app/config/config.yml @@ -215,3 +215,28 @@ lexik_maintenance: response: code: 503 status: "wallabag Service Temporarily Unavailable" + +old_sound_rabbit_mq: + connections: + default: + host: %rabbitmq_host% + port: %rabbitmq_port% + user: %rabbitmq_user% + password: %rabbitmq_password% + vhost: / + lazy: false + producers: + wallabag: + connection: default + exchange_options: + name: 'wallabag_exchange' + type: topic + consumers: + entries: + connection: default + exchange_options: + name: 'wallabag_exchange' + type: topic + queue_options: + name: 'wallabag_queue' + callback: wallabag_import.consumer.entry diff --git a/app/config/parameters.yml.dist b/app/config/parameters.yml.dist index d092e139..e925b412 100644 --- a/app/config/parameters.yml.dist +++ b/app/config/parameters.yml.dist @@ -38,3 +38,15 @@ parameters: fosuser_confirmation: true from_email: no-reply@wallabag.org + + rss_limit: 50 + + # pocket import + pocket_consumer_key: xxxxxxxx + + # RabbitMQ processing + rabbitmq: false + rabbitmq_host: localhost + rabbitmq_port: 5672 + rabbitmq_user: guest + rabbitmq_password: guest diff --git a/composer.json b/composer.json index bdaad601..121a3fd3 100644 --- a/composer.json +++ b/composer.json @@ -81,7 +81,8 @@ "lexik/maintenance-bundle": "~2.1", "ocramius/proxy-manager": "1.*", "white-october/pagerfanta-bundle": "^1.0", - "mouf/nodejs-installer": "~1.0" + "mouf/nodejs-installer": "~1.0", + "php-amqplib/rabbitmq-bundle": "^1.8" }, "require-dev": { "doctrine/doctrine-fixtures-bundle": "~2.2", diff --git a/docs/en/developer/rabbitmq.rst b/docs/en/developer/rabbitmq.rst new file mode 100644 index 00000000..a17e6e4d --- /dev/null +++ b/docs/en/developer/rabbitmq.rst @@ -0,0 +1,49 @@ +Install RabbitMQ for asynchronous tasks +======================================= + +In order to launch asynchronous tasks (useful for huge imports for example), we use RabbitMQ. + +Requirements +------------ + +You need to have RabbitMQ installed on your server. + +Installation +~~~~~~~~~~~~ + +.. code:: bash + + wget https://www.rabbitmq.com/rabbitmq-signing-key-public.asc + apt-key add rabbitmq-signing-key-public.asc + apt-get update + apt-get install rabbitmq-server + +Configuration and launch +~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code:: bash + + rabbitmq-plugins enable rabbitmq_management # (useful to have a web interface, available at http://localhost:15672/ (guest/guest) + rabbitmq-server -detached + +Stop RabbitMQ +~~~~~~~~~~~~~ + +.. code:: bash + + rabbitmqctl stop + + +Configure RabbitMQ in wallabag +------------------------------ + +Edit your ``parameters.yml`` file to edit RabbitMQ configuration. + +Launch RabbitMQ consumer +------------------------ + +Put this command in a cron job: + +.. code:: bash + + bin/console rabbitmq:consumer entries -w \ No newline at end of file diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php new file mode 100644 index 00000000..7775f01c --- /dev/null +++ b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php @@ -0,0 +1,39 @@ +em = $em; + $this->entryRepository = $entryRepository; + $this->contentProxy = $contentProxy; + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = unserialize($msg->body); + $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']); + if ($entry) { + $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl()); + if ($entry) { + $this->em->persist($entry); + $this->em->flush(); + } + } + } +} diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index a6f905b1..b02894f0 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -2,6 +2,8 @@ namespace Wallabag\ImportBundle\Import; +use OldSound\RabbitMqBundle\RabbitMq\Producer; +use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use GuzzleHttp\Client; @@ -20,14 +22,18 @@ class PocketImport extends AbstractImport private $importedEntries = 0; private $markAsRead; protected $accessToken; + private $producer; + private $rabbitMQ; - public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) + public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, $rabbitMQ, Producer $producer) { $this->user = $tokenStorage->getToken()->getUser(); $this->em = $em; $this->contentProxy = $contentProxy; $this->consumerKey = $craueConfig->get('pocket_consumer_key'); $this->logger = new NullLogger(); + $this->rabbitMQ = $rabbitMQ; + $this->producer = $producer; } /** @@ -197,7 +203,7 @@ class PocketImport extends AbstractImport { $i = 1; - foreach ($entries as $pocketEntry) { + foreach ($entries as &$pocketEntry) { $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; $existingEntry = $this->em @@ -210,12 +216,15 @@ class PocketImport extends AbstractImport } $entry = new Entry($this->user); - $entry = $this->fetchContent($entry, $url); - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; - continue; + if (!$this->rabbitMQ) { + $entry = $this->fetchContent($entry, $url); + + // jump to next entry in case of problem while getting content + if (false === $entry) { + ++$this->skippedEntries; + continue; + } } // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted @@ -236,6 +245,7 @@ class PocketImport extends AbstractImport } $entry->setTitle($title); + $entry->setUrl($url); // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { @@ -249,6 +259,9 @@ class PocketImport extends AbstractImport ); } + $pocketEntry['url'] = $url; + $pocketEntry['userId'] = $this->user->getId(); + $this->em->persist($entry); ++$this->importedEntries; @@ -256,10 +269,16 @@ class PocketImport extends AbstractImport if (($i % 20) === 0) { $this->em->flush(); } + ++$i; } $this->em->flush(); - $this->em->clear(); + + if ($this->rabbitMQ) { + foreach ($entries as $entry) { + $this->producer->publish(serialize($entry)); + } + } } } diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index 520d43af..7ea54162 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -1,4 +1,11 @@ services: + wallabag_import.consumer.entry: + class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_core.entry_repository" + - "@wallabag_core.content_proxy" + wallabag_import.chain: class: Wallabag\ImportBundle\Import\ImportChain @@ -18,6 +25,8 @@ services: - "@doctrine.orm.entity_manager" - "@wallabag_core.content_proxy" - "@craue_config" + - %rabbitmq% + - "@old_sound_rabbit_mq.wallabag_producer" calls: - [ setClient, [ "@wallabag_import.pocket.client" ] ] - [ setLogger, [ "@logger" ]] diff --git a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php index 8534e1c8..a0f943ee 100644 --- a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php +++ b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php @@ -27,8 +27,9 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase protected $em; protected $contentProxy; protected $logHandler; + protected $producer; - private function getPocketImport($consumerKey = 'ConsumerKey') + private function getPocketImport($consumerKey = 'ConsumerKey', $rabbitMQ = false) { $this->user = new User(); @@ -65,11 +66,17 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase ->with('pocket_consumer_key') ->willReturn($consumerKey); + $this->producer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\Producer') + ->disableOriginalConstructor() + ->getMock(); + $pocket = new PocketImportMock( $this->tokenStorage, $this->em, $this->contentProxy, - $config + $config, + $rabbitMQ, + $this->producer ); $this->logHandler = new TestHandler(); -- 2.41.0