From c98db1b653b5dc8b701422190b02d9fbf10c4e68 Mon Sep 17 00:00:00 2001 From: Jeremy Benoist Date: Sun, 4 Sep 2016 21:49:21 +0200 Subject: Convert other imports to Rabbit --- app/config/config.yml | 43 ++++++- .../ImportBundle/Consumer/AMPQ/EntryConsumer.php | 63 ++++++++++ .../ImportBundle/Consumer/AMPQ/PocketConsumer.php | 63 ---------- .../ImportBundle/Controller/PocketController.php | 2 +- .../Controller/ReadabilityController.php | 6 +- .../ImportBundle/Controller/WallabagController.php | 2 +- .../Controller/WallabagV1Controller.php | 8 +- .../Controller/WallabagV2Controller.php | 8 +- .../ImportBundle/Import/AbstractImport.php | 83 ++++++++++++++ src/Wallabag/ImportBundle/Import/PocketImport.php | 107 +++-------------- .../ImportBundle/Import/ReadabilityImport.php | 127 +++++++++------------ .../ImportBundle/Import/WallabagImport.php | 118 +++++++------------ .../ImportBundle/Import/WallabagV1Import.php | 20 +++- .../ImportBundle/Import/WallabagV2Import.php | 20 +++- .../ImportBundle/Resources/config/services.yml | 23 +++- 15 files changed, 382 insertions(+), 311 deletions(-) create mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php delete mode 100644 src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php diff --git a/app/config/config.yml b/app/config/config.yml index fa829637..d39bef95 100644 --- a/app/config/config.yml +++ b/app/config/config.yml @@ -226,13 +226,28 @@ old_sound_rabbit_mq: vhost: / lazy: true producers: - wallabag_pocket: + import_pocket: connection: default exchange_options: name: 'wallabag.import.pocket' type: topic + import_readability: + connection: default + exchange_options: + name: 'wallabag.import.readability' + type: topic + import_wallabag_v1: + connection: default + exchange_options: + name: 'wallabag.import.wallabag_v1' + type: topic + import_wallabag_v2: + connection: default + exchange_options: + name: 'wallabag.import.wallabag_v2' + type: topic consumers: - wallabag_pocket: + import_pocket: connection: default exchange_options: name: 'wallabag.import.pocket' @@ -240,3 +255,27 @@ old_sound_rabbit_mq: queue_options: name: 'wallabag.import.pocket' callback: wallabag_import.consumer.pocket + import_readability: + connection: default + exchange_options: + name: 'wallabag.import.readability' + type: topic + queue_options: + name: 'wallabag.import.readability' + callback: wallabag_import.consumer.readability + import_wallabag_v1: + connection: default + exchange_options: + name: 'wallabag.import.wallabag_v1' + type: topic + queue_options: + name: 'wallabag.import.wallabag_v1' + callback: wallabag_import.consumer.wallabag_v1 + import_wallabag_v2: + connection: default + exchange_options: + name: 'wallabag.import.wallabag_v2' + type: topic + queue_options: + name: 'wallabag.import.wallabag_v2' + callback: wallabag_import.consumer.wallabag_v2 diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php new file mode 100644 index 00000000..8a8cf45d --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/EntryConsumer.php @@ -0,0 +1,63 @@ +em = $em; + $this->userRepository = $userRepository; + $this->import = $import; + $this->logger = $logger ?: new NullLogger(); + } + + /** + * {@inheritdoc} + */ + public function execute(AMQPMessage $msg) + { + $storedEntry = json_decode($msg->body, true); + + $user = $this->userRepository->find($storedEntry['userId']); + + // no user? Drop message + if (null === $user) { + $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); + + return; + } + + $this->import->setUser($user); + + $entry = $this->import->parseEntry($storedEntry); + + if (null === $entry) { + $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); + + return; + } + + try { + $this->em->flush(); + $this->em->clear($entry); + } catch (\Exception $e) { + $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); + + return; + } + } +} diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php deleted file mode 100644 index 239e7446..00000000 --- a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php +++ /dev/null @@ -1,63 +0,0 @@ -em = $em; - $this->userRepository = $userRepository; - $this->pocketImport = $pocketImport; - $this->logger = $logger ?: new NullLogger(); - } - - /** - * {@inheritdoc} - */ - public function execute(AMQPMessage $msg) - { - $storedEntry = json_decode($msg->body, true); - - $user = $this->userRepository->find($storedEntry['userId']); - - // no user? Drop message - if (null === $user) { - $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); - - return; - } - - $this->pocketImport->setUser($user); - - $entry = $this->pocketImport->parseEntry($storedEntry); - - if (null === $entry) { - $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); - - return; - } - - try { - $this->em->flush(); - $this->em->clear($entry); - } catch (\Exception $e) { - $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); - - return; - } - } -} diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php index a2dcd8a7..57c007c3 100644 --- a/src/Wallabag/ImportBundle/Controller/PocketController.php +++ b/src/Wallabag/ImportBundle/Controller/PocketController.php @@ -21,7 +21,7 @@ class PocketController extends Controller $pocket->setUser($this->getUser()); if ($this->get('craue_config')->get('rabbitmq')) { - $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.wallabag_pocket_producer')); + $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_pocket_producer')); } return $pocket; diff --git a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php index b61aa99c..ee875a40 100644 --- a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php +++ b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php @@ -18,6 +18,11 @@ class ReadabilityController extends Controller $form->handleRequest($request); $readability = $this->get('wallabag_import.readability.import'); + $readability->setUser($this->getUser()); + + if ($this->get('craue_config')->get('rabbitmq')) { + $readability->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_readability_producer')); + } if ($form->isValid()) { $file = $form->get('file')->getData(); @@ -26,7 +31,6 @@ class ReadabilityController extends Controller if (in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { $res = $readability - ->setUser($this->getUser()) ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name) ->setMarkAsRead($markAsRead) ->import(); diff --git a/src/Wallabag/ImportBundle/Controller/WallabagController.php b/src/Wallabag/ImportBundle/Controller/WallabagController.php index 76ced0d2..1e6114c5 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagController.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagController.php @@ -38,6 +38,7 @@ abstract class WallabagController extends Controller $form->handleRequest($request); $wallabag = $this->getImportService(); + $wallabag->setUser($this->getUser()); if ($form->isValid()) { $file = $form->get('file')->getData(); @@ -46,7 +47,6 @@ abstract class WallabagController extends Controller if (in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { $res = $wallabag - ->setUser($this->getUser()) ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name) ->setMarkAsRead($markAsRead) ->import(); diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php index 3e748d57..f80aec3a 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php @@ -12,7 +12,13 @@ class WallabagV1Controller extends WallabagController */ protected function getImportService() { - return $this->get('wallabag_import.wallabag_v1.import'); + $service = $this->get('wallabag_import.wallabag_v1.import'); + + if ($this->get('craue_config')->get('rabbitmq')) { + $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v1_producer')); + } + + return $service; } /** diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php index c2a42165..063cddd9 100644 --- a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php +++ b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php @@ -12,7 +12,13 @@ class WallabagV2Controller extends WallabagController */ protected function getImportService() { - return $this->get('wallabag_import.wallabag_v2.import'); + $service = $this->get('wallabag_import.wallabag_v2.import'); + + if ($this->get('craue_config')->get('rabbitmq')) { + $service->setRabbitmqProducer($this->get('old_sound_rabbit_mq.import_wallabag_v2_producer')); + } + + return $service; } /** diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 14377a35..5b9d65d7 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -7,12 +7,17 @@ use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use Wallabag\CoreBundle\Helper\ContentProxy; use Wallabag\CoreBundle\Entity\Entry; +use Symfony\Component\Security\Core\User\UserInterface; +use OldSound\RabbitMqBundle\RabbitMq\Producer; abstract class AbstractImport implements ImportInterface { protected $em; protected $logger; protected $contentProxy; + protected $producer; + protected $user; + protected $markAsRead; public function __construct(EntityManager $em, ContentProxy $contentProxy) { @@ -26,6 +31,48 @@ abstract class AbstractImport implements ImportInterface $this->logger = $logger; } + /** + * Set RabbitMQ Producer to send each entry to a queue. + * This method should be called when user has enabled RabbitMQ. + * + * @param Producer $producer + */ + public function setRabbitmqProducer(Producer $producer) + { + $this->producer = $producer; + } + + /** + * Set current user. + * Could the current *connected* user or one retrieve by the consumer. + * + * @param UserInterface $user + */ + public function setUser(UserInterface $user) + { + $this->user = $user; + } + + /** + * Set whether articles must be all marked as read. + * + * @param bool $markAsRead + */ + public function setMarkAsRead($markAsRead) + { + $this->markAsRead = $markAsRead; + + return $this; + } + + /** + * Get whether articles must be all marked as read. + */ + public function getMarkAsRead() + { + return $this->markAsRead; + } + /** * Fetch content from the ContentProxy (using graby). * If it fails return false instead of the updated entry. @@ -44,4 +91,40 @@ abstract class AbstractImport implements ImportInterface return false; } } + + /** + * Parse and insert all given entries. + * + * @param $entries + */ + protected function parseEntries($entries) + { + $i = 1; + + foreach ($entries as $importedEntry) { + $entry = $this->parseEntry($importedEntry); + + if (null === $entry) { + continue; + } + + // flush every 20 entries + if (($i % 20) === 0) { + $this->em->flush(); + $this->em->clear($entry); + } + ++$i; + } + + $this->em->flush(); + } + + /** + * Parse one entry. + * + * @param array $importedEntry + * + * @return Entry + */ + abstract public function parseEntry(array $importedEntry); } diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 27df4917..dd0ddd72 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php @@ -2,25 +2,20 @@ namespace Wallabag\ImportBundle\Import; -use OldSound\RabbitMqBundle\RabbitMq\Producer; use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use GuzzleHttp\Client; use GuzzleHttp\Exception\RequestException; -use Symfony\Component\Security\Core\User\UserInterface; use Wallabag\CoreBundle\Entity\Entry; use Wallabag\CoreBundle\Helper\ContentProxy; use Craue\ConfigBundle\Util\Config; class PocketImport extends AbstractImport { - private $user; private $client; private $consumerKey; private $skippedEntries = 0; private $importedEntries = 0; - private $markAsRead; - private $producer; protected $accessToken; public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) @@ -31,28 +26,6 @@ class PocketImport extends AbstractImport $this->logger = new NullLogger(); } - /** - * Set RabbitMQ Producer to send each entry to a queue. - * This method should be called when user has enabled RabbitMQ. - * - * @param Producer $producer - */ - public function setRabbitmqProducer(Producer $producer) - { - $this->producer = $producer; - } - - /** - * Set current user. - * Could the current *connected* user or one retrieve by the consumer. - * - * @param UserInterface $user - */ - public function setUser(UserInterface $user) - { - $this->user = $user; - } - /** * {@inheritdoc} */ @@ -138,26 +111,6 @@ class PocketImport extends AbstractImport return true; } - /** - * Set whether articles must be all marked as read. - * - * @param bool $markAsRead - */ - public function setMarkAsRead($markAsRead) - { - $this->markAsRead = $markAsRead; - - return $this; - } - - /** - * Get whether articles must be all marked as read. - */ - public function getMarkAsRead() - { - return $this->markAsRead; - } - /** * {@inheritdoc} */ @@ -217,37 +170,9 @@ class PocketImport extends AbstractImport $this->client = $client; } - /** - * @see https://getpocket.com/developer/docs/v3/retrieve - * - * @param array $entries - */ - private function parseEntries(array $entries) - { - $i = 1; - - foreach ($entries as $pocketEntry) { - $entry = $this->parseEntry($pocketEntry); - - if (null === $entry) { - continue; - } - - // flush every 20 entries - if (($i % 20) === 0) { - $this->em->flush(); - $this->em->clear($entry); - } - - ++$i; - } - - $this->em->flush(); - } - - public function parseEntry(array $pocketEntry) + public function parseEntry(array $importedEntry) { - $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; + $url = isset($importedEntry['resolved_url']) && $importedEntry['resolved_url'] != '' ? $importedEntry['resolved_url'] : $importedEntry['given_url']; $existingEntry = $this->em ->getRepository('WallabagCoreBundle:Entry') @@ -270,34 +195,34 @@ class PocketImport extends AbstractImport } // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted - if ($pocketEntry['status'] == 1 || $this->markAsRead) { + if ($importedEntry['status'] == 1 || $this->markAsRead) { $entry->setArchived(true); } // 0 or 1 - 1 If the item is starred - if ($pocketEntry['favorite'] == 1) { + if ($importedEntry['favorite'] == 1) { $entry->setStarred(true); } $title = 'Untitled'; - if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { - $title = $pocketEntry['resolved_title']; - } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') { - $title = $pocketEntry['given_title']; + if (isset($importedEntry['resolved_title']) && $importedEntry['resolved_title'] != '') { + $title = $importedEntry['resolved_title']; + } elseif (isset($importedEntry['given_title']) && $importedEntry['given_title'] != '') { + $title = $importedEntry['given_title']; } $entry->setTitle($title); $entry->setUrl($url); // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image - if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { - $entry->setPreviewPicture($pocketEntry['images'][1]['src']); + if (isset($importedEntry['has_image']) && $importedEntry['has_image'] > 0 && isset($importedEntry['images'][1])) { + $entry->setPreviewPicture($importedEntry['images'][1]['src']); } - if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { + if (isset($importedEntry['tags']) && !empty($importedEntry['tags'])) { $this->contentProxy->assignTagsToEntry( $entry, - array_keys($pocketEntry['tags']) + array_keys($importedEntry['tags']) ); } @@ -315,17 +240,17 @@ class PocketImport extends AbstractImport */ public function parseEntriesForProducer($entries) { - foreach ($entries as $pocketEntry) { + foreach ($entries as $importedEntry) { // set userId for the producer (it won't know which user is connected) - $pocketEntry['userId'] = $this->user->getId(); + $importedEntry['userId'] = $this->user->getId(); if ($this->markAsRead) { - $pocketEntry['status'] = 1; + $importedEntry['status'] = 1; } ++$this->importedEntries; - $this->producer->publish(json_encode($pocketEntry)); + $this->producer->publish(json_encode($importedEntry)); } } } diff --git a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php index c7cfe15d..18a6631a 100644 --- a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php +++ b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php @@ -7,24 +7,9 @@ use Wallabag\UserBundle\Entity\User; class ReadabilityImport extends AbstractImport { - private $user; private $skippedEntries = 0; private $importedEntries = 0; private $filepath; - private $markAsRead; - - /** - * We define the user in a custom call because on the import command there is no logged in user. - * So we can't retrieve user from the `security.token_storage` service. - * - * @param User $user - */ - public function setUser(User $user) - { - $this->user = $user; - - return $this; - } /** * {@inheritdoc} @@ -62,26 +47,6 @@ class ReadabilityImport extends AbstractImport return $this; } - /** - * Set whether articles must be all marked as read. - * - * @param bool $markAsRead - */ - public function setMarkAsRead($markAsRead) - { - $this->markAsRead = $markAsRead; - - return $this; - } - - /** - * Get whether articles must be all marked as read. - */ - public function getMarkAsRead() - { - return $this->markAsRead; - } - /** * {@inheritdoc} */ @@ -116,54 +81,76 @@ class ReadabilityImport extends AbstractImport return false; } + if ($this->producer) { + $this->parseEntriesForProducer($data['bookmarks']); + + return true; + } + $this->parseEntries($data['bookmarks']); return true; } + public function parseEntry(array $importedEntry) + { + $existingEntry = $this->em + ->getRepository('WallabagCoreBundle:Entry') + ->findByUrlAndUserId($importedEntry['article__url'], $this->user->getId()); + + if (false !== $existingEntry) { + ++$this->skippedEntries; + + return; + } + + $data = [ + 'title' => $importedEntry['article__title'], + 'url' => $importedEntry['article__url'], + 'content_type' => '', + 'language' => '', + 'is_archived' => $importedEntry['archive'] || $this->markAsRead, + 'is_starred' => $importedEntry['favorite'], + ]; + + $entry = $this->fetchContent( + new Entry($this->user), + $data['url'], + $data + ); + + // jump to next entry in case of problem while getting content + if (false === $entry) { + ++$this->skippedEntries; + + return; + } + + $entry->setArchived($data['is_archived']); + $entry->setStarred($data['is_starred']); + + $this->em->persist($entry); + ++$this->importedEntries; + + return $entry; + } + /** - * Parse and insert all given entries. + * Faster parse entries for Producer. + * We don't care to make check at this time. They'll be done by the consumer. * - * @param $entries + * @param array $entries */ - protected function parseEntries($entries) + protected function parseEntriesForProducer($entries) { - $i = 1; - foreach ($entries as $importedEntry) { - $existingEntry = $this->em - ->getRepository('WallabagCoreBundle:Entry') - ->findByUrlAndUserId($importedEntry['article__url'], $this->user->getId()); - - if (false !== $existingEntry) { - ++$this->skippedEntries; - continue; - } + // set userId for the producer (it won't know which user is connected) + $importedEntry['userId'] = $this->user->getId(); - $data = [ - 'title' => $importedEntry['article__title'], - 'url' => $importedEntry['article__url'], - 'content_type' => '', - 'language' => '', - 'is_archived' => $importedEntry['archive'] || $this->markAsRead, - 'is_starred' => $importedEntry['favorite'], - ]; - - $entry = $this->fetchContent( - new Entry($this->user), - $data['url'], - $data - ); - - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; - continue; + if ($this->markAsRead) { + $importedEntry['archive'] = 1; } - $entry->setArchived($data['is_archived']); - $entry->setStarred($data['is_starred']); - $this->em->persist($entry); ++$this->importedEntries; // flush every 20 entries diff --git a/src/Wallabag/ImportBundle/Import/WallabagImport.php b/src/Wallabag/ImportBundle/Import/WallabagImport.php index 581ec178..6ad14e8c 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagImport.php +++ b/src/Wallabag/ImportBundle/Import/WallabagImport.php @@ -11,7 +11,6 @@ abstract class WallabagImport extends AbstractImport protected $skippedEntries = 0; protected $importedEntries = 0; protected $filepath; - protected $markAsRead; // untitled in all languages from v1 protected $untitled = [ 'Untitled', @@ -28,19 +27,6 @@ abstract class WallabagImport extends AbstractImport '', ]; - /** - * We define the user in a custom call because on the import command there is no logged in user. - * So we can't retrieve user from the `security.token_storage` service. - * - * @param User $user - */ - public function setUser(User $user) - { - $this->user = $user; - - return $this; - } - /** * {@inheritdoc} */ @@ -79,6 +65,12 @@ abstract class WallabagImport extends AbstractImport return false; } + if ($this->producer) { + $this->parseEntriesForProducer($data); + + return true; + } + $this->parseEntries($data); return true; @@ -108,85 +100,61 @@ abstract class WallabagImport extends AbstractImport } /** - * Set whether articles must be all marked as read. - * - * @param bool $markAsRead + * {@inheritdoc} */ - public function setMarkAsRead($markAsRead) + public function parseEntry(array $importedEntry) { - $this->markAsRead = $markAsRead; + $existingEntry = $this->em + ->getRepository('WallabagCoreBundle:Entry') + ->findByUrlAndUserId($importedEntry['url'], $this->user->getId()); - return $this; - } + if (false !== $existingEntry) { + ++$this->skippedEntries; - /** - * Parse and insert all given entries. - * - * @param $entries - */ - protected function parseEntries($entries) - { - $i = 1; + return; + } + + $data = $this->prepareEntry($importedEntry); - foreach ($entries as $importedEntry) { - $existingEntry = $this->em - ->getRepository('WallabagCoreBundle:Entry') - ->findByUrlAndUserId($importedEntry['url'], $this->user->getId()); + $entry = $this->fetchContent( + new Entry($this->user), + $importedEntry['url'], + $data + ); - if (false !== $existingEntry) { - ++$this->skippedEntries; - continue; - } + // jump to next entry in case of problem while getting content + if (false === $entry) { + ++$this->skippedEntries; - $data = $this->prepareEntry($importedEntry, $this->markAsRead); + return; + } - $entry = $this->fetchContent( - new Entry($this->user), - $importedEntry['url'], - $data + if (array_key_exists('tags', $data)) { + $this->contentProxy->assignTagsToEntry( + $entry, + $data['tags'] ); + } - // jump to next entry in case of problem while getting content - if (false === $entry) { - ++$this->skippedEntries; - continue; - } - - if (array_key_exists('tags', $data)) { - $this->contentProxy->assignTagsToEntry( - $entry, - $data['tags'] - ); - } - - if (isset($importedEntry['preview_picture'])) { - $entry->setPreviewPicture($importedEntry['preview_picture']); - } - - $entry->setArchived($data['is_archived']); - $entry->setStarred($data['is_starred']); - - $this->em->persist($entry); - ++$this->importedEntries; - - // flush every 20 entries - if (($i % 20) === 0) { - $this->em->flush(); - } - ++$i; + if (isset($importedEntry['preview_picture'])) { + $entry->setPreviewPicture($importedEntry['preview_picture']); } - $this->em->flush(); - $this->em->clear(); + $entry->setArchived($data['is_archived']); + $entry->setStarred($data['is_starred']); + + $this->em->persist($entry); + ++$this->importedEntries; + + return $entry; } /** * This should return a cleaned array for a given entry to be given to `updateEntry`. * - * @param array $entry Data from the imported file - * @param bool $markAsRead Should we mark as read content? + * @param array $entry Data from the imported file * * @return array */ - abstract protected function prepareEntry($entry = [], $markAsRead = false); + abstract protected function prepareEntry($entry = []); } diff --git a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php index 6cf3467a..86734652 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php +++ b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php @@ -31,7 +31,7 @@ class WallabagV1Import extends WallabagImport /** * {@inheritdoc} */ - protected function prepareEntry($entry = [], $markAsRead = false) + protected function prepareEntry($entry = []) { $data = [ 'title' => $entry['title'], @@ -39,7 +39,7 @@ class WallabagV1Import extends WallabagImport 'url' => $entry['url'], 'content_type' => '', 'language' => '', - 'is_archived' => $entry['is_read'] || $markAsRead, + 'is_archived' => $entry['is_read'] || $this->markAsRead, 'is_starred' => $entry['is_fav'], 'tags' => '', ]; @@ -56,4 +56,20 @@ class WallabagV1Import extends WallabagImport return $data; } + + protected function parseEntriesForProducer($entries) + { + foreach ($entries as $importedEntry) { + // set userId for the producer (it won't know which user is connected) + $importedEntry['userId'] = $this->user->getId(); + + if ($this->markAsRead) { + $importedEntry['is_read'] = 1; + } + + ++$this->importedEntries; + + $this->producer->publish(json_encode($importedEntry)); + } + } } diff --git a/src/Wallabag/ImportBundle/Import/WallabagV2Import.php b/src/Wallabag/ImportBundle/Import/WallabagV2Import.php index d0035b63..faf4236f 100644 --- a/src/Wallabag/ImportBundle/Import/WallabagV2Import.php +++ b/src/Wallabag/ImportBundle/Import/WallabagV2Import.php @@ -31,12 +31,28 @@ class WallabagV2Import extends WallabagImport /** * {@inheritdoc} */ - protected function prepareEntry($entry = [], $markAsRead = false) + protected function prepareEntry($entry = []) { return [ 'html' => $entry['content'], 'content_type' => $entry['mimetype'], - 'is_archived' => ($entry['is_archived'] || $markAsRead), + 'is_archived' => ($entry['is_archived'] || $this->markAsRead), ] + $entry; } + + protected function parseEntriesForProducer($entries) + { + foreach ($entries as $importedEntry) { + // set userId for the producer (it won't know which user is connected) + $importedEntry['userId'] = $this->user->getId(); + + if ($this->markAsRead) { + $importedEntry['is_archived'] = 1; + } + + ++$this->importedEntries; + + $this->producer->publish(json_encode($importedEntry)); + } + } } diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index fe388b26..cad44e71 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml @@ -1,11 +1,32 @@ services: wallabag_import.consumer.pocket: - class: Wallabag\ImportBundle\Consumer\AMPQ\PocketConsumer + class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer arguments: - "@doctrine.orm.entity_manager" - "@wallabag_user.user_repository" - "@wallabag_import.pocket.import" - "@logger" + wallabag_import.consumer.readability: + class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.readability.import" + - "@logger" + wallabag_import.consumer.wallabag_v1: + class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v1.import" + - "@logger" + wallabag_import.consumer.wallabag_v2: + class: Wallabag\ImportBundle\Consumer\AMPQ\EntryConsumer + arguments: + - "@doctrine.orm.entity_manager" + - "@wallabag_user.user_repository" + - "@wallabag_import.wallabag_v2.import" + - "@logger" wallabag_import.chain: class: Wallabag\ImportBundle\Import\ImportChain -- cgit v1.2.3