3 namespace Wallabag\ImportBundle\Consumer\AMPQ
;
5 use Doctrine\ORM\EntityManager
;
6 use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface
;
7 use PhpAmqpLib\Message\AMQPMessage
;
8 use Wallabag\ImportBundle\Import\AbstractImport
;
9 use Wallabag\UserBundle\Repository\UserRepository
;
10 use Wallabag\CoreBundle\Entity\Entry
;
11 use Wallabag\CoreBundle\Entity\Tag
;
12 use Psr\Log\LoggerInterface
;
13 use Psr\Log\NullLogger
;
15 class EntryConsumer
implements ConsumerInterface
18 private $userRepository;
22 public function __construct(EntityManager
$em, UserRepository
$userRepository, AbstractImport
$import, LoggerInterface
$logger = null)
25 $this->userRepository
= $userRepository;
26 $this->import
= $import;
27 $this->logger
= $logger ?: new NullLogger();
33 public function execute(AMQPMessage
$msg)
35 $storedEntry = json_decode($msg->body
, true);
37 $user = $this->userRepository
->find($storedEntry['userId']);
39 // no user? Drop message
41 $this->logger
->warning('Unable to retrieve user', ['entry' => $storedEntry]);
46 $this->import
->setUser($user);
48 $entry = $this->import
->parseEntry($storedEntry);
50 if (null === $entry) {
51 $this->logger
->warning('Unable to parse entry', ['entry' => $storedEntry]);
59 // clear only affected entities
60 $this->em
->clear(Entry
::class);
61 $this->em
->clear(Tag
::class);
62 } catch (\Exception
$e) {
63 $this->logger
->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);