From 56c778b4152a1b886353933276ee3626e4e8c004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20L=C5=93uillet?= Date: Fri, 15 Jan 2016 08:24:32 +0100 Subject: 1st draft for rabbitMQ --- .../ImportBundle/Component/AMPQ/EntryConsumer.php | 39 ++++++++++++++++++++++ src/Wallabag/ImportBundle/Import/PocketImport.php | 35 ++++++++++++++----- .../ImportBundle/Resources/config/services.yml | 9 +++++ 3 files changed, 75 insertions(+), 8 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php (limited to 'src/Wallabag') diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php new file mode 100644 index 00000000..7775f01c --- /dev/null +++ b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php @@ -0,0 +1,39 @@ +em = $em; + $this->entryRepository = $entryRepository; + $this->contentProxy = $contentProxy; + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = unserialize($msg->body); + $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']); + if ($entry) { + $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl()); + if ($entry) { + $this->em->persist($entry); + $this->em->flush(); + } + } + } +} diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index a6f905b1..b02894f0 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -2,6 +2,8 @@ namespace Wallabag\ImportBundle\Import; +use OldSound\RabbitMqBundle\RabbitMq\Producer; +use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use GuzzleHttp\Client; @@ -20,14 +22,18 @@ class PocketImport extends AbstractImport private $importedEntries = 0; private $markAsRead; protected $accessToken; + private $producer; + private $rabbitMQ; - public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) + public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, $rabbitMQ, Producer $producer) { $this->user = $tokenStorage->getToken()->getUser(); $this->em = $em; $this->contentProxy = $contentProxy; $this->consumerKey = $craueConfig->get('pocket_consumer_key'); $this->logger = new NullLogger(); + $this->rabbitMQ = $rabbitMQ; + $this->producer = $producer; } /** @@ -197,7 +203,7 @@ class PocketImport extends AbstractImport { $i = 1; - foreach ($entries as $pocketEntry) { + foreach ($entries as &$pocketEntry) { $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; $existingEntry = $this->em @@ -210,12 +216,15 @@ class PocketImport extends AbstractImport } $entry = new Entry($this->user); - $entry = $this->fetchContent($entry, $url); - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; - continue; + if (!$this->rabbitMQ) { + $entry = $this->fetchContent($entry, $url); + + // jump to next entry in case of problem while getting content + if (false === $entry) { + ++$this->skippedEntries; + continue; + } } // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted @@ -236,6 +245,7 @@ class PocketImport extends AbstractImport } $entry->setTitle($title); + $entry->setUrl($url); // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { @@ -249,6 +259,9 @@ class PocketImport extends AbstractImport ); } + $pocketEntry['url'] = $url; + $pocketEntry['userId'] = $this->user->getId(); + $this->em->persist($entry); ++$this->importedEntries; @@ -256,10 +269,16 @@ class PocketImport extends AbstractImport if (($i % 20) === 0) { $this->em->flush(); } + ++$i; } $this->em->flush(); - $this->em->clear(); + + if ($this->rabbitMQ) { + foreach ($entries as $entry) { + $this->producer->publish(serialize($entry)); + } + } } } diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index 520d43af..7ea54162 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -1,4 +1,11 @@ services: + wallabag_import.consumer.entry: + class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_core.entry_repository" + - "@wallabag_core.content_proxy" + wallabag_import.chain: class: Wallabag\ImportBundle\Import\ImportChain @@ -18,6 +25,8 @@ services: - "@doctrine.orm.entity_manager" - "@wallabag_core.content_proxy" - "@craue_config" + - %rabbitmq% + - "@old_sound_rabbit_mq.wallabag_producer" calls: - [ setClient, [ "@wallabag_import.pocket.client" ] ] - [ setLogger, [ "@logger" ]] -- cgit v1.2.3