diff options
author | Nicolas LÅ“uillet <nicolas.loeuillet@smile.fr> | 2016-01-15 08:24:32 +0100 |
---|---|---|
committer | Jeremy Benoist <jeremy.benoist@gmail.com> | 2016-09-11 21:57:09 +0200 |
commit | 56c778b4152a1b886353933276ee3626e4e8c004 (patch) | |
tree | 203a2bc2fc36016b5a51703b04d86b034d9393c3 | |
parent | 59758d8fe5ad5ff554391515a78d84b0d47bdb76 (diff) | |
download | wallabag-56c778b4152a1b886353933276ee3626e4e8c004.tar.gz wallabag-56c778b4152a1b886353933276ee3626e4e8c004.tar.zst wallabag-56c778b4152a1b886353933276ee3626e4e8c004.zip |
1st draft for rabbitMQ
-rw-r--r-- | app/AppKernel.php | 1 | ||||
-rw-r--r-- | app/config/config.yml | 25 | ||||
-rw-r--r-- | app/config/parameters.yml.dist | 12 | ||||
-rw-r--r-- | composer.json | 3 | ||||
-rw-r--r-- | docs/en/developer/rabbitmq.rst | 49 | ||||
-rw-r--r-- | src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php | 39 | ||||
-rw-r--r-- | src/Wallabag/ImportBundle/Import/PocketImport.php | 35 | ||||
-rw-r--r-- | src/Wallabag/ImportBundle/Resources/config/services.yml | 9 | ||||
-rw-r--r-- | tests/Wallabag/ImportBundle/Import/PocketImportTest.php | 11 |
9 files changed, 173 insertions, 11 deletions
diff --git a/app/AppKernel.php b/app/AppKernel.php index 96e45da8..52f85558 100644 --- a/app/AppKernel.php +++ b/app/AppKernel.php | |||
@@ -38,6 +38,7 @@ class AppKernel extends Kernel | |||
38 | new Wallabag\UserBundle\WallabagUserBundle(), | 38 | new Wallabag\UserBundle\WallabagUserBundle(), |
39 | new Wallabag\ImportBundle\WallabagImportBundle(), | 39 | new Wallabag\ImportBundle\WallabagImportBundle(), |
40 | new Wallabag\AnnotationBundle\WallabagAnnotationBundle(), | 40 | new Wallabag\AnnotationBundle\WallabagAnnotationBundle(), |
41 | new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(), | ||
41 | ]; | 42 | ]; |
42 | 43 | ||
43 | if (in_array($this->getEnvironment(), ['dev', 'test'], true)) { | 44 | if (in_array($this->getEnvironment(), ['dev', 'test'], true)) { |
diff --git a/app/config/config.yml b/app/config/config.yml index 31bd8a8c..ef5ae0aa 100644 --- a/app/config/config.yml +++ b/app/config/config.yml | |||
@@ -215,3 +215,28 @@ lexik_maintenance: | |||
215 | response: | 215 | response: |
216 | code: 503 | 216 | code: 503 |
217 | status: "wallabag Service Temporarily Unavailable" | 217 | status: "wallabag Service Temporarily Unavailable" |
218 | |||
219 | old_sound_rabbit_mq: | ||
220 | connections: | ||
221 | default: | ||
222 | host: %rabbitmq_host% | ||
223 | port: %rabbitmq_port% | ||
224 | user: %rabbitmq_user% | ||
225 | password: %rabbitmq_password% | ||
226 | vhost: / | ||
227 | lazy: false | ||
228 | producers: | ||
229 | wallabag: | ||
230 | connection: default | ||
231 | exchange_options: | ||
232 | name: 'wallabag_exchange' | ||
233 | type: topic | ||
234 | consumers: | ||
235 | entries: | ||
236 | connection: default | ||
237 | exchange_options: | ||
238 | name: 'wallabag_exchange' | ||
239 | type: topic | ||
240 | queue_options: | ||
241 | name: 'wallabag_queue' | ||
242 | callback: wallabag_import.consumer.entry | ||
diff --git a/app/config/parameters.yml.dist b/app/config/parameters.yml.dist index d092e139..e925b412 100644 --- a/app/config/parameters.yml.dist +++ b/app/config/parameters.yml.dist | |||
@@ -38,3 +38,15 @@ parameters: | |||
38 | fosuser_confirmation: true | 38 | fosuser_confirmation: true |
39 | 39 | ||
40 | from_email: no-reply@wallabag.org | 40 | from_email: no-reply@wallabag.org |
41 | |||
42 | rss_limit: 50 | ||
43 | |||
44 | # pocket import | ||
45 | pocket_consumer_key: xxxxxxxx | ||
46 | |||
47 | # RabbitMQ processing | ||
48 | rabbitmq: false | ||
49 | rabbitmq_host: localhost | ||
50 | rabbitmq_port: 5672 | ||
51 | rabbitmq_user: guest | ||
52 | rabbitmq_password: guest | ||
diff --git a/composer.json b/composer.json index bdaad601..121a3fd3 100644 --- a/composer.json +++ b/composer.json | |||
@@ -81,7 +81,8 @@ | |||
81 | "lexik/maintenance-bundle": "~2.1", | 81 | "lexik/maintenance-bundle": "~2.1", |
82 | "ocramius/proxy-manager": "1.*", | 82 | "ocramius/proxy-manager": "1.*", |
83 | "white-october/pagerfanta-bundle": "^1.0", | 83 | "white-october/pagerfanta-bundle": "^1.0", |
84 | "mouf/nodejs-installer": "~1.0" | 84 | "mouf/nodejs-installer": "~1.0", |
85 | "php-amqplib/rabbitmq-bundle": "^1.8" | ||
85 | }, | 86 | }, |
86 | "require-dev": { | 87 | "require-dev": { |
87 | "doctrine/doctrine-fixtures-bundle": "~2.2", | 88 | "doctrine/doctrine-fixtures-bundle": "~2.2", |
diff --git a/docs/en/developer/rabbitmq.rst b/docs/en/developer/rabbitmq.rst new file mode 100644 index 00000000..a17e6e4d --- /dev/null +++ b/docs/en/developer/rabbitmq.rst | |||
@@ -0,0 +1,49 @@ | |||
1 | Install RabbitMQ for asynchronous tasks | ||
2 | ======================================= | ||
3 | |||
4 | In order to launch asynchronous tasks (useful for huge imports for example), we use RabbitMQ. | ||
5 | |||
6 | Requirements | ||
7 | ------------ | ||
8 | |||
9 | You need to have RabbitMQ installed on your server. | ||
10 | |||
11 | Installation | ||
12 | ~~~~~~~~~~~~ | ||
13 | |||
14 | .. code:: bash | ||
15 | |||
16 | wget https://www.rabbitmq.com/rabbitmq-signing-key-public.asc | ||
17 | apt-key add rabbitmq-signing-key-public.asc | ||
18 | apt-get update | ||
19 | apt-get install rabbitmq-server | ||
20 | |||
21 | Configuration and launch | ||
22 | ~~~~~~~~~~~~~~~~~~~~~~~~ | ||
23 | |||
24 | .. code:: bash | ||
25 | |||
26 | rabbitmq-plugins enable rabbitmq_management # (useful to have a web interface, available at http://localhost:15672/ (guest/guest) | ||
27 | rabbitmq-server -detached | ||
28 | |||
29 | Stop RabbitMQ | ||
30 | ~~~~~~~~~~~~~ | ||
31 | |||
32 | .. code:: bash | ||
33 | |||
34 | rabbitmqctl stop | ||
35 | |||
36 | |||
37 | Configure RabbitMQ in wallabag | ||
38 | ------------------------------ | ||
39 | |||
40 | Edit your ``parameters.yml`` file to edit RabbitMQ configuration. | ||
41 | |||
42 | Launch RabbitMQ consumer | ||
43 | ------------------------ | ||
44 | |||
45 | Put this command in a cron job: | ||
46 | |||
47 | .. code:: bash | ||
48 | |||
49 | bin/console rabbitmq:consumer entries -w \ No newline at end of file | ||
diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php new file mode 100644 index 00000000..7775f01c --- /dev/null +++ b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php | |||
@@ -0,0 +1,39 @@ | |||
1 | <?php | ||
2 | |||
3 | namespace Wallabag\ImportBundle\Component\AMPQ; | ||
4 | |||
5 | use Doctrine\ORM\EntityManager; | ||
6 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; | ||
7 | use PhpAmqpLib\Message\AMQPMessage; | ||
8 | use Wallabag\CoreBundle\Helper\ContentProxy; | ||
9 | use Wallabag\CoreBundle\Repository\EntryRepository; | ||
10 | |||
11 | class EntryConsumer implements ConsumerInterface | ||
12 | { | ||
13 | private $em; | ||
14 | private $contentProxy; | ||
15 | private $entryRepository; | ||
16 | |||
17 | public function __construct(EntityManager $em, EntryRepository $entryRepository, ContentProxy $contentProxy) | ||
18 | { | ||
19 | $this->em = $em; | ||
20 | $this->entryRepository = $entryRepository; | ||
21 | $this->contentProxy = $contentProxy; | ||
22 | } | ||
23 | |||
24 | /** | ||
25 | * {@inheritdoc} | ||
26 | */ | ||
27 | public function execute(AMQPMessage $msg) | ||
28 | { | ||
29 | $storedEntry = unserialize($msg->body); | ||
30 | $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']); | ||
31 | if ($entry) { | ||
32 | $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl()); | ||
33 | if ($entry) { | ||
34 | $this->em->persist($entry); | ||
35 | $this->em->flush(); | ||
36 | } | ||
37 | } | ||
38 | } | ||
39 | } | ||
diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index a6f905b1..b02894f0 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php | |||
@@ -2,6 +2,8 @@ | |||
2 | 2 | ||
3 | namespace Wallabag\ImportBundle\Import; | 3 | namespace Wallabag\ImportBundle\Import; |
4 | 4 | ||
5 | use OldSound\RabbitMqBundle\RabbitMq\Producer; | ||
6 | use Psr\Log\LoggerInterface; | ||
5 | use Psr\Log\NullLogger; | 7 | use Psr\Log\NullLogger; |
6 | use Doctrine\ORM\EntityManager; | 8 | use Doctrine\ORM\EntityManager; |
7 | use GuzzleHttp\Client; | 9 | use GuzzleHttp\Client; |
@@ -20,14 +22,18 @@ class PocketImport extends AbstractImport | |||
20 | private $importedEntries = 0; | 22 | private $importedEntries = 0; |
21 | private $markAsRead; | 23 | private $markAsRead; |
22 | protected $accessToken; | 24 | protected $accessToken; |
25 | private $producer; | ||
26 | private $rabbitMQ; | ||
23 | 27 | ||
24 | public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) | 28 | public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, $rabbitMQ, Producer $producer) |
25 | { | 29 | { |
26 | $this->user = $tokenStorage->getToken()->getUser(); | 30 | $this->user = $tokenStorage->getToken()->getUser(); |
27 | $this->em = $em; | 31 | $this->em = $em; |
28 | $this->contentProxy = $contentProxy; | 32 | $this->contentProxy = $contentProxy; |
29 | $this->consumerKey = $craueConfig->get('pocket_consumer_key'); | 33 | $this->consumerKey = $craueConfig->get('pocket_consumer_key'); |
30 | $this->logger = new NullLogger(); | 34 | $this->logger = new NullLogger(); |
35 | $this->rabbitMQ = $rabbitMQ; | ||
36 | $this->producer = $producer; | ||
31 | } | 37 | } |
32 | 38 | ||
33 | /** | 39 | /** |
@@ -197,7 +203,7 @@ class PocketImport extends AbstractImport | |||
197 | { | 203 | { |
198 | $i = 1; | 204 | $i = 1; |
199 | 205 | ||
200 | foreach ($entries as $pocketEntry) { | 206 | foreach ($entries as &$pocketEntry) { |
201 | $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; | 207 | $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; |
202 | 208 | ||
203 | $existingEntry = $this->em | 209 | $existingEntry = $this->em |
@@ -210,12 +216,15 @@ class PocketImport extends AbstractImport | |||
210 | } | 216 | } |
211 | 217 | ||
212 | $entry = new Entry($this->user); | 218 | $entry = new Entry($this->user); |
213 | $entry = $this->fetchContent($entry, $url); | ||
214 | 219 | ||
215 | // jump to next entry in case of problem while getting content | 220 | if (!$this->rabbitMQ) { |
216 | if (false === $entry) { | 221 | $entry = $this->fetchContent($entry, $url); |
217 | ++$this->skippedEntries; | 222 | |
218 | continue; | 223 | // jump to next entry in case of problem while getting content |
224 | if (false === $entry) { | ||
225 | ++$this->skippedEntries; | ||
226 | continue; | ||
227 | } | ||
219 | } | 228 | } |
220 | 229 | ||
221 | // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted | 230 | // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted |
@@ -236,6 +245,7 @@ class PocketImport extends AbstractImport | |||
236 | } | 245 | } |
237 | 246 | ||
238 | $entry->setTitle($title); | 247 | $entry->setTitle($title); |
248 | $entry->setUrl($url); | ||
239 | 249 | ||
240 | // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image | 250 | // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image |
241 | if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { | 251 | if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { |
@@ -249,6 +259,9 @@ class PocketImport extends AbstractImport | |||
249 | ); | 259 | ); |
250 | } | 260 | } |
251 | 261 | ||
262 | $pocketEntry['url'] = $url; | ||
263 | $pocketEntry['userId'] = $this->user->getId(); | ||
264 | |||
252 | $this->em->persist($entry); | 265 | $this->em->persist($entry); |
253 | ++$this->importedEntries; | 266 | ++$this->importedEntries; |
254 | 267 | ||
@@ -256,10 +269,16 @@ class PocketImport extends AbstractImport | |||
256 | if (($i % 20) === 0) { | 269 | if (($i % 20) === 0) { |
257 | $this->em->flush(); | 270 | $this->em->flush(); |
258 | } | 271 | } |
272 | |||
259 | ++$i; | 273 | ++$i; |
260 | } | 274 | } |
261 | 275 | ||
262 | $this->em->flush(); | 276 | $this->em->flush(); |
263 | $this->em->clear(); | 277 | |
278 | if ($this->rabbitMQ) { | ||
279 | foreach ($entries as $entry) { | ||
280 | $this->producer->publish(serialize($entry)); | ||
281 | } | ||
282 | } | ||
264 | } | 283 | } |
265 | } | 284 | } |
diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index 520d43af..7ea54162 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml | |||
@@ -1,4 +1,11 @@ | |||
1 | services: | 1 | services: |
2 | wallabag_import.consumer.entry: | ||
3 | class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer | ||
4 | arguments: | ||
5 | - "@doctrine.orm.entity_manager" | ||
6 | - "@wallabag_core.entry_repository" | ||
7 | - "@wallabag_core.content_proxy" | ||
8 | |||
2 | wallabag_import.chain: | 9 | wallabag_import.chain: |
3 | class: Wallabag\ImportBundle\Import\ImportChain | 10 | class: Wallabag\ImportBundle\Import\ImportChain |
4 | 11 | ||
@@ -18,6 +25,8 @@ services: | |||
18 | - "@doctrine.orm.entity_manager" | 25 | - "@doctrine.orm.entity_manager" |
19 | - "@wallabag_core.content_proxy" | 26 | - "@wallabag_core.content_proxy" |
20 | - "@craue_config" | 27 | - "@craue_config" |
28 | - %rabbitmq% | ||
29 | - "@old_sound_rabbit_mq.wallabag_producer" | ||
21 | calls: | 30 | calls: |
22 | - [ setClient, [ "@wallabag_import.pocket.client" ] ] | 31 | - [ setClient, [ "@wallabag_import.pocket.client" ] ] |
23 | - [ setLogger, [ "@logger" ]] | 32 | - [ setLogger, [ "@logger" ]] |
diff --git a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php index 8534e1c8..a0f943ee 100644 --- a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php +++ b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php | |||
@@ -27,8 +27,9 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase | |||
27 | protected $em; | 27 | protected $em; |
28 | protected $contentProxy; | 28 | protected $contentProxy; |
29 | protected $logHandler; | 29 | protected $logHandler; |
30 | protected $producer; | ||
30 | 31 | ||
31 | private function getPocketImport($consumerKey = 'ConsumerKey') | 32 | private function getPocketImport($consumerKey = 'ConsumerKey', $rabbitMQ = false) |
32 | { | 33 | { |
33 | $this->user = new User(); | 34 | $this->user = new User(); |
34 | 35 | ||
@@ -65,11 +66,17 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase | |||
65 | ->with('pocket_consumer_key') | 66 | ->with('pocket_consumer_key') |
66 | ->willReturn($consumerKey); | 67 | ->willReturn($consumerKey); |
67 | 68 | ||
69 | $this->producer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\Producer') | ||
70 | ->disableOriginalConstructor() | ||
71 | ->getMock(); | ||
72 | |||
68 | $pocket = new PocketImportMock( | 73 | $pocket = new PocketImportMock( |
69 | $this->tokenStorage, | 74 | $this->tokenStorage, |
70 | $this->em, | 75 | $this->em, |
71 | $this->contentProxy, | 76 | $this->contentProxy, |
72 | $config | 77 | $config, |
78 | $rabbitMQ, | ||
79 | $this->producer | ||
73 | ); | 80 | ); |
74 | 81 | ||
75 | $this->logHandler = new TestHandler(); | 82 | $this->logHandler = new TestHandler(); |