diff options
author | Jeremy Benoist <jeremy.benoist@gmail.com> | 2016-09-03 17:36:57 +0200 |
---|---|---|
committer | Jeremy Benoist <jeremy.benoist@gmail.com> | 2016-09-11 21:57:46 +0200 |
commit | ef75e1220ebb76a8df019d946460ad612759f0bb (patch) | |
tree | 100830464851d6834e1e24ccabca1961a862fcd8 /src/Wallabag/ImportBundle/Consumer | |
parent | 87c9995b6c61a9f5cde3771bd4f9d44b5da26c43 (diff) | |
download | wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.gz wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.zst wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.zip |
Send every imported item to the queue
Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format.
Then, the worker retrieve that information, find / create the entry and save it.
Diffstat (limited to 'src/Wallabag/ImportBundle/Consumer')
-rw-r--r-- | src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php | 63 |
1 files changed, 63 insertions, 0 deletions
diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php new file mode 100644 index 00000000..239e7446 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php | |||
@@ -0,0 +1,63 @@ | |||
1 | <?php | ||
2 | |||
3 | namespace Wallabag\ImportBundle\Consumer\AMPQ; | ||
4 | |||
5 | use Doctrine\ORM\EntityManager; | ||
6 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; | ||
7 | use PhpAmqpLib\Message\AMQPMessage; | ||
8 | use Wallabag\ImportBundle\Import\PocketImport; | ||
9 | use Wallabag\UserBundle\Repository\UserRepository; | ||
10 | use Psr\Log\LoggerInterface; | ||
11 | use Psr\Log\NullLogger; | ||
12 | |||
13 | class PocketConsumer implements ConsumerInterface | ||
14 | { | ||
15 | private $em; | ||
16 | private $userRepository; | ||
17 | private $pocketImport; | ||
18 | private $logger; | ||
19 | |||
20 | public function __construct(EntityManager $em, UserRepository $userRepository, PocketImport $pocketImport, LoggerInterface $logger = null) | ||
21 | { | ||
22 | $this->em = $em; | ||
23 | $this->userRepository = $userRepository; | ||
24 | $this->pocketImport = $pocketImport; | ||
25 | $this->logger = $logger ?: new NullLogger(); | ||
26 | } | ||
27 | |||
28 | /** | ||
29 | * {@inheritdoc} | ||
30 | */ | ||
31 | public function execute(AMQPMessage $msg) | ||
32 | { | ||
33 | $storedEntry = json_decode($msg->body, true); | ||
34 | |||
35 | $user = $this->userRepository->find($storedEntry['userId']); | ||
36 | |||
37 | // no user? Drop message | ||
38 | if (null === $user) { | ||
39 | $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); | ||
40 | |||
41 | return; | ||
42 | } | ||
43 | |||
44 | $this->pocketImport->setUser($user); | ||
45 | |||
46 | $entry = $this->pocketImport->parseEntry($storedEntry); | ||
47 | |||
48 | if (null === $entry) { | ||
49 | $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); | ||
50 | |||
51 | return; | ||
52 | } | ||
53 | |||
54 | try { | ||
55 | $this->em->flush(); | ||
56 | $this->em->clear($entry); | ||
57 | } catch (\Exception $e) { | ||
58 | $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); | ||
59 | |||
60 | return; | ||
61 | } | ||
62 | } | ||
63 | } | ||