From ef75e1220ebb76a8df019d946460ad612759f0bb Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sat, 3 Sep 2016 17:36:57 +0200 Subject: Send every imported item to the queue Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format. Then, the worker retrieve that information, find / create the entry and save it. --- .../ImportBundle/Consumer/AMPQ/PocketConsumer.php | 63 ++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php (limited to 'src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php new file mode 100644 index 00000000..239e7446 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php @@ -0,0 +1,63 @@ +em = $em; + $this->userRepository = $userRepository; + $this->pocketImport = $pocketImport; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->pocketImport->setUser($user); + + $entry = $this->pocketImport->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + $this->em->clear($entry); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + } +} -- cgit v1.2.3