From ef75e1220ebb76a8df019d946460ad612759f0bb Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sat, 3 Sep 2016 17:36:57 +0200 Subject: Send every imported item to the queue Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format. Then, the worker retrieve that information, find / create the entry and save it. --- .../ImportBundle/Consumer/AMPQ/PocketConsumer.php | 63 ++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php (limited to 'src/Wallabag/ImportBundle/Consumer') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php new file mode 100644 index 00000000..239e7446 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php @@ -0,0 +1,63 @@ +em = $em; + $this->userRepository = $userRepository; + $this->pocketImport = $pocketImport; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->pocketImport->setUser($user); + + $entry = $this->pocketImport->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + $this->em->clear($entry); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + } +} -- cgit v1.2.3 From c98db1b653b5dc8b701422190b02d9fbf10c4e68 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 4 Sep 2016 21:49:21 +0200 Subject: Convert other imports to Rabbit --- .../ImportBundle/Consumer/AMPQ/EntryConsumer.php | 63 ++++++++++++++++++++++ .../ImportBundle/Consumer/AMPQ/PocketConsumer.php | 63 ---------------------- 2 files changed, 63 insertions(+), 63 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php delete mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php (limited to 'src/Wallabag/ImportBundle/Consumer') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php new file mode 100644 index 00000000..8a8cf45d --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php @@ -0,0 +1,63 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + $this->em->clear($entry); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php deleted file mode 100644 index 239e7446..00000000 --- a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php +++ /dev/null @@ -1,63 +0,0 @@ -em = $em; - $this->userRepository = $userRepository; - $this->pocketImport = $pocketImport; - $this->logger = $logger ?: new NullLogger(); - } - - /** - * {@inheritdoc} - */ - public function execute(AMQPMessage $msg) - { - $storedEntry = json_decode($msg->body, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return; - } - - $this->pocketImport->setUser($user); - - $entry = $this->pocketImport->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return; - } - - try { - $this->em->flush(); - $this->em->clear($entry); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return; - } - } -} -- cgit v1.2.3 From 8664069e1aa2fa89e17587308a03f2720c20327a Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 9 Sep 2016 10:12:25 +0200 Subject: Fix DateTime & clear() --- src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'src/Wallabag/ImportBundle/Consumer') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php index 8a8cf45d..72a3260a 100644 --- a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php @@ -7,6 +7,8 @@ use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; use Wallabag\ImportBundle\Import\AbstractImport; use Wallabag\UserBundle\Repository\UserRepository; +use Wallabag\CoreBundle\Entity\Entry; +use Wallabag\CoreBundle\Entity\Tag; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -53,7 +55,10 @@ class EntryConsumer implements ConsumerInterface try { $this->em->flush(); - $this->em->clear($entry); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); } catch (\Exception $e) { $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); -- cgit v1.2.3 From b3437d58ae224121375c99e9288d8b808524e624 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 9 Sep 2016 21:02:03 +0200 Subject: Enable Redis async import - using javibravo/simpleue - internal config value are now `import_with_redis` & `import_with_rabbit` which are more clear - if both option are enable rabbit will be choosen - services imports related to async are now splitted into 2 files: `redis.yml` & `rabbit.yml` - --- .../ImportBundle/Consumer/AMPQ/EntryConsumer.php | 68 ------------------ .../ImportBundle/Consumer/AMPQEntryConsumer.php | 70 ++++++++++++++++++ .../ImportBundle/Consumer/RedisEntryConsumer.php | 84 ++++++++++++++++++++++ 3 files changed, 154 insertions(+), 68 deletions(-) delete mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php (limited to 'src/Wallabag/ImportBundle/Consumer') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php deleted file mode 100644 index 72a3260a..00000000 --- a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php +++ /dev/null @@ -1,68 +0,0 @@ -em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - - /** - * {@inheritdoc} - */ - public function execute(AMQPMessage $msg) - { - $storedEntry = json_decode($msg->body, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return; - } - } -} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php new file mode 100644 index 00000000..39bb5375 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php @@ -0,0 +1,70 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + + $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php new file mode 100644 index 00000000..38665b01 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php @@ -0,0 +1,84 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * Handle one message by one message. + * + * @param string $job Content of the message (directly from Redis) + * + * @return bool + */ + public function manage($job) + { + $storedEntry = json_decode($job, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return false; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return false; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return false; + } + + $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + + return true; + } + + /** + * Should tell if the given job will kill the worker. + * We don't want to stop it :). + */ + public function isStopJob($job) + { + return false; + } +} -- cgit v1.2.3 From 7d862f83b95d24b4f081d73ca7b0bdf4435ae008 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 11 Sep 2016 20:23:17 +0200 Subject: Re-facto EntryConsumer Using an abstract method allow to share code but also can be used it we add a new broker in the future --- .../ImportBundle/Consumer/AMPQEntryConsumer.php | 57 +---------------- .../ImportBundle/Consumer/AbstractConsumer.php | 74 ++++++++++++++++++++++ .../ImportBundle/Consumer/RedisEntryConsumer.php | 59 +---------------- 3 files changed, 78 insertions(+), 112 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php (limited to 'src/Wallabag/ImportBundle/Consumer') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php index 39bb5375..d95a011d 100644 --- a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php @@ -2,69 +2,16 @@ namespace Wallabag\ImportBundle\Consumer; -use Doctrine\ORM\EntityManager; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; -use Wallabag\ImportBundle\Import\AbstractImport; -use Wallabag\UserBundle\Repository\UserRepository; -use Wallabag\CoreBundle\Entity\Entry; -use Wallabag\CoreBundle\Entity\Tag; -use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; -class AMPQEntryConsumer implements ConsumerInterface +class AMPQEntryConsumer extends AbstractConsumer implements ConsumerInterface { - private $em; - private $userRepository; - private $import; - private $logger; - - public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) - { - $this->em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - /** * {@inheritdoc} */ public function execute(AMQPMessage $msg) { - $storedEntry = json_decode($msg->body, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return; - } - - $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + return $this->handleMessage($msg->body); } } diff --git a/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php new file mode 100644 index 00000000..2b85ad76 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php @@ -0,0 +1,74 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * Handle a message and save it. + * + * @param string $body Message from the queue (in json) + * + * @return bool + */ + protected function handleMessage($body) + { + $storedEntry = json_decode($body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return false; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return false; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return false; + } + + $this->logger->info('Content with url imported! ('.$entry->getUrl().')'); + + return true; + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php index 38665b01..450b71ff 100644 --- a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php @@ -3,29 +3,9 @@ namespace Wallabag\ImportBundle\Consumer; use Simpleue\Job\Job; -use Doctrine\ORM\EntityManager; -use Wallabag\ImportBundle\Import\AbstractImport; -use Wallabag\UserBundle\Repository\UserRepository; -use Wallabag\CoreBundle\Entity\Entry; -use Wallabag\CoreBundle\Entity\Tag; -use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; -class RedisEntryConsumer implements Job +class RedisEntryConsumer extends AbstractConsumer implements Job { - private $em; - private $userRepository; - private $import; - private $logger; - - public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) - { - $this->em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - /** * Handle one message by one message. * @@ -35,42 +15,7 @@ class RedisEntryConsumer implements Job */ public function manage($job) { - $storedEntry = json_decode($job, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return false; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return false; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return false; - } - - $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); - - return true; + return $this->handleMessage($job); } /** -- cgit v1.2.3 From ac87e0db2ac5db90f1b0639a2d31c7098b4eaa20 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Wed, 14 Sep 2016 10:17:22 +0200 Subject: AMPQ -> AMQP --- .../ImportBundle/Consumer/AMPQEntryConsumer.php | 17 ----------------- .../ImportBundle/Consumer/AMQPEntryConsumer.php | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 17 deletions(-) delete mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php (limited to 'src/Wallabag/ImportBundle/Consumer') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php deleted file mode 100644 index d95a011d..00000000 --- a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php +++ /dev/null @@ -1,17 +0,0 @@ -handleMessage($msg->body); - } -} diff --git a/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php new file mode 100644 index 00000000..5aaa8c03 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php @@ -0,0 +1,17 @@ +handleMessage($msg->body); + } +} -- cgit v1.2.3