aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/Wallabag/ImportBundle
diff options
context:
space:
mode:
authorJeremy Benoist <jeremy.benoist@gmail.com>2016-09-03 17:36:57 +0200
committerJeremy Benoist <jeremy.benoist@gmail.com>2016-09-11 21:57:46 +0200
commitef75e1220ebb76a8df019d946460ad612759f0bb (patch)
tree100830464851d6834e1e24ccabca1961a862fcd8 /src/Wallabag/ImportBundle
parent87c9995b6c61a9f5cde3771bd4f9d44b5da26c43 (diff)
downloadwallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.gz
wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.zst
wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.zip
Send every imported item to the queue
Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format. Then, the worker retrieve that information, find / create the entry and save it.
Diffstat (limited to 'src/Wallabag/ImportBundle')
-rw-r--r--src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php39
-rw-r--r--src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php63
-rw-r--r--src/Wallabag/ImportBundle/Controller/PocketController.php25
-rw-r--r--src/Wallabag/ImportBundle/Import/PocketImport.php177
-rw-r--r--src/Wallabag/ImportBundle/Resources/config/services.yml11
5 files changed, 201 insertions, 114 deletions
diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php
deleted file mode 100644
index 7775f01c..00000000
--- a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php
+++ /dev/null
@@ -1,39 +0,0 @@
1<?php
2
3namespace Wallabag\ImportBundle\Component\AMPQ;
4
5use Doctrine\ORM\EntityManager;
6use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
7use PhpAmqpLib\Message\AMQPMessage;
8use Wallabag\CoreBundle\Helper\ContentProxy;
9use Wallabag\CoreBundle\Repository\EntryRepository;
10
11class EntryConsumer implements ConsumerInterface
12{
13 private $em;
14 private $contentProxy;
15 private $entryRepository;
16
17 public function __construct(EntityManager $em, EntryRepository $entryRepository, ContentProxy $contentProxy)
18 {
19 $this->em = $em;
20 $this->entryRepository = $entryRepository;
21 $this->contentProxy = $contentProxy;
22 }
23
24 /**
25 * {@inheritdoc}
26 */
27 public function execute(AMQPMessage $msg)
28 {
29 $storedEntry = unserialize($msg->body);
30 $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']);
31 if ($entry) {
32 $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl());
33 if ($entry) {
34 $this->em->persist($entry);
35 $this->em->flush();
36 }
37 }
38 }
39}
diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php
new file mode 100644
index 00000000..239e7446
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php
@@ -0,0 +1,63 @@
1<?php
2
3namespace Wallabag\ImportBundle\Consumer\AMPQ;
4
5use Doctrine\ORM\EntityManager;
6use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
7use PhpAmqpLib\Message\AMQPMessage;
8use Wallabag\ImportBundle\Import\PocketImport;
9use Wallabag\UserBundle\Repository\UserRepository;
10use Psr\Log\LoggerInterface;
11use Psr\Log\NullLogger;
12
13class PocketConsumer implements ConsumerInterface
14{
15 private $em;
16 private $userRepository;
17 private $pocketImport;
18 private $logger;
19
20 public function __construct(EntityManager $em, UserRepository $userRepository, PocketImport $pocketImport, LoggerInterface $logger = null)
21 {
22 $this->em = $em;
23 $this->userRepository = $userRepository;
24 $this->pocketImport = $pocketImport;
25 $this->logger = $logger ?: new NullLogger();
26 }
27
28 /**
29 * {@inheritdoc}
30 */
31 public function execute(AMQPMessage $msg)
32 {
33 $storedEntry = json_decode($msg->body, true);
34
35 $user = $this->userRepository->find($storedEntry['userId']);
36
37 // no user? Drop message
38 if (null === $user) {
39 $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]);
40
41 return;
42 }
43
44 $this->pocketImport->setUser($user);
45
46 $entry = $this->pocketImport->parseEntry($storedEntry);
47
48 if (null === $entry) {
49 $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]);
50
51 return;
52 }
53
54 try {
55 $this->em->flush();
56 $this->em->clear($entry);
57 } catch (\Exception $e) {
58 $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);
59
60 return;
61 }
62 }
63}
diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php
index 36ee25bf..a2dcd8a7 100644
--- a/src/Wallabag/ImportBundle/Controller/PocketController.php
+++ b/src/Wallabag/ImportBundle/Controller/PocketController.php
@@ -11,11 +11,28 @@ use Symfony\Component\Form\Extension\Core\Type\CheckboxType;
11class PocketController extends Controller 11class PocketController extends Controller
12{ 12{
13 /** 13 /**
14 * Return Pocket Import Service with or without RabbitMQ enabled.
15 *
16 * @return \Wallabag\ImportBundle\Import\PocketImport
17 */
18 private function getPocketImportService()
19 {
20 $pocket = $this->get('wallabag_import.pocket.import');
21 $pocket->setUser($this->getUser());
22
23 if ($this->get('craue_config')->get('rabbitmq')) {
24 $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.wallabag_pocket_producer'));
25 }
26
27 return $pocket;
28 }
29
30 /**
14 * @Route("/pocket", name="import_pocket") 31 * @Route("/pocket", name="import_pocket")
15 */ 32 */
16 public function indexAction() 33 public function indexAction()
17 { 34 {
18 $pocket = $this->get('wallabag_import.pocket.import'); 35 $pocket = $this->getPocketImportService();
19 $form = $this->createFormBuilder($pocket) 36 $form = $this->createFormBuilder($pocket)
20 ->add('mark_as_read', CheckboxType::class, [ 37 ->add('mark_as_read', CheckboxType::class, [
21 'label' => 'import.form.mark_as_read_label', 38 'label' => 'import.form.mark_as_read_label',
@@ -24,7 +41,7 @@ class PocketController extends Controller
24 ->getForm(); 41 ->getForm();
25 42
26 return $this->render('WallabagImportBundle:Pocket:index.html.twig', [ 43 return $this->render('WallabagImportBundle:Pocket:index.html.twig', [
27 'import' => $this->get('wallabag_import.pocket.import'), 44 'import' => $this->getPocketImportService(),
28 'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true, 45 'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true,
29 'form' => $form->createView(), 46 'form' => $form->createView(),
30 ]); 47 ]);
@@ -35,7 +52,7 @@ class PocketController extends Controller
35 */ 52 */
36 public function authAction(Request $request) 53 public function authAction(Request $request)
37 { 54 {
38 $requestToken = $this->get('wallabag_import.pocket.import') 55 $requestToken = $this->getPocketImportService()
39 ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL)); 56 ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL));
40 57
41 if (false === $requestToken) { 58 if (false === $requestToken) {
@@ -62,7 +79,7 @@ class PocketController extends Controller
62 public function callbackAction() 79 public function callbackAction()
63 { 80 {
64 $message = 'flashes.import.notice.failed'; 81 $message = 'flashes.import.notice.failed';
65 $pocket = $this->get('wallabag_import.pocket.import'); 82 $pocket = $this->getPocketImportService();
66 83
67 $markAsRead = $this->get('session')->get('mark_as_read'); 84 $markAsRead = $this->get('session')->get('mark_as_read');
68 $this->get('session')->remove('mark_as_read'); 85 $this->get('session')->remove('mark_as_read');
diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php
index 7d1c0c61..27df4917 100644
--- a/src/Wallabag/ImportBundle/Import/PocketImport.php
+++ b/src/Wallabag/ImportBundle/Import/PocketImport.php
@@ -3,12 +3,11 @@
3namespace Wallabag\ImportBundle\Import; 3namespace Wallabag\ImportBundle\Import;
4 4
5use OldSound\RabbitMqBundle\RabbitMq\Producer; 5use OldSound\RabbitMqBundle\RabbitMq\Producer;
6use Psr\Log\LoggerInterface;
7use Psr\Log\NullLogger; 6use Psr\Log\NullLogger;
8use Doctrine\ORM\EntityManager; 7use Doctrine\ORM\EntityManager;
9use GuzzleHttp\Client; 8use GuzzleHttp\Client;
10use GuzzleHttp\Exception\RequestException; 9use GuzzleHttp\Exception\RequestException;
11use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; 10use Symfony\Component\Security\Core\User\UserInterface;
12use Wallabag\CoreBundle\Entity\Entry; 11use Wallabag\CoreBundle\Entity\Entry;
13use Wallabag\CoreBundle\Helper\ContentProxy; 12use Wallabag\CoreBundle\Helper\ContentProxy;
14use Craue\ConfigBundle\Util\Config; 13use Craue\ConfigBundle\Util\Config;
@@ -21,22 +20,40 @@ class PocketImport extends AbstractImport
21 private $skippedEntries = 0; 20 private $skippedEntries = 0;
22 private $importedEntries = 0; 21 private $importedEntries = 0;
23 private $markAsRead; 22 private $markAsRead;
24 protected $accessToken;
25 private $producer; 23 private $producer;
26 private $rabbitMQ; 24 protected $accessToken;
27 25
28 public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, Producer $producer) 26 public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig)
29 { 27 {
30 $this->user = $tokenStorage->getToken()->getUser();
31 $this->em = $em; 28 $this->em = $em;
32 $this->contentProxy = $contentProxy; 29 $this->contentProxy = $contentProxy;
33 $this->consumerKey = $craueConfig->get('pocket_consumer_key'); 30 $this->consumerKey = $craueConfig->get('pocket_consumer_key');
34 $this->logger = new NullLogger(); 31 $this->logger = new NullLogger();
35 $this->rabbitMQ = $craueConfig->get('rabbitmq'); 32 }
33
34 /**
35 * Set RabbitMQ Producer to send each entry to a queue.
36 * This method should be called when user has enabled RabbitMQ.
37 *
38 * @param Producer $producer
39 */
40 public function setRabbitmqProducer(Producer $producer)
41 {
36 $this->producer = $producer; 42 $this->producer = $producer;
37 } 43 }
38 44
39 /** 45 /**
46 * Set current user.
47 * Could the current *connected* user or one retrieve by the consumer.
48 *
49 * @param UserInterface $user
50 */
51 public function setUser(UserInterface $user)
52 {
53 $this->user = $user;
54 }
55
56 /**
40 * {@inheritdoc} 57 * {@inheritdoc}
41 */ 58 */
42 public function getName() 59 public function getName()
@@ -168,6 +185,12 @@ class PocketImport extends AbstractImport
168 185
169 $entries = $response->json(); 186 $entries = $response->json();
170 187
188 if ($this->producer) {
189 $this->parseEntriesForProducer($entries['list']);
190
191 return true;
192 }
193
171 $this->parseEntries($entries['list']); 194 $this->parseEntries($entries['list']);
172 195
173 return true; 196 return true;
@@ -197,88 +220,112 @@ class PocketImport extends AbstractImport
197 /** 220 /**
198 * @see https://getpocket.com/developer/docs/v3/retrieve 221 * @see https://getpocket.com/developer/docs/v3/retrieve
199 * 222 *
200 * @param $entries 223 * @param array $entries
201 */ 224 */
202 private function parseEntries($entries) 225 private function parseEntries(array $entries)
203 { 226 {
204 $i = 1; 227 $i = 1;
205 228
206 foreach ($entries as &$pocketEntry) { 229 foreach ($entries as $pocketEntry) {
207 $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; 230 $entry = $this->parseEntry($pocketEntry);
208
209 $existingEntry = $this->em
210 ->getRepository('WallabagCoreBundle:Entry')
211 ->findByUrlAndUserId($url, $this->user->getId());
212 231
213 if (false !== $existingEntry) { 232 if (null === $entry) {
214 ++$this->skippedEntries;
215 continue; 233 continue;
216 } 234 }
217 235
218 $entry = new Entry($this->user); 236 // flush every 20 entries
237 if (($i % 20) === 0) {
238 $this->em->flush();
239 $this->em->clear($entry);
240 }
219 241
220 if (!$this->rabbitMQ) { 242 ++$i;
221 $entry = $this->fetchContent($entry, $url); 243 }
222 244
223 // jump to next entry in case of problem while getting content 245 $this->em->flush();
224 if (false === $entry) { 246 }
225 ++$this->skippedEntries;
226 continue;
227 }
228 }
229 247
230 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted 248 public function parseEntry(array $pocketEntry)
231 if ($pocketEntry['status'] == 1 || $this->markAsRead) { 249 {
232 $entry->setArchived(true); 250 $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url'];
233 }
234 251
235 // 0 or 1 - 1 If the item is starred 252 $existingEntry = $this->em
236 if ($pocketEntry['favorite'] == 1) { 253 ->getRepository('WallabagCoreBundle:Entry')
237 $entry->setStarred(true); 254 ->findByUrlAndUserId($url, $this->user->getId());
238 }
239 255
240 $title = 'Untitled'; 256 if (false !== $existingEntry) {
241 if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { 257 ++$this->skippedEntries;
242 $title = $pocketEntry['resolved_title'];
243 } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
244 $title = $pocketEntry['given_title'];
245 }
246 258
247 $entry->setTitle($title); 259 return;
248 $entry->setUrl($url); 260 }
249 261
250 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image 262 $entry = new Entry($this->user);
251 if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { 263 $entry = $this->fetchContent($entry, $url);
252 $entry->setPreviewPicture($pocketEntry['images'][1]['src']);
253 }
254 264
255 if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { 265 // jump to next entry in case of problem while getting content
256 $this->contentProxy->assignTagsToEntry( 266 if (false === $entry) {
257 $entry, 267 ++$this->skippedEntries;
258 array_keys($pocketEntry['tags'])
259 );
260 }
261 268
262 $pocketEntry['url'] = $url; 269 return;
263 $pocketEntry['userId'] = $this->user->getId(); 270 }
264 271
265 $this->em->persist($entry); 272 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
266 ++$this->importedEntries; 273 if ($pocketEntry['status'] == 1 || $this->markAsRead) {
274 $entry->setArchived(true);
275 }
267 276
268 // flush every 20 entries 277 // 0 or 1 - 1 If the item is starred
269 if (($i % 20) === 0) { 278 if ($pocketEntry['favorite'] == 1) {
270 $this->em->flush(); 279 $entry->setStarred(true);
271 } 280 }
272 281
273 ++$i; 282 $title = 'Untitled';
283 if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') {
284 $title = $pocketEntry['resolved_title'];
285 } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
286 $title = $pocketEntry['given_title'];
274 } 287 }
275 288
276 $this->em->flush(); 289 $entry->setTitle($title);
290 $entry->setUrl($url);
291
292 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
293 if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
294 $entry->setPreviewPicture($pocketEntry['images'][1]['src']);
295 }
296
297 if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) {
298 $this->contentProxy->assignTagsToEntry(
299 $entry,
300 array_keys($pocketEntry['tags'])
301 );
302 }
303
304 $this->em->persist($entry);
305 ++$this->importedEntries;
277 306
278 if ($this->rabbitMQ) { 307 return $entry;
279 foreach ($entries as $entry) { 308 }
280 $this->producer->publish(serialize($entry)); 309
310 /**
311 * Faster parse entries for Producer.
312 * We don't care to make check at this time. They'll be done by the consumer.
313 *
314 * @param array $entries
315 */
316 public function parseEntriesForProducer($entries)
317 {
318 foreach ($entries as $pocketEntry) {
319 // set userId for the producer (it won't know which user is connected)
320 $pocketEntry['userId'] = $this->user->getId();
321
322 if ($this->markAsRead) {
323 $pocketEntry['status'] = 1;
281 } 324 }
325
326 ++$this->importedEntries;
327
328 $this->producer->publish(json_encode($pocketEntry));
282 } 329 }
283 } 330 }
284} 331}
diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml
index 60eb4e18..fe388b26 100644
--- a/src/Wallabag/ImportBundle/Resources/config/services.yml
+++ b/src/Wallabag/ImportBundle/Resources/config/services.yml
@@ -1,10 +1,11 @@
1services: 1services:
2 wallabag_import.consumer.entry: 2 wallabag_import.consumer.pocket:
3 class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer 3 class: Wallabag\ImportBundle\Consumer\AMPQ\PocketConsumer
4 arguments: 4 arguments:
5 - "@doctrine.orm.entity_manager" 5 - "@doctrine.orm.entity_manager"
6 - "@wallabag_core.entry_repository" 6 - "@wallabag_user.user_repository"
7 - "@wallabag_core.content_proxy" 7 - "@wallabag_import.pocket.import"
8 - "@logger"
8 9
9 wallabag_import.chain: 10 wallabag_import.chain:
10 class: Wallabag\ImportBundle\Import\ImportChain 11 class: Wallabag\ImportBundle\Import\ImportChain
@@ -21,11 +22,9 @@ services:
21 wallabag_import.pocket.import: 22 wallabag_import.pocket.import:
22 class: Wallabag\ImportBundle\Import\PocketImport 23 class: Wallabag\ImportBundle\Import\PocketImport
23 arguments: 24 arguments:
24 - "@security.token_storage"
25 - "@doctrine.orm.entity_manager" 25 - "@doctrine.orm.entity_manager"
26 - "@wallabag_core.content_proxy" 26 - "@wallabag_core.content_proxy"
27 - "@craue_config" 27 - "@craue_config"
28 - "@old_sound_rabbit_mq.wallabag_producer"
29 calls: 28 calls:
30 - [ setClient, [ "@wallabag_import.pocket.client" ] ] 29 - [ setClient, [ "@wallabag_import.pocket.client" ] ]
31 - [ setLogger, [ "@logger" ]] 30 - [ setLogger, [ "@logger" ]]