diff options
author | Jeremy Benoist <jeremy.benoist@gmail.com> | 2016-09-09 21:02:03 +0200 |
---|---|---|
committer | Jeremy Benoist <jeremy.benoist@gmail.com> | 2016-09-11 21:58:56 +0200 |
commit | b3437d58ae224121375c99e9288d8b808524e624 (patch) | |
tree | 94ce3446aed4396ba9304b8c97e421eba35e4edf /src/Wallabag/ImportBundle/Consumer | |
parent | 7f7531171f6e49110b5842f869e37c766a682473 (diff) | |
download | wallabag-b3437d58ae224121375c99e9288d8b808524e624.tar.gz wallabag-b3437d58ae224121375c99e9288d8b808524e624.tar.zst wallabag-b3437d58ae224121375c99e9288d8b808524e624.zip |
Enable Redis async import
- using javibravo/simpleue
- internal config value are now `import_with_redis` & `import_with_rabbit` which are more clear
- if both option are enable rabbit will be choosen
- services imports related to async are now splitted into 2 files: `redis.yml` & `rabbit.yml`
-
Diffstat (limited to 'src/Wallabag/ImportBundle/Consumer')
-rw-r--r-- | src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php (renamed from src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php) | 6 | ||||
-rw-r--r-- | src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php | 84 |
2 files changed, 88 insertions, 2 deletions
diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php index 72a3260a..39bb5375 100644 --- a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php | |||
@@ -1,6 +1,6 @@ | |||
1 | <?php | 1 | <?php |
2 | 2 | ||
3 | namespace Wallabag\ImportBundle\Consumer\AMPQ; | 3 | namespace Wallabag\ImportBundle\Consumer; |
4 | 4 | ||
5 | use Doctrine\ORM\EntityManager; | 5 | use Doctrine\ORM\EntityManager; |
6 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; | 6 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; |
@@ -12,7 +12,7 @@ use Wallabag\CoreBundle\Entity\Tag; | |||
12 | use Psr\Log\LoggerInterface; | 12 | use Psr\Log\LoggerInterface; |
13 | use Psr\Log\NullLogger; | 13 | use Psr\Log\NullLogger; |
14 | 14 | ||
15 | class EntryConsumer implements ConsumerInterface | 15 | class AMPQEntryConsumer implements ConsumerInterface |
16 | { | 16 | { |
17 | private $em; | 17 | private $em; |
18 | private $userRepository; | 18 | private $userRepository; |
@@ -64,5 +64,7 @@ class EntryConsumer implements ConsumerInterface | |||
64 | 64 | ||
65 | return; | 65 | return; |
66 | } | 66 | } |
67 | |||
68 | $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); | ||
67 | } | 69 | } |
68 | } | 70 | } |
diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php new file mode 100644 index 00000000..38665b01 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php | |||
@@ -0,0 +1,84 @@ | |||
1 | <?php | ||
2 | |||
3 | namespace Wallabag\ImportBundle\Consumer; | ||
4 | |||
5 | use Simpleue\Job\Job; | ||
6 | use Doctrine\ORM\EntityManager; | ||
7 | use Wallabag\ImportBundle\Import\AbstractImport; | ||
8 | use Wallabag\UserBundle\Repository\UserRepository; | ||
9 | use Wallabag\CoreBundle\Entity\Entry; | ||
10 | use Wallabag\CoreBundle\Entity\Tag; | ||
11 | use Psr\Log\LoggerInterface; | ||
12 | use Psr\Log\NullLogger; | ||
13 | |||
14 | class RedisEntryConsumer implements Job | ||
15 | { | ||
16 | private $em; | ||
17 | private $userRepository; | ||
18 | private $import; | ||
19 | private $logger; | ||
20 | |||
21 | public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) | ||
22 | { | ||
23 | $this->em = $em; | ||
24 | $this->userRepository = $userRepository; | ||
25 | $this->import = $import; | ||
26 | $this->logger = $logger ?: new NullLogger(); | ||
27 | } | ||
28 | |||
29 | /** | ||
30 | * Handle one message by one message. | ||
31 | * | ||
32 | * @param string $job Content of the message (directly from Redis) | ||
33 | * | ||
34 | * @return bool | ||
35 | */ | ||
36 | public function manage($job) | ||
37 | { | ||
38 | $storedEntry = json_decode($job, true); | ||
39 | |||
40 | $user = $this->userRepository->find($storedEntry['userId']); | ||
41 | |||
42 | // no user? Drop message | ||
43 | if (null === $user) { | ||
44 | $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); | ||
45 | |||
46 | return false; | ||
47 | } | ||
48 | |||
49 | $this->import->setUser($user); | ||
50 | |||
51 | $entry = $this->import->parseEntry($storedEntry); | ||
52 | |||
53 | if (null === $entry) { | ||
54 | $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); | ||
55 | |||
56 | return false; | ||
57 | } | ||
58 | |||
59 | try { | ||
60 | $this->em->flush(); | ||
61 | |||
62 | // clear only affected entities | ||
63 | $this->em->clear(Entry::class); | ||
64 | $this->em->clear(Tag::class); | ||
65 | } catch (\Exception $e) { | ||
66 | $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); | ||
67 | |||
68 | return false; | ||
69 | } | ||
70 | |||
71 | $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); | ||
72 | |||
73 | return true; | ||
74 | } | ||
75 | |||
76 | /** | ||
77 | * Should tell if the given job will kill the worker. | ||
78 | * We don't want to stop it :). | ||
79 | */ | ||
80 | public function isStopJob($job) | ||
81 | { | ||
82 | return false; | ||
83 | } | ||
84 | } | ||