From 7d862f83b95d24b4f081d73ca7b0bdf4435ae008 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 11 Sep 2016 20:23:17 +0200 Subject: Re-facto EntryConsumer Using an abstract method allow to share code but also can be used it we add a new broker in the future --- .../ImportBundle/Consumer/AMPQEntryConsumer.php | 57 +---------------- .../ImportBundle/Consumer/AbstractConsumer.php | 74 ++++++++++++++++++++++ .../ImportBundle/Consumer/RedisEntryConsumer.php | 59 +---------------- 3 files changed, 78 insertions(+), 112 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php index 39bb5375..d95a011d 100644 --- a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php @@ -2,69 +2,16 @@ namespace Wallabag\ImportBundle\Consumer; -use Doctrine\ORM\EntityManager; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; -use Wallabag\ImportBundle\Import\AbstractImport; -use Wallabag\UserBundle\Repository\UserRepository; -use Wallabag\CoreBundle\Entity\Entry; -use Wallabag\CoreBundle\Entity\Tag; -use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; -class AMPQEntryConsumer implements ConsumerInterface +class AMPQEntryConsumer extends AbstractConsumer implements ConsumerInterface { - private $em; - private $userRepository; - private $import; - private $logger; - - public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) - { - $this->em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - /** * {@inheritdoc} */ public function execute(AMQPMessage $msg) { - $storedEntry = json_decode($msg->body, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return; - } - - $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + return $this->handleMessage($msg->body); } } diff --git a/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php new file mode 100644 index 00000000..2b85ad76 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php @@ -0,0 +1,74 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * Handle a message and save it. + * + * @param string $body Message from the queue (in json) + * + * @return bool + */ + protected function handleMessage($body) + { + $storedEntry = json_decode($body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return false; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return false; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return false; + } + + $this->logger->info('Content with url imported! ('.$entry->getUrl().')'); + + return true; + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php index 38665b01..450b71ff 100644 --- a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php @@ -3,29 +3,9 @@ namespace Wallabag\ImportBundle\Consumer; use Simpleue\Job\Job; -use Doctrine\ORM\EntityManager; -use Wallabag\ImportBundle\Import\AbstractImport; -use Wallabag\UserBundle\Repository\UserRepository; -use Wallabag\CoreBundle\Entity\Entry; -use Wallabag\CoreBundle\Entity\Tag; -use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; -class RedisEntryConsumer implements Job +class RedisEntryConsumer extends AbstractConsumer implements Job { - private $em; - private $userRepository; - private $import; - private $logger; - - public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) - { - $this->em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - /** * Handle one message by one message. * @@ -35,42 +15,7 @@ class RedisEntryConsumer implements Job */ public function manage($job) { - $storedEntry = json_decode($job, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return false; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return false; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return false; - } - - $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); - - return true; + return $this->handleMessage($job); } /** -- cgit v1.2.3