From b3437d58ae224121375c99e9288d8b808524e624 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 9 Sep 2016 21:02:03 +0200 Subject: Enable Redis async import - using javibravo/simpleue - internal config value are now `import_with_redis` & `import_with_rabbit` which are more clear - if both option are enable rabbit will be choosen - services imports related to async are now splitted into 2 files: `redis.yml` & `rabbit.yml` - --- .../ImportBundle/Consumer/RedisEntryConsumer.php | 84 ++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php (limited to 'src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php') diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php new file mode 100644 index 00000000..38665b01 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php @@ -0,0 +1,84 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * Handle one message by one message. + * + * @param string $job Content of the message (directly from Redis) + * + * @return bool + */ + public function manage($job) + { + $storedEntry = json_decode($job, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return false; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return false; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return false; + } + + $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + + return true; + } + + /** + * Should tell if the given job will kill the worker. + * We don't want to stop it :). + */ + public function isStopJob($job) + { + return false; + } +} -- cgit v1.2.3 From 7d862f83b95d24b4f081d73ca7b0bdf4435ae008 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 11 Sep 2016 20:23:17 +0200 Subject: Re-facto EntryConsumer Using an abstract method allow to share code but also can be used it we add a new broker in the future --- .../ImportBundle/Consumer/RedisEntryConsumer.php | 59 +--------------------- 1 file changed, 2 insertions(+), 57 deletions(-) (limited to 'src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php') diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php index 38665b01..450b71ff 100644 --- a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php @@ -3,29 +3,9 @@ namespace Wallabag\ImportBundle\Consumer; use Simpleue\Job\Job; -use Doctrine\ORM\EntityManager; -use Wallabag\ImportBundle\Import\AbstractImport; -use Wallabag\UserBundle\Repository\UserRepository; -use Wallabag\CoreBundle\Entity\Entry; -use Wallabag\CoreBundle\Entity\Tag; -use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; -class RedisEntryConsumer implements Job +class RedisEntryConsumer extends AbstractConsumer implements Job { - private $em; - private $userRepository; - private $import; - private $logger; - - public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) - { - $this->em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - /** * Handle one message by one message. * @@ -35,42 +15,7 @@ class RedisEntryConsumer implements Job */ public function manage($job) { - $storedEntry = json_decode($job, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return false; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return false; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return false; - } - - $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); - - return true; + return $this->handleMessage($job); } /** -- cgit v1.2.3