X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;ds=sidebyside;f=src%2FWallabag%2FImportBundle%2FImport%2FAbstractImport.php;h=764b390a830428fbdc60ddf6a793d8d632fb75b0;hb=ff1a5362f7254d686864ea53994da6c517b3d3e8;hp=5b9d65d7255d8eac6a3599c138b02c8161131735;hpb=c98db1b653b5dc8b701422190b02d9fbf10c4e68;p=github%2Fwallabag%2Fwallabag.git diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 5b9d65d7..764b390a 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php @@ -7,8 +7,9 @@ use Psr\Log\NullLogger; use Doctrine\ORM\EntityManager; use Wallabag\CoreBundle\Helper\ContentProxy; use Wallabag\CoreBundle\Entity\Entry; -use Symfony\Component\Security\Core\User\UserInterface; -use OldSound\RabbitMqBundle\RabbitMq\Producer; +use Wallabag\CoreBundle\Entity\Tag; +use Wallabag\UserBundle\Entity\User; +use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; abstract class AbstractImport implements ImportInterface { @@ -18,6 +19,9 @@ abstract class AbstractImport implements ImportInterface protected $producer; protected $user; protected $markAsRead; + protected $skippedEntries = 0; + protected $importedEntries = 0; + protected $queuedEntries = 0; public function __construct(EntityManager $em, ContentProxy $contentProxy) { @@ -32,12 +36,12 @@ abstract class AbstractImport implements ImportInterface } /** - * Set RabbitMQ Producer to send each entry to a queue. + * Set RabbitMQ/Redis Producer to send each entry to a queue. * This method should be called when user has enabled RabbitMQ. * - * @param Producer $producer + * @param ProducerInterface $producer */ - public function setRabbitmqProducer(Producer $producer) + public function setProducer(ProducerInterface $producer) { $this->producer = $producer; } @@ -46,9 +50,9 @@ abstract class AbstractImport implements ImportInterface * Set current user. * Could the current *connected* user or one retrieve by the consumer. * - * @param UserInterface $user + * @param User $user */ - public function setUser(UserInterface $user) + public function setUser(User $user) { $this->user = $user; } @@ -75,20 +79,20 @@ abstract class AbstractImport implements ImportInterface /** * Fetch content from the ContentProxy (using graby). - * If it fails return false instead of the updated entry. + * If it fails return the given entry to be saved in all case (to avoid user to loose the content). * * @param Entry $entry Entry to update * @param string $url Url to grab content for * @param array $content An array with AT LEAST keys title, html, url, language & content_type to skip the fetchContent from the url * - * @return Entry|false + * @return Entry */ protected function fetchContent(Entry $entry, $url, array $content = []) { try { return $this->contentProxy->updateEntry($entry, $url, $content); } catch (\Exception $e) { - return false; + return $entry; } } @@ -102,6 +106,10 @@ abstract class AbstractImport implements ImportInterface $i = 1; foreach ($entries as $importedEntry) { + if ($this->markAsRead) { + $importedEntry = $this->setEntryAsRead($importedEntry); + } + $entry = $this->parseEntry($importedEntry); if (null === $entry) { @@ -111,7 +119,10 @@ abstract class AbstractImport implements ImportInterface // flush every 20 entries if (($i % 20) === 0) { $this->em->flush(); - $this->em->clear($entry); + + // clear only affected entities + $this->em->clear(Entry::class); + $this->em->clear(Tag::class); } ++$i; } @@ -119,6 +130,44 @@ abstract class AbstractImport implements ImportInterface $this->em->flush(); } + /** + * Parse entries and send them to the queue. + * It should just be a simple loop on all item, no call to the database should be done + * to speedup queuing. + * + * Faster parse entries for Producer. + * We don't care to make check at this time. They'll be done by the consumer. + * + * @param array $entries + */ + protected function parseEntriesForProducer(array $entries) + { + foreach ($entries as $importedEntry) { + // set userId for the producer (it won't know which user is connected) + $importedEntry['userId'] = $this->user->getId(); + + if ($this->markAsRead) { + $importedEntry = $this->setEntryAsRead($importedEntry); + } + + ++$this->queuedEntries; + + $this->producer->publish(json_encode($importedEntry)); + } + } + + /** + * {@inheritdoc} + */ + public function getSummary() + { + return [ + 'skipped' => $this->skippedEntries, + 'imported' => $this->importedEntries, + 'queued' => $this->queuedEntries, + ]; + } + /** * Parse one entry. * @@ -127,4 +176,14 @@ abstract class AbstractImport implements ImportInterface * @return Entry */ abstract public function parseEntry(array $importedEntry); + + /** + * Set current imported entry to archived / read. + * Implementation is different accross all imports. + * + * @param array $importedEntry + * + * @return array + */ + abstract protected function setEntryAsRead(array $importedEntry); }