aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/Wallabag/ImportBundle
diff options
context:
space:
mode:
authorJeremy Benoist <jeremy.benoist@gmail.com>2016-09-09 21:02:03 +0200
committerJeremy Benoist <jeremy.benoist@gmail.com>2016-09-11 21:58:56 +0200
commitb3437d58ae224121375c99e9288d8b808524e624 (patch)
tree94ce3446aed4396ba9304b8c97e421eba35e4edf /src/Wallabag/ImportBundle
parent7f7531171f6e49110b5842f869e37c766a682473 (diff)
downloadwallabag-b3437d58ae224121375c99e9288d8b808524e624.tar.gz
wallabag-b3437d58ae224121375c99e9288d8b808524e624.tar.zst
wallabag-b3437d58ae224121375c99e9288d8b808524e624.zip
Enable Redis async import
- using javibravo/simpleue - internal config value are now `import_with_redis` & `import_with_rabbit` which are more clear - if both option are enable rabbit will be choosen - services imports related to async are now splitted into 2 files: `redis.yml` & `rabbit.yml` -
Diffstat (limited to 'src/Wallabag/ImportBundle')
-rw-r--r--src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php41
-rw-r--r--src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php (renamed from src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php)6
-rw-r--r--src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php84
-rw-r--r--src/Wallabag/ImportBundle/Controller/PocketController.php6
-rw-r--r--src/Wallabag/ImportBundle/Controller/ReadabilityController.php6
-rw-r--r--src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php6
-rw-r--r--src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php6
-rw-r--r--src/Wallabag/ImportBundle/Import/AbstractImport.php8
-rw-r--r--src/Wallabag/ImportBundle/Redis/Producer.php36
-rw-r--r--src/Wallabag/ImportBundle/Resources/config/rabbit.yml30
-rw-r--r--src/Wallabag/ImportBundle/Resources/config/redis.yml81
-rw-r--r--src/Wallabag/ImportBundle/Resources/config/services.yml33
12 files changed, 300 insertions, 43 deletions
diff --git a/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php
new file mode 100644
index 00000000..85c5a903
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php
@@ -0,0 +1,41 @@
1<?php
2
3namespace Wallabag\ImportBundle\Command;
4
5use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
6use Symfony\Component\Config\Definition\Exception\Exception;
7use Symfony\Component\Console\Input\InputArgument;
8use Symfony\Component\Console\Input\InputInterface;
9use Symfony\Component\Console\Output\OutputInterface;
10use Simpleue\Worker\QueueWorker;
11
12class RedisWorkerCommand extends ContainerAwareCommand
13{
14 protected function configure()
15 {
16 $this
17 ->setName('wallabag:import:redis-worker')
18 ->setDescription('Launch Redis worker')
19 ->addArgument('serviceName', InputArgument::REQUIRED, 'Service to use: wallabag_v1, wallabag_v2, pocket or readability')
20 ;
21 }
22
23 protected function execute(InputInterface $input, OutputInterface $output)
24 {
25 $output->writeln('Worker started at: '.(new \DateTime())->format('d-m-Y G:i:s'));
26 $output->writeln('Waiting for message ...');
27
28 $serviceName = $input->getArgument('serviceName');
29
30 if (!$this->getContainer()->has('wallabag_import.queue.redis.'.$serviceName) || !$this->getContainer()->has('wallabag_import.consumer.redis.'.$serviceName)) {
31 throw new Exception(sprintf('No queue or consumer found for service name: "%s"', $input->getArgument('serviceName')));
32 }
33
34 $worker = new QueueWorker(
35 $this->getContainer()->get('wallabag_import.queue.redis.'.$serviceName),
36 $this->getContainer()->get('wallabag_import.consumer.redis.'.$serviceName)
37 );
38
39 $worker->start();
40 }
41}
diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php
index 72a3260a..39bb5375 100644
--- a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php
+++ b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php
@@ -1,6 +1,6 @@
1<?php 1<?php
2 2
3namespace Wallabag\ImportBundle\Consumer\AMPQ; 3namespace Wallabag\ImportBundle\Consumer;
4 4
5use Doctrine\ORM\EntityManager; 5use Doctrine\ORM\EntityManager;
6use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; 6use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
@@ -12,7 +12,7 @@ use Wallabag\CoreBundle\Entity\Tag;
12use Psr\Log\LoggerInterface; 12use Psr\Log\LoggerInterface;
13use Psr\Log\NullLogger; 13use Psr\Log\NullLogger;
14 14
15class EntryConsumer implements ConsumerInterface 15class AMPQEntryConsumer implements ConsumerInterface
16{ 16{
17 private $em; 17 private $em;
18 private $userRepository; 18 private $userRepository;
@@ -64,5 +64,7 @@ class EntryConsumer implements ConsumerInterface
64 64
65 return; 65 return;
66 } 66 }
67
68 $this->logger->info('Content with url ('.$entry->getUrl().') imported !');
67 } 69 }
68} 70}
diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php
new file mode 100644
index 00000000..38665b01
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php
@@ -0,0 +1,84 @@
1<?php
2
3namespace Wallabag\ImportBundle\Consumer;
4
5use Simpleue\Job\Job;
6use Doctrine\ORM\EntityManager;
7use Wallabag\ImportBundle\Import\AbstractImport;
8use Wallabag\UserBundle\Repository\UserRepository;
9use Wallabag\CoreBundle\Entity\Entry;
10use Wallabag\CoreBundle\Entity\Tag;
11use Psr\Log\LoggerInterface;
12use Psr\Log\NullLogger;
13
14class RedisEntryConsumer implements Job
15{
16 private $em;
17 private $userRepository;
18 private $import;
19 private $logger;
20
21 public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null)
22 {
23 $this->em = $em;
24 $this->userRepository = $userRepository;
25 $this->import = $import;
26 $this->logger = $logger ?: new NullLogger();
27 }
28
29 /**
30 * Handle one message by one message.
31 *
32 * @param string $job Content of the message (directly from Redis)
33 *
34 * @return bool
35 */
36 public function manage($job)
37 {
38 $storedEntry = json_decode($job, true);
39
40 $user = $this->userRepository->find($storedEntry['userId']);
41
42 // no user? Drop message
43 if (null === $user) {
44 $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]);
45
46 return false;
47 }
48
49 $this->import->setUser($user);
50
51 $entry = $this->import->parseEntry($storedEntry);
52
53 if (null === $entry) {
54 $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]);
55
56 return false;
57 }
58
59 try {
60 $this->em->flush();
61
62 // clear only affected entities
63 $this->em->clear(Entry::class);
64 $this->em->clear(Tag::class);
65 } catch (\Exception $e) {
66 $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);
67
68 return false;
69 }
70
71 $this->logger->info('Content with url ('.$entry->getUrl().') imported !');
72
73 return true;
74 }
75
76 /**
77 * Should tell if the given job will kill the worker.
78 * We don't want to stop it :).
79 */
80 public function isStopJob($job)
81 {
82 return false;
83 }
84}
diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php
index 57c007c3..3d555717 100644
--- a/src/Wallabag/ImportBundle/Controller/PocketController.php
+++ b/src/Wallabag/ImportBundle/Controller/PocketController.php
@@ -20,8 +20,10 @@ class PocketController extends Controller
20 $pocket = $this->get('wallabag_import.pocket.import'); 20 $pocket = $this->get('wallabag_import.pocket.import');
21 $pocket->setUser($this->getUser()); 21 $pocket->setUser($this->getUser());
22 22
23 if ($this->get('craue_config')->get('rabbitmq')) { 23 if ($this->get('craue_config')->get('import_with_rabbitmq')) {
24 $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_pocket_producer')); 24 $pocket->setProducer($this->get('old_sound_rabbit_mq.import_pocket_producer'));
25 } elseif ($this->get('craue_config')->get('import_with_redis')) {
26 $pocket->setProducer($this->get('wallabag_import.producer.redis.pocket'));
25 } 27 }
26 28
27 return $pocket; 29 return $pocket;
diff --git a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php
index ee875a40..61243042 100644
--- a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php
+++ b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php
@@ -20,8 +20,10 @@ class ReadabilityController extends Controller
20 $readability = $this->get('wallabag_import.readability.import'); 20 $readability = $this->get('wallabag_import.readability.import');
21 $readability->setUser($this->getUser()); 21 $readability->setUser($this->getUser());
22 22
23 if ($this->get('craue_config')->get('rabbitmq')) { 23 if ($this->get('craue_config')->get('import_with_rabbitmq')) {
24 $readability->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_readability_producer')); 24 $readability->setProducer($this->get('old_sound_rabbit_mq.import_readability_producer'));
25 } elseif ($this->get('craue_config')->get('import_with_redis')) {
26 $readability->setProducer($this->get('wallabag_import.producer.redis.readability'));
25 } 27 }
26 28
27 if ($form->isValid()) { 29 if ($form->isValid()) {
diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php
index f80aec3a..312c7a35 100644
--- a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php
+++ b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php
@@ -14,8 +14,10 @@ class WallabagV1Controller extends WallabagController
14 { 14 {
15 $service = $this->get('wallabag_import.wallabag_v1.import'); 15 $service = $this->get('wallabag_import.wallabag_v1.import');
16 16
17 if ($this->get('craue_config')->get('rabbitmq')) { 17 if ($this->get('craue_config')->get('import_with_rabbitmq')) {
18 $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v1_producer')); 18 $service->setProducer($this->get('old_sound_rabbit_mq.import_wallabag_v1_producer'));
19 } elseif ($this->get('craue_config')->get('import_with_redis')) {
20 $service->setProducer($this->get('wallabag_import.producer.redis.wallabag_v1'));
19 } 21 }
20 22
21 return $service; 23 return $service;
diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php
index 063cddd9..45211fe6 100644
--- a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php
+++ b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php
@@ -14,8 +14,10 @@ class WallabagV2Controller extends WallabagController
14 { 14 {
15 $service = $this->get('wallabag_import.wallabag_v2.import'); 15 $service = $this->get('wallabag_import.wallabag_v2.import');
16 16
17 if ($this->get('craue_config')->get('rabbitmq')) { 17 if ($this->get('craue_config')->get('import_with_rabbitmq')) {
18 $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v2_producer')); 18 $service->setProducer($this->get('old_sound_rabbit_mq.import_wallabag_v2_producer'));
19 } elseif ($this->get('craue_config')->get('import_with_redis')) {
20 $service->setProducer($this->get('wallabag_import.producer.redis.wallabag_v2'));
19 } 21 }
20 22
21 return $service; 23 return $service;
diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php
index 39befa7b..4cd8e846 100644
--- a/src/Wallabag/ImportBundle/Import/AbstractImport.php
+++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php
@@ -9,7 +9,7 @@ use Wallabag\CoreBundle\Helper\ContentProxy;
9use Wallabag\CoreBundle\Entity\Entry; 9use Wallabag\CoreBundle\Entity\Entry;
10use Wallabag\CoreBundle\Entity\Tag; 10use Wallabag\CoreBundle\Entity\Tag;
11use Wallabag\UserBundle\Entity\User; 11use Wallabag\UserBundle\Entity\User;
12use OldSound\RabbitMqBundle\RabbitMq\Producer; 12use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
13 13
14abstract class AbstractImport implements ImportInterface 14abstract class AbstractImport implements ImportInterface
15{ 15{
@@ -35,12 +35,12 @@ abstract class AbstractImport implements ImportInterface
35 } 35 }
36 36
37 /** 37 /**
38 * Set RabbitMQ Producer to send each entry to a queue. 38 * Set RabbitMQ/Redis Producer to send each entry to a queue.
39 * This method should be called when user has enabled RabbitMQ. 39 * This method should be called when user has enabled RabbitMQ.
40 * 40 *
41 * @param Producer $producer 41 * @param ProducerInterface $producer
42 */ 42 */
43 public function setRabbitmqProducer(Producer $producer) 43 public function setProducer(ProducerInterface $producer)
44 { 44 {
45 $this->producer = $producer; 45 $this->producer = $producer;
46 } 46 }
diff --git a/src/Wallabag/ImportBundle/Redis/Producer.php b/src/Wallabag/ImportBundle/Redis/Producer.php
new file mode 100644
index 00000000..fedc3e57
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Redis/Producer.php
@@ -0,0 +1,36 @@
1<?php
2
3namespace Wallabag\ImportBundle\Redis;
4
5use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
6use Simpleue\Queue\RedisQueue;
7
8/**
9 * This is a proxy class for "Simpleue\Queue\RedisQueue".
10 * It allow us to use the same way to publish a message between RabbitMQ & Redis: publish().
11 *
12 * It implements the ProducerInterface of RabbitMQ (yes it's ugly) so we can have the same
13 * kind of class which implements the same interface.
14 * So we can inject either a RabbitMQ producer or a Redis producer with the same signature
15 */
16class Producer implements ProducerInterface
17{
18 private $queue;
19
20 public function __construct(RedisQueue $queue)
21 {
22 $this->queue = $queue;
23 }
24
25 /**
26 * Publish a message in the Redis queue.
27 *
28 * @param string $msgBody
29 * @param string $routingKey NOT USED
30 * @param array $additionalProperties NOT USED
31 */
32 public function publish($msgBody, $routingKey = '', $additionalProperties = array())
33 {
34 $this->queue->sendJob($msgBody);
35 }
36}
diff --git a/src/Wallabag/ImportBundle/Resources/config/rabbit.yml b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml
new file mode 100644
index 00000000..f09dda0d
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml
@@ -0,0 +1,30 @@
1# RabbitMQ stuff
2services:
3 wallabag_import.consumer.ampq.pocket:
4 class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer
5 arguments:
6 - "@doctrine.orm.entity_manager"
7 - "@wallabag_user.user_repository"
8 - "@wallabag_import.pocket.import"
9 - "@logger"
10 wallabag_import.consumer.ampq.readability:
11 class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer
12 arguments:
13 - "@doctrine.orm.entity_manager"
14 - "@wallabag_user.user_repository"
15 - "@wallabag_import.readability.import"
16 - "@logger"
17 wallabag_import.consumer.ampq.wallabag_v1:
18 class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer
19 arguments:
20 - "@doctrine.orm.entity_manager"
21 - "@wallabag_user.user_repository"
22 - "@wallabag_import.wallabag_v1.import"
23 - "@logger"
24 wallabag_import.consumer.ampq.wallabag_v2:
25 class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer
26 arguments:
27 - "@doctrine.orm.entity_manager"
28 - "@wallabag_user.user_repository"
29 - "@wallabag_import.wallabag_v2.import"
30 - "@logger"
diff --git a/src/Wallabag/ImportBundle/Resources/config/redis.yml b/src/Wallabag/ImportBundle/Resources/config/redis.yml
new file mode 100644
index 00000000..7d3248e5
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Resources/config/redis.yml
@@ -0,0 +1,81 @@
1# Redis stuff
2services:
3 # readability
4 wallabag_import.queue.redis.readability:
5 class: Simpleue\Queue\RedisQueue
6 arguments:
7 - "@wallabag_core.redis.client"
8 - "wallabag.import.readability"
9
10 wallabag_import.producer.redis.readability:
11 class: Wallabag\ImportBundle\Redis\Producer
12 arguments:
13 - "@wallabag_import.queue.redis.readability"
14
15 wallabag_import.consumer.redis.readability:
16 class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer
17 arguments:
18 - "@doctrine.orm.entity_manager"
19 - "@wallabag_user.user_repository"
20 - "@wallabag_import.readability.import"
21 - "@logger"
22
23 # pocket
24 wallabag_import.queue.redis.pocket:
25 class: Simpleue\Queue\RedisQueue
26 arguments:
27 - "@wallabag_core.redis.client"
28 - "wallabag.import.pocket"
29
30 wallabag_import.producer.redis.pocket:
31 class: Wallabag\ImportBundle\Redis\Producer
32 arguments:
33 - "@wallabag_import.queue.redis.pocket"
34
35 wallabag_import.consumer.redis.pocket:
36 class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer
37 arguments:
38 - "@doctrine.orm.entity_manager"
39 - "@wallabag_user.user_repository"
40 - "@wallabag_import.pocket.import"
41 - "@logger"
42
43 # wallabag v1
44 wallabag_import.queue.redis.wallabag_v1:
45 class: Simpleue\Queue\RedisQueue
46 arguments:
47 - "@wallabag_core.redis.client"
48 - "wallabag.import.wallabag_v1"
49
50 wallabag_import.producer.redis.wallabag_v1:
51 class: Wallabag\ImportBundle\Redis\Producer
52 arguments:
53 - "@wallabag_import.queue.redis.wallabag_v1"
54
55 wallabag_import.consumer.redis.wallabag_v1:
56 class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer
57 arguments:
58 - "@doctrine.orm.entity_manager"
59 - "@wallabag_user.user_repository"
60 - "@wallabag_import.wallabag_v1.import"
61 - "@logger"
62
63 # wallabag v2
64 wallabag_import.queue.redis.wallabag_v2:
65 class: Simpleue\Queue\RedisQueue
66 arguments:
67 - "@wallabag_core.redis.client"
68 - "wallabag.import.wallabag_v2"
69
70 wallabag_import.producer.redis.wallabag_v2:
71 class: Wallabag\ImportBundle\Redis\Producer
72 arguments:
73 - "@wallabag_import.queue.redis.wallabag_v2"
74
75 wallabag_import.consumer.redis.wallabag_v2:
76 class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer
77 arguments:
78 - "@doctrine.orm.entity_manager"
79 - "@wallabag_user.user_repository"
80 - "@wallabag_import.wallabag_v2.import"
81 - "@logger"
diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml
index cad44e71..f03404ae 100644
--- a/src/Wallabag/ImportBundle/Resources/config/services.yml
+++ b/src/Wallabag/ImportBundle/Resources/config/services.yml
@@ -1,33 +1,8 @@
1services: 1imports:
2 wallabag_import.consumer.pocket: 2 - { resource: rabbit.yml }
3 class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer 3 - { resource: redis.yml }
4 arguments:
5 - "@doctrine.orm.entity_manager"
6 - "@wallabag_user.user_repository"
7 - "@wallabag_import.pocket.import"
8 - "@logger"
9 wallabag_import.consumer.readability:
10 class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer
11 arguments:
12 - "@doctrine.orm.entity_manager"
13 - "@wallabag_user.user_repository"
14 - "@wallabag_import.readability.import"
15 - "@logger"
16 wallabag_import.consumer.wallabag_v1:
17 class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer
18 arguments:
19 - "@doctrine.orm.entity_manager"
20 - "@wallabag_user.user_repository"
21 - "@wallabag_import.wallabag_v1.import"
22 - "@logger"
23 wallabag_import.consumer.wallabag_v2:
24 class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer
25 arguments:
26 - "@doctrine.orm.entity_manager"
27 - "@wallabag_user.user_repository"
28 - "@wallabag_import.wallabag_v2.import"
29 - "@logger"
30 4
5services:
31 wallabag_import.chain: 6 wallabag_import.chain:
32 class: Wallabag\ImportBundle\Import\ImportChain 7 class: Wallabag\ImportBundle\Import\ImportChain
33 8