From b3437d58ae224121375c99e9288d8b808524e624 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 9 Sep 2016 21:02:03 +0200 Subject: Enable Redis async import - using javibravo/simpleue - internal config value are now `import_with_redis` & `import_with_rabbit` which are more clear - if both option are enable rabbit will be choosen - services imports related to async are now splitted into 2 files: `redis.yml` & `rabbit.yml` - --- .../ImportBundle/Command/RedisWorkerCommand.php | 41 +++++++++++ .../ImportBundle/Consumer/AMPQ/EntryConsumer.php | 68 ------------------ .../ImportBundle/Consumer/AMPQEntryConsumer.php | 70 ++++++++++++++++++ .../ImportBundle/Consumer/RedisEntryConsumer.php | 84 ++++++++++++++++++++++ .../ImportBundle/Controller/PocketController.php | 6 +- .../Controller/ReadabilityController.php | 6 +- .../Controller/WallabagV1Controller.php | 6 +- .../Controller/WallabagV2Controller.php | 6 +- .../ImportBundle/Import/AbstractImport.php | 8 +-- src/Wallabag/ImportBundle/Redis/Producer.php | 36 ++++++++++ .../ImportBundle/Resources/config/rabbit.yml | 30 ++++++++ .../ImportBundle/Resources/config/redis.yml | 81 +++++++++++++++++++++ .../ImportBundle/Resources/config/services.yml | 33 ++------- 13 files changed, 366 insertions(+), 109 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php delete mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Redis/Producer.php create mode 100644 src/Wallabag/ImportBundle/Resources/config/rabbit.yml create mode 100644 src/Wallabag/ImportBundle/Resources/config/redis.yml (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php new file mode 100644 index 00000000..85c5a903 --- /dev/null +++ b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php @@ -0,0 +1,41 @@ +setName('wallabag:import:redis-worker') + ->setDescription('Launch Redis worker') + ->addArgument('serviceName', InputArgument::REQUIRED, 'Service to use: wallabag_v1, wallabag_v2, pocket or readability') + ; + } + + protected function execute(InputInterface $input, OutputInterface $output) + { + $output->writeln('Worker started at: '.(new \DateTime())->format('d-m-Y G:i:s')); + $output->writeln('Waiting for message ...'); + + $serviceName = $input->getArgument('serviceName'); + + if (!$this->getContainer()->has('wallabag_import.queue.redis.'.$serviceName) || !$this->getContainer()->has('wallabag_import.consumer.redis.'.$serviceName)) { + throw new Exception(sprintf('No queue or consumer found for service name: "%s"', $input->getArgument('serviceName'))); + } + + $worker = new QueueWorker( + $this->getContainer()->get('wallabag_import.queue.redis.'.$serviceName), + $this->getContainer()->get('wallabag_import.consumer.redis.'.$serviceName) + ); + + $worker->start(); + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php deleted file mode 100644 index 72a3260a..00000000 --- a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php +++ /dev/null @@ -1,68 +0,0 @@ -em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - - /** - * {@inheritdoc} - */ - public function execute(AMQPMessage $msg) - { - $storedEntry = json_decode($msg->body, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return; - } - } -} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php new file mode 100644 index 00000000..39bb5375 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php @@ -0,0 +1,70 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + + $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php new file mode 100644 index 00000000..38665b01 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php @@ -0,0 +1,84 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * Handle one message by one message. + * + * @param string $job Content of the message (directly from Redis) + * + * @return bool + */ + public function manage($job) + { + $storedEntry = json_decode($job, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return false; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return false; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return false; + } + + $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + + return true; + } + + /** + * Should tell if the given job will kill the worker. + * We don't want to stop it :). + */ + public function isStopJob($job) + { + return false; + } +} diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php index 57c007c3..3d555717 100644 --- a/src/Wallabag/ImportBundle/Controller/PocketController.php +++ b/src/Wallabag/ImportBundle/Controller/PocketController.php @@ -20,8 +20,10 @@ class PocketController extends Controller $pocket = $this->get('wallabag_import.pocket.import'); $pocket->setUser($this->getUser()); - if ($this->get('craue_config')->get('rabbitmq')) { - $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_pocket_producer')); + if ($this->get('craue_config')->get('import_with_rabbitmq')) { + $pocket->setProducer($this->get('old_sound_rabbit_mq.import_pocket_producer')); + } elseif ($this->get('craue_config')->get('import_with_redis')) { + $pocket->setProducer($this->get('wallabag_import.producer.redis.pocket')); } return $pocket; diff --git a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php index ee875a40..61243042 100644 --- a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php +++ b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php @@ -20,8 +20,10 @@ class ReadabilityController extends Controller $readability = $this->get('wallabag_import.readability.import'); $readability->setUser($this->getUser()); - if ($this->get('craue_config')->get('rabbitmq')) { - $readability->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_readability_producer')); + if ($this->get('craue_config')->get('import_with_rabbitmq')) { + $readability->setProducer($this->get('old_sound_rabbit_mq.import_readability_producer')); + } elseif ($this->get('craue_config')->get('import_with_redis')) { + $readability->setProducer($this->get('wallabag_import.producer.redis.readability')); } if ($form->isValid()) { diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php index f80aec3a..312c7a35 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php @@ -14,8 +14,10 @@ class WallabagV1Controller extends WallabagController { $service = $this->get('wallabag_import.wallabag_v1.import'); - if ($this->get('craue_config')->get('rabbitmq')) { - $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v1_producer')); + if ($this->get('craue_config')->get('import_with_rabbitmq')) { + $service->setProducer($this->get('old_sound_rabbit_mq.import_wallabag_v1_producer')); + } elseif ($this->get('craue_config')->get('import_with_redis')) { + $service->setProducer($this->get('wallabag_import.producer.redis.wallabag_v1')); } return $service; diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php index 063cddd9..45211fe6 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php @@ -14,8 +14,10 @@ class WallabagV2Controller extends WallabagController { $service = $this->get('wallabag_import.wallabag_v2.import'); - if ($this->get('craue_config')->get('rabbitmq')) { - $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v2_producer')); + if ($this->get('craue_config')->get('import_with_rabbitmq')) { + $service->setProducer($this->get('old_sound_rabbit_mq.import_wallabag_v2_producer')); + } elseif ($this->get('craue_config')->get('import_with_redis')) { + $service->setProducer($this->get('wallabag_import.producer.redis.wallabag_v2')); } return $service; diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 39befa7b..4cd8e846 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -9,7 +9,7 @@ use Wallabag\CoreBundle\Helper\ContentProxy; use Wallabag\CoreBundle\Entity\Entry; use Wallabag\CoreBundle\Entity\Tag; use Wallabag\UserBundle\Entity\User; -use OldSound\RabbitMqBundle\RabbitMq\Producer; +use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; abstract class AbstractImport implements ImportInterface { @@ -35,12 +35,12 @@ abstract class AbstractImport implements ImportInterface } /** - * Set RabbitMQ Producer to send each entry to a queue. + * Set RabbitMQ/Redis Producer to send each entry to a queue. * This method should be called when user has enabled RabbitMQ. * - * @param Producer $producer + * @param ProducerInterface $producer */ - public function setRabbitmqProducer(Producer $producer) + public function setProducer(ProducerInterface $producer) { $this->producer = $producer; } diff --git a/src/Wallabag/ImportBundle/Redis/Producer.php b/src/Wallabag/ImportBundle/Redis/Producer.php new file mode 100644 index 00000000..fedc3e57 --- /dev/null +++ b/src/Wallabag/ImportBundle/Redis/Producer.php @@ -0,0 +1,36 @@ +queue = $queue; + } + + /** + * Publish a message in the Redis queue. + * + * @param string $msgBody + * @param string $routingKey NOT USED + * @param array $additionalProperties NOT USED + */ + public function publish($msgBody, $routingKey = '', $additionalProperties = array()) + { + $this->queue->sendJob($msgBody); + } +} diff --git a/src/Wallabag/ImportBundle/Resources/config/rabbit.yml b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml new file mode 100644 index 00000000..f09dda0d --- /dev/null +++ b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml @@ -0,0 +1,30 @@ +# RabbitMQ stuff +services: + wallabag_import.consumer.ampq.pocket: + class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.pocket.import" + - "@logger" + wallabag_import.consumer.ampq.readability: + class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.readability.import" + - "@logger" + wallabag_import.consumer.ampq.wallabag_v1: + class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v1.import" + - "@logger" + wallabag_import.consumer.ampq.wallabag_v2: + class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v2.import" + - "@logger" diff --git a/src/Wallabag/ImportBundle/Resources/config/redis.yml b/src/Wallabag/ImportBundle/Resources/config/redis.yml new file mode 100644 index 00000000..7d3248e5 --- /dev/null +++ b/src/Wallabag/ImportBundle/Resources/config/redis.yml @@ -0,0 +1,81 @@ +# Redis stuff +services: + # readability + wallabag_import.queue.redis.readability: + class: Simpleue\Queue\RedisQueue + arguments: + - "@wallabag_core.redis.client" + - "wallabag.import.readability" + + wallabag_import.producer.redis.readability: + class: Wallabag\ImportBundle\Redis\Producer + arguments: + - "@wallabag_import.queue.redis.readability" + + wallabag_import.consumer.redis.readability: + class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.readability.import" + - "@logger" + + # pocket + wallabag_import.queue.redis.pocket: + class: Simpleue\Queue\RedisQueue + arguments: + - "@wallabag_core.redis.client" + - "wallabag.import.pocket" + + wallabag_import.producer.redis.pocket: + class: Wallabag\ImportBundle\Redis\Producer + arguments: + - "@wallabag_import.queue.redis.pocket" + + wallabag_import.consumer.redis.pocket: + class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.pocket.import" + - "@logger" + + # wallabag v1 + wallabag_import.queue.redis.wallabag_v1: + class: Simpleue\Queue\RedisQueue + arguments: + - "@wallabag_core.redis.client" + - "wallabag.import.wallabag_v1" + + wallabag_import.producer.redis.wallabag_v1: + class: Wallabag\ImportBundle\Redis\Producer + arguments: + - "@wallabag_import.queue.redis.wallabag_v1" + + wallabag_import.consumer.redis.wallabag_v1: + class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v1.import" + - "@logger" + + # wallabag v2 + wallabag_import.queue.redis.wallabag_v2: + class: Simpleue\Queue\RedisQueue + arguments: + - "@wallabag_core.redis.client" + - "wallabag.import.wallabag_v2" + + wallabag_import.producer.redis.wallabag_v2: + class: Wallabag\ImportBundle\Redis\Producer + arguments: + - "@wallabag_import.queue.redis.wallabag_v2" + + wallabag_import.consumer.redis.wallabag_v2: + class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v2.import" + - "@logger" diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index cad44e71..f03404ae 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -1,33 +1,8 @@ -services: - wallabag_import.consumer.pocket: - class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer - arguments: - - "@doctrine.orm.entity_manager" - - "@wallabag_user.user_repository" - - "@wallabag_import.pocket.import" - - "@logger" - wallabag_import.consumer.readability: - class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer - arguments: - - "@doctrine.orm.entity_manager" - - "@wallabag_user.user_repository" - - "@wallabag_import.readability.import" - - "@logger" - wallabag_import.consumer.wallabag_v1: - class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer - arguments: - - "@doctrine.orm.entity_manager" - - "@wallabag_user.user_repository" - - "@wallabag_import.wallabag_v1.import" - - "@logger" - wallabag_import.consumer.wallabag_v2: - class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer - arguments: - - "@doctrine.orm.entity_manager" - - "@wallabag_user.user_repository" - - "@wallabag_import.wallabag_v2.import" - - "@logger" +imports: + - { resource: rabbit.yml } + - { resource: redis.yml } +services: wallabag_import.chain: class: Wallabag\ImportBundle\Import\ImportChain -- cgit v1.2.3