aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorNicolas LÅ“uillet <nicolas.loeuillet@smile.fr>2016-01-15 08:24:32 +0100
committerJeremy Benoist <jeremy.benoist@gmail.com>2016-09-11 21:57:09 +0200
commit56c778b4152a1b886353933276ee3626e4e8c004 (patch)
tree203a2bc2fc36016b5a51703b04d86b034d9393c3
parent59758d8fe5ad5ff554391515a78d84b0d47bdb76 (diff)
downloadwallabag-56c778b4152a1b886353933276ee3626e4e8c004.tar.gz
wallabag-56c778b4152a1b886353933276ee3626e4e8c004.tar.zst
wallabag-56c778b4152a1b886353933276ee3626e4e8c004.zip
1st draft for rabbitMQ
-rw-r--r--app/AppKernel.php1
-rw-r--r--app/config/config.yml25
-rw-r--r--app/config/parameters.yml.dist12
-rw-r--r--composer.json3
-rw-r--r--docs/en/developer/rabbitmq.rst49
-rw-r--r--src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php39
-rw-r--r--src/Wallabag/ImportBundle/Import/PocketImport.php35
-rw-r--r--src/Wallabag/ImportBundle/Resources/config/services.yml9
-rw-r--r--tests/Wallabag/ImportBundle/Import/PocketImportTest.php11
9 files changed, 173 insertions, 11 deletions
diff --git a/app/AppKernel.php b/app/AppKernel.php
index 96e45da8..52f85558 100644
--- a/app/AppKernel.php
+++ b/app/AppKernel.php
@@ -38,6 +38,7 @@ class AppKernel extends Kernel
38 new Wallabag\UserBundle\WallabagUserBundle(), 38 new Wallabag\UserBundle\WallabagUserBundle(),
39 new Wallabag\ImportBundle\WallabagImportBundle(), 39 new Wallabag\ImportBundle\WallabagImportBundle(),
40 new Wallabag\AnnotationBundle\WallabagAnnotationBundle(), 40 new Wallabag\AnnotationBundle\WallabagAnnotationBundle(),
41 new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
41 ]; 42 ];
42 43
43 if (in_array($this->getEnvironment(), ['dev', 'test'], true)) { 44 if (in_array($this->getEnvironment(), ['dev', 'test'], true)) {
diff --git a/app/config/config.yml b/app/config/config.yml
index 31bd8a8c..ef5ae0aa 100644
--- a/app/config/config.yml
+++ b/app/config/config.yml
@@ -215,3 +215,28 @@ lexik_maintenance:
215 response: 215 response:
216 code: 503 216 code: 503
217 status: "wallabag Service Temporarily Unavailable" 217 status: "wallabag Service Temporarily Unavailable"
218
219old_sound_rabbit_mq:
220 connections:
221 default:
222 host: %rabbitmq_host%
223 port: %rabbitmq_port%
224 user: %rabbitmq_user%
225 password: %rabbitmq_password%
226 vhost: /
227 lazy: false
228 producers:
229 wallabag:
230 connection: default
231 exchange_options:
232 name: 'wallabag_exchange'
233 type: topic
234 consumers:
235 entries:
236 connection: default
237 exchange_options:
238 name: 'wallabag_exchange'
239 type: topic
240 queue_options:
241 name: 'wallabag_queue'
242 callback: wallabag_import.consumer.entry
diff --git a/app/config/parameters.yml.dist b/app/config/parameters.yml.dist
index d092e139..e925b412 100644
--- a/app/config/parameters.yml.dist
+++ b/app/config/parameters.yml.dist
@@ -38,3 +38,15 @@ parameters:
38 fosuser_confirmation: true 38 fosuser_confirmation: true
39 39
40 from_email: no-reply@wallabag.org 40 from_email: no-reply@wallabag.org
41
42 rss_limit: 50
43
44 # pocket import
45 pocket_consumer_key: xxxxxxxx
46
47 # RabbitMQ processing
48 rabbitmq: false
49 rabbitmq_host: localhost
50 rabbitmq_port: 5672
51 rabbitmq_user: guest
52 rabbitmq_password: guest
diff --git a/composer.json b/composer.json
index bdaad601..121a3fd3 100644
--- a/composer.json
+++ b/composer.json
@@ -81,7 +81,8 @@
81 "lexik/maintenance-bundle": "~2.1", 81 "lexik/maintenance-bundle": "~2.1",
82 "ocramius/proxy-manager": "1.*", 82 "ocramius/proxy-manager": "1.*",
83 "white-october/pagerfanta-bundle": "^1.0", 83 "white-october/pagerfanta-bundle": "^1.0",
84 "mouf/nodejs-installer": "~1.0" 84 "mouf/nodejs-installer": "~1.0",
85 "php-amqplib/rabbitmq-bundle": "^1.8"
85 }, 86 },
86 "require-dev": { 87 "require-dev": {
87 "doctrine/doctrine-fixtures-bundle": "~2.2", 88 "doctrine/doctrine-fixtures-bundle": "~2.2",
diff --git a/docs/en/developer/rabbitmq.rst b/docs/en/developer/rabbitmq.rst
new file mode 100644
index 00000000..a17e6e4d
--- /dev/null
+++ b/docs/en/developer/rabbitmq.rst
@@ -0,0 +1,49 @@
1Install RabbitMQ for asynchronous tasks
2=======================================
3
4In order to launch asynchronous tasks (useful for huge imports for example), we use RabbitMQ.
5
6Requirements
7------------
8
9You need to have RabbitMQ installed on your server.
10
11Installation
12~~~~~~~~~~~~
13
14.. code:: bash
15
16 wget https://www.rabbitmq.com/rabbitmq-signing-key-public.asc
17 apt-key add rabbitmq-signing-key-public.asc
18 apt-get update
19 apt-get install rabbitmq-server
20
21Configuration and launch
22~~~~~~~~~~~~~~~~~~~~~~~~
23
24.. code:: bash
25
26 rabbitmq-plugins enable rabbitmq_management # (useful to have a web interface, available at http://localhost:15672/ (guest/guest)
27 rabbitmq-server -detached
28
29Stop RabbitMQ
30~~~~~~~~~~~~~
31
32.. code:: bash
33
34 rabbitmqctl stop
35
36
37Configure RabbitMQ in wallabag
38------------------------------
39
40Edit your ``parameters.yml`` file to edit RabbitMQ configuration.
41
42Launch RabbitMQ consumer
43------------------------
44
45Put this command in a cron job:
46
47.. code:: bash
48
49 bin/console rabbitmq:consumer entries -w \ No newline at end of file
diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php
new file mode 100644
index 00000000..7775f01c
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php
@@ -0,0 +1,39 @@
1<?php
2
3namespace Wallabag\ImportBundle\Component\AMPQ;
4
5use Doctrine\ORM\EntityManager;
6use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
7use PhpAmqpLib\Message\AMQPMessage;
8use Wallabag\CoreBundle\Helper\ContentProxy;
9use Wallabag\CoreBundle\Repository\EntryRepository;
10
11class EntryConsumer implements ConsumerInterface
12{
13 private $em;
14 private $contentProxy;
15 private $entryRepository;
16
17 public function __construct(EntityManager $em, EntryRepository $entryRepository, ContentProxy $contentProxy)
18 {
19 $this->em = $em;
20 $this->entryRepository = $entryRepository;
21 $this->contentProxy = $contentProxy;
22 }
23
24 /**
25 * {@inheritdoc}
26 */
27 public function execute(AMQPMessage $msg)
28 {
29 $storedEntry = unserialize($msg->body);
30 $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']);
31 if ($entry) {
32 $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl());
33 if ($entry) {
34 $this->em->persist($entry);
35 $this->em->flush();
36 }
37 }
38 }
39}
diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php
index a6f905b1..b02894f0 100644
--- a/src/Wallabag/ImportBundle/Import/PocketImport.php
+++ b/src/Wallabag/ImportBundle/Import/PocketImport.php
@@ -2,6 +2,8 @@
2 2
3namespace Wallabag\ImportBundle\Import; 3namespace Wallabag\ImportBundle\Import;
4 4
5use OldSound\RabbitMqBundle\RabbitMq\Producer;
6use Psr\Log\LoggerInterface;
5use Psr\Log\NullLogger; 7use Psr\Log\NullLogger;
6use Doctrine\ORM\EntityManager; 8use Doctrine\ORM\EntityManager;
7use GuzzleHttp\Client; 9use GuzzleHttp\Client;
@@ -20,14 +22,18 @@ class PocketImport extends AbstractImport
20 private $importedEntries = 0; 22 private $importedEntries = 0;
21 private $markAsRead; 23 private $markAsRead;
22 protected $accessToken; 24 protected $accessToken;
25 private $producer;
26 private $rabbitMQ;
23 27
24 public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) 28 public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, $rabbitMQ, Producer $producer)
25 { 29 {
26 $this->user = $tokenStorage->getToken()->getUser(); 30 $this->user = $tokenStorage->getToken()->getUser();
27 $this->em = $em; 31 $this->em = $em;
28 $this->contentProxy = $contentProxy; 32 $this->contentProxy = $contentProxy;
29 $this->consumerKey = $craueConfig->get('pocket_consumer_key'); 33 $this->consumerKey = $craueConfig->get('pocket_consumer_key');
30 $this->logger = new NullLogger(); 34 $this->logger = new NullLogger();
35 $this->rabbitMQ = $rabbitMQ;
36 $this->producer = $producer;
31 } 37 }
32 38
33 /** 39 /**
@@ -197,7 +203,7 @@ class PocketImport extends AbstractImport
197 { 203 {
198 $i = 1; 204 $i = 1;
199 205
200 foreach ($entries as $pocketEntry) { 206 foreach ($entries as &$pocketEntry) {
201 $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; 207 $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url'];
202 208
203 $existingEntry = $this->em 209 $existingEntry = $this->em
@@ -210,12 +216,15 @@ class PocketImport extends AbstractImport
210 } 216 }
211 217
212 $entry = new Entry($this->user); 218 $entry = new Entry($this->user);
213 $entry = $this->fetchContent($entry, $url);
214 219
215 // jump to next entry in case of problem while getting content 220 if (!$this->rabbitMQ) {
216 if (false === $entry) { 221 $entry = $this->fetchContent($entry, $url);
217 ++$this->skippedEntries; 222
218 continue; 223 // jump to next entry in case of problem while getting content
224 if (false === $entry) {
225 ++$this->skippedEntries;
226 continue;
227 }
219 } 228 }
220 229
221 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted 230 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
@@ -236,6 +245,7 @@ class PocketImport extends AbstractImport
236 } 245 }
237 246
238 $entry->setTitle($title); 247 $entry->setTitle($title);
248 $entry->setUrl($url);
239 249
240 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image 250 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
241 if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { 251 if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
@@ -249,6 +259,9 @@ class PocketImport extends AbstractImport
249 ); 259 );
250 } 260 }
251 261
262 $pocketEntry['url'] = $url;
263 $pocketEntry['userId'] = $this->user->getId();
264
252 $this->em->persist($entry); 265 $this->em->persist($entry);
253 ++$this->importedEntries; 266 ++$this->importedEntries;
254 267
@@ -256,10 +269,16 @@ class PocketImport extends AbstractImport
256 if (($i % 20) === 0) { 269 if (($i % 20) === 0) {
257 $this->em->flush(); 270 $this->em->flush();
258 } 271 }
272
259 ++$i; 273 ++$i;
260 } 274 }
261 275
262 $this->em->flush(); 276 $this->em->flush();
263 $this->em->clear(); 277
278 if ($this->rabbitMQ) {
279 foreach ($entries as $entry) {
280 $this->producer->publish(serialize($entry));
281 }
282 }
264 } 283 }
265} 284}
diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml
index 520d43af..7ea54162 100644
--- a/src/Wallabag/ImportBundle/Resources/config/services.yml
+++ b/src/Wallabag/ImportBundle/Resources/config/services.yml
@@ -1,4 +1,11 @@
1services: 1services:
2 wallabag_import.consumer.entry:
3 class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer
4 arguments:
5 - "@doctrine.orm.entity_manager"
6 - "@wallabag_core.entry_repository"
7 - "@wallabag_core.content_proxy"
8
2 wallabag_import.chain: 9 wallabag_import.chain:
3 class: Wallabag\ImportBundle\Import\ImportChain 10 class: Wallabag\ImportBundle\Import\ImportChain
4 11
@@ -18,6 +25,8 @@ services:
18 - "@doctrine.orm.entity_manager" 25 - "@doctrine.orm.entity_manager"
19 - "@wallabag_core.content_proxy" 26 - "@wallabag_core.content_proxy"
20 - "@craue_config" 27 - "@craue_config"
28 - %rabbitmq%
29 - "@old_sound_rabbit_mq.wallabag_producer"
21 calls: 30 calls:
22 - [ setClient, [ "@wallabag_import.pocket.client" ] ] 31 - [ setClient, [ "@wallabag_import.pocket.client" ] ]
23 - [ setLogger, [ "@logger" ]] 32 - [ setLogger, [ "@logger" ]]
diff --git a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php
index 8534e1c8..a0f943ee 100644
--- a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php
+++ b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php
@@ -27,8 +27,9 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase
27 protected $em; 27 protected $em;
28 protected $contentProxy; 28 protected $contentProxy;
29 protected $logHandler; 29 protected $logHandler;
30 protected $producer;
30 31
31 private function getPocketImport($consumerKey = 'ConsumerKey') 32 private function getPocketImport($consumerKey = 'ConsumerKey', $rabbitMQ = false)
32 { 33 {
33 $this->user = new User(); 34 $this->user = new User();
34 35
@@ -65,11 +66,17 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase
65 ->with('pocket_consumer_key') 66 ->with('pocket_consumer_key')
66 ->willReturn($consumerKey); 67 ->willReturn($consumerKey);
67 68
69 $this->producer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\Producer')
70 ->disableOriginalConstructor()
71 ->getMock();
72
68 $pocket = new PocketImportMock( 73 $pocket = new PocketImportMock(
69 $this->tokenStorage, 74 $this->tokenStorage,
70 $this->em, 75 $this->em,
71 $this->contentProxy, 76 $this->contentProxy,
72 $config 77 $config,
78 $rabbitMQ,
79 $this->producer
73 ); 80 );
74 81
75 $this->logHandler = new TestHandler(); 82 $this->logHandler = new TestHandler();