Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format.
Then, the worker retrieve that information, find / create the entry and save it.
old_sound_rabbit_mq:
connections:
default:
- host: %rabbitmq_host%
- port: %rabbitmq_port%
- user: %rabbitmq_user%
- password: %rabbitmq_password%
+ host: "%rabbitmq_host%"
+ port: "%rabbitmq_port%"
+ user: "%rabbitmq_user%"
+ password: "%rabbitmq_password%"
vhost: /
- lazy: false
+ lazy: true
producers:
- wallabag:
+ wallabag_pocket:
connection: default
exchange_options:
- name: 'wallabag_exchange'
+ name: 'wallabag.import.pocket'
type: topic
consumers:
- entries:
+ wallabag_pocket:
connection: default
exchange_options:
- name: 'wallabag_exchange'
+ name: 'wallabag.import.pocket'
type: topic
queue_options:
- name: 'wallabag_queue'
- callback: wallabag_import.consumer.entry
+ name: 'wallabag.import.pocket'
+ callback: wallabag_import.consumer.pocket
rss_limit: 50
- # pocket import
- pocket_consumer_key: xxxxxxxx
-
# RabbitMQ processing
- rabbitmq: false
rabbitmq_host: localhost
rabbitmq_port: 5672
rabbitmq_user: guest
+++ /dev/null
-<?php
-
-namespace Wallabag\ImportBundle\Component\AMPQ;
-
-use Doctrine\ORM\EntityManager;
-use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
-use PhpAmqpLib\Message\AMQPMessage;
-use Wallabag\CoreBundle\Helper\ContentProxy;
-use Wallabag\CoreBundle\Repository\EntryRepository;
-
-class EntryConsumer implements ConsumerInterface
-{
- private $em;
- private $contentProxy;
- private $entryRepository;
-
- public function __construct(EntityManager $em, EntryRepository $entryRepository, ContentProxy $contentProxy)
- {
- $this->em = $em;
- $this->entryRepository = $entryRepository;
- $this->contentProxy = $contentProxy;
- }
-
- /**
- * {@inheritdoc}
- */
- public function execute(AMQPMessage $msg)
- {
- $storedEntry = unserialize($msg->body);
- $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']);
- if ($entry) {
- $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl());
- if ($entry) {
- $this->em->persist($entry);
- $this->em->flush();
- }
- }
- }
-}
--- /dev/null
+<?php
+
+namespace Wallabag\ImportBundle\Consumer\AMPQ;
+
+use Doctrine\ORM\EntityManager;
+use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
+use PhpAmqpLib\Message\AMQPMessage;
+use Wallabag\ImportBundle\Import\PocketImport;
+use Wallabag\UserBundle\Repository\UserRepository;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+
+class PocketConsumer implements ConsumerInterface
+{
+ private $em;
+ private $userRepository;
+ private $pocketImport;
+ private $logger;
+
+ public function __construct(EntityManager $em, UserRepository $userRepository, PocketImport $pocketImport, LoggerInterface $logger = null)
+ {
+ $this->em = $em;
+ $this->userRepository = $userRepository;
+ $this->pocketImport = $pocketImport;
+ $this->logger = $logger ?: new NullLogger();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function execute(AMQPMessage $msg)
+ {
+ $storedEntry = json_decode($msg->body, true);
+
+ $user = $this->userRepository->find($storedEntry['userId']);
+
+ // no user? Drop message
+ if (null === $user) {
+ $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]);
+
+ return;
+ }
+
+ $this->pocketImport->setUser($user);
+
+ $entry = $this->pocketImport->parseEntry($storedEntry);
+
+ if (null === $entry) {
+ $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]);
+
+ return;
+ }
+
+ try {
+ $this->em->flush();
+ $this->em->clear($entry);
+ } catch (\Exception $e) {
+ $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);
+
+ return;
+ }
+ }
+}
class PocketController extends Controller
{
+ /**
+ * Return Pocket Import Service with or without RabbitMQ enabled.
+ *
+ * @return \Wallabag\ImportBundle\Import\PocketImport
+ */
+ private function getPocketImportService()
+ {
+ $pocket = $this->get('wallabag_import.pocket.import');
+ $pocket->setUser($this->getUser());
+
+ if ($this->get('craue_config')->get('rabbitmq')) {
+ $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.wallabag_pocket_producer'));
+ }
+
+ return $pocket;
+ }
+
/**
* @Route("/pocket", name="import_pocket")
*/
public function indexAction()
{
- $pocket = $this->get('wallabag_import.pocket.import');
+ $pocket = $this->getPocketImportService();
$form = $this->createFormBuilder($pocket)
->add('mark_as_read', CheckboxType::class, [
'label' => 'import.form.mark_as_read_label',
->getForm();
return $this->render('WallabagImportBundle:Pocket:index.html.twig', [
- 'import' => $this->get('wallabag_import.pocket.import'),
+ 'import' => $this->getPocketImportService(),
'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true,
'form' => $form->createView(),
]);
*/
public function authAction(Request $request)
{
- $requestToken = $this->get('wallabag_import.pocket.import')
+ $requestToken = $this->getPocketImportService()
->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL));
if (false === $requestToken) {
public function callbackAction()
{
$message = 'flashes.import.notice.failed';
- $pocket = $this->get('wallabag_import.pocket.import');
+ $pocket = $this->getPocketImportService();
$markAsRead = $this->get('session')->get('mark_as_read');
$this->get('session')->remove('mark_as_read');
namespace Wallabag\ImportBundle\Import;
use OldSound\RabbitMqBundle\RabbitMq\Producer;
-use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Doctrine\ORM\EntityManager;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\RequestException;
-use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface;
+use Symfony\Component\Security\Core\User\UserInterface;
use Wallabag\CoreBundle\Entity\Entry;
use Wallabag\CoreBundle\Helper\ContentProxy;
use Craue\ConfigBundle\Util\Config;
private $skippedEntries = 0;
private $importedEntries = 0;
private $markAsRead;
- protected $accessToken;
private $producer;
- private $rabbitMQ;
+ protected $accessToken;
- public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, Producer $producer)
+ public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig)
{
- $this->user = $tokenStorage->getToken()->getUser();
$this->em = $em;
$this->contentProxy = $contentProxy;
$this->consumerKey = $craueConfig->get('pocket_consumer_key');
$this->logger = new NullLogger();
- $this->rabbitMQ = $craueConfig->get('rabbitmq');
+ }
+
+ /**
+ * Set RabbitMQ Producer to send each entry to a queue.
+ * This method should be called when user has enabled RabbitMQ.
+ *
+ * @param Producer $producer
+ */
+ public function setRabbitmqProducer(Producer $producer)
+ {
$this->producer = $producer;
}
+ /**
+ * Set current user.
+ * Could the current *connected* user or one retrieve by the consumer.
+ *
+ * @param UserInterface $user
+ */
+ public function setUser(UserInterface $user)
+ {
+ $this->user = $user;
+ }
+
/**
* {@inheritdoc}
*/
$entries = $response->json();
+ if ($this->producer) {
+ $this->parseEntriesForProducer($entries['list']);
+
+ return true;
+ }
+
$this->parseEntries($entries['list']);
return true;
/**
* @see https://getpocket.com/developer/docs/v3/retrieve
*
- * @param $entries
+ * @param array $entries
*/
- private function parseEntries($entries)
+ private function parseEntries(array $entries)
{
$i = 1;
- foreach ($entries as &$pocketEntry) {
- $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url'];
-
- $existingEntry = $this->em
- ->getRepository('WallabagCoreBundle:Entry')
- ->findByUrlAndUserId($url, $this->user->getId());
+ foreach ($entries as $pocketEntry) {
+ $entry = $this->parseEntry($pocketEntry);
- if (false !== $existingEntry) {
- ++$this->skippedEntries;
+ if (null === $entry) {
continue;
}
- $entry = new Entry($this->user);
+ // flush every 20 entries
+ if (($i % 20) === 0) {
+ $this->em->flush();
+ $this->em->clear($entry);
+ }
- if (!$this->rabbitMQ) {
- $entry = $this->fetchContent($entry, $url);
+ ++$i;
+ }
- // jump to next entry in case of problem while getting content
- if (false === $entry) {
- ++$this->skippedEntries;
- continue;
- }
- }
+ $this->em->flush();
+ }
- // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
- if ($pocketEntry['status'] == 1 || $this->markAsRead) {
- $entry->setArchived(true);
- }
+ public function parseEntry(array $pocketEntry)
+ {
+ $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url'];
- // 0 or 1 - 1 If the item is starred
- if ($pocketEntry['favorite'] == 1) {
- $entry->setStarred(true);
- }
+ $existingEntry = $this->em
+ ->getRepository('WallabagCoreBundle:Entry')
+ ->findByUrlAndUserId($url, $this->user->getId());
- $title = 'Untitled';
- if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') {
- $title = $pocketEntry['resolved_title'];
- } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
- $title = $pocketEntry['given_title'];
- }
+ if (false !== $existingEntry) {
+ ++$this->skippedEntries;
- $entry->setTitle($title);
- $entry->setUrl($url);
+ return;
+ }
- // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
- if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
- $entry->setPreviewPicture($pocketEntry['images'][1]['src']);
- }
+ $entry = new Entry($this->user);
+ $entry = $this->fetchContent($entry, $url);
- if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) {
- $this->contentProxy->assignTagsToEntry(
- $entry,
- array_keys($pocketEntry['tags'])
- );
- }
+ // jump to next entry in case of problem while getting content
+ if (false === $entry) {
+ ++$this->skippedEntries;
- $pocketEntry['url'] = $url;
- $pocketEntry['userId'] = $this->user->getId();
+ return;
+ }
- $this->em->persist($entry);
- ++$this->importedEntries;
+ // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
+ if ($pocketEntry['status'] == 1 || $this->markAsRead) {
+ $entry->setArchived(true);
+ }
- // flush every 20 entries
- if (($i % 20) === 0) {
- $this->em->flush();
- }
+ // 0 or 1 - 1 If the item is starred
+ if ($pocketEntry['favorite'] == 1) {
+ $entry->setStarred(true);
+ }
- ++$i;
+ $title = 'Untitled';
+ if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') {
+ $title = $pocketEntry['resolved_title'];
+ } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
+ $title = $pocketEntry['given_title'];
}
- $this->em->flush();
+ $entry->setTitle($title);
+ $entry->setUrl($url);
+
+ // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
+ if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
+ $entry->setPreviewPicture($pocketEntry['images'][1]['src']);
+ }
+
+ if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) {
+ $this->contentProxy->assignTagsToEntry(
+ $entry,
+ array_keys($pocketEntry['tags'])
+ );
+ }
+
+ $this->em->persist($entry);
+ ++$this->importedEntries;
- if ($this->rabbitMQ) {
- foreach ($entries as $entry) {
- $this->producer->publish(serialize($entry));
+ return $entry;
+ }
+
+ /**
+ * Faster parse entries for Producer.
+ * We don't care to make check at this time. They'll be done by the consumer.
+ *
+ * @param array $entries
+ */
+ public function parseEntriesForProducer($entries)
+ {
+ foreach ($entries as $pocketEntry) {
+ // set userId for the producer (it won't know which user is connected)
+ $pocketEntry['userId'] = $this->user->getId();
+
+ if ($this->markAsRead) {
+ $pocketEntry['status'] = 1;
}
+
+ ++$this->importedEntries;
+
+ $this->producer->publish(json_encode($pocketEntry));
}
}
}
services:
- wallabag_import.consumer.entry:
- class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer
+ wallabag_import.consumer.pocket:
+ class: Wallabag\ImportBundle\Consumer\AMPQ\PocketConsumer
arguments:
- "@doctrine.orm.entity_manager"
- - "@wallabag_core.entry_repository"
- - "@wallabag_core.content_proxy"
+ - "@wallabag_user.user_repository"
+ - "@wallabag_import.pocket.import"
+ - "@logger"
wallabag_import.chain:
class: Wallabag\ImportBundle\Import\ImportChain
wallabag_import.pocket.import:
class: Wallabag\ImportBundle\Import\PocketImport
arguments:
- - "@security.token_storage"
- "@doctrine.orm.entity_manager"
- "@wallabag_core.content_proxy"
- "@craue_config"
- - "@old_sound_rabbit_mq.wallabag_producer"
calls:
- [ setClient, [ "@wallabag_import.pocket.client" ] ]
- [ setLogger, [ "@logger" ]]
- "@router"
tags:
- { name: kernel.event_subscriber }
+
+ wallabag_user.user_repository:
+ class: Wallabag\UserBundle\Repository\UserRepository
+ factory: [ "@doctrine.orm.default_entity_manager", getRepository ]
+ arguments:
+ - WallabagUserBundle:User
protected $em;
protected $contentProxy;
protected $logHandler;
- protected $producer;
- private function getPocketImport($consumerKey = 'ConsumerKey', $rabbitMQ = false)
+ private function getPocketImport($consumerKey = 'ConsumerKey')
{
$this->user = new User();
- $this->tokenStorage = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface')
- ->disableOriginalConstructor()
- ->getMock();
-
- $token = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\TokenInterface')
- ->disableOriginalConstructor()
- ->getMock();
-
$this->contentProxy = $this->getMockBuilder('Wallabag\CoreBundle\Helper\ContentProxy')
->disableOriginalConstructor()
->getMock();
- $token->expects($this->once())
- ->method('getUser')
- ->willReturn($this->user);
-
- $this->tokenStorage->expects($this->once())
- ->method('getToken')
- ->willReturn($token);
-
$this->em = $this->getMockBuilder('Doctrine\ORM\EntityManager')
->disableOriginalConstructor()
->getMock();
->with('pocket_consumer_key')
->willReturn($consumerKey);
- $this->producer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\Producer')
- ->disableOriginalConstructor()
- ->getMock();
-
$pocket = new PocketImportMock(
- $this->tokenStorage,
$this->em,
$this->contentProxy,
- $config,
- $this->producer
+ $config
);
+ $pocket->setUser($this->user);
$this->logHandler = new TestHandler();
$logger = new Logger('test', [$this->logHandler]);