aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php39
-rw-r--r--src/Wallabag/ImportBundle/Import/PocketImport.php35
-rw-r--r--src/Wallabag/ImportBundle/Resources/config/services.yml9
3 files changed, 75 insertions, 8 deletions
diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php
new file mode 100644
index 00000000..7775f01c
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php
@@ -0,0 +1,39 @@
1<?php
2
3namespace Wallabag\ImportBundle\Component\AMPQ;
4
5use Doctrine\ORM\EntityManager;
6use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
7use PhpAmqpLib\Message\AMQPMessage;
8use Wallabag\CoreBundle\Helper\ContentProxy;
9use Wallabag\CoreBundle\Repository\EntryRepository;
10
11class EntryConsumer implements ConsumerInterface
12{
13 private $em;
14 private $contentProxy;
15 private $entryRepository;
16
17 public function __construct(EntityManager $em, EntryRepository $entryRepository, ContentProxy $contentProxy)
18 {
19 $this->em = $em;
20 $this->entryRepository = $entryRepository;
21 $this->contentProxy = $contentProxy;
22 }
23
24 /**
25 * {@inheritdoc}
26 */
27 public function execute(AMQPMessage $msg)
28 {
29 $storedEntry = unserialize($msg->body);
30 $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']);
31 if ($entry) {
32 $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl());
33 if ($entry) {
34 $this->em->persist($entry);
35 $this->em->flush();
36 }
37 }
38 }
39}
diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php
index a6f905b1..b02894f0 100644
--- a/src/Wallabag/ImportBundle/Import/PocketImport.php
+++ b/src/Wallabag/ImportBundle/Import/PocketImport.php
@@ -2,6 +2,8 @@
2 2
3namespace Wallabag\ImportBundle\Import; 3namespace Wallabag\ImportBundle\Import;
4 4
5use OldSound\RabbitMqBundle\RabbitMq\Producer;
6use Psr\Log\LoggerInterface;
5use Psr\Log\NullLogger; 7use Psr\Log\NullLogger;
6use Doctrine\ORM\EntityManager; 8use Doctrine\ORM\EntityManager;
7use GuzzleHttp\Client; 9use GuzzleHttp\Client;
@@ -20,14 +22,18 @@ class PocketImport extends AbstractImport
20 private $importedEntries = 0; 22 private $importedEntries = 0;
21 private $markAsRead; 23 private $markAsRead;
22 protected $accessToken; 24 protected $accessToken;
25 private $producer;
26 private $rabbitMQ;
23 27
24 public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) 28 public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, $rabbitMQ, Producer $producer)
25 { 29 {
26 $this->user = $tokenStorage->getToken()->getUser(); 30 $this->user = $tokenStorage->getToken()->getUser();
27 $this->em = $em; 31 $this->em = $em;
28 $this->contentProxy = $contentProxy; 32 $this->contentProxy = $contentProxy;
29 $this->consumerKey = $craueConfig->get('pocket_consumer_key'); 33 $this->consumerKey = $craueConfig->get('pocket_consumer_key');
30 $this->logger = new NullLogger(); 34 $this->logger = new NullLogger();
35 $this->rabbitMQ = $rabbitMQ;
36 $this->producer = $producer;
31 } 37 }
32 38
33 /** 39 /**
@@ -197,7 +203,7 @@ class PocketImport extends AbstractImport
197 { 203 {
198 $i = 1; 204 $i = 1;
199 205
200 foreach ($entries as $pocketEntry) { 206 foreach ($entries as &$pocketEntry) {
201 $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; 207 $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url'];
202 208
203 $existingEntry = $this->em 209 $existingEntry = $this->em
@@ -210,12 +216,15 @@ class PocketImport extends AbstractImport
210 } 216 }
211 217
212 $entry = new Entry($this->user); 218 $entry = new Entry($this->user);
213 $entry = $this->fetchContent($entry, $url);
214 219
215 // jump to next entry in case of problem while getting content 220 if (!$this->rabbitMQ) {
216 if (false === $entry) { 221 $entry = $this->fetchContent($entry, $url);
217 ++$this->skippedEntries; 222
218 continue; 223 // jump to next entry in case of problem while getting content
224 if (false === $entry) {
225 ++$this->skippedEntries;
226 continue;
227 }
219 } 228 }
220 229
221 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted 230 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
@@ -236,6 +245,7 @@ class PocketImport extends AbstractImport
236 } 245 }
237 246
238 $entry->setTitle($title); 247 $entry->setTitle($title);
248 $entry->setUrl($url);
239 249
240 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image 250 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
241 if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { 251 if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
@@ -249,6 +259,9 @@ class PocketImport extends AbstractImport
249 ); 259 );
250 } 260 }
251 261
262 $pocketEntry['url'] = $url;
263 $pocketEntry['userId'] = $this->user->getId();
264
252 $this->em->persist($entry); 265 $this->em->persist($entry);
253 ++$this->importedEntries; 266 ++$this->importedEntries;
254 267
@@ -256,10 +269,16 @@ class PocketImport extends AbstractImport
256 if (($i % 20) === 0) { 269 if (($i % 20) === 0) {
257 $this->em->flush(); 270 $this->em->flush();
258 } 271 }
272
259 ++$i; 273 ++$i;
260 } 274 }
261 275
262 $this->em->flush(); 276 $this->em->flush();
263 $this->em->clear(); 277
278 if ($this->rabbitMQ) {
279 foreach ($entries as $entry) {
280 $this->producer->publish(serialize($entry));
281 }
282 }
264 } 283 }
265} 284}
diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml
index 520d43af..7ea54162 100644
--- a/src/Wallabag/ImportBundle/Resources/config/services.yml
+++ b/src/Wallabag/ImportBundle/Resources/config/services.yml
@@ -1,4 +1,11 @@
1services: 1services:
2 wallabag_import.consumer.entry:
3 class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer
4 arguments:
5 - "@doctrine.orm.entity_manager"
6 - "@wallabag_core.entry_repository"
7 - "@wallabag_core.content_proxy"
8
2 wallabag_import.chain: 9 wallabag_import.chain:
3 class: Wallabag\ImportBundle\Import\ImportChain 10 class: Wallabag\ImportBundle\Import\ImportChain
4 11
@@ -18,6 +25,8 @@ services:
18 - "@doctrine.orm.entity_manager" 25 - "@doctrine.orm.entity_manager"
19 - "@wallabag_core.content_proxy" 26 - "@wallabag_core.content_proxy"
20 - "@craue_config" 27 - "@craue_config"
28 - %rabbitmq%
29 - "@old_sound_rabbit_mq.wallabag_producer"
21 calls: 30 calls:
22 - [ setClient, [ "@wallabag_import.pocket.client" ] ] 31 - [ setClient, [ "@wallabag_import.pocket.client" ] ]
23 - [ setLogger, [ "@logger" ]] 32 - [ setLogger, [ "@logger" ]]