]> git.immae.eu Git - github/wallabag/wallabag.git/commitdiff
Send every imported item to the queue
authorJeremy Benoist <jeremy.benoist@gmail.com>
Sat, 3 Sep 2016 15:36:57 +0000 (17:36 +0200)
committerJeremy Benoist <jeremy.benoist@gmail.com>
Sun, 11 Sep 2016 19:57:46 +0000 (21:57 +0200)
Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format.
Then, the worker retrieve that information, find / create the entry and save it.

app/config/config.yml
app/config/parameters.yml.dist
src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php [deleted file]
src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php [new file with mode: 0644]
src/Wallabag/ImportBundle/Controller/PocketController.php
src/Wallabag/ImportBundle/Import/PocketImport.php
src/Wallabag/ImportBundle/Resources/config/services.yml
src/Wallabag/UserBundle/Resources/config/services.yml
tests/Wallabag/ImportBundle/Import/PocketImportTest.php

index ef5ae0aa2e64d63ee5dac4ac8a54d57315b65680..fa8296375f9ff0d3761d526c92fd6f5f8b7a96ab 100644 (file)
@@ -219,24 +219,24 @@ lexik_maintenance:
 old_sound_rabbit_mq:
     connections:
         default:
-            host:     %rabbitmq_host%
-            port:     %rabbitmq_port%
-            user:     %rabbitmq_user%
-            password: %rabbitmq_password%
+            host:     "%rabbitmq_host%"
+            port:     "%rabbitmq_port%"
+            user:     "%rabbitmq_user%"
+            password: "%rabbitmq_password%"
             vhost:    /
-            lazy:     false
+            lazy:     true
     producers:
-        wallabag:
+        wallabag_pocket:
             connection: default
             exchange_options:
-                name: 'wallabag_exchange'
+                name: 'wallabag.import.pocket'
                 type: topic
     consumers:
-        entries:
+        wallabag_pocket:
             connection: default
             exchange_options:
-                name: 'wallabag_exchange'
+                name: 'wallabag.import.pocket'
                 type: topic
             queue_options:
-                name: 'wallabag_queue'
-            callback: wallabag_import.consumer.entry
+                name: 'wallabag.import.pocket'
+            callback: wallabag_import.consumer.pocket
index e925b41261266c466226373d8cc67ecef1270799..a59dc02c9514ba230be3eafe14ad5f8c04263141 100644 (file)
@@ -41,11 +41,7 @@ parameters:
 
     rss_limit: 50
 
-    # pocket import
-    pocket_consumer_key: xxxxxxxx
-
     # RabbitMQ processing
-    rabbitmq: false
     rabbitmq_host: localhost
     rabbitmq_port: 5672
     rabbitmq_user: guest
diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php
deleted file mode 100644 (file)
index 7775f01..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-<?php
-
-namespace Wallabag\ImportBundle\Component\AMPQ;
-
-use Doctrine\ORM\EntityManager;
-use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
-use PhpAmqpLib\Message\AMQPMessage;
-use Wallabag\CoreBundle\Helper\ContentProxy;
-use Wallabag\CoreBundle\Repository\EntryRepository;
-
-class EntryConsumer implements ConsumerInterface
-{
-    private $em;
-    private $contentProxy;
-    private $entryRepository;
-
-    public function __construct(EntityManager $em, EntryRepository $entryRepository, ContentProxy $contentProxy)
-    {
-        $this->em = $em;
-        $this->entryRepository = $entryRepository;
-        $this->contentProxy = $contentProxy;
-    }
-
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(AMQPMessage $msg)
-    {
-        $storedEntry = unserialize($msg->body);
-        $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']);
-        if ($entry) {
-            $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl());
-            if ($entry) {
-                $this->em->persist($entry);
-                $this->em->flush();
-            }
-        }
-    }
-}
diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php
new file mode 100644 (file)
index 0000000..239e744
--- /dev/null
@@ -0,0 +1,63 @@
+<?php
+
+namespace Wallabag\ImportBundle\Consumer\AMPQ;
+
+use Doctrine\ORM\EntityManager;
+use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
+use PhpAmqpLib\Message\AMQPMessage;
+use Wallabag\ImportBundle\Import\PocketImport;
+use Wallabag\UserBundle\Repository\UserRepository;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+
+class PocketConsumer implements ConsumerInterface
+{
+    private $em;
+    private $userRepository;
+    private $pocketImport;
+    private $logger;
+
+    public function __construct(EntityManager $em, UserRepository $userRepository, PocketImport $pocketImport, LoggerInterface $logger = null)
+    {
+        $this->em = $em;
+        $this->userRepository = $userRepository;
+        $this->pocketImport = $pocketImport;
+        $this->logger = $logger ?: new NullLogger();
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function execute(AMQPMessage $msg)
+    {
+        $storedEntry = json_decode($msg->body, true);
+
+        $user = $this->userRepository->find($storedEntry['userId']);
+
+        // no user? Drop message
+        if (null === $user) {
+            $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]);
+
+            return;
+        }
+
+        $this->pocketImport->setUser($user);
+
+        $entry = $this->pocketImport->parseEntry($storedEntry);
+
+        if (null === $entry) {
+            $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]);
+
+            return;
+        }
+
+        try {
+            $this->em->flush();
+            $this->em->clear($entry);
+        } catch (\Exception $e) {
+            $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);
+
+            return;
+        }
+    }
+}
index 36ee25bf9e015a1acc5710b94673f4af767600a1..a2dcd8a7f0ff5df767197780e03b73f3c84feeea 100644 (file)
@@ -10,12 +10,29 @@ use Symfony\Component\Form\Extension\Core\Type\CheckboxType;
 
 class PocketController extends Controller
 {
+    /**
+     * Return Pocket Import Service with or without RabbitMQ enabled.
+     *
+     * @return \Wallabag\ImportBundle\Import\PocketImport
+     */
+    private function getPocketImportService()
+    {
+        $pocket = $this->get('wallabag_import.pocket.import');
+        $pocket->setUser($this->getUser());
+
+        if ($this->get('craue_config')->get('rabbitmq')) {
+            $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.wallabag_pocket_producer'));
+        }
+
+        return $pocket;
+    }
+
     /**
      * @Route("/pocket", name="import_pocket")
      */
     public function indexAction()
     {
-        $pocket = $this->get('wallabag_import.pocket.import');
+        $pocket = $this->getPocketImportService();
         $form = $this->createFormBuilder($pocket)
             ->add('mark_as_read', CheckboxType::class, [
                 'label' => 'import.form.mark_as_read_label',
@@ -24,7 +41,7 @@ class PocketController extends Controller
             ->getForm();
 
         return $this->render('WallabagImportBundle:Pocket:index.html.twig', [
-            'import' => $this->get('wallabag_import.pocket.import'),
+            'import' => $this->getPocketImportService(),
             'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true,
             'form' => $form->createView(),
         ]);
@@ -35,7 +52,7 @@ class PocketController extends Controller
      */
     public function authAction(Request $request)
     {
-        $requestToken = $this->get('wallabag_import.pocket.import')
+        $requestToken = $this->getPocketImportService()
             ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL));
 
         if (false === $requestToken) {
@@ -62,7 +79,7 @@ class PocketController extends Controller
     public function callbackAction()
     {
         $message = 'flashes.import.notice.failed';
-        $pocket = $this->get('wallabag_import.pocket.import');
+        $pocket = $this->getPocketImportService();
 
         $markAsRead = $this->get('session')->get('mark_as_read');
         $this->get('session')->remove('mark_as_read');
index 7d1c0c617bbc78dc36a58d731e8252bc81925978..27df4917ae4d6b10e3041a6b574ce5aefeedd647 100644 (file)
@@ -3,12 +3,11 @@
 namespace Wallabag\ImportBundle\Import;
 
 use OldSound\RabbitMqBundle\RabbitMq\Producer;
-use Psr\Log\LoggerInterface;
 use Psr\Log\NullLogger;
 use Doctrine\ORM\EntityManager;
 use GuzzleHttp\Client;
 use GuzzleHttp\Exception\RequestException;
-use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface;
+use Symfony\Component\Security\Core\User\UserInterface;
 use Wallabag\CoreBundle\Entity\Entry;
 use Wallabag\CoreBundle\Helper\ContentProxy;
 use Craue\ConfigBundle\Util\Config;
@@ -21,21 +20,39 @@ class PocketImport extends AbstractImport
     private $skippedEntries = 0;
     private $importedEntries = 0;
     private $markAsRead;
-    protected $accessToken;
     private $producer;
-    private $rabbitMQ;
+    protected $accessToken;
 
-    public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, Producer $producer)
+    public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig)
     {
-        $this->user = $tokenStorage->getToken()->getUser();
         $this->em = $em;
         $this->contentProxy = $contentProxy;
         $this->consumerKey = $craueConfig->get('pocket_consumer_key');
         $this->logger = new NullLogger();
-        $this->rabbitMQ = $craueConfig->get('rabbitmq');
+    }
+
+    /**
+     * Set RabbitMQ Producer to send each entry to a queue.
+     * This method should be called when user has enabled RabbitMQ.
+     *
+     * @param Producer $producer
+     */
+    public function setRabbitmqProducer(Producer $producer)
+    {
         $this->producer = $producer;
     }
 
+    /**
+     * Set current user.
+     * Could the current *connected* user or one retrieve by the consumer.
+     *
+     * @param UserInterface $user
+     */
+    public function setUser(UserInterface $user)
+    {
+        $this->user = $user;
+    }
+
     /**
      * {@inheritdoc}
      */
@@ -168,6 +185,12 @@ class PocketImport extends AbstractImport
 
         $entries = $response->json();
 
+        if ($this->producer) {
+            $this->parseEntriesForProducer($entries['list']);
+
+            return true;
+        }
+
         $this->parseEntries($entries['list']);
 
         return true;
@@ -197,88 +220,112 @@ class PocketImport extends AbstractImport
     /**
      * @see https://getpocket.com/developer/docs/v3/retrieve
      *
-     * @param $entries
+     * @param array $entries
      */
-    private function parseEntries($entries)
+    private function parseEntries(array $entries)
     {
         $i = 1;
 
-        foreach ($entries as &$pocketEntry) {
-            $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url'];
-
-            $existingEntry = $this->em
-                ->getRepository('WallabagCoreBundle:Entry')
-                ->findByUrlAndUserId($url, $this->user->getId());
+        foreach ($entries as $pocketEntry) {
+            $entry = $this->parseEntry($pocketEntry);
 
-            if (false !== $existingEntry) {
-                ++$this->skippedEntries;
+            if (null === $entry) {
                 continue;
             }
 
-            $entry = new Entry($this->user);
+            // flush every 20 entries
+            if (($i % 20) === 0) {
+                $this->em->flush();
+                $this->em->clear($entry);
+            }
 
-            if (!$this->rabbitMQ) {
-                $entry = $this->fetchContent($entry, $url);
+            ++$i;
+        }
 
-                // jump to next entry in case of problem while getting content
-                if (false === $entry) {
-                    ++$this->skippedEntries;
-                    continue;
-                }
-            }
+        $this->em->flush();
+    }
 
-            // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
-            if ($pocketEntry['status'] == 1 || $this->markAsRead) {
-                $entry->setArchived(true);
-            }
+    public function parseEntry(array $pocketEntry)
+    {
+        $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url'];
 
-            // 0 or 1 - 1 If the item is starred
-            if ($pocketEntry['favorite'] == 1) {
-                $entry->setStarred(true);
-            }
+        $existingEntry = $this->em
+            ->getRepository('WallabagCoreBundle:Entry')
+            ->findByUrlAndUserId($url, $this->user->getId());
 
-            $title = 'Untitled';
-            if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') {
-                $title = $pocketEntry['resolved_title'];
-            } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
-                $title = $pocketEntry['given_title'];
-            }
+        if (false !== $existingEntry) {
+            ++$this->skippedEntries;
 
-            $entry->setTitle($title);
-            $entry->setUrl($url);
+            return;
+        }
 
-            // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
-            if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
-                $entry->setPreviewPicture($pocketEntry['images'][1]['src']);
-            }
+        $entry = new Entry($this->user);
+        $entry = $this->fetchContent($entry, $url);
 
-            if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) {
-                $this->contentProxy->assignTagsToEntry(
-                    $entry,
-                    array_keys($pocketEntry['tags'])
-                );
-            }
+        // jump to next entry in case of problem while getting content
+        if (false === $entry) {
+            ++$this->skippedEntries;
 
-            $pocketEntry['url'] = $url;
-            $pocketEntry['userId'] = $this->user->getId();
+            return;
+        }
 
-            $this->em->persist($entry);
-            ++$this->importedEntries;
+        // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
+        if ($pocketEntry['status'] == 1 || $this->markAsRead) {
+            $entry->setArchived(true);
+        }
 
-            // flush every 20 entries
-            if (($i % 20) === 0) {
-                $this->em->flush();
-            }
+        // 0 or 1 - 1 If the item is starred
+        if ($pocketEntry['favorite'] == 1) {
+            $entry->setStarred(true);
+        }
 
-            ++$i;
+        $title = 'Untitled';
+        if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') {
+            $title = $pocketEntry['resolved_title'];
+        } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
+            $title = $pocketEntry['given_title'];
         }
 
-        $this->em->flush();
+        $entry->setTitle($title);
+        $entry->setUrl($url);
+
+        // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
+        if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
+            $entry->setPreviewPicture($pocketEntry['images'][1]['src']);
+        }
+
+        if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) {
+            $this->contentProxy->assignTagsToEntry(
+                $entry,
+                array_keys($pocketEntry['tags'])
+            );
+        }
+
+        $this->em->persist($entry);
+        ++$this->importedEntries;
 
-        if ($this->rabbitMQ) {
-            foreach ($entries as $entry) {
-                $this->producer->publish(serialize($entry));
+        return $entry;
+    }
+
+    /**
+     * Faster parse entries for Producer.
+     * We don't care to make check at this time. They'll be done by the consumer.
+     *
+     * @param array $entries
+     */
+    public function parseEntriesForProducer($entries)
+    {
+        foreach ($entries as $pocketEntry) {
+            // set userId for the producer (it won't know which user is connected)
+            $pocketEntry['userId'] = $this->user->getId();
+
+            if ($this->markAsRead) {
+                $pocketEntry['status'] = 1;
             }
+
+            ++$this->importedEntries;
+
+            $this->producer->publish(json_encode($pocketEntry));
         }
     }
 }
index 60eb4e1845be7c74c08c5d7f4d0f203c91298260..fe388b261aed7684d1341b6ae85415f0db6b7223 100644 (file)
@@ -1,10 +1,11 @@
 services:
-    wallabag_import.consumer.entry:
-        class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer
+    wallabag_import.consumer.pocket:
+        class: Wallabag\ImportBundle\Consumer\AMPQ\PocketConsumer
         arguments:
             - "@doctrine.orm.entity_manager"
-            - "@wallabag_core.entry_repository"
-            - "@wallabag_core.content_proxy"
+            - "@wallabag_user.user_repository"
+            - "@wallabag_import.pocket.import"
+            - "@logger"
 
     wallabag_import.chain:
         class: Wallabag\ImportBundle\Import\ImportChain
@@ -21,11 +22,9 @@ services:
     wallabag_import.pocket.import:
         class: Wallabag\ImportBundle\Import\PocketImport
         arguments:
-            - "@security.token_storage"
             - "@doctrine.orm.entity_manager"
             - "@wallabag_core.content_proxy"
             - "@craue_config"
-            - "@old_sound_rabbit_mq.wallabag_producer"
         calls:
             - [ setClient, [ "@wallabag_import.pocket.client" ] ]
             - [ setLogger, [ "@logger" ]]
index d79d8fa2bacf2a5ecc1823e29d5a108ee7ba4b62..05830555ee8cb9eee5818598930e3c14fe8ccba4 100644 (file)
@@ -14,3 +14,9 @@ services:
             - "@router"
         tags:
             - { name: kernel.event_subscriber }
+
+    wallabag_user.user_repository:
+        class: Wallabag\UserBundle\Repository\UserRepository
+        factory: [ "@doctrine.orm.default_entity_manager", getRepository ]
+        arguments:
+            - WallabagUserBundle:User
index 5bf47d964086a1f3cce19b533f5ffe55fa29eb69..d6b9617e4ad5efecd63b496d8838bf36676f6283 100644 (file)
@@ -27,32 +27,15 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase
     protected $em;
     protected $contentProxy;
     protected $logHandler;
-    protected $producer;
 
-    private function getPocketImport($consumerKey = 'ConsumerKey', $rabbitMQ = false)
+    private function getPocketImport($consumerKey = 'ConsumerKey')
     {
         $this->user = new User();
 
-        $this->tokenStorage = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface')
-            ->disableOriginalConstructor()
-            ->getMock();
-
-        $token = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\TokenInterface')
-            ->disableOriginalConstructor()
-            ->getMock();
-
         $this->contentProxy = $this->getMockBuilder('Wallabag\CoreBundle\Helper\ContentProxy')
             ->disableOriginalConstructor()
             ->getMock();
 
-        $token->expects($this->once())
-            ->method('getUser')
-            ->willReturn($this->user);
-
-        $this->tokenStorage->expects($this->once())
-            ->method('getToken')
-            ->willReturn($token);
-
         $this->em = $this->getMockBuilder('Doctrine\ORM\EntityManager')
             ->disableOriginalConstructor()
             ->getMock();
@@ -66,17 +49,12 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase
             ->with('pocket_consumer_key')
             ->willReturn($consumerKey);
 
-        $this->producer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\Producer')
-            ->disableOriginalConstructor()
-            ->getMock();
-
         $pocket = new PocketImportMock(
-            $this->tokenStorage,
             $this->em,
             $this->contentProxy,
-            $config,
-            $this->producer
+            $config
         );
+        $pocket->setUser($this->user);
 
         $this->logHandler = new TestHandler();
         $logger = new Logger('test', [$this->logHandler]);