aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/Wallabag/ImportBundle/Consumer
diff options
context:
space:
mode:
authorJeremy Benoist <jeremy.benoist@gmail.com>2016-09-03 17:36:57 +0200
committerJeremy Benoist <jeremy.benoist@gmail.com>2016-09-11 21:57:46 +0200
commitef75e1220ebb76a8df019d946460ad612759f0bb (patch)
tree100830464851d6834e1e24ccabca1961a862fcd8 /src/Wallabag/ImportBundle/Consumer
parent87c9995b6c61a9f5cde3771bd4f9d44b5da26c43 (diff)
downloadwallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.gz
wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.zst
wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.zip
Send every imported item to the queue
Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format. Then, the worker retrieve that information, find / create the entry and save it.
Diffstat (limited to 'src/Wallabag/ImportBundle/Consumer')
-rw-r--r--src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php63
1 files changed, 63 insertions, 0 deletions
diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php
new file mode 100644
index 00000000..239e7446
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php
@@ -0,0 +1,63 @@
1<?php
2
3namespace Wallabag\ImportBundle\Consumer\AMPQ;
4
5use Doctrine\ORM\EntityManager;
6use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
7use PhpAmqpLib\Message\AMQPMessage;
8use Wallabag\ImportBundle\Import\PocketImport;
9use Wallabag\UserBundle\Repository\UserRepository;
10use Psr\Log\LoggerInterface;
11use Psr\Log\NullLogger;
12
13class PocketConsumer implements ConsumerInterface
14{
15 private $em;
16 private $userRepository;
17 private $pocketImport;
18 private $logger;
19
20 public function __construct(EntityManager $em, UserRepository $userRepository, PocketImport $pocketImport, LoggerInterface $logger = null)
21 {
22 $this->em = $em;
23 $this->userRepository = $userRepository;
24 $this->pocketImport = $pocketImport;
25 $this->logger = $logger ?: new NullLogger();
26 }
27
28 /**
29 * {@inheritdoc}
30 */
31 public function execute(AMQPMessage $msg)
32 {
33 $storedEntry = json_decode($msg->body, true);
34
35 $user = $this->userRepository->find($storedEntry['userId']);
36
37 // no user? Drop message
38 if (null === $user) {
39 $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]);
40
41 return;
42 }
43
44 $this->pocketImport->setUser($user);
45
46 $entry = $this->pocketImport->parseEntry($storedEntry);
47
48 if (null === $entry) {
49 $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]);
50
51 return;
52 }
53
54 try {
55 $this->em->flush();
56 $this->em->clear($entry);
57 } catch (\Exception $e) {
58 $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);
59
60 return;
61 }
62 }
63}