diff options
author | Jeremy Benoist <jeremy.benoist@gmail.com> | 2016-09-11 20:23:17 +0200 |
---|---|---|
committer | Jeremy Benoist <jeremy.benoist@gmail.com> | 2016-09-11 22:15:31 +0200 |
commit | 7d862f83b95d24b4f081d73ca7b0bdf4435ae008 (patch) | |
tree | 334383a1fdbd651247116cd54cb9d85e6300fc19 /src/Wallabag/ImportBundle/Consumer | |
parent | dc69e25f97c357fdfdff5225f4f65cc55a6770b0 (diff) | |
download | wallabag-7d862f83b95d24b4f081d73ca7b0bdf4435ae008.tar.gz wallabag-7d862f83b95d24b4f081d73ca7b0bdf4435ae008.tar.zst wallabag-7d862f83b95d24b4f081d73ca7b0bdf4435ae008.zip |
Re-facto EntryConsumer
Using an abstract method allow to share code but also can be used it we add a new broker in the future
Diffstat (limited to 'src/Wallabag/ImportBundle/Consumer')
3 files changed, 78 insertions, 112 deletions
diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php index 39bb5375..d95a011d 100644 --- a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php | |||
@@ -2,69 +2,16 @@ | |||
2 | 2 | ||
3 | namespace Wallabag\ImportBundle\Consumer; | 3 | namespace Wallabag\ImportBundle\Consumer; |
4 | 4 | ||
5 | use Doctrine\ORM\EntityManager; | ||
6 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; | 5 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; |
7 | use PhpAmqpLib\Message\AMQPMessage; | 6 | use PhpAmqpLib\Message\AMQPMessage; |
8 | use Wallabag\ImportBundle\Import\AbstractImport; | ||
9 | use Wallabag\UserBundle\Repository\UserRepository; | ||
10 | use Wallabag\CoreBundle\Entity\Entry; | ||
11 | use Wallabag\CoreBundle\Entity\Tag; | ||
12 | use Psr\Log\LoggerInterface; | ||
13 | use Psr\Log\NullLogger; | ||
14 | 7 | ||
15 | class AMPQEntryConsumer implements ConsumerInterface | 8 | class AMPQEntryConsumer extends AbstractConsumer implements ConsumerInterface |
16 | { | 9 | { |
17 | private $em; | ||
18 | private $userRepository; | ||
19 | private $import; | ||
20 | private $logger; | ||
21 | |||
22 | public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) | ||
23 | { | ||
24 | $this->em = $em; | ||
25 | $this->userRepository = $userRepository; | ||
26 | $this->import = $import; | ||
27 | $this->logger = $logger ?: new NullLogger(); | ||
28 | } | ||
29 | |||
30 | /** | 10 | /** |
31 | * {@inheritdoc} | 11 | * {@inheritdoc} |
32 | */ | 12 | */ |
33 | public function execute(AMQPMessage $msg) | 13 | public function execute(AMQPMessage $msg) |
34 | { | 14 | { |
35 | $storedEntry = json_decode($msg->body, true); | 15 | return $this->handleMessage($msg->body); |
36 | |||
37 | $user = $this->userRepository->find($storedEntry['userId']); | ||
38 | |||
39 | // no user? Drop message | ||
40 | if (null === $user) { | ||
41 | $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); | ||
42 | |||
43 | return; | ||
44 | } | ||
45 | |||
46 | $this->import->setUser($user); | ||
47 | |||
48 | $entry = $this->import->parseEntry($storedEntry); | ||
49 | |||
50 | if (null === $entry) { | ||
51 | $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); | ||
52 | |||
53 | return; | ||
54 | } | ||
55 | |||
56 | try { | ||
57 | $this->em->flush(); | ||
58 | |||
59 | // clear only affected entities | ||
60 | $this->em->clear(Entry::class); | ||
61 | $this->em->clear(Tag::class); | ||
62 | } catch (\Exception $e) { | ||
63 | $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); | ||
64 | |||
65 | return; | ||
66 | } | ||
67 | |||
68 | $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); | ||
69 | } | 16 | } |
70 | } | 17 | } |
diff --git a/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php new file mode 100644 index 00000000..2b85ad76 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php | |||
@@ -0,0 +1,74 @@ | |||
1 | <?php | ||
2 | |||
3 | namespace Wallabag\ImportBundle\Consumer; | ||
4 | |||
5 | use Doctrine\ORM\EntityManager; | ||
6 | use Wallabag\ImportBundle\Import\AbstractImport; | ||
7 | use Wallabag\UserBundle\Repository\UserRepository; | ||
8 | use Wallabag\CoreBundle\Entity\Entry; | ||
9 | use Wallabag\CoreBundle\Entity\Tag; | ||
10 | use Psr\Log\LoggerInterface; | ||
11 | use Psr\Log\NullLogger; | ||
12 | |||
13 | abstract class AbstractConsumer | ||
14 | { | ||
15 | protected $em; | ||
16 | protected $userRepository; | ||
17 | protected $import; | ||
18 | protected $logger; | ||
19 | |||
20 | public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) | ||
21 | { | ||
22 | $this->em = $em; | ||
23 | $this->userRepository = $userRepository; | ||
24 | $this->import = $import; | ||
25 | $this->logger = $logger ?: new NullLogger(); | ||
26 | } | ||
27 | |||
28 | /** | ||
29 | * Handle a message and save it. | ||
30 | * | ||
31 | * @param string $body Message from the queue (in json) | ||
32 | * | ||
33 | * @return bool | ||
34 | */ | ||
35 | protected function handleMessage($body) | ||
36 | { | ||
37 | $storedEntry = json_decode($body, true); | ||
38 | |||
39 | $user = $this->userRepository->find($storedEntry['userId']); | ||
40 | |||
41 | // no user? Drop message | ||
42 | if (null === $user) { | ||
43 | $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); | ||
44 | |||
45 | return false; | ||
46 | } | ||
47 | |||
48 | $this->import->setUser($user); | ||
49 | |||
50 | $entry = $this->import->parseEntry($storedEntry); | ||
51 | |||
52 | if (null === $entry) { | ||
53 | $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); | ||
54 | |||
55 | return false; | ||
56 | } | ||
57 | |||
58 | try { | ||
59 | $this->em->flush(); | ||
60 | |||
61 | // clear only affected entities | ||
62 | $this->em->clear(Entry::class); | ||
63 | $this->em->clear(Tag::class); | ||
64 | } catch (\Exception $e) { | ||
65 | $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); | ||
66 | |||
67 | return false; | ||
68 | } | ||
69 | |||
70 | $this->logger->info('Content with url imported! ('.$entry->getUrl().')'); | ||
71 | |||
72 | return true; | ||
73 | } | ||
74 | } | ||
diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php index 38665b01..450b71ff 100644 --- a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php | |||
@@ -3,29 +3,9 @@ | |||
3 | namespace Wallabag\ImportBundle\Consumer; | 3 | namespace Wallabag\ImportBundle\Consumer; |
4 | 4 | ||
5 | use Simpleue\Job\Job; | 5 | use Simpleue\Job\Job; |
6 | use Doctrine\ORM\EntityManager; | ||
7 | use Wallabag\ImportBundle\Import\AbstractImport; | ||
8 | use Wallabag\UserBundle\Repository\UserRepository; | ||
9 | use Wallabag\CoreBundle\Entity\Entry; | ||
10 | use Wallabag\CoreBundle\Entity\Tag; | ||
11 | use Psr\Log\LoggerInterface; | ||
12 | use Psr\Log\NullLogger; | ||
13 | 6 | ||
14 | class RedisEntryConsumer implements Job | 7 | class RedisEntryConsumer extends AbstractConsumer implements Job |
15 | { | 8 | { |
16 | private $em; | ||
17 | private $userRepository; | ||
18 | private $import; | ||
19 | private $logger; | ||
20 | |||
21 | public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) | ||
22 | { | ||
23 | $this->em = $em; | ||
24 | $this->userRepository = $userRepository; | ||
25 | $this->import = $import; | ||
26 | $this->logger = $logger ?: new NullLogger(); | ||
27 | } | ||
28 | |||
29 | /** | 9 | /** |
30 | * Handle one message by one message. | 10 | * Handle one message by one message. |
31 | * | 11 | * |
@@ -35,42 +15,7 @@ class RedisEntryConsumer implements Job | |||
35 | */ | 15 | */ |
36 | public function manage($job) | 16 | public function manage($job) |
37 | { | 17 | { |
38 | $storedEntry = json_decode($job, true); | 18 | return $this->handleMessage($job); |
39 | |||
40 | $user = $this->userRepository->find($storedEntry['userId']); | ||
41 | |||
42 | // no user? Drop message | ||
43 | if (null === $user) { | ||
44 | $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); | ||
45 | |||
46 | return false; | ||
47 | } | ||
48 | |||
49 | $this->import->setUser($user); | ||
50 | |||
51 | $entry = $this->import->parseEntry($storedEntry); | ||
52 | |||
53 | if (null === $entry) { | ||
54 | $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); | ||
55 | |||
56 | return false; | ||
57 | } | ||
58 | |||
59 | try { | ||
60 | $this->em->flush(); | ||
61 | |||
62 | // clear only affected entities | ||
63 | $this->em->clear(Entry::class); | ||
64 | $this->em->clear(Tag::class); | ||
65 | } catch (\Exception $e) { | ||
66 | $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); | ||
67 | |||
68 | return false; | ||
69 | } | ||
70 | |||
71 | $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); | ||
72 | |||
73 | return true; | ||
74 | } | 19 | } |
75 | 20 | ||
76 | /** | 21 | /** |