From 56c778b4152a1b886353933276ee3626e4e8c004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20L=C5=93uillet?= Date: Fri, 15 Jan 2016 08:24:32 +0100 Subject: 1st draft for rabbitMQ --- .../ImportBundle/Component/AMPQ/EntryConsumer.php | 39 ++++++++++++++++++++++ src/Wallabag/ImportBundle/Import/PocketImport.php | 35 ++++++++++++++----- .../ImportBundle/Resources/config/services.yml | 9 +++++ 3 files changed, 75 insertions(+), 8 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php new file mode 100644 index 00000000..7775f01c --- /dev/null +++ b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php @@ -0,0 +1,39 @@ +em = $em; + $this->entryRepository = $entryRepository; + $this->contentProxy = $contentProxy; + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = unserialize($msg->body); + $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']); + if ($entry) { + $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl()); + if ($entry) { + $this->em->persist($entry); + $this->em->flush(); + } + } + } +} diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index a6f905b1..b02894f0 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -2,6 +2,8 @@ namespace Wallabag\ImportBundle\Import; +use OldSound\RabbitMqBundle\RabbitMq\Producer; +use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use GuzzleHttp\Client; @@ -20,14 +22,18 @@ class PocketImport extends AbstractImport private $importedEntries = 0; private $markAsRead; protected $accessToken; + private $producer; + private $rabbitMQ; - public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) + public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, $rabbitMQ, Producer $producer) { $this->user = $tokenStorage->getToken()->getUser(); $this->em = $em; $this->contentProxy = $contentProxy; $this->consumerKey = $craueConfig->get('pocket_consumer_key'); $this->logger = new NullLogger(); + $this->rabbitMQ = $rabbitMQ; + $this->producer = $producer; } /** @@ -197,7 +203,7 @@ class PocketImport extends AbstractImport { $i = 1; - foreach ($entries as $pocketEntry) { + foreach ($entries as &$pocketEntry) { $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; $existingEntry = $this->em @@ -210,12 +216,15 @@ class PocketImport extends AbstractImport } $entry = new Entry($this->user); - $entry = $this->fetchContent($entry, $url); - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; - continue; + if (!$this->rabbitMQ) { + $entry = $this->fetchContent($entry, $url); + + // jump to next entry in case of problem while getting content + if (false === $entry) { + ++$this->skippedEntries; + continue; + } } // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted @@ -236,6 +245,7 @@ class PocketImport extends AbstractImport } $entry->setTitle($title); + $entry->setUrl($url); // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { @@ -249,6 +259,9 @@ class PocketImport extends AbstractImport ); } + $pocketEntry['url'] = $url; + $pocketEntry['userId'] = $this->user->getId(); + $this->em->persist($entry); ++$this->importedEntries; @@ -256,10 +269,16 @@ class PocketImport extends AbstractImport if (($i % 20) === 0) { $this->em->flush(); } + ++$i; } $this->em->flush(); - $this->em->clear(); + + if ($this->rabbitMQ) { + foreach ($entries as $entry) { + $this->producer->publish(serialize($entry)); + } + } } } diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index 520d43af..7ea54162 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -1,4 +1,11 @@ services: + wallabag_import.consumer.entry: + class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_core.entry_repository" + - "@wallabag_core.content_proxy" + wallabag_import.chain: class: Wallabag\ImportBundle\Import\ImportChain @@ -18,6 +25,8 @@ services: - "@doctrine.orm.entity_manager" - "@wallabag_core.content_proxy" - "@craue_config" + - %rabbitmq% + - "@old_sound_rabbit_mq.wallabag_producer" calls: - [ setClient, [ "@wallabag_import.pocket.client" ] ] - [ setLogger, [ "@logger" ]] -- cgit v1.2.3 From 40d2a29443df8ef6fdf1f2d09b5ba8808543c245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20L=C5=93uillet?= Date: Mon, 15 Feb 2016 21:30:55 +0100 Subject: Replace RabbitMQ injection with CraueConfiguration --- src/Wallabag/ImportBundle/Import/PocketImport.php | 4 ++-- src/Wallabag/ImportBundle/Resources/config/services.yml | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index b02894f0..7d1c0c61 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -25,14 +25,14 @@ class PocketImport extends AbstractImport private $producer; private $rabbitMQ; - public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, $rabbitMQ, Producer $producer) + public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, Producer $producer) { $this->user = $tokenStorage->getToken()->getUser(); $this->em = $em; $this->contentProxy = $contentProxy; $this->consumerKey = $craueConfig->get('pocket_consumer_key'); $this->logger = new NullLogger(); - $this->rabbitMQ = $rabbitMQ; + $this->rabbitMQ = $craueConfig->get('rabbitmq'); $this->producer = $producer; } diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index 7ea54162..60eb4e18 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -25,7 +25,6 @@ services: - "@doctrine.orm.entity_manager" - "@wallabag_core.content_proxy" - "@craue_config" - - %rabbitmq% - "@old_sound_rabbit_mq.wallabag_producer" calls: - [ setClient, [ "@wallabag_import.pocket.client" ] ] -- cgit v1.2.3 From ef75e1220ebb76a8df019d946460ad612759f0bb Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sat, 3 Sep 2016 17:36:57 +0200 Subject: Send every imported item to the queue Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format. Then, the worker retrieve that information, find / create the entry and save it. --- .../ImportBundle/Component/AMPQ/EntryConsumer.php | 39 ----- .../ImportBundle/Consumer/AMPQ/PocketConsumer.php | 63 ++++++++ .../ImportBundle/Controller/PocketController.php | 25 ++- src/Wallabag/ImportBundle/Import/PocketImport.php | 177 +++++++++++++-------- .../ImportBundle/Resources/config/services.yml | 11 +- 5 files changed, 201 insertions(+), 114 deletions(-) delete mode 100644 src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php deleted file mode 100644 index 7775f01c..00000000 --- a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php +++ /dev/null @@ -1,39 +0,0 @@ -em = $em; - $this->entryRepository = $entryRepository; - $this->contentProxy = $contentProxy; - } - - /** - * {@inheritdoc} - */ - public function execute(AMQPMessage $msg) - { - $storedEntry = unserialize($msg->body); - $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']); - if ($entry) { - $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl()); - if ($entry) { - $this->em->persist($entry); - $this->em->flush(); - } - } - } -} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php new file mode 100644 index 00000000..239e7446 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php @@ -0,0 +1,63 @@ +em = $em; + $this->userRepository = $userRepository; + $this->pocketImport = $pocketImport; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->pocketImport->setUser($user); + + $entry = $this->pocketImport->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + $this->em->clear($entry); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + } +} diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php index 36ee25bf..a2dcd8a7 100644 --- a/src/Wallabag/ImportBundle/Controller/PocketController.php +++ b/src/Wallabag/ImportBundle/Controller/PocketController.php @@ -10,12 +10,29 @@ use Symfony\Component\Form\Extension\Core\Type\CheckboxType; class PocketController extends Controller { + /** + * Return Pocket Import Service with or without RabbitMQ enabled. + * + * @return \Wallabag\ImportBundle\Import\PocketImport + */ + private function getPocketImportService() + { + $pocket = $this->get('wallabag_import.pocket.import'); + $pocket->setUser($this->getUser()); + + if ($this->get('craue_config')->get('rabbitmq')) { + $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.wallabag_pocket_producer')); + } + + return $pocket; + } + /** * @Route("/pocket", name="import_pocket") */ public function indexAction() { - $pocket = $this->get('wallabag_import.pocket.import'); + $pocket = $this->getPocketImportService(); $form = $this->createFormBuilder($pocket) ->add('mark_as_read', CheckboxType::class, [ 'label' => 'import.form.mark_as_read_label', @@ -24,7 +41,7 @@ class PocketController extends Controller ->getForm(); return $this->render('WallabagImportBundle:Pocket:index.html.twig', [ - 'import' => $this->get('wallabag_import.pocket.import'), + 'import' => $this->getPocketImportService(), 'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true, 'form' => $form->createView(), ]); @@ -35,7 +52,7 @@ class PocketController extends Controller */ public function authAction(Request $request) { - $requestToken = $this->get('wallabag_import.pocket.import') + $requestToken = $this->getPocketImportService() ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL)); if (false === $requestToken) { @@ -62,7 +79,7 @@ class PocketController extends Controller public function callbackAction() { $message = 'flashes.import.notice.failed'; - $pocket = $this->get('wallabag_import.pocket.import'); + $pocket = $this->getPocketImportService(); $markAsRead = $this->get('session')->get('mark_as_read'); $this->get('session')->remove('mark_as_read'); diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 7d1c0c61..27df4917 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -3,12 +3,11 @@ namespace Wallabag\ImportBundle\Import; use OldSound\RabbitMqBundle\RabbitMq\Producer; -use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use GuzzleHttp\Client; use GuzzleHttp\Exception\RequestException; -use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; +use Symfony\Component\Security\Core\User\UserInterface; use Wallabag\CoreBundle\Entity\Entry; use Wallabag\CoreBundle\Helper\ContentProxy; use Craue\ConfigBundle\Util\Config; @@ -21,21 +20,39 @@ class PocketImport extends AbstractImport private $skippedEntries = 0; private $importedEntries = 0; private $markAsRead; - protected $accessToken; private $producer; - private $rabbitMQ; + protected $accessToken; - public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, Producer $producer) + public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) { - $this->user = $tokenStorage->getToken()->getUser(); $this->em = $em; $this->contentProxy = $contentProxy; $this->consumerKey = $craueConfig->get('pocket_consumer_key'); $this->logger = new NullLogger(); - $this->rabbitMQ = $craueConfig->get('rabbitmq'); + } + + /** + * Set RabbitMQ Producer to send each entry to a queue. + * This method should be called when user has enabled RabbitMQ. + * + * @param Producer $producer + */ + public function setRabbitmqProducer(Producer $producer) + { $this->producer = $producer; } + /** + * Set current user. + * Could the current *connected* user or one retrieve by the consumer. + * + * @param UserInterface $user + */ + public function setUser(UserInterface $user) + { + $this->user = $user; + } + /** * {@inheritdoc} */ @@ -168,6 +185,12 @@ class PocketImport extends AbstractImport $entries = $response->json(); + if ($this->producer) { + $this->parseEntriesForProducer($entries['list']); + + return true; + } + $this->parseEntries($entries['list']); return true; @@ -197,88 +220,112 @@ class PocketImport extends AbstractImport /** * @see https://getpocket.com/developer/docs/v3/retrieve * - * @param $entries + * @param array $entries */ - private function parseEntries($entries) + private function parseEntries(array $entries) { $i = 1; - foreach ($entries as &$pocketEntry) { - $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; - - $existingEntry = $this->em - ->getRepository('WallabagCoreBundle:Entry') - ->findByUrlAndUserId($url, $this->user->getId()); + foreach ($entries as $pocketEntry) { + $entry = $this->parseEntry($pocketEntry); - if (false !== $existingEntry) { - ++$this->skippedEntries; + if (null === $entry) { continue; } - $entry = new Entry($this->user); + // flush every 20 entries + if (($i % 20) === 0) { + $this->em->flush(); + $this->em->clear($entry); + } - if (!$this->rabbitMQ) { - $entry = $this->fetchContent($entry, $url); + ++$i; + } - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; - continue; - } - } + $this->em->flush(); + } - // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted - if ($pocketEntry['status'] == 1 || $this->markAsRead) { - $entry->setArchived(true); - } + public function parseEntry(array $pocketEntry) + { + $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; - // 0 or 1 - 1 If the item is starred - if ($pocketEntry['favorite'] == 1) { - $entry->setStarred(true); - } + $existingEntry = $this->em + ->getRepository('WallabagCoreBundle:Entry') + ->findByUrlAndUserId($url, $this->user->getId()); - $title = 'Untitled'; - if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { - $title = $pocketEntry['resolved_title']; - } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') { - $title = $pocketEntry['given_title']; - } + if (false !== $existingEntry) { + ++$this->skippedEntries; - $entry->setTitle($title); - $entry->setUrl($url); + return; + } - // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image - if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { - $entry->setPreviewPicture($pocketEntry['images'][1]['src']); - } + $entry = new Entry($this->user); + $entry = $this->fetchContent($entry, $url); - if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { - $this->contentProxy->assignTagsToEntry( - $entry, - array_keys($pocketEntry['tags']) - ); - } + // jump to next entry in case of problem while getting content + if (false === $entry) { + ++$this->skippedEntries; - $pocketEntry['url'] = $url; - $pocketEntry['userId'] = $this->user->getId(); + return; + } - $this->em->persist($entry); - ++$this->importedEntries; + // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted + if ($pocketEntry['status'] == 1 || $this->markAsRead) { + $entry->setArchived(true); + } - // flush every 20 entries - if (($i % 20) === 0) { - $this->em->flush(); - } + // 0 or 1 - 1 If the item is starred + if ($pocketEntry['favorite'] == 1) { + $entry->setStarred(true); + } - ++$i; + $title = 'Untitled'; + if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { + $title = $pocketEntry['resolved_title']; + } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') { + $title = $pocketEntry['given_title']; } - $this->em->flush(); + $entry->setTitle($title); + $entry->setUrl($url); + + // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image + if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { + $entry->setPreviewPicture($pocketEntry['images'][1]['src']); + } + + if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { + $this->contentProxy->assignTagsToEntry( + $entry, + array_keys($pocketEntry['tags']) + ); + } + + $this->em->persist($entry); + ++$this->importedEntries; - if ($this->rabbitMQ) { - foreach ($entries as $entry) { - $this->producer->publish(serialize($entry)); + return $entry; + } + + /** + * Faster parse entries for Producer. + * We don't care to make check at this time. They'll be done by the consumer. + * + * @param array $entries + */ + public function parseEntriesForProducer($entries) + { + foreach ($entries as $pocketEntry) { + // set userId for the producer (it won't know which user is connected) + $pocketEntry['userId'] = $this->user->getId(); + + if ($this->markAsRead) { + $pocketEntry['status'] = 1; } + + ++$this->importedEntries; + + $this->producer->publish(json_encode($pocketEntry)); } } } diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index 60eb4e18..fe388b26 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -1,10 +1,11 @@ services: - wallabag_import.consumer.entry: - class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer + wallabag_import.consumer.pocket: + class: Wallabag\ImportBundle\Consumer\AMPQ\PocketConsumer arguments: - "@doctrine.orm.entity_manager" - - "@wallabag_core.entry_repository" - - "@wallabag_core.content_proxy" + - "@wallabag_user.user_repository" + - "@wallabag_import.pocket.import" + - "@logger" wallabag_import.chain: class: Wallabag\ImportBundle\Import\ImportChain @@ -21,11 +22,9 @@ services: wallabag_import.pocket.import: class: Wallabag\ImportBundle\Import\PocketImport arguments: - - "@security.token_storage" - "@doctrine.orm.entity_manager" - "@wallabag_core.content_proxy" - "@craue_config" - - "@old_sound_rabbit_mq.wallabag_producer" calls: - [ setClient, [ "@wallabag_import.pocket.client" ] ] - [ setLogger, [ "@logger" ]] -- cgit v1.2.3 From c98db1b653b5dc8b701422190b02d9fbf10c4e68 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 4 Sep 2016 21:49:21 +0200 Subject: Convert other imports to Rabbit --- .../ImportBundle/Consumer/AMPQ/EntryConsumer.php | 63 ++++++++++ .../ImportBundle/Consumer/AMPQ/PocketConsumer.php | 63 ---------- .../ImportBundle/Controller/PocketController.php | 2 +- .../Controller/ReadabilityController.php | 6 +- .../ImportBundle/Controller/WallabagController.php | 2 +- .../Controller/WallabagV1Controller.php | 8 +- .../Controller/WallabagV2Controller.php | 8 +- .../ImportBundle/Import/AbstractImport.php | 83 ++++++++++++++ src/Wallabag/ImportBundle/Import/PocketImport.php | 107 +++-------------- .../ImportBundle/Import/ReadabilityImport.php | 127 +++++++++------------ .../ImportBundle/Import/WallabagImport.php | 118 +++++++------------ .../ImportBundle/Import/WallabagV1Import.php | 20 +++- .../ImportBundle/Import/WallabagV2Import.php | 20 +++- .../ImportBundle/Resources/config/services.yml | 23 +++- 14 files changed, 341 insertions(+), 309 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php delete mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php new file mode 100644 index 00000000..8a8cf45d --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php @@ -0,0 +1,63 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + $this->em->clear($entry); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php deleted file mode 100644 index 239e7446..00000000 --- a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php +++ /dev/null @@ -1,63 +0,0 @@ -em = $em; - $this->userRepository = $userRepository; - $this->pocketImport = $pocketImport; - $this->logger = $logger ?: new NullLogger(); - } - - /** - * {@inheritdoc} - */ - public function execute(AMQPMessage $msg) - { - $storedEntry = json_decode($msg->body, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return; - } - - $this->pocketImport->setUser($user); - - $entry = $this->pocketImport->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return; - } - - try { - $this->em->flush(); - $this->em->clear($entry); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return; - } - } -} diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php index a2dcd8a7..57c007c3 100644 --- a/src/Wallabag/ImportBundle/Controller/PocketController.php +++ b/src/Wallabag/ImportBundle/Controller/PocketController.php @@ -21,7 +21,7 @@ class PocketController extends Controller $pocket->setUser($this->getUser()); if ($this->get('craue_config')->get('rabbitmq')) { - $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.wallabag_pocket_producer')); + $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_pocket_producer')); } return $pocket; diff --git a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php index b61aa99c..ee875a40 100644 --- a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php +++ b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php @@ -18,6 +18,11 @@ class ReadabilityController extends Controller $form->handleRequest($request); $readability = $this->get('wallabag_import.readability.import'); + $readability->setUser($this->getUser()); + + if ($this->get('craue_config')->get('rabbitmq')) { + $readability->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_readability_producer')); + } if ($form->isValid()) { $file = $form->get('file')->getData(); @@ -26,7 +31,6 @@ class ReadabilityController extends Controller if (in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { $res = $readability - ->setUser($this->getUser()) ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name) ->setMarkAsRead($markAsRead) ->import(); diff --git a/src/Wallabag/ImportBundle/Controller/WallabagController.php b/src/Wallabag/ImportBundle/Controller/WallabagController.php index 76ced0d2..1e6114c5 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagController.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagController.php @@ -38,6 +38,7 @@ abstract class WallabagController extends Controller $form->handleRequest($request); $wallabag = $this->getImportService(); + $wallabag->setUser($this->getUser()); if ($form->isValid()) { $file = $form->get('file')->getData(); @@ -46,7 +47,6 @@ abstract class WallabagController extends Controller if (in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { $res = $wallabag - ->setUser($this->getUser()) ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name) ->setMarkAsRead($markAsRead) ->import(); diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php index 3e748d57..f80aec3a 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php @@ -12,7 +12,13 @@ class WallabagV1Controller extends WallabagController */ protected function getImportService() { - return $this->get('wallabag_import.wallabag_v1.import'); + $service = $this->get('wallabag_import.wallabag_v1.import'); + + if ($this->get('craue_config')->get('rabbitmq')) { + $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v1_producer')); + } + + return $service; } /** diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php index c2a42165..063cddd9 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php @@ -12,7 +12,13 @@ class WallabagV2Controller extends WallabagController */ protected function getImportService() { - return $this->get('wallabag_import.wallabag_v2.import'); + $service = $this->get('wallabag_import.wallabag_v2.import'); + + if ($this->get('craue_config')->get('rabbitmq')) { + $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v2_producer')); + } + + return $service; } /** diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 14377a35..5b9d65d7 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -7,12 +7,17 @@ use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use Wallabag\CoreBundle\Helper\ContentProxy; use Wallabag\CoreBundle\Entity\Entry; +use Symfony\Component\Security\Core\User\UserInterface; +use OldSound\RabbitMqBundle\RabbitMq\Producer; abstract class AbstractImport implements ImportInterface { protected $em; protected $logger; protected $contentProxy; + protected $producer; + protected $user; + protected $markAsRead; public function __construct(EntityManager $em, ContentProxy $contentProxy) { @@ -26,6 +31,48 @@ abstract class AbstractImport implements ImportInterface $this->logger = $logger; } + /** + * Set RabbitMQ Producer to send each entry to a queue. + * This method should be called when user has enabled RabbitMQ. + * + * @param Producer $producer + */ + public function setRabbitmqProducer(Producer $producer) + { + $this->producer = $producer; + } + + /** + * Set current user. + * Could the current *connected* user or one retrieve by the consumer. + * + * @param UserInterface $user + */ + public function setUser(UserInterface $user) + { + $this->user = $user; + } + + /** + * Set whether articles must be all marked as read. + * + * @param bool $markAsRead + */ + public function setMarkAsRead($markAsRead) + { + $this->markAsRead = $markAsRead; + + return $this; + } + + /** + * Get whether articles must be all marked as read. + */ + public function getMarkAsRead() + { + return $this->markAsRead; + } + /** * Fetch content from the ContentProxy (using graby). * If it fails return false instead of the updated entry. @@ -44,4 +91,40 @@ abstract class AbstractImport implements ImportInterface return false; } } + + /** + * Parse and insert all given entries. + * + * @param $entries + */ + protected function parseEntries($entries) + { + $i = 1; + + foreach ($entries as $importedEntry) { + $entry = $this->parseEntry($importedEntry); + + if (null === $entry) { + continue; + } + + // flush every 20 entries + if (($i % 20) === 0) { + $this->em->flush(); + $this->em->clear($entry); + } + ++$i; + } + + $this->em->flush(); + } + + /** + * Parse one entry. + * + * @param array $importedEntry + * + * @return Entry + */ + abstract public function parseEntry(array $importedEntry); } diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 27df4917..dd0ddd72 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -2,25 +2,20 @@ namespace Wallabag\ImportBundle\Import; -use OldSound\RabbitMqBundle\RabbitMq\Producer; use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use GuzzleHttp\Client; use GuzzleHttp\Exception\RequestException; -use Symfony\Component\Security\Core\User\UserInterface; use Wallabag\CoreBundle\Entity\Entry; use Wallabag\CoreBundle\Helper\ContentProxy; use Craue\ConfigBundle\Util\Config; class PocketImport extends AbstractImport { - private $user; private $client; private $consumerKey; private $skippedEntries = 0; private $importedEntries = 0; - private $markAsRead; - private $producer; protected $accessToken; public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) @@ -31,28 +26,6 @@ class PocketImport extends AbstractImport $this->logger = new NullLogger(); } - /** - * Set RabbitMQ Producer to send each entry to a queue. - * This method should be called when user has enabled RabbitMQ. - * - * @param Producer $producer - */ - public function setRabbitmqProducer(Producer $producer) - { - $this->producer = $producer; - } - - /** - * Set current user. - * Could the current *connected* user or one retrieve by the consumer. - * - * @param UserInterface $user - */ - public function setUser(UserInterface $user) - { - $this->user = $user; - } - /** * {@inheritdoc} */ @@ -138,26 +111,6 @@ class PocketImport extends AbstractImport return true; } - /** - * Set whether articles must be all marked as read. - * - * @param bool $markAsRead - */ - public function setMarkAsRead($markAsRead) - { - $this->markAsRead = $markAsRead; - - return $this; - } - - /** - * Get whether articles must be all marked as read. - */ - public function getMarkAsRead() - { - return $this->markAsRead; - } - /** * {@inheritdoc} */ @@ -217,37 +170,9 @@ class PocketImport extends AbstractImport $this->client = $client; } - /** - * @see https://getpocket.com/developer/docs/v3/retrieve - * - * @param array $entries - */ - private function parseEntries(array $entries) - { - $i = 1; - - foreach ($entries as $pocketEntry) { - $entry = $this->parseEntry($pocketEntry); - - if (null === $entry) { - continue; - } - - // flush every 20 entries - if (($i % 20) === 0) { - $this->em->flush(); - $this->em->clear($entry); - } - - ++$i; - } - - $this->em->flush(); - } - - public function parseEntry(array $pocketEntry) + public function parseEntry(array $importedEntry) { - $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; + $url = isset($importedEntry['resolved_url']) && $importedEntry['resolved_url'] != '' ? $importedEntry['resolved_url'] : $importedEntry['given_url']; $existingEntry = $this->em ->getRepository('WallabagCoreBundle:Entry') @@ -270,34 +195,34 @@ class PocketImport extends AbstractImport } // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted - if ($pocketEntry['status'] == 1 || $this->markAsRead) { + if ($importedEntry['status'] == 1 || $this->markAsRead) { $entry->setArchived(true); } // 0 or 1 - 1 If the item is starred - if ($pocketEntry['favorite'] == 1) { + if ($importedEntry['favorite'] == 1) { $entry->setStarred(true); } $title = 'Untitled'; - if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { - $title = $pocketEntry['resolved_title']; - } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') { - $title = $pocketEntry['given_title']; + if (isset($importedEntry['resolved_title']) && $importedEntry['resolved_title'] != '') { + $title = $importedEntry['resolved_title']; + } elseif (isset($importedEntry['given_title']) && $importedEntry['given_title'] != '') { + $title = $importedEntry['given_title']; } $entry->setTitle($title); $entry->setUrl($url); // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image - if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { - $entry->setPreviewPicture($pocketEntry['images'][1]['src']); + if (isset($importedEntry['has_image']) && $importedEntry['has_image'] > 0 && isset($importedEntry['images'][1])) { + $entry->setPreviewPicture($importedEntry['images'][1]['src']); } - if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { + if (isset($importedEntry['tags']) && !empty($importedEntry['tags'])) { $this->contentProxy->assignTagsToEntry( $entry, - array_keys($pocketEntry['tags']) + array_keys($importedEntry['tags']) ); } @@ -315,17 +240,17 @@ class PocketImport extends AbstractImport */ public function parseEntriesForProducer($entries) { - foreach ($entries as $pocketEntry) { + foreach ($entries as $importedEntry) { // set userId for the producer (it won't know which user is connected) - $pocketEntry['userId'] = $this->user->getId(); + $importedEntry['userId'] = $this->user->getId(); if ($this->markAsRead) { - $pocketEntry['status'] = 1; + $importedEntry['status'] = 1; } ++$this->importedEntries; - $this->producer->publish(json_encode($pocketEntry)); + $this->producer->publish(json_encode($importedEntry)); } } } diff --git a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php index c7cfe15d..18a6631a 100644 --- a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php +++ b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php @@ -7,24 +7,9 @@ use Wallabag\UserBundle\Entity\User; class ReadabilityImport extends AbstractImport { - private $user; private $skippedEntries = 0; private $importedEntries = 0; private $filepath; - private $markAsRead; - - /** - * We define the user in a custom call because on the import command there is no logged in user. - * So we can't retrieve user from the `security.token_storage` service. - * - * @param User $user - */ - public function setUser(User $user) - { - $this->user = $user; - - return $this; - } /** * {@inheritdoc} @@ -62,26 +47,6 @@ class ReadabilityImport extends AbstractImport return $this; } - /** - * Set whether articles must be all marked as read. - * - * @param bool $markAsRead - */ - public function setMarkAsRead($markAsRead) - { - $this->markAsRead = $markAsRead; - - return $this; - } - - /** - * Get whether articles must be all marked as read. - */ - public function getMarkAsRead() - { - return $this->markAsRead; - } - /** * {@inheritdoc} */ @@ -116,54 +81,76 @@ class ReadabilityImport extends AbstractImport return false; } + if ($this->producer) { + $this->parseEntriesForProducer($data['bookmarks']); + + return true; + } + $this->parseEntries($data['bookmarks']); return true; } + public function parseEntry(array $importedEntry) + { + $existingEntry = $this->em + ->getRepository('WallabagCoreBundle:Entry') + ->findByUrlAndUserId($importedEntry['article__url'], $this->user->getId()); + + if (false !== $existingEntry) { + ++$this->skippedEntries; + + return; + } + + $data = [ + 'title' => $importedEntry['article__title'], + 'url' => $importedEntry['article__url'], + 'content_type' => '', + 'language' => '', + 'is_archived' => $importedEntry['archive'] || $this->markAsRead, + 'is_starred' => $importedEntry['favorite'], + ]; + + $entry = $this->fetchContent( + new Entry($this->user), + $data['url'], + $data + ); + + // jump to next entry in case of problem while getting content + if (false === $entry) { + ++$this->skippedEntries; + + return; + } + + $entry->setArchived($data['is_archived']); + $entry->setStarred($data['is_starred']); + + $this->em->persist($entry); + ++$this->importedEntries; + + return $entry; + } + /** - * Parse and insert all given entries. + * Faster parse entries for Producer. + * We don't care to make check at this time. They'll be done by the consumer. * - * @param $entries + * @param array $entries */ - protected function parseEntries($entries) + protected function parseEntriesForProducer($entries) { - $i = 1; - foreach ($entries as $importedEntry) { - $existingEntry = $this->em - ->getRepository('WallabagCoreBundle:Entry') - ->findByUrlAndUserId($importedEntry['article__url'], $this->user->getId()); - - if (false !== $existingEntry) { - ++$this->skippedEntries; - continue; - } + // set userId for the producer (it won't know which user is connected) + $importedEntry['userId'] = $this->user->getId(); - $data = [ - 'title' => $importedEntry['article__title'], - 'url' => $importedEntry['article__url'], - 'content_type' => '', - 'language' => '', - 'is_archived' => $importedEntry['archive'] || $this->markAsRead, - 'is_starred' => $importedEntry['favorite'], - ]; - - $entry = $this->fetchContent( - new Entry($this->user), - $data['url'], - $data - ); - - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; - continue; + if ($this->markAsRead) { + $importedEntry['archive'] = 1; } - $entry->setArchived($data['is_archived']); - $entry->setStarred($data['is_starred']); - $this->em->persist($entry); ++$this->importedEntries; // flush every 20 entries diff --git a/src/Wallabag/ImportBundle/Import/WallabagImport.php b/src/Wallabag/ImportBundle/Import/WallabagImport.php index 581ec178..6ad14e8c 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagImport.php +++ b/src/Wallabag/ImportBundle/Import/WallabagImport.php @@ -11,7 +11,6 @@ abstract class WallabagImport extends AbstractImport protected $skippedEntries = 0; protected $importedEntries = 0; protected $filepath; - protected $markAsRead; // untitled in all languages from v1 protected $untitled = [ 'Untitled', @@ -28,19 +27,6 @@ abstract class WallabagImport extends AbstractImport '', ]; - /** - * We define the user in a custom call because on the import command there is no logged in user. - * So we can't retrieve user from the `security.token_storage` service. - * - * @param User $user - */ - public function setUser(User $user) - { - $this->user = $user; - - return $this; - } - /** * {@inheritdoc} */ @@ -79,6 +65,12 @@ abstract class WallabagImport extends AbstractImport return false; } + if ($this->producer) { + $this->parseEntriesForProducer($data); + + return true; + } + $this->parseEntries($data); return true; @@ -108,85 +100,61 @@ abstract class WallabagImport extends AbstractImport } /** - * Set whether articles must be all marked as read. - * - * @param bool $markAsRead + * {@inheritdoc} */ - public function setMarkAsRead($markAsRead) + public function parseEntry(array $importedEntry) { - $this->markAsRead = $markAsRead; + $existingEntry = $this->em + ->getRepository('WallabagCoreBundle:Entry') + ->findByUrlAndUserId($importedEntry['url'], $this->user->getId()); - return $this; - } + if (false !== $existingEntry) { + ++$this->skippedEntries; - /** - * Parse and insert all given entries. - * - * @param $entries - */ - protected function parseEntries($entries) - { - $i = 1; + return; + } + + $data = $this->prepareEntry($importedEntry); - foreach ($entries as $importedEntry) { - $existingEntry = $this->em - ->getRepository('WallabagCoreBundle:Entry') - ->findByUrlAndUserId($importedEntry['url'], $this->user->getId()); + $entry = $this->fetchContent( + new Entry($this->user), + $importedEntry['url'], + $data + ); - if (false !== $existingEntry) { - ++$this->skippedEntries; - continue; - } + // jump to next entry in case of problem while getting content + if (false === $entry) { + ++$this->skippedEntries; - $data = $this->prepareEntry($importedEntry, $this->markAsRead); + return; + } - $entry = $this->fetchContent( - new Entry($this->user), - $importedEntry['url'], - $data + if (array_key_exists('tags', $data)) { + $this->contentProxy->assignTagsToEntry( + $entry, + $data['tags'] ); + } - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; - continue; - } - - if (array_key_exists('tags', $data)) { - $this->contentProxy->assignTagsToEntry( - $entry, - $data['tags'] - ); - } - - if (isset($importedEntry['preview_picture'])) { - $entry->setPreviewPicture($importedEntry['preview_picture']); - } - - $entry->setArchived($data['is_archived']); - $entry->setStarred($data['is_starred']); - - $this->em->persist($entry); - ++$this->importedEntries; - - // flush every 20 entries - if (($i % 20) === 0) { - $this->em->flush(); - } - ++$i; + if (isset($importedEntry['preview_picture'])) { + $entry->setPreviewPicture($importedEntry['preview_picture']); } - $this->em->flush(); - $this->em->clear(); + $entry->setArchived($data['is_archived']); + $entry->setStarred($data['is_starred']); + + $this->em->persist($entry); + ++$this->importedEntries; + + return $entry; } /** * This should return a cleaned array for a given entry to be given to `updateEntry`. * - * @param array $entry Data from the imported file - * @param bool $markAsRead Should we mark as read content? + * @param array $entry Data from the imported file * * @return array */ - abstract protected function prepareEntry($entry = [], $markAsRead = false); + abstract protected function prepareEntry($entry = []); } diff --git a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php index 6cf3467a..86734652 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php +++ b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php @@ -31,7 +31,7 @@ class WallabagV1Import extends WallabagImport /** * {@inheritdoc} */ - protected function prepareEntry($entry = [], $markAsRead = false) + protected function prepareEntry($entry = []) { $data = [ 'title' => $entry['title'], @@ -39,7 +39,7 @@ class WallabagV1Import extends WallabagImport 'url' => $entry['url'], 'content_type' => '', 'language' => '', - 'is_archived' => $entry['is_read'] || $markAsRead, + 'is_archived' => $entry['is_read'] || $this->markAsRead, 'is_starred' => $entry['is_fav'], 'tags' => '', ]; @@ -56,4 +56,20 @@ class WallabagV1Import extends WallabagImport return $data; } + + protected function parseEntriesForProducer($entries) + { + foreach ($entries as $importedEntry) { + // set userId for the producer (it won't know which user is connected) + $importedEntry['userId'] = $this->user->getId(); + + if ($this->markAsRead) { + $importedEntry['is_read'] = 1; + } + + ++$this->importedEntries; + + $this->producer->publish(json_encode($importedEntry)); + } + } } diff --git a/src/Wallabag/ImportBundle/Import/WallabagV2Import.php b/src/Wallabag/ImportBundle/Import/WallabagV2Import.php index d0035b63..faf4236f 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagV2Import.php +++ b/src/Wallabag/ImportBundle/Import/WallabagV2Import.php @@ -31,12 +31,28 @@ class WallabagV2Import extends WallabagImport /** * {@inheritdoc} */ - protected function prepareEntry($entry = [], $markAsRead = false) + protected function prepareEntry($entry = []) { return [ 'html' => $entry['content'], 'content_type' => $entry['mimetype'], - 'is_archived' => ($entry['is_archived'] || $markAsRead), + 'is_archived' => ($entry['is_archived'] || $this->markAsRead), ] + $entry; } + + protected function parseEntriesForProducer($entries) + { + foreach ($entries as $importedEntry) { + // set userId for the producer (it won't know which user is connected) + $importedEntry['userId'] = $this->user->getId(); + + if ($this->markAsRead) { + $importedEntry['is_archived'] = 1; + } + + ++$this->importedEntries; + + $this->producer->publish(json_encode($importedEntry)); + } + } } diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index fe388b26..cad44e71 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -1,11 +1,32 @@ services: wallabag_import.consumer.pocket: - class: Wallabag\ImportBundle\Consumer\AMPQ\PocketConsumer + class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer arguments: - "@doctrine.orm.entity_manager" - "@wallabag_user.user_repository" - "@wallabag_import.pocket.import" - "@logger" + wallabag_import.consumer.readability: + class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.readability.import" + - "@logger" + wallabag_import.consumer.wallabag_v1: + class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v1.import" + - "@logger" + wallabag_import.consumer.wallabag_v2: + class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v2.import" + - "@logger" wallabag_import.chain: class: Wallabag\ImportBundle\Import\ImportChain -- cgit v1.2.3 From 02f64895728fe9aee2c696a627e0bbe27a24faf2 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Mon, 5 Sep 2016 07:13:09 +0200 Subject: Retrieve all items from Pocket 5000 by 5000. Also, retrieve newest item first. --- src/Wallabag/ImportBundle/Import/PocketImport.php | 37 +++++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index dd0ddd72..06a31813 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -16,7 +16,9 @@ class PocketImport extends AbstractImport private $consumerKey; private $skippedEntries = 0; private $importedEntries = 0; - protected $accessToken; + private $accessToken; + + const NB_ELEMENTS = 5000; public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) { @@ -26,6 +28,16 @@ class PocketImport extends AbstractImport $this->logger = new NullLogger(); } + /** + * Only used for test purpose + * + * @return string + */ + public function getAccessToken() + { + return $this->accessToken; + } + /** * {@inheritdoc} */ @@ -114,8 +126,10 @@ class PocketImport extends AbstractImport /** * {@inheritdoc} */ - public function import() + public function import($offset = 0) { + static $run = 0; + $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/get', [ 'body' => json_encode([ @@ -123,7 +137,9 @@ class PocketImport extends AbstractImport 'access_token' => $this->accessToken, 'detailType' => 'complete', 'state' => 'all', - 'sort' => 'oldest', + 'sort' => 'newest', + 'count' => self::NB_ELEMENTS, + 'offset' => $offset, ]), ] ); @@ -140,11 +156,20 @@ class PocketImport extends AbstractImport if ($this->producer) { $this->parseEntriesForProducer($entries['list']); - - return true; + } else { + $this->parseEntries($entries['list']); } - $this->parseEntries($entries['list']); + // if we retrieve exactly the amount of items requested it means we can get more + // re-call import and offset item by the amount previous received: + // - first call get 5k offset 0 + // - second call get 5k offset 5k + // - and so on + if (count($entries['list']) === self::NB_ELEMENTS) { + ++$run; + + return $this->import(self::NB_ELEMENTS * $run); + } return true; } -- cgit v1.2.3 From 3849a9f3231c0109c87af085452c3ac5e4aed303 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Mon, 5 Sep 2016 07:50:10 +0200 Subject: Some cleanup & refactor --- .../ImportBundle/Import/AbstractImport.php | 42 ++++++++++++++++++++-- src/Wallabag/ImportBundle/Import/PocketImport.php | 22 +++--------- .../ImportBundle/Import/ReadabilityImport.php | 28 +++------------ .../ImportBundle/Import/WallabagImport.php | 2 -- .../ImportBundle/Import/WallabagV1Import.php | 18 ++++------ .../ImportBundle/Import/WallabagV2Import.php | 18 ++++------ 6 files changed, 60 insertions(+), 70 deletions(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 5b9d65d7..b085dc3a 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -7,7 +7,7 @@ use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use Wallabag\CoreBundle\Helper\ContentProxy; use Wallabag\CoreBundle\Entity\Entry; -use Symfony\Component\Security\Core\User\UserInterface; +use Wallabag\UserBundle\Entity\User; use OldSound\RabbitMqBundle\RabbitMq\Producer; abstract class AbstractImport implements ImportInterface @@ -46,9 +46,9 @@ abstract class AbstractImport implements ImportInterface * Set current user. * Could the current *connected* user or one retrieve by the consumer. * - * @param UserInterface $user + * @param User $user */ - public function setUser(UserInterface $user) + public function setUser(User $user) { $this->user = $user; } @@ -119,6 +119,32 @@ abstract class AbstractImport implements ImportInterface $this->em->flush(); } + /** + * Parse entries and send them to the queue. + * It should just be a simple loop on all item, no call to the database should be done + * to speedup queuing. + * + * Faster parse entries for Producer. + * We don't care to make check at this time. They'll be done by the consumer. + * + * @param array $entries + */ + protected function parseEntriesForProducer(array $entries) + { + foreach ($entries as $importedEntry) { + // set userId for the producer (it won't know which user is connected) + $importedEntry['userId'] = $this->user->getId(); + + if ($this->markAsRead) { + $importedEntry = $this->setEntryAsRead($importedEntry); + } + + ++$this->importedEntries; + + $this->producer->publish(json_encode($importedEntry)); + } + } + /** * Parse one entry. * @@ -127,4 +153,14 @@ abstract class AbstractImport implements ImportInterface * @return Entry */ abstract public function parseEntry(array $importedEntry); + + /** + * Set current imported entry to archived / read. + * Implementation is different accross all imports. + * + * @param array $importedEntry + * + * @return array + */ + abstract protected function setEntryAsRead(array $importedEntry); } diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 06a31813..5850deba 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -29,7 +29,7 @@ class PocketImport extends AbstractImport } /** - * Only used for test purpose + * Only used for test purpose. * * @return string */ @@ -258,24 +258,12 @@ class PocketImport extends AbstractImport } /** - * Faster parse entries for Producer. - * We don't care to make check at this time. They'll be done by the consumer. - * - * @param array $entries + * {@inheritdoc} */ - public function parseEntriesForProducer($entries) + protected function setEntryAsRead(array $importedEntry) { - foreach ($entries as $importedEntry) { - // set userId for the producer (it won't know which user is connected) - $importedEntry['userId'] = $this->user->getId(); - - if ($this->markAsRead) { - $importedEntry['status'] = 1; - } + $importedEntry['status'] = 1; - ++$this->importedEntries; - - $this->producer->publish(json_encode($importedEntry)); - } + return $importedEntry; } } diff --git a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php index 18a6631a..64ef62bf 100644 --- a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php +++ b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php @@ -3,7 +3,6 @@ namespace Wallabag\ImportBundle\Import; use Wallabag\CoreBundle\Entity\Entry; -use Wallabag\UserBundle\Entity\User; class ReadabilityImport extends AbstractImport { @@ -136,31 +135,12 @@ class ReadabilityImport extends AbstractImport } /** - * Faster parse entries for Producer. - * We don't care to make check at this time. They'll be done by the consumer. - * - * @param array $entries + * {@inheritdoc} */ - protected function parseEntriesForProducer($entries) + protected function setEntryAsRead(array $importedEntry) { - foreach ($entries as $importedEntry) { - // set userId for the producer (it won't know which user is connected) - $importedEntry['userId'] = $this->user->getId(); - - if ($this->markAsRead) { - $importedEntry['archive'] = 1; - } - - ++$this->importedEntries; - - // flush every 20 entries - if (($i % 20) === 0) { - $this->em->flush(); - } - ++$i; - } + $importedEntry['archive'] = 1; - $this->em->flush(); - $this->em->clear(); + return $importedEntry; } } diff --git a/src/Wallabag/ImportBundle/Import/WallabagImport.php b/src/Wallabag/ImportBundle/Import/WallabagImport.php index 6ad14e8c..8e18e0ef 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagImport.php +++ b/src/Wallabag/ImportBundle/Import/WallabagImport.php @@ -3,11 +3,9 @@ namespace Wallabag\ImportBundle\Import; use Wallabag\CoreBundle\Entity\Entry; -use Wallabag\UserBundle\Entity\User; abstract class WallabagImport extends AbstractImport { - protected $user; protected $skippedEntries = 0; protected $importedEntries = 0; protected $filepath; diff --git a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php index 86734652..292b72a7 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php +++ b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php @@ -57,19 +57,13 @@ class WallabagV1Import extends WallabagImport return $data; } - protected function parseEntriesForProducer($entries) + /** + * {@inheritdoc} + */ + protected function setEntryAsRead(array $importedEntry) { - foreach ($entries as $importedEntry) { - // set userId for the producer (it won't know which user is connected) - $importedEntry['userId'] = $this->user->getId(); + $importedEntry['is_read'] = 1; - if ($this->markAsRead) { - $importedEntry['is_read'] = 1; - } - - ++$this->importedEntries; - - $this->producer->publish(json_encode($importedEntry)); - } + return $importedEntry; } } diff --git a/src/Wallabag/ImportBundle/Import/WallabagV2Import.php b/src/Wallabag/ImportBundle/Import/WallabagV2Import.php index faf4236f..37c8ca14 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagV2Import.php +++ b/src/Wallabag/ImportBundle/Import/WallabagV2Import.php @@ -40,19 +40,13 @@ class WallabagV2Import extends WallabagImport ] + $entry; } - protected function parseEntriesForProducer($entries) + /** + * {@inheritdoc} + */ + protected function setEntryAsRead(array $importedEntry) { - foreach ($entries as $importedEntry) { - // set userId for the producer (it won't know which user is connected) - $importedEntry['userId'] = $this->user->getId(); - - if ($this->markAsRead) { - $importedEntry['is_archived'] = 1; - } - - ++$this->importedEntries; + $importedEntry['is_archived'] = 1; - $this->producer->publish(json_encode($importedEntry)); - } + return $importedEntry; } } -- cgit v1.2.3 From 3aca0a9f00417b64203a660dee0a2b4c0fe22ac8 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Mon, 5 Sep 2016 09:35:42 +0200 Subject: CS --- src/Wallabag/ImportBundle/Import/AbstractImport.php | 2 ++ src/Wallabag/ImportBundle/Import/PocketImport.php | 2 -- src/Wallabag/ImportBundle/Import/ReadabilityImport.php | 2 -- src/Wallabag/ImportBundle/Import/WallabagImport.php | 2 -- 4 files changed, 2 insertions(+), 6 deletions(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index b085dc3a..8610062d 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -18,6 +18,8 @@ abstract class AbstractImport implements ImportInterface protected $producer; protected $user; protected $markAsRead; + protected $skippedEntries = 0; + protected $importedEntries = 0; public function __construct(EntityManager $em, ContentProxy $contentProxy) { diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 5850deba..845380b7 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -14,8 +14,6 @@ class PocketImport extends AbstractImport { private $client; private $consumerKey; - private $skippedEntries = 0; - private $importedEntries = 0; private $accessToken; const NB_ELEMENTS = 5000; diff --git a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php index 64ef62bf..915d4cd3 100644 --- a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php +++ b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php @@ -6,8 +6,6 @@ use Wallabag\CoreBundle\Entity\Entry; class ReadabilityImport extends AbstractImport { - private $skippedEntries = 0; - private $importedEntries = 0; private $filepath; /** diff --git a/src/Wallabag/ImportBundle/Import/WallabagImport.php b/src/Wallabag/ImportBundle/Import/WallabagImport.php index 8e18e0ef..026567b0 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagImport.php +++ b/src/Wallabag/ImportBundle/Import/WallabagImport.php @@ -6,8 +6,6 @@ use Wallabag\CoreBundle\Entity\Entry; abstract class WallabagImport extends AbstractImport { - protected $skippedEntries = 0; - protected $importedEntries = 0; protected $filepath; // untitled in all languages from v1 protected $untitled = [ -- cgit v1.2.3 From 6d65c0a8b089d3caa6f8e20d7935a9fe2f87d926 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 9 Sep 2016 09:36:07 +0200 Subject: Add ability to define created_at for all import At the moment only Readability & wallabag v2 import allow created_at import. Pocket removed `time_added` field from their API v2 to v3... And wallabag v1 doesn't export that value. --- src/Wallabag/ImportBundle/Import/PocketImport.php | 5 +++++ src/Wallabag/ImportBundle/Import/ReadabilityImport.php | 5 +++++ src/Wallabag/ImportBundle/Import/WallabagImport.php | 4 ++++ src/Wallabag/ImportBundle/Import/WallabagV1Import.php | 1 + 4 files changed, 15 insertions(+) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 845380b7..92dcdd40 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -193,6 +193,11 @@ class PocketImport extends AbstractImport $this->client = $client; } + /** + * {@inheritdoc} + * + * @see https://getpocket.com/developer/docs/v3/retrieve + */ public function parseEntry(array $importedEntry) { $url = isset($importedEntry['resolved_url']) && $importedEntry['resolved_url'] != '' ? $importedEntry['resolved_url'] : $importedEntry['given_url']; diff --git a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php index 915d4cd3..8f080d38 100644 --- a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php +++ b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php @@ -89,6 +89,9 @@ class ReadabilityImport extends AbstractImport return true; } + /** + * {@inheritdoc} + */ public function parseEntry(array $importedEntry) { $existingEntry = $this->em @@ -108,6 +111,7 @@ class ReadabilityImport extends AbstractImport 'language' => '', 'is_archived' => $importedEntry['archive'] || $this->markAsRead, 'is_starred' => $importedEntry['favorite'], + 'created_at' => $importedEntry['date_added'], ]; $entry = $this->fetchContent( @@ -125,6 +129,7 @@ class ReadabilityImport extends AbstractImport $entry->setArchived($data['is_archived']); $entry->setStarred($data['is_starred']); + $entry->setCreatedAt(new \DateTime($data['created_at'])); $this->em->persist($entry); ++$this->importedEntries; diff --git a/src/Wallabag/ImportBundle/Import/WallabagImport.php b/src/Wallabag/ImportBundle/Import/WallabagImport.php index 026567b0..8e50b135 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagImport.php +++ b/src/Wallabag/ImportBundle/Import/WallabagImport.php @@ -139,6 +139,10 @@ abstract class WallabagImport extends AbstractImport $entry->setArchived($data['is_archived']); $entry->setStarred($data['is_starred']); + if (!empty($data['created_at'])) { + $entry->setCreatedAt(new \DateTime($data['created_at'])); + } + $this->em->persist($entry); ++$this->importedEntries; diff --git a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php index 292b72a7..4f001062 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php +++ b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php @@ -42,6 +42,7 @@ class WallabagV1Import extends WallabagImport 'is_archived' => $entry['is_read'] || $this->markAsRead, 'is_starred' => $entry['is_fav'], 'tags' => '', + 'created_at' => '', ]; // force content to be refreshed in case on bad fetch in the v1 installation -- cgit v1.2.3 From 8664069e1aa2fa89e17587308a03f2720c20327a Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 9 Sep 2016 10:12:25 +0200 Subject: Fix DateTime & clear() --- src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php | 7 ++++++- src/Wallabag/ImportBundle/Import/AbstractImport.php | 6 +++++- 2 files changed, 11 insertions(+), 2 deletions(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php index 8a8cf45d..72a3260a 100644 --- a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php @@ -7,6 +7,8 @@ use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; use Wallabag\ImportBundle\Import\AbstractImport; use Wallabag\UserBundle\Repository\UserRepository; +use Wallabag\CoreBundle\Entity\Entry; +use Wallabag\CoreBundle\Entity\Tag; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -53,7 +55,10 @@ class EntryConsumer implements ConsumerInterface try { $this->em->flush(); - $this->em->clear($entry); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); } catch (\Exception $e) { $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 8610062d..39befa7b 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -7,6 +7,7 @@ use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use Wallabag\CoreBundle\Helper\ContentProxy; use Wallabag\CoreBundle\Entity\Entry; +use Wallabag\CoreBundle\Entity\Tag; use Wallabag\UserBundle\Entity\User; use OldSound\RabbitMqBundle\RabbitMq\Producer; @@ -113,7 +114,10 @@ abstract class AbstractImport implements ImportInterface // flush every 20 entries if (($i % 20) === 0) { $this->em->flush(); - $this->em->clear($entry); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); } ++$i; } -- cgit v1.2.3 From 13470c3596d0b1490bbf18b39128a05bbb3c7f3e Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 9 Sep 2016 18:02:29 +0200 Subject: Add test for RabbitMQ Also update Symfony deps --- src/Wallabag/ImportBundle/Import/PocketImport.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 92dcdd40..d76a3a08 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -265,7 +265,7 @@ class PocketImport extends AbstractImport */ protected function setEntryAsRead(array $importedEntry) { - $importedEntry['status'] = 1; + $importedEntry['status'] = '1'; return $importedEntry; } -- cgit v1.2.3 From 7f7531171f6e49110b5842f869e37c766a682473 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 9 Sep 2016 20:45:30 +0200 Subject: Retrieve created date from Pocket --- src/Wallabag/ImportBundle/Import/PocketImport.php | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index d76a3a08..fe39d33f 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -254,6 +254,10 @@ class PocketImport extends AbstractImport ); } + if (!empty($importedEntry['time_added'])) { + $entry->setCreatedAt((new \DateTime())->setTimestamp($importedEntry['time_added'])); + } + $this->em->persist($entry); ++$this->importedEntries; -- cgit v1.2.3 From b3437d58ae224121375c99e9288d8b808524e624 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 9 Sep 2016 21:02:03 +0200 Subject: Enable Redis async import - using javibravo/simpleue - internal config value are now `import_with_redis` & `import_with_rabbit` which are more clear - if both option are enable rabbit will be choosen - services imports related to async are now splitted into 2 files: `redis.yml` & `rabbit.yml` - --- .../ImportBundle/Command/RedisWorkerCommand.php | 41 +++++++++++ .../ImportBundle/Consumer/AMPQ/EntryConsumer.php | 68 ------------------ .../ImportBundle/Consumer/AMPQEntryConsumer.php | 70 ++++++++++++++++++ .../ImportBundle/Consumer/RedisEntryConsumer.php | 84 ++++++++++++++++++++++ .../ImportBundle/Controller/PocketController.php | 6 +- .../Controller/ReadabilityController.php | 6 +- .../Controller/WallabagV1Controller.php | 6 +- .../Controller/WallabagV2Controller.php | 6 +- .../ImportBundle/Import/AbstractImport.php | 8 +-- src/Wallabag/ImportBundle/Redis/Producer.php | 36 ++++++++++ .../ImportBundle/Resources/config/rabbit.yml | 30 ++++++++ .../ImportBundle/Resources/config/redis.yml | 81 +++++++++++++++++++++ .../ImportBundle/Resources/config/services.yml | 33 ++------- 13 files changed, 366 insertions(+), 109 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php delete mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Redis/Producer.php create mode 100644 src/Wallabag/ImportBundle/Resources/config/rabbit.yml create mode 100644 src/Wallabag/ImportBundle/Resources/config/redis.yml (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php new file mode 100644 index 00000000..85c5a903 --- /dev/null +++ b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php @@ -0,0 +1,41 @@ +setName('wallabag:import:redis-worker') + ->setDescription('Launch Redis worker') + ->addArgument('serviceName', InputArgument::REQUIRED, 'Service to use: wallabag_v1, wallabag_v2, pocket or readability') + ; + } + + protected function execute(InputInterface $input, OutputInterface $output) + { + $output->writeln('Worker started at: '.(new \DateTime())->format('d-m-Y G:i:s')); + $output->writeln('Waiting for message ...'); + + $serviceName = $input->getArgument('serviceName'); + + if (!$this->getContainer()->has('wallabag_import.queue.redis.'.$serviceName) || !$this->getContainer()->has('wallabag_import.consumer.redis.'.$serviceName)) { + throw new Exception(sprintf('No queue or consumer found for service name: "%s"', $input->getArgument('serviceName'))); + } + + $worker = new QueueWorker( + $this->getContainer()->get('wallabag_import.queue.redis.'.$serviceName), + $this->getContainer()->get('wallabag_import.consumer.redis.'.$serviceName) + ); + + $worker->start(); + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php deleted file mode 100644 index 72a3260a..00000000 --- a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php +++ /dev/null @@ -1,68 +0,0 @@ -em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - - /** - * {@inheritdoc} - */ - public function execute(AMQPMessage $msg) - { - $storedEntry = json_decode($msg->body, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return; - } - } -} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php new file mode 100644 index 00000000..39bb5375 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php @@ -0,0 +1,70 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + + $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php new file mode 100644 index 00000000..38665b01 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php @@ -0,0 +1,84 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * Handle one message by one message. + * + * @param string $job Content of the message (directly from Redis) + * + * @return bool + */ + public function manage($job) + { + $storedEntry = json_decode($job, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return false; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return false; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return false; + } + + $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + + return true; + } + + /** + * Should tell if the given job will kill the worker. + * We don't want to stop it :). + */ + public function isStopJob($job) + { + return false; + } +} diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php index 57c007c3..3d555717 100644 --- a/src/Wallabag/ImportBundle/Controller/PocketController.php +++ b/src/Wallabag/ImportBundle/Controller/PocketController.php @@ -20,8 +20,10 @@ class PocketController extends Controller $pocket = $this->get('wallabag_import.pocket.import'); $pocket->setUser($this->getUser()); - if ($this->get('craue_config')->get('rabbitmq')) { - $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_pocket_producer')); + if ($this->get('craue_config')->get('import_with_rabbitmq')) { + $pocket->setProducer($this->get('old_sound_rabbit_mq.import_pocket_producer')); + } elseif ($this->get('craue_config')->get('import_with_redis')) { + $pocket->setProducer($this->get('wallabag_import.producer.redis.pocket')); } return $pocket; diff --git a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php index ee875a40..61243042 100644 --- a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php +++ b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php @@ -20,8 +20,10 @@ class ReadabilityController extends Controller $readability = $this->get('wallabag_import.readability.import'); $readability->setUser($this->getUser()); - if ($this->get('craue_config')->get('rabbitmq')) { - $readability->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_readability_producer')); + if ($this->get('craue_config')->get('import_with_rabbitmq')) { + $readability->setProducer($this->get('old_sound_rabbit_mq.import_readability_producer')); + } elseif ($this->get('craue_config')->get('import_with_redis')) { + $readability->setProducer($this->get('wallabag_import.producer.redis.readability')); } if ($form->isValid()) { diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php index f80aec3a..312c7a35 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php @@ -14,8 +14,10 @@ class WallabagV1Controller extends WallabagController { $service = $this->get('wallabag_import.wallabag_v1.import'); - if ($this->get('craue_config')->get('rabbitmq')) { - $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v1_producer')); + if ($this->get('craue_config')->get('import_with_rabbitmq')) { + $service->setProducer($this->get('old_sound_rabbit_mq.import_wallabag_v1_producer')); + } elseif ($this->get('craue_config')->get('import_with_redis')) { + $service->setProducer($this->get('wallabag_import.producer.redis.wallabag_v1')); } return $service; diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php index 063cddd9..45211fe6 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php @@ -14,8 +14,10 @@ class WallabagV2Controller extends WallabagController { $service = $this->get('wallabag_import.wallabag_v2.import'); - if ($this->get('craue_config')->get('rabbitmq')) { - $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v2_producer')); + if ($this->get('craue_config')->get('import_with_rabbitmq')) { + $service->setProducer($this->get('old_sound_rabbit_mq.import_wallabag_v2_producer')); + } elseif ($this->get('craue_config')->get('import_with_redis')) { + $service->setProducer($this->get('wallabag_import.producer.redis.wallabag_v2')); } return $service; diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 39befa7b..4cd8e846 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -9,7 +9,7 @@ use Wallabag\CoreBundle\Helper\ContentProxy; use Wallabag\CoreBundle\Entity\Entry; use Wallabag\CoreBundle\Entity\Tag; use Wallabag\UserBundle\Entity\User; -use OldSound\RabbitMqBundle\RabbitMq\Producer; +use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; abstract class AbstractImport implements ImportInterface { @@ -35,12 +35,12 @@ abstract class AbstractImport implements ImportInterface } /** - * Set RabbitMQ Producer to send each entry to a queue. + * Set RabbitMQ/Redis Producer to send each entry to a queue. * This method should be called when user has enabled RabbitMQ. * - * @param Producer $producer + * @param ProducerInterface $producer */ - public function setRabbitmqProducer(Producer $producer) + public function setProducer(ProducerInterface $producer) { $this->producer = $producer; } diff --git a/src/Wallabag/ImportBundle/Redis/Producer.php b/src/Wallabag/ImportBundle/Redis/Producer.php new file mode 100644 index 00000000..fedc3e57 --- /dev/null +++ b/src/Wallabag/ImportBundle/Redis/Producer.php @@ -0,0 +1,36 @@ +queue = $queue; + } + + /** + * Publish a message in the Redis queue. + * + * @param string $msgBody + * @param string $routingKey NOT USED + * @param array $additionalProperties NOT USED + */ + public function publish($msgBody, $routingKey = '', $additionalProperties = array()) + { + $this->queue->sendJob($msgBody); + } +} diff --git a/src/Wallabag/ImportBundle/Resources/config/rabbit.yml b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml new file mode 100644 index 00000000..f09dda0d --- /dev/null +++ b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml @@ -0,0 +1,30 @@ +# RabbitMQ stuff +services: + wallabag_import.consumer.ampq.pocket: + class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.pocket.import" + - "@logger" + wallabag_import.consumer.ampq.readability: + class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.readability.import" + - "@logger" + wallabag_import.consumer.ampq.wallabag_v1: + class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v1.import" + - "@logger" + wallabag_import.consumer.ampq.wallabag_v2: + class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v2.import" + - "@logger" diff --git a/src/Wallabag/ImportBundle/Resources/config/redis.yml b/src/Wallabag/ImportBundle/Resources/config/redis.yml new file mode 100644 index 00000000..7d3248e5 --- /dev/null +++ b/src/Wallabag/ImportBundle/Resources/config/redis.yml @@ -0,0 +1,81 @@ +# Redis stuff +services: + # readability + wallabag_import.queue.redis.readability: + class: Simpleue\Queue\RedisQueue + arguments: + - "@wallabag_core.redis.client" + - "wallabag.import.readability" + + wallabag_import.producer.redis.readability: + class: Wallabag\ImportBundle\Redis\Producer + arguments: + - "@wallabag_import.queue.redis.readability" + + wallabag_import.consumer.redis.readability: + class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.readability.import" + - "@logger" + + # pocket + wallabag_import.queue.redis.pocket: + class: Simpleue\Queue\RedisQueue + arguments: + - "@wallabag_core.redis.client" + - "wallabag.import.pocket" + + wallabag_import.producer.redis.pocket: + class: Wallabag\ImportBundle\Redis\Producer + arguments: + - "@wallabag_import.queue.redis.pocket" + + wallabag_import.consumer.redis.pocket: + class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.pocket.import" + - "@logger" + + # wallabag v1 + wallabag_import.queue.redis.wallabag_v1: + class: Simpleue\Queue\RedisQueue + arguments: + - "@wallabag_core.redis.client" + - "wallabag.import.wallabag_v1" + + wallabag_import.producer.redis.wallabag_v1: + class: Wallabag\ImportBundle\Redis\Producer + arguments: + - "@wallabag_import.queue.redis.wallabag_v1" + + wallabag_import.consumer.redis.wallabag_v1: + class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v1.import" + - "@logger" + + # wallabag v2 + wallabag_import.queue.redis.wallabag_v2: + class: Simpleue\Queue\RedisQueue + arguments: + - "@wallabag_core.redis.client" + - "wallabag.import.wallabag_v2" + + wallabag_import.producer.redis.wallabag_v2: + class: Wallabag\ImportBundle\Redis\Producer + arguments: + - "@wallabag_import.queue.redis.wallabag_v2" + + wallabag_import.consumer.redis.wallabag_v2: + class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v2.import" + - "@logger" diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index cad44e71..f03404ae 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -1,33 +1,8 @@ -services: - wallabag_import.consumer.pocket: - class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer - arguments: - - "@doctrine.orm.entity_manager" - - "@wallabag_user.user_repository" - - "@wallabag_import.pocket.import" - - "@logger" - wallabag_import.consumer.readability: - class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer - arguments: - - "@doctrine.orm.entity_manager" - - "@wallabag_user.user_repository" - - "@wallabag_import.readability.import" - - "@logger" - wallabag_import.consumer.wallabag_v1: - class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer - arguments: - - "@doctrine.orm.entity_manager" - - "@wallabag_user.user_repository" - - "@wallabag_import.wallabag_v1.import" - - "@logger" - wallabag_import.consumer.wallabag_v2: - class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer - arguments: - - "@doctrine.orm.entity_manager" - - "@wallabag_user.user_repository" - - "@wallabag_import.wallabag_v2.import" - - "@logger" +imports: + - { resource: rabbit.yml } + - { resource: redis.yml } +services: wallabag_import.chain: class: Wallabag\ImportBundle\Import\ImportChain -- cgit v1.2.3 From dc69e25f97c357fdfdff5225f4f65cc55a6770b0 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 11 Sep 2016 20:22:38 +0200 Subject: Display a message when async is enabled --- .../ImportBundle/Resources/views/Import/_workerEnabled.html.twig | 8 ++++++++ src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig | 4 +++- .../ImportBundle/Resources/views/Readability/index.html.twig | 2 ++ .../ImportBundle/Resources/views/WallabagV1/index.html.twig | 2 ++ 4 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 src/Wallabag/ImportBundle/Resources/views/Import/_workerEnabled.html.twig (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Resources/views/Import/_workerEnabled.html.twig b/src/Wallabag/ImportBundle/Resources/views/Import/_workerEnabled.html.twig new file mode 100644 index 00000000..2390a41f --- /dev/null +++ b/src/Wallabag/ImportBundle/Resources/views/Import/_workerEnabled.html.twig @@ -0,0 +1,8 @@ +{% set redis = craue_setting('import_with_redis') %} +{% set rabbit = craue_setting('import_with_rabbitmq') %} + +{% if redis or rabbit %} +
+ {{ 'import.worker.enabled'|trans }} {% if rabbit %}RabbitMQ{% elseif redis %}Redis{% endif %} +
+{% endif %} diff --git a/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig b/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig index 401dbd33..aa5941b7 100644 --- a/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig +++ b/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig @@ -6,8 +6,10 @@
+ {% include 'WallabagImportBundle:Import:_workerEnabled.html.twig' %} + {% if not has_consumer_key %} -
+
{{ 'import.pocket.config_missing.description'|trans }} {% if is_granted('ROLE_SUPER_ADMIN') %} diff --git a/src/Wallabag/ImportBundle/Resources/views/Readability/index.html.twig b/src/Wallabag/ImportBundle/Resources/views/Readability/index.html.twig index f527d309..74653b0f 100644 --- a/src/Wallabag/ImportBundle/Resources/views/Readability/index.html.twig +++ b/src/Wallabag/ImportBundle/Resources/views/Readability/index.html.twig @@ -6,6 +6,8 @@
+ {% include 'WallabagImportBundle:Import:_workerEnabled.html.twig' %} +
{{ import.description|trans }}

{{ 'import.readability.how_to'|trans }}

diff --git a/src/Wallabag/ImportBundle/Resources/views/WallabagV1/index.html.twig b/src/Wallabag/ImportBundle/Resources/views/WallabagV1/index.html.twig index 13e24c8c..0b19bc34 100644 --- a/src/Wallabag/ImportBundle/Resources/views/WallabagV1/index.html.twig +++ b/src/Wallabag/ImportBundle/Resources/views/WallabagV1/index.html.twig @@ -6,6 +6,8 @@
+ {% include 'WallabagImportBundle:Import:_workerEnabled.html.twig' %} +
{{ import.description|trans }}

{{ 'import.wallabag_v1.how_to'|trans }}

-- cgit v1.2.3 From 7d862f83b95d24b4f081d73ca7b0bdf4435ae008 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 11 Sep 2016 20:23:17 +0200 Subject: Re-facto EntryConsumer Using an abstract method allow to share code but also can be used it we add a new broker in the future --- .../ImportBundle/Consumer/AMPQEntryConsumer.php | 57 +---------------- .../ImportBundle/Consumer/AbstractConsumer.php | 74 ++++++++++++++++++++++ .../ImportBundle/Consumer/RedisEntryConsumer.php | 59 +---------------- 3 files changed, 78 insertions(+), 112 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php index 39bb5375..d95a011d 100644 --- a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php @@ -2,69 +2,16 @@ namespace Wallabag\ImportBundle\Consumer; -use Doctrine\ORM\EntityManager; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; -use Wallabag\ImportBundle\Import\AbstractImport; -use Wallabag\UserBundle\Repository\UserRepository; -use Wallabag\CoreBundle\Entity\Entry; -use Wallabag\CoreBundle\Entity\Tag; -use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; -class AMPQEntryConsumer implements ConsumerInterface +class AMPQEntryConsumer extends AbstractConsumer implements ConsumerInterface { - private $em; - private $userRepository; - private $import; - private $logger; - - public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) - { - $this->em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - /** * {@inheritdoc} */ public function execute(AMQPMessage $msg) { - $storedEntry = json_decode($msg->body, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return; - } - - $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); + return $this->handleMessage($msg->body); } } diff --git a/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php new file mode 100644 index 00000000..2b85ad76 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php @@ -0,0 +1,74 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * Handle a message and save it. + * + * @param string $body Message from the queue (in json) + * + * @return bool + */ + protected function handleMessage($body) + { + $storedEntry = json_decode($body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return false; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return false; + } + + try { + $this->em->flush(); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return false; + } + + $this->logger->info('Content with url imported! ('.$entry->getUrl().')'); + + return true; + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php index 38665b01..450b71ff 100644 --- a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php +++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php @@ -3,29 +3,9 @@ namespace Wallabag\ImportBundle\Consumer; use Simpleue\Job\Job; -use Doctrine\ORM\EntityManager; -use Wallabag\ImportBundle\Import\AbstractImport; -use Wallabag\UserBundle\Repository\UserRepository; -use Wallabag\CoreBundle\Entity\Entry; -use Wallabag\CoreBundle\Entity\Tag; -use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; -class RedisEntryConsumer implements Job +class RedisEntryConsumer extends AbstractConsumer implements Job { - private $em; - private $userRepository; - private $import; - private $logger; - - public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null) - { - $this->em = $em; - $this->userRepository = $userRepository; - $this->import = $import; - $this->logger = $logger ?: new NullLogger(); - } - /** * Handle one message by one message. * @@ -35,42 +15,7 @@ class RedisEntryConsumer implements Job */ public function manage($job) { - $storedEntry = json_decode($job, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return false; - } - - $this->import->setUser($user); - - $entry = $this->import->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return false; - } - - try { - $this->em->flush(); - - // clear only affected entities - $this->em->clear(Entry::class); - $this->em->clear(Tag::class); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return false; - } - - $this->logger->info('Content with url ('.$entry->getUrl().') imported !'); - - return true; + return $this->handleMessage($job); } /** -- cgit v1.2.3 From 015c7a8359c950f9621b38b11c3973860a981da8 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 11 Sep 2016 20:24:04 +0200 Subject: Add more tests And ability to define how many messages can be hanle by the redis worker before stopping (usefull for tests) --- src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php | 5 ++++- src/Wallabag/ImportBundle/Controller/ReadabilityController.php | 2 +- src/Wallabag/ImportBundle/Controller/WallabagController.php | 2 +- src/Wallabag/ImportBundle/Form/Type/UploadImportType.php | 1 + 4 files changed, 7 insertions(+), 3 deletions(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php index 85c5a903..5f90e00f 100644 --- a/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php +++ b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php @@ -5,6 +5,7 @@ namespace Wallabag\ImportBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Config\Definition\Exception\Exception; use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Simpleue\Worker\QueueWorker; @@ -17,6 +18,7 @@ class RedisWorkerCommand extends ContainerAwareCommand ->setName('wallabag:import:redis-worker') ->setDescription('Launch Redis worker') ->addArgument('serviceName', InputArgument::REQUIRED, 'Service to use: wallabag_v1, wallabag_v2, pocket or readability') + ->addOption('maxIterations', '', InputOption::VALUE_OPTIONAL, 'Number of iterations before stoping', false) ; } @@ -33,7 +35,8 @@ class RedisWorkerCommand extends ContainerAwareCommand $worker = new QueueWorker( $this->getContainer()->get('wallabag_import.queue.redis.'.$serviceName), - $this->getContainer()->get('wallabag_import.consumer.redis.'.$serviceName) + $this->getContainer()->get('wallabag_import.consumer.redis.'.$serviceName), + $input->getOption('maxIterations') ); $worker->start(); diff --git a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php index 61243042..8775e8a3 100644 --- a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php +++ b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php @@ -31,7 +31,7 @@ class ReadabilityController extends Controller $markAsRead = $form->get('mark_as_read')->getData(); $name = 'readability_'.$this->getUser()->getId().'.json'; - if (in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { + if (null !== $file && in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { $res = $readability ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name) ->setMarkAsRead($markAsRead) diff --git a/src/Wallabag/ImportBundle/Controller/WallabagController.php b/src/Wallabag/ImportBundle/Controller/WallabagController.php index 1e6114c5..3c2dd6d1 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagController.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagController.php @@ -45,7 +45,7 @@ abstract class WallabagController extends Controller $markAsRead = $form->get('mark_as_read')->getData(); $name = $this->getUser()->getId().'.json'; - if (in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { + if (null !== $file && in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { $res = $wallabag ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name) ->setMarkAsRead($markAsRead) diff --git a/src/Wallabag/ImportBundle/Form/Type/UploadImportType.php b/src/Wallabag/ImportBundle/Form/Type/UploadImportType.php index 92a167d9..f50424c1 100644 --- a/src/Wallabag/ImportBundle/Form/Type/UploadImportType.php +++ b/src/Wallabag/ImportBundle/Form/Type/UploadImportType.php @@ -15,6 +15,7 @@ class UploadImportType extends AbstractType $builder ->add('file', FileType::class, [ 'label' => 'import.form.file_label', + 'required' => true, ]) ->add('mark_as_read', CheckboxType::class, [ 'label' => 'import.form.mark_as_read_label', -- cgit v1.2.3 From ebf5e5087d2f79ece42a660ee7bddaa3ff3ebe1e Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 11 Sep 2016 21:40:08 +0200 Subject: Add tests on ImportCommand --- src/Wallabag/ImportBundle/Command/ImportCommand.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Command/ImportCommand.php b/src/Wallabag/ImportBundle/Command/ImportCommand.php index a4aa8531..20ecc6e1 100644 --- a/src/Wallabag/ImportBundle/Command/ImportCommand.php +++ b/src/Wallabag/ImportBundle/Command/ImportCommand.php @@ -26,6 +26,10 @@ class ImportCommand extends ContainerAwareCommand { $output->writeln('Start : '.(new \DateTime())->format('d-m-Y G:i:s').' ---'); + if (!file_exists($input->getArgument('filepath'))) { + throw new Exception(sprintf('File "%s" not found', $input->getArgument('filepath'))); + } + $em = $this->getContainer()->get('doctrine')->getManager(); // Turning off doctrine default logs queries for saving memory $em->getConnection()->getConfiguration()->setSQLLogger(null); @@ -43,9 +47,9 @@ class ImportCommand extends ContainerAwareCommand } $wallabag->setMarkAsRead($input->getOption('markAsRead')); + $wallabag->setUser($user); $res = $wallabag - ->setUser($user) ->setFilepath($input->getArgument('filepath')) ->import(); -- cgit v1.2.3 From e01a3c98d6908e95121b5ade0161f40af1b05ca6 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Tue, 13 Sep 2016 20:31:32 +0200 Subject: Display how many messages are queue - update the docker-composer to add Redis - add migrations --- .../ImportBundle/Controller/ImportController.php | 54 ++++++++++++++++++++++ .../Resources/views/Import/check_queue.html.twig | 11 +++++ 2 files changed, 65 insertions(+) create mode 100644 src/Wallabag/ImportBundle/Resources/views/Import/check_queue.html.twig (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Controller/ImportController.php b/src/Wallabag/ImportBundle/Controller/ImportController.php index c1486e38..d670746c 100644 --- a/src/Wallabag/ImportBundle/Controller/ImportController.php +++ b/src/Wallabag/ImportBundle/Controller/ImportController.php @@ -16,4 +16,58 @@ class ImportController extends Controller 'imports' => $this->get('wallabag_import.chain')->getAll(), ]); } + + /** + * Display how many messages are queue (both in Redis and RabbitMQ). + */ + public function checkQueueAction() + { + $nbRedisMessages = null; + $nbRabbitMessages = null; + + if ($this->get('craue_config')->get('import_with_rabbitmq')) { + $nbRabbitMessages = $this->getTotalMessageInRabbitQueue('pocket') + + $this->getTotalMessageInRabbitQueue('readability') + + $this->getTotalMessageInRabbitQueue('wallabag_v1') + + $this->getTotalMessageInRabbitQueue('wallabag_v2') + ; + } elseif ($this->get('craue_config')->get('import_with_redis')) { + $redis = $this->get('wallabag_core.redis.client'); + + $nbRedisMessages = $redis->llen('wallabag.import.pocket') + + $redis->llen('wallabag.import.readability') + + $redis->llen('wallabag.import.wallabag_v1') + + $redis->llen('wallabag.import.wallabag_v2') + ; + } + + return $this->render('WallabagImportBundle:Import:check_queue.html.twig', [ + 'nbRedisMessages' => $nbRedisMessages, + 'nbRabbitMessages' => $nbRabbitMessages, + ]); + } + + /** + * Count message in RabbitMQ queue. + * It get one message without acking it (so it'll stay in the queue) + * which will include the total of *other* messages in the queue. + * Adding one to that messages will result in the full total message. + * + * @param string $importService The import service related: pocket, readability, wallabag_v1 or wallabag_v2 + * + * @return int + */ + private function getTotalMessageInRabbitQueue($importService) + { + $message = $this + ->get('old_sound_rabbit_mq.import_'.$importService.'_consumer') + ->getChannel() + ->basic_get('wallabag.import.'.$importService); + + if (null === $message) { + return 0; + } + + return $message->delivery_info['message_count'] + 1; + } } diff --git a/src/Wallabag/ImportBundle/Resources/views/Import/check_queue.html.twig b/src/Wallabag/ImportBundle/Resources/views/Import/check_queue.html.twig new file mode 100644 index 00000000..7168ea35 --- /dev/null +++ b/src/Wallabag/ImportBundle/Resources/views/Import/check_queue.html.twig @@ -0,0 +1,11 @@ +{% if nbRedisMessages > 0 %} + +{% endif %} + +{% if nbRabbitMessages > 0 %} + +{% endif %} -- cgit v1.2.3 From c80cc01afa315dcfa38b2a01c5b05d4516659c24 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Tue, 13 Sep 2016 21:09:05 +0200 Subject: Change flash message for queued articles --- src/Wallabag/ImportBundle/Controller/PocketController.php | 6 ++++++ .../ImportBundle/Controller/ReadabilityController.php | 6 ++++++ .../ImportBundle/Controller/WallabagController.php | 6 ++++++ src/Wallabag/ImportBundle/Import/AbstractImport.php | 15 ++++++++++++++- src/Wallabag/ImportBundle/Import/PocketImport.php | 11 ----------- src/Wallabag/ImportBundle/Import/ReadabilityImport.php | 11 ----------- src/Wallabag/ImportBundle/Import/WallabagImport.php | 11 ----------- 7 files changed, 32 insertions(+), 34 deletions(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php index 3d555717..1f92c182 100644 --- a/src/Wallabag/ImportBundle/Controller/PocketController.php +++ b/src/Wallabag/ImportBundle/Controller/PocketController.php @@ -102,6 +102,12 @@ class PocketController extends Controller '%imported%' => $summary['imported'], '%skipped%' => $summary['skipped'], ]); + + if (0 < $summary['queued']) { + $message = $this->get('translator')->trans('flashes.import.notice.summary_with_queue', [ + '%queued%' => $summary['queued'], + ]); + } } $this->get('session')->getFlashBag()->add( diff --git a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php index 8775e8a3..d00e22c2 100644 --- a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php +++ b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php @@ -46,6 +46,12 @@ class ReadabilityController extends Controller '%skipped%' => $summary['skipped'], ]); + if (0 < $summary['queued']) { + $message = $this->get('translator')->trans('flashes.import.notice.summary_with_queue', [ + '%queued%' => $summary['queued'], + ]); + } + unlink($this->getParameter('wallabag_import.resource_dir').'/'.$name); } diff --git a/src/Wallabag/ImportBundle/Controller/WallabagController.php b/src/Wallabag/ImportBundle/Controller/WallabagController.php index 3c2dd6d1..9c0cde80 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagController.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagController.php @@ -60,6 +60,12 @@ abstract class WallabagController extends Controller '%skipped%' => $summary['skipped'], ]); + if (0 < $summary['queued']) { + $message = $this->get('translator')->trans('flashes.import.notice.summary_with_queue', [ + '%queued%' => $summary['queued'], + ]); + } + unlink($this->getParameter('wallabag_import.resource_dir').'/'.$name); } diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 4cd8e846..2af0e69b 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -21,6 +21,7 @@ abstract class AbstractImport implements ImportInterface protected $markAsRead; protected $skippedEntries = 0; protected $importedEntries = 0; + protected $queuedEntries = 0; public function __construct(EntityManager $em, ContentProxy $contentProxy) { @@ -145,12 +146,24 @@ abstract class AbstractImport implements ImportInterface $importedEntry = $this->setEntryAsRead($importedEntry); } - ++$this->importedEntries; + ++$this->queuedEntries; $this->producer->publish(json_encode($importedEntry)); } } + /** + * {@inheritdoc} + */ + public function getSummary() + { + return [ + 'skipped' => $this->skippedEntries, + 'imported' => $this->importedEntries, + 'queued' => $this->queuedEntries, + ]; + } + /** * Parse one entry. * diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index fe39d33f..cc6faf1f 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -172,17 +172,6 @@ class PocketImport extends AbstractImport return true; } - /** - * {@inheritdoc} - */ - public function getSummary() - { - return [ - 'skipped' => $this->skippedEntries, - 'imported' => $this->importedEntries, - ]; - } - /** * Set the Guzzle client. * diff --git a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php index 8f080d38..b852f8f0 100644 --- a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php +++ b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php @@ -44,17 +44,6 @@ class ReadabilityImport extends AbstractImport return $this; } - /** - * {@inheritdoc} - */ - public function getSummary() - { - return [ - 'skipped' => $this->skippedEntries, - 'imported' => $this->importedEntries, - ]; - } - /** * {@inheritdoc} */ diff --git a/src/Wallabag/ImportBundle/Import/WallabagImport.php b/src/Wallabag/ImportBundle/Import/WallabagImport.php index 8e50b135..969a6a04 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagImport.php +++ b/src/Wallabag/ImportBundle/Import/WallabagImport.php @@ -72,17 +72,6 @@ abstract class WallabagImport extends AbstractImport return true; } - /** - * {@inheritdoc} - */ - public function getSummary() - { - return [ - 'skipped' => $this->skippedEntries, - 'imported' => $this->importedEntries, - ]; - } - /** * Set file path to the json file. * -- cgit v1.2.3 From ac87e0db2ac5db90f1b0639a2d31c7098b4eaa20 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Wed, 14 Sep 2016 10:17:22 +0200 Subject: AMPQ -> AMQP --- .../ImportBundle/Consumer/AMPQEntryConsumer.php | 17 ----------------- .../ImportBundle/Consumer/AMQPEntryConsumer.php | 17 +++++++++++++++++ src/Wallabag/ImportBundle/Resources/config/rabbit.yml | 16 ++++++++-------- 3 files changed, 25 insertions(+), 25 deletions(-) delete mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php deleted file mode 100644 index d95a011d..00000000 --- a/src/Wallabag/ImportBundle/Consumer/AMPQEntryConsumer.php +++ /dev/null @@ -1,17 +0,0 @@ -handleMessage($msg->body); - } -} diff --git a/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php new file mode 100644 index 00000000..5aaa8c03 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php @@ -0,0 +1,17 @@ +handleMessage($msg->body); + } +} diff --git a/src/Wallabag/ImportBundle/Resources/config/rabbit.yml b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml index f09dda0d..aa049749 100644 --- a/src/Wallabag/ImportBundle/Resources/config/rabbit.yml +++ b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml @@ -1,28 +1,28 @@ # RabbitMQ stuff services: - wallabag_import.consumer.ampq.pocket: - class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + wallabag_import.consumer.amqp.pocket: + class: Wallabag\ImportBundle\Consumer\AMQPEntryConsumer arguments: - "@doctrine.orm.entity_manager" - "@wallabag_user.user_repository" - "@wallabag_import.pocket.import" - "@logger" - wallabag_import.consumer.ampq.readability: - class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + wallabag_import.consumer.amqp.readability: + class: Wallabag\ImportBundle\Consumer\AMQPEntryConsumer arguments: - "@doctrine.orm.entity_manager" - "@wallabag_user.user_repository" - "@wallabag_import.readability.import" - "@logger" - wallabag_import.consumer.ampq.wallabag_v1: - class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + wallabag_import.consumer.amqp.wallabag_v1: + class: Wallabag\ImportBundle\Consumer\AMQPEntryConsumer arguments: - "@doctrine.orm.entity_manager" - "@wallabag_user.user_repository" - "@wallabag_import.wallabag_v1.import" - "@logger" - wallabag_import.consumer.ampq.wallabag_v2: - class: Wallabag\ImportBundle\Consumer\AMPQEntryConsumer + wallabag_import.consumer.amqp.wallabag_v2: + class: Wallabag\ImportBundle\Consumer\AMQPEntryConsumer arguments: - "@doctrine.orm.entity_manager" - "@wallabag_user.user_repository" -- cgit v1.2.3 From 4fc998245c56ad95c1e753ab52b0c702d4a8a59d Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 16 Sep 2016 20:08:07 +0200 Subject: Only display message in queue for admin Instead of for EVERYONE --- src/Wallabag/ImportBundle/Controller/ImportController.php | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Controller/ImportController.php b/src/Wallabag/ImportBundle/Controller/ImportController.php index d670746c..ea4f7d7b 100644 --- a/src/Wallabag/ImportBundle/Controller/ImportController.php +++ b/src/Wallabag/ImportBundle/Controller/ImportController.php @@ -19,12 +19,20 @@ class ImportController extends Controller /** * Display how many messages are queue (both in Redis and RabbitMQ). + * Only for admins. */ public function checkQueueAction() { $nbRedisMessages = null; $nbRabbitMessages = null; + if (!$this->get('security.authorization_checker')->isGranted('ROLE_SUPER_ADMIN')) { + return $this->render('WallabagImportBundle:Import:check_queue.html.twig', [ + 'nbRedisMessages' => $nbRedisMessages, + 'nbRabbitMessages' => $nbRabbitMessages, + ]); + } + if ($this->get('craue_config')->get('import_with_rabbitmq')) { $nbRabbitMessages = $this->getTotalMessageInRabbitQueue('pocket') + $this->getTotalMessageInRabbitQueue('readability') -- cgit v1.2.3 From ebe0787e093f4f2934430033015d6ebad1c64dca Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 16 Sep 2016 22:22:25 +0200 Subject: Moved Pocket token to user config --- src/Wallabag/ImportBundle/Controller/PocketController.php | 2 +- src/Wallabag/ImportBundle/Import/PocketImport.php | 10 ++++------ .../ImportBundle/Resources/views/Pocket/index.html.twig | 8 ++------ 3 files changed, 7 insertions(+), 13 deletions(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php index 1f92c182..56be5cbf 100644 --- a/src/Wallabag/ImportBundle/Controller/PocketController.php +++ b/src/Wallabag/ImportBundle/Controller/PocketController.php @@ -44,7 +44,7 @@ class PocketController extends Controller return $this->render('WallabagImportBundle:Pocket:index.html.twig', [ 'import' => $this->getPocketImportService(), - 'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true, + 'has_consumer_key' => '' === trim($this->getUser()->getConfig()->getPocketConsumerKey()) ? false : true, 'form' => $form->createView(), ]); } diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index cc6faf1f..40603c90 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -13,16 +13,14 @@ use Craue\ConfigBundle\Util\Config; class PocketImport extends AbstractImport { private $client; - private $consumerKey; private $accessToken; const NB_ELEMENTS = 5000; - public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) + public function __construct(EntityManager $em, ContentProxy $contentProxy) { $this->em = $em; $this->contentProxy = $contentProxy; - $this->consumerKey = $craueConfig->get('pocket_consumer_key'); $this->logger = new NullLogger(); } @@ -72,7 +70,7 @@ class PocketImport extends AbstractImport $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/oauth/request', [ 'body' => json_encode([ - 'consumer_key' => $this->consumerKey, + 'consumer_key' => $this->user->getConfig()->getPocketConsumerKey(), 'redirect_uri' => $redirectUri, ]), ] @@ -102,7 +100,7 @@ class PocketImport extends AbstractImport $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/oauth/authorize', [ 'body' => json_encode([ - 'consumer_key' => $this->consumerKey, + 'consumer_key' => $this->user->getConfig()->getPocketConsumerKey(), 'code' => $code, ]), ] @@ -131,7 +129,7 @@ class PocketImport extends AbstractImport $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/get', [ 'body' => json_encode([ - 'consumer_key' => $this->consumerKey, + 'consumer_key' => $this->user->getConfig()->getPocketConsumerKey(), 'access_token' => $this->accessToken, 'detailType' => 'complete', 'state' => 'all', diff --git a/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig b/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig index aa5941b7..6195fa07 100644 --- a/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig +++ b/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig @@ -12,11 +12,7 @@
{{ 'import.pocket.config_missing.description'|trans }} - {% if is_granted('ROLE_SUPER_ADMIN') %} - {{ 'import.pocket.config_missing.admin_message'|trans({'%keyurls%': '', '%keyurle%':''})|raw }} - {% else %} - {{ 'import.pocket.config_missing.user_message'|trans }} - {% endif %} + {{ 'import.pocket.config_missing.admin_message'|trans({'%keyurls%': '', '%keyurle%':''})|raw }}
{% endif %} @@ -31,7 +27,7 @@ {{ form_label(form.mark_as_read) }}
- -- cgit v1.2.3 From fbb319f064e6336a3b44bda12cdc51c93c51f379 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Fri, 16 Sep 2016 22:58:33 +0200 Subject: Missing some migrations and CS --- src/Wallabag/ImportBundle/Import/PocketImport.php | 1 - 1 file changed, 1 deletion(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 40603c90..1bf22d68 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -8,7 +8,6 @@ use GuzzleHttp\Client; use GuzzleHttp\Exception\RequestException; use Wallabag\CoreBundle\Entity\Entry; use Wallabag\CoreBundle\Helper\ContentProxy; -use Craue\ConfigBundle\Util\Config; class PocketImport extends AbstractImport { -- cgit v1.2.3 From 59b97fae996d8307b9d957d210d46200f6d206bf Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sat, 17 Sep 2016 07:40:56 +0200 Subject: Avoid losing entry when fetching fail MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of just say “Failed to save entry” we’ll save the entry at all cost and try to fetch content. If fetching content failed, the entry will still be saved at least, but without content. --- src/Wallabag/ImportBundle/Import/AbstractImport.php | 6 +++--- src/Wallabag/ImportBundle/Import/PocketImport.php | 19 +++++-------------- .../ImportBundle/Import/ReadabilityImport.php | 16 +++++----------- src/Wallabag/ImportBundle/Import/WallabagImport.php | 16 +++++----------- 4 files changed, 18 insertions(+), 39 deletions(-) (limited to 'src/Wallabag/ImportBundle') diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 2af0e69b..a1a14576 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -79,20 +79,20 @@ abstract class AbstractImport implements ImportInterface /** * Fetch content from the ContentProxy (using graby). - * If it fails return false instead of the updated entry. + * If it fails return the given entry to be saved in all case (to avoid user to loose the content). * * @param Entry $entry Entry to update * @param string $url Url to grab content for * @param array $content An array with AT LEAST keys title, html, url, language & content_type to skip the fetchContent from the url * - * @return Entry|false + * @return Entry */ protected function fetchContent(Entry $entry, $url, array $content = []) { try { return $this->contentProxy->updateEntry($entry, $url, $content); } catch (\Exception $e) { - return false; + return $entry; } } diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 1bf22d68..e00eb44b 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -199,24 +199,16 @@ class PocketImport extends AbstractImport } $entry = new Entry($this->user); - $entry = $this->fetchContent($entry, $url); - - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; + $entry->setUrl($url); - return; - } + // update entry with content (in case fetching failed, the given entry will be return) + $entry = $this->fetchContent($entry, $url); // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted - if ($importedEntry['status'] == 1 || $this->markAsRead) { - $entry->setArchived(true); - } + $entry->setArchived($importedEntry['status'] == 1 || $this->markAsRead); // 0 or 1 - 1 If the item is starred - if ($importedEntry['favorite'] == 1) { - $entry->setStarred(true); - } + $entry->setStarred($importedEntry['favorite'] == 1); $title = 'Untitled'; if (isset($importedEntry['resolved_title']) && $importedEntry['resolved_title'] != '') { @@ -226,7 +218,6 @@ class PocketImport extends AbstractImport } $entry->setTitle($title); - $entry->setUrl($url); // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image if (isset($importedEntry['has_image']) && $importedEntry['has_image'] > 0 && isset($importedEntry['images'][1])) { diff --git a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php index b852f8f0..fa2b7053 100644 --- a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php +++ b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php @@ -103,18 +103,12 @@ class ReadabilityImport extends AbstractImport 'created_at' => $importedEntry['date_added'], ]; - $entry = $this->fetchContent( - new Entry($this->user), - $data['url'], - $data - ); - - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; + $entry = new Entry($this->user); + $entry->setUrl($data['url']); + $entry->setTitle($data['title']); - return; - } + // update entry with content (in case fetching failed, the given entry will be return) + $entry = $this->fetchContent($entry, $data['url'], $data); $entry->setArchived($data['is_archived']); $entry->setStarred($data['is_starred']); diff --git a/src/Wallabag/ImportBundle/Import/WallabagImport.php b/src/Wallabag/ImportBundle/Import/WallabagImport.php index 969a6a04..043bb0a2 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagImport.php +++ b/src/Wallabag/ImportBundle/Import/WallabagImport.php @@ -101,18 +101,12 @@ abstract class WallabagImport extends AbstractImport $data = $this->prepareEntry($importedEntry); - $entry = $this->fetchContent( - new Entry($this->user), - $importedEntry['url'], - $data - ); - - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; + $entry = new Entry($this->user); + $entry->setUrl($data['url']); + $entry->setTitle($data['title']); - return; - } + // update entry with content (in case fetching failed, the given entry will be return) + $entry = $this->fetchContent($entry, $data['url'], $data); if (array_key_exists('tags', $data)) { $this->contentProxy->assignTagsToEntry( -- cgit v1.2.3