3 namespace Wallabag\ImportBundle\Consumer\AMPQ
;
5 use Doctrine\ORM\EntityManager
;
6 use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface
;
7 use PhpAmqpLib\Message\AMQPMessage
;
8 use Wallabag\ImportBundle\Import\AbstractImport
;
9 use Wallabag\UserBundle\Repository\UserRepository
;
10 use Psr\Log\LoggerInterface
;
11 use Psr\Log\NullLogger
;
13 class EntryConsumer
implements ConsumerInterface
16 private $userRepository;
20 public function __construct(EntityManager
$em, UserRepository
$userRepository, AbstractImport
$import, LoggerInterface
$logger = null)
23 $this->userRepository
= $userRepository;
24 $this->import
= $import;
25 $this->logger
= $logger ?: new NullLogger();
31 public function execute(AMQPMessage
$msg)
33 $storedEntry = json_decode($msg->body
, true);
35 $user = $this->userRepository
->find($storedEntry['userId']);
37 // no user? Drop message
39 $this->logger
->warning('Unable to retrieve user', ['entry' => $storedEntry]);
44 $this->import
->setUser($user);
46 $entry = $this->import
->parseEntry($storedEntry);
48 if (null === $entry) {
49 $this->logger
->warning('Unable to parse entry', ['entry' => $storedEntry]);
56 $this->em
->clear($entry);
57 } catch (\Exception
$e) {
58 $this->logger
->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);