]>
Commit | Line | Data |
---|---|---|
ef75e122 JB |
1 | <?php |
2 | ||
3 | namespace Wallabag\ImportBundle\Consumer\AMPQ; | |
4 | ||
5 | use Doctrine\ORM\EntityManager; | |
6 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; | |
7 | use PhpAmqpLib\Message\AMQPMessage; | |
8 | use Wallabag\ImportBundle\Import\PocketImport; | |
9 | use Wallabag\UserBundle\Repository\UserRepository; | |
10 | use Psr\Log\LoggerInterface; | |
11 | use Psr\Log\NullLogger; | |
12 | ||
13 | class PocketConsumer implements ConsumerInterface | |
14 | { | |
15 | private $em; | |
16 | private $userRepository; | |
17 | private $pocketImport; | |
18 | private $logger; | |
19 | ||
20 | public function __construct(EntityManager $em, UserRepository $userRepository, PocketImport $pocketImport, LoggerInterface $logger = null) | |
21 | { | |
22 | $this->em = $em; | |
23 | $this->userRepository = $userRepository; | |
24 | $this->pocketImport = $pocketImport; | |
25 | $this->logger = $logger ?: new NullLogger(); | |
26 | } | |
27 | ||
28 | /** | |
29 | * {@inheritdoc} | |
30 | */ | |
31 | public function execute(AMQPMessage $msg) | |
32 | { | |
33 | $storedEntry = json_decode($msg->body, true); | |
34 | ||
35 | $user = $this->userRepository->find($storedEntry['userId']); | |
36 | ||
37 | // no user? Drop message | |
38 | if (null === $user) { | |
39 | $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); | |
40 | ||
41 | return; | |
42 | } | |
43 | ||
44 | $this->pocketImport->setUser($user); | |
45 | ||
46 | $entry = $this->pocketImport->parseEntry($storedEntry); | |
47 | ||
48 | if (null === $entry) { | |
49 | $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); | |
50 | ||
51 | return; | |
52 | } | |
53 | ||
54 | try { | |
55 | $this->em->flush(); | |
56 | $this->em->clear($entry); | |
57 | } catch (\Exception $e) { | |
58 | $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); | |
59 | ||
60 | return; | |
61 | } | |
62 | } | |
63 | } |