3 namespace Wallabag\ImportBundle\Consumer
;
5 use Doctrine\ORM\EntityManager
;
6 use Wallabag\ImportBundle\Import\AbstractImport
;
7 use Wallabag\UserBundle\Repository\UserRepository
;
8 use Wallabag\CoreBundle\Entity\Entry
;
9 use Wallabag\CoreBundle\Entity\Tag
;
10 use Psr\Log\LoggerInterface
;
11 use Psr\Log\NullLogger
;
12 use Symfony\Component\EventDispatcher\EventDispatcherInterface
;
13 use Wallabag\CoreBundle\Event\EntrySavedEvent
;
15 abstract class AbstractConsumer
18 protected $userRepository;
20 protected $eventDispatcher;
23 public function __construct(EntityManager
$em, UserRepository
$userRepository, AbstractImport
$import, EventDispatcherInterface
$eventDispatcher, LoggerInterface
$logger = null)
26 $this->userRepository
= $userRepository;
27 $this->import
= $import;
28 $this->eventDispatcher
= $eventDispatcher;
29 $this->logger
= $logger ?: new NullLogger();
33 * Handle a message and save it.
35 * @param string $body Message from the queue (in json)
39 protected function handleMessage($body)
41 $storedEntry = json_decode($body, true);
43 $user = $this->userRepository
->find($storedEntry['userId']);
45 // no user? Drop message
47 $this->logger
->warning('Unable to retrieve user', ['entry' => $storedEntry]);
49 // return true to skip message
53 $this->import
->setUser($user);
55 $entry = $this->import
->parseEntry($storedEntry);
57 if (null === $entry) {
58 $this->logger
->warning('Entry already exists', ['entry' => $storedEntry]);
60 // return true to skip message
67 // entry saved, dispatch event about it!
68 $this->eventDispatcher
->dispatch(EntrySavedEvent
::NAME
, new EntrySavedEvent($entry));
70 // clear only affected entities
71 $this->em
->clear(Entry
::class);
72 $this->em
->clear(Tag
::class);
73 } catch (\Exception
$e) {
74 $this->logger
->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);
79 $this->logger
->info('Content with url imported! ('.$entry->getUrl().')');