From ef75e1220ebb76a8df019d946460ad612759f0bb Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sat, 3 Sep 2016 17:36:57 +0200 Subject: [PATCH] Send every imported item to the queue Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format. Then, the worker retrieve that information, find / create the entry and save it. --- app/config/config.yml | 22 +-- app/config/parameters.yml.dist | 4 - .../Component/AMPQ/EntryConsumer.php | 39 ---- .../Consumer/AMPQ/PocketConsumer.php | 63 +++++++ .../Controller/PocketController.php | 25 ++- .../ImportBundle/Import/PocketImport.php | 177 +++++++++++------- .../Resources/config/services.yml | 11 +- .../UserBundle/Resources/config/services.yml | 6 + .../ImportBundle/Import/PocketImportTest.php | 28 +-- 9 files changed, 221 insertions(+), 154 deletions(-) delete mode 100644 src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php diff --git a/app/config/config.yml b/app/config/config.yml index ef5ae0aa..fa829637 100644 --- a/app/config/config.yml +++ b/app/config/config.yml @@ -219,24 +219,24 @@ lexik_maintenance: old_sound_rabbit_mq: connections: default: - host: %rabbitmq_host% - port: %rabbitmq_port% - user: %rabbitmq_user% - password: %rabbitmq_password% + host: "%rabbitmq_host%" + port: "%rabbitmq_port%" + user: "%rabbitmq_user%" + password: "%rabbitmq_password%" vhost: / - lazy: false + lazy: true producers: - wallabag: + wallabag_pocket: connection: default exchange_options: - name: 'wallabag_exchange' + name: 'wallabag.import.pocket' type: topic consumers: - entries: + wallabag_pocket: connection: default exchange_options: - name: 'wallabag_exchange' + name: 'wallabag.import.pocket' type: topic queue_options: - name: 'wallabag_queue' - callback: wallabag_import.consumer.entry + name: 'wallabag.import.pocket' + callback: wallabag_import.consumer.pocket diff --git a/app/config/parameters.yml.dist b/app/config/parameters.yml.dist index e925b412..a59dc02c 100644 --- a/app/config/parameters.yml.dist +++ b/app/config/parameters.yml.dist @@ -41,11 +41,7 @@ parameters: rss_limit: 50 - # pocket import - pocket_consumer_key: xxxxxxxx - # RabbitMQ processing - rabbitmq: false rabbitmq_host: localhost rabbitmq_port: 5672 rabbitmq_user: guest diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php deleted file mode 100644 index 7775f01c..00000000 --- a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php +++ /dev/null @@ -1,39 +0,0 @@ -em = $em; - $this->entryRepository = $entryRepository; - $this->contentProxy = $contentProxy; - } - - /** - * {@inheritdoc} - */ - public function execute(AMQPMessage $msg) - { - $storedEntry = unserialize($msg->body); - $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']); - if ($entry) { - $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl()); - if ($entry) { - $this->em->persist($entry); - $this->em->flush(); - } - } - } -} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php new file mode 100644 index 00000000..239e7446 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php @@ -0,0 +1,63 @@ +em = $em; + $this->userRepository = $userRepository; + $this->pocketImport = $pocketImport; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->pocketImport->setUser($user); + + $entry = $this->pocketImport->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + $this->em->clear($entry); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + } +} diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php index 36ee25bf..a2dcd8a7 100644 --- a/src/Wallabag/ImportBundle/Controller/PocketController.php +++ b/src/Wallabag/ImportBundle/Controller/PocketController.php @@ -10,12 +10,29 @@ use Symfony\Component\Form\Extension\Core\Type\CheckboxType; class PocketController extends Controller { + /** + * Return Pocket Import Service with or without RabbitMQ enabled. + * + * @return \Wallabag\ImportBundle\Import\PocketImport + */ + private function getPocketImportService() + { + $pocket = $this->get('wallabag_import.pocket.import'); + $pocket->setUser($this->getUser()); + + if ($this->get('craue_config')->get('rabbitmq')) { + $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.wallabag_pocket_producer')); + } + + return $pocket; + } + /** * @Route("/pocket", name="import_pocket") */ public function indexAction() { - $pocket = $this->get('wallabag_import.pocket.import'); + $pocket = $this->getPocketImportService(); $form = $this->createFormBuilder($pocket) ->add('mark_as_read', CheckboxType::class, [ 'label' => 'import.form.mark_as_read_label', @@ -24,7 +41,7 @@ class PocketController extends Controller ->getForm(); return $this->render('WallabagImportBundle:Pocket:index.html.twig', [ - 'import' => $this->get('wallabag_import.pocket.import'), + 'import' => $this->getPocketImportService(), 'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true, 'form' => $form->createView(), ]); @@ -35,7 +52,7 @@ class PocketController extends Controller */ public function authAction(Request $request) { - $requestToken = $this->get('wallabag_import.pocket.import') + $requestToken = $this->getPocketImportService() ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL)); if (false === $requestToken) { @@ -62,7 +79,7 @@ class PocketController extends Controller public function callbackAction() { $message = 'flashes.import.notice.failed'; - $pocket = $this->get('wallabag_import.pocket.import'); + $pocket = $this->getPocketImportService(); $markAsRead = $this->get('session')->get('mark_as_read'); $this->get('session')->remove('mark_as_read'); diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 7d1c0c61..27df4917 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -3,12 +3,11 @@ namespace Wallabag\ImportBundle\Import; use OldSound\RabbitMqBundle\RabbitMq\Producer; -use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use GuzzleHttp\Client; use GuzzleHttp\Exception\RequestException; -use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; +use Symfony\Component\Security\Core\User\UserInterface; use Wallabag\CoreBundle\Entity\Entry; use Wallabag\CoreBundle\Helper\ContentProxy; use Craue\ConfigBundle\Util\Config; @@ -21,21 +20,39 @@ class PocketImport extends AbstractImport private $skippedEntries = 0; private $importedEntries = 0; private $markAsRead; - protected $accessToken; private $producer; - private $rabbitMQ; + protected $accessToken; - public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, Producer $producer) + public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) { - $this->user = $tokenStorage->getToken()->getUser(); $this->em = $em; $this->contentProxy = $contentProxy; $this->consumerKey = $craueConfig->get('pocket_consumer_key'); $this->logger = new NullLogger(); - $this->rabbitMQ = $craueConfig->get('rabbitmq'); + } + + /** + * Set RabbitMQ Producer to send each entry to a queue. + * This method should be called when user has enabled RabbitMQ. + * + * @param Producer $producer + */ + public function setRabbitmqProducer(Producer $producer) + { $this->producer = $producer; } + /** + * Set current user. + * Could the current *connected* user or one retrieve by the consumer. + * + * @param UserInterface $user + */ + public function setUser(UserInterface $user) + { + $this->user = $user; + } + /** * {@inheritdoc} */ @@ -168,6 +185,12 @@ class PocketImport extends AbstractImport $entries = $response->json(); + if ($this->producer) { + $this->parseEntriesForProducer($entries['list']); + + return true; + } + $this->parseEntries($entries['list']); return true; @@ -197,88 +220,112 @@ class PocketImport extends AbstractImport /** * @see https://getpocket.com/developer/docs/v3/retrieve * - * @param $entries + * @param array $entries */ - private function parseEntries($entries) + private function parseEntries(array $entries) { $i = 1; - foreach ($entries as &$pocketEntry) { - $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; - - $existingEntry = $this->em - ->getRepository('WallabagCoreBundle:Entry') - ->findByUrlAndUserId($url, $this->user->getId()); + foreach ($entries as $pocketEntry) { + $entry = $this->parseEntry($pocketEntry); - if (false !== $existingEntry) { - ++$this->skippedEntries; + if (null === $entry) { continue; } - $entry = new Entry($this->user); + // flush every 20 entries + if (($i % 20) === 0) { + $this->em->flush(); + $this->em->clear($entry); + } - if (!$this->rabbitMQ) { - $entry = $this->fetchContent($entry, $url); + ++$i; + } - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; - continue; - } - } + $this->em->flush(); + } - // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted - if ($pocketEntry['status'] == 1 || $this->markAsRead) { - $entry->setArchived(true); - } + public function parseEntry(array $pocketEntry) + { + $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; - // 0 or 1 - 1 If the item is starred - if ($pocketEntry['favorite'] == 1) { - $entry->setStarred(true); - } + $existingEntry = $this->em + ->getRepository('WallabagCoreBundle:Entry') + ->findByUrlAndUserId($url, $this->user->getId()); - $title = 'Untitled'; - if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { - $title = $pocketEntry['resolved_title']; - } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') { - $title = $pocketEntry['given_title']; - } + if (false !== $existingEntry) { + ++$this->skippedEntries; - $entry->setTitle($title); - $entry->setUrl($url); + return; + } - // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image - if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { - $entry->setPreviewPicture($pocketEntry['images'][1]['src']); - } + $entry = new Entry($this->user); + $entry = $this->fetchContent($entry, $url); - if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { - $this->contentProxy->assignTagsToEntry( - $entry, - array_keys($pocketEntry['tags']) - ); - } + // jump to next entry in case of problem while getting content + if (false === $entry) { + ++$this->skippedEntries; - $pocketEntry['url'] = $url; - $pocketEntry['userId'] = $this->user->getId(); + return; + } - $this->em->persist($entry); - ++$this->importedEntries; + // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted + if ($pocketEntry['status'] == 1 || $this->markAsRead) { + $entry->setArchived(true); + } - // flush every 20 entries - if (($i % 20) === 0) { - $this->em->flush(); - } + // 0 or 1 - 1 If the item is starred + if ($pocketEntry['favorite'] == 1) { + $entry->setStarred(true); + } - ++$i; + $title = 'Untitled'; + if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { + $title = $pocketEntry['resolved_title']; + } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') { + $title = $pocketEntry['given_title']; } - $this->em->flush(); + $entry->setTitle($title); + $entry->setUrl($url); + + // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image + if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { + $entry->setPreviewPicture($pocketEntry['images'][1]['src']); + } + + if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { + $this->contentProxy->assignTagsToEntry( + $entry, + array_keys($pocketEntry['tags']) + ); + } + + $this->em->persist($entry); + ++$this->importedEntries; - if ($this->rabbitMQ) { - foreach ($entries as $entry) { - $this->producer->publish(serialize($entry)); + return $entry; + } + + /** + * Faster parse entries for Producer. + * We don't care to make check at this time. They'll be done by the consumer. + * + * @param array $entries + */ + public function parseEntriesForProducer($entries) + { + foreach ($entries as $pocketEntry) { + // set userId for the producer (it won't know which user is connected) + $pocketEntry['userId'] = $this->user->getId(); + + if ($this->markAsRead) { + $pocketEntry['status'] = 1; } + + ++$this->importedEntries; + + $this->producer->publish(json_encode($pocketEntry)); } } } diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index 60eb4e18..fe388b26 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -1,10 +1,11 @@ services: - wallabag_import.consumer.entry: - class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer + wallabag_import.consumer.pocket: + class: Wallabag\ImportBundle\Consumer\AMPQ\PocketConsumer arguments: - "@doctrine.orm.entity_manager" - - "@wallabag_core.entry_repository" - - "@wallabag_core.content_proxy" + - "@wallabag_user.user_repository" + - "@wallabag_import.pocket.import" + - "@logger" wallabag_import.chain: class: Wallabag\ImportBundle\Import\ImportChain @@ -21,11 +22,9 @@ services: wallabag_import.pocket.import: class: Wallabag\ImportBundle\Import\PocketImport arguments: - - "@security.token_storage" - "@doctrine.orm.entity_manager" - "@wallabag_core.content_proxy" - "@craue_config" - - "@old_sound_rabbit_mq.wallabag_producer" calls: - [ setClient, [ "@wallabag_import.pocket.client" ] ] - [ setLogger, [ "@logger" ]] diff --git a/src/Wallabag/UserBundle/Resources/config/services.yml b/src/Wallabag/UserBundle/Resources/config/services.yml index d79d8fa2..05830555 100644 --- a/src/Wallabag/UserBundle/Resources/config/services.yml +++ b/src/Wallabag/UserBundle/Resources/config/services.yml @@ -14,3 +14,9 @@ services: - "@router" tags: - { name: kernel.event_subscriber } + + wallabag_user.user_repository: + class: Wallabag\UserBundle\Repository\UserRepository + factory: [ "@doctrine.orm.default_entity_manager", getRepository ] + arguments: + - WallabagUserBundle:User diff --git a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php index 5bf47d96..d6b9617e 100644 --- a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php +++ b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php @@ -27,32 +27,15 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase protected $em; protected $contentProxy; protected $logHandler; - protected $producer; - private function getPocketImport($consumerKey = 'ConsumerKey', $rabbitMQ = false) + private function getPocketImport($consumerKey = 'ConsumerKey') { $this->user = new User(); - $this->tokenStorage = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface') - ->disableOriginalConstructor() - ->getMock(); - - $token = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\TokenInterface') - ->disableOriginalConstructor() - ->getMock(); - $this->contentProxy = $this->getMockBuilder('Wallabag\CoreBundle\Helper\ContentProxy') ->disableOriginalConstructor() ->getMock(); - $token->expects($this->once()) - ->method('getUser') - ->willReturn($this->user); - - $this->tokenStorage->expects($this->once()) - ->method('getToken') - ->willReturn($token); - $this->em = $this->getMockBuilder('Doctrine\ORM\EntityManager') ->disableOriginalConstructor() ->getMock(); @@ -66,17 +49,12 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase ->with('pocket_consumer_key') ->willReturn($consumerKey); - $this->producer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\Producer') - ->disableOriginalConstructor() - ->getMock(); - $pocket = new PocketImportMock( - $this->tokenStorage, $this->em, $this->contentProxy, - $config, - $this->producer + $config ); + $pocket->setUser($this->user); $this->logHandler = new TestHandler(); $logger = new Logger('test', [$this->logHandler]); -- 2.41.0