aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/Wallabag/ImportBundle/Consumer
diff options
context:
space:
mode:
Diffstat (limited to 'src/Wallabag/ImportBundle/Consumer')
-rw-r--r--src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php17
-rw-r--r--src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php74
-rw-r--r--src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php29
3 files changed, 120 insertions, 0 deletions
diff --git a/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php
new file mode 100644
index 00000000..5aaa8c03
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php
@@ -0,0 +1,17 @@
1<?php
2
3namespace Wallabag\ImportBundle\Consumer;
4
5use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6use PhpAmqpLib\Message\AMQPMessage;
7
8class AMQPEntryConsumer extends AbstractConsumer implements ConsumerInterface
9{
10 /**
11 * {@inheritdoc}
12 */
13 public function execute(AMQPMessage $msg)
14 {
15 return $this->handleMessage($msg->body);
16 }
17}
diff --git a/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php
new file mode 100644
index 00000000..2b85ad76
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php
@@ -0,0 +1,74 @@
1<?php
2
3namespace Wallabag\ImportBundle\Consumer;
4
5use Doctrine\ORM\EntityManager;
6use Wallabag\ImportBundle\Import\AbstractImport;
7use Wallabag\UserBundle\Repository\UserRepository;
8use Wallabag\CoreBundle\Entity\Entry;
9use Wallabag\CoreBundle\Entity\Tag;
10use Psr\Log\LoggerInterface;
11use Psr\Log\NullLogger;
12
13abstract class AbstractConsumer
14{
15 protected $em;
16 protected $userRepository;
17 protected $import;
18 protected $logger;
19
20 public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null)
21 {
22 $this->em = $em;
23 $this->userRepository = $userRepository;
24 $this->import = $import;
25 $this->logger = $logger ?: new NullLogger();
26 }
27
28 /**
29 * Handle a message and save it.
30 *
31 * @param string $body Message from the queue (in json)
32 *
33 * @return bool
34 */
35 protected function handleMessage($body)
36 {
37 $storedEntry = json_decode($body, true);
38
39 $user = $this->userRepository->find($storedEntry['userId']);
40
41 // no user? Drop message
42 if (null === $user) {
43 $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]);
44
45 return false;
46 }
47
48 $this->import->setUser($user);
49
50 $entry = $this->import->parseEntry($storedEntry);
51
52 if (null === $entry) {
53 $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]);
54
55 return false;
56 }
57
58 try {
59 $this->em->flush();
60
61 // clear only affected entities
62 $this->em->clear(Entry::class);
63 $this->em->clear(Tag::class);
64 } catch (\Exception $e) {
65 $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);
66
67 return false;
68 }
69
70 $this->logger->info('Content with url imported! ('.$entry->getUrl().')');
71
72 return true;
73 }
74}
diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php
new file mode 100644
index 00000000..450b71ff
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php
@@ -0,0 +1,29 @@
1<?php
2
3namespace Wallabag\ImportBundle\Consumer;
4
5use Simpleue\Job\Job;
6
7class RedisEntryConsumer extends AbstractConsumer implements Job
8{
9 /**
10 * Handle one message by one message.
11 *
12 * @param string $job Content of the message (directly from Redis)
13 *
14 * @return bool
15 */
16 public function manage($job)
17 {
18 return $this->handleMessage($job);
19 }
20
21 /**
22 * Should tell if the given job will kill the worker.
23 * We don't want to stop it :).
24 */
25 public function isStopJob($job)
26 {
27 return false;
28 }
29}