aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/Wallabag/ImportBundle
diff options
context:
space:
mode:
authorJeremy Benoist <j0k3r@users.noreply.github.com>2016-09-19 07:15:40 +0200
committerGitHub <noreply@github.com>2016-09-19 07:15:40 +0200
commitda18a4682f124b02278860d23ac1d59dee995277 (patch)
treeeabbe9da7203eea41e0cb0ec3c26b9b6599cf58f /src/Wallabag/ImportBundle
parent0ed8ce55b5caf2c88e8330afa83abef6c4aac9a4 (diff)
parent59b97fae996d8307b9d957d210d46200f6d206bf (diff)
downloadwallabag-da18a4682f124b02278860d23ac1d59dee995277.tar.gz
wallabag-da18a4682f124b02278860d23ac1d59dee995277.tar.zst
wallabag-da18a4682f124b02278860d23ac1d59dee995277.zip
Merge pull request #1941 from wallabag/v2-asynchronous-jobs
Use asynchronous jobs for imports
Diffstat (limited to 'src/Wallabag/ImportBundle')
-rw-r--r--src/Wallabag/ImportBundle/Command/ImportCommand.php6
-rw-r--r--src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php44
-rw-r--r--src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php17
-rw-r--r--src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php74
-rw-r--r--src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php29
-rw-r--r--src/Wallabag/ImportBundle/Controller/ImportController.php62
-rw-r--r--src/Wallabag/ImportBundle/Controller/PocketController.php35
-rw-r--r--src/Wallabag/ImportBundle/Controller/ReadabilityController.php16
-rw-r--r--src/Wallabag/ImportBundle/Controller/WallabagController.php10
-rw-r--r--src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php10
-rw-r--r--src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php10
-rw-r--r--src/Wallabag/ImportBundle/Form/Type/UploadImportType.php1
-rw-r--r--src/Wallabag/ImportBundle/Import/AbstractImport.php144
-rw-r--r--src/Wallabag/ImportBundle/Import/PocketImport.php218
-rw-r--r--src/Wallabag/ImportBundle/Import/ReadabilityImport.php149
-rw-r--r--src/Wallabag/ImportBundle/Import/WallabagImport.php131
-rw-r--r--src/Wallabag/ImportBundle/Import/WallabagV1Import.php15
-rw-r--r--src/Wallabag/ImportBundle/Import/WallabagV2Import.php14
-rw-r--r--src/Wallabag/ImportBundle/Redis/Producer.php36
-rw-r--r--src/Wallabag/ImportBundle/Resources/config/rabbit.yml30
-rw-r--r--src/Wallabag/ImportBundle/Resources/config/redis.yml81
-rw-r--r--src/Wallabag/ImportBundle/Resources/config/services.yml5
-rw-r--r--src/Wallabag/ImportBundle/Resources/views/Import/_workerEnabled.html.twig8
-rw-r--r--src/Wallabag/ImportBundle/Resources/views/Import/check_queue.html.twig11
-rw-r--r--src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig12
-rw-r--r--src/Wallabag/ImportBundle/Resources/views/Readability/index.html.twig2
-rw-r--r--src/Wallabag/ImportBundle/Resources/views/WallabagV1/index.html.twig2
27 files changed, 842 insertions, 330 deletions
diff --git a/src/Wallabag/ImportBundle/Command/ImportCommand.php b/src/Wallabag/ImportBundle/Command/ImportCommand.php
index a4aa8531..20ecc6e1 100644
--- a/src/Wallabag/ImportBundle/Command/ImportCommand.php
+++ b/src/Wallabag/ImportBundle/Command/ImportCommand.php
@@ -26,6 +26,10 @@ class ImportCommand extends ContainerAwareCommand
26 { 26 {
27 $output->writeln('Start : '.(new \DateTime())->format('d-m-Y G:i:s').' ---'); 27 $output->writeln('Start : '.(new \DateTime())->format('d-m-Y G:i:s').' ---');
28 28
29 if (!file_exists($input->getArgument('filepath'))) {
30 throw new Exception(sprintf('File "%s" not found', $input->getArgument('filepath')));
31 }
32
29 $em = $this->getContainer()->get('doctrine')->getManager(); 33 $em = $this->getContainer()->get('doctrine')->getManager();
30 // Turning off doctrine default logs queries for saving memory 34 // Turning off doctrine default logs queries for saving memory
31 $em->getConnection()->getConfiguration()->setSQLLogger(null); 35 $em->getConnection()->getConfiguration()->setSQLLogger(null);
@@ -43,9 +47,9 @@ class ImportCommand extends ContainerAwareCommand
43 } 47 }
44 48
45 $wallabag->setMarkAsRead($input->getOption('markAsRead')); 49 $wallabag->setMarkAsRead($input->getOption('markAsRead'));
50 $wallabag->setUser($user);
46 51
47 $res = $wallabag 52 $res = $wallabag
48 ->setUser($user)
49 ->setFilepath($input->getArgument('filepath')) 53 ->setFilepath($input->getArgument('filepath'))
50 ->import(); 54 ->import();
51 55
diff --git a/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php
new file mode 100644
index 00000000..5f90e00f
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Command/RedisWorkerCommand.php
@@ -0,0 +1,44 @@
1<?php
2
3namespace Wallabag\ImportBundle\Command;
4
5use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
6use Symfony\Component\Config\Definition\Exception\Exception;
7use Symfony\Component\Console\Input\InputArgument;
8use Symfony\Component\Console\Input\InputOption;
9use Symfony\Component\Console\Input\InputInterface;
10use Symfony\Component\Console\Output\OutputInterface;
11use Simpleue\Worker\QueueWorker;
12
13class RedisWorkerCommand extends ContainerAwareCommand
14{
15 protected function configure()
16 {
17 $this
18 ->setName('wallabag:import:redis-worker')
19 ->setDescription('Launch Redis worker')
20 ->addArgument('serviceName', InputArgument::REQUIRED, 'Service to use: wallabag_v1, wallabag_v2, pocket or readability')
21 ->addOption('maxIterations', '', InputOption::VALUE_OPTIONAL, 'Number of iterations before stoping', false)
22 ;
23 }
24
25 protected function execute(InputInterface $input, OutputInterface $output)
26 {
27 $output->writeln('Worker started at: '.(new \DateTime())->format('d-m-Y G:i:s'));
28 $output->writeln('Waiting for message ...');
29
30 $serviceName = $input->getArgument('serviceName');
31
32 if (!$this->getContainer()->has('wallabag_import.queue.redis.'.$serviceName) || !$this->getContainer()->has('wallabag_import.consumer.redis.'.$serviceName)) {
33 throw new Exception(sprintf('No queue or consumer found for service name: "%s"', $input->getArgument('serviceName')));
34 }
35
36 $worker = new QueueWorker(
37 $this->getContainer()->get('wallabag_import.queue.redis.'.$serviceName),
38 $this->getContainer()->get('wallabag_import.consumer.redis.'.$serviceName),
39 $input->getOption('maxIterations')
40 );
41
42 $worker->start();
43 }
44}
diff --git a/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php
new file mode 100644
index 00000000..5aaa8c03
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Consumer/AMQPEntryConsumer.php
@@ -0,0 +1,17 @@
1<?php
2
3namespace Wallabag\ImportBundle\Consumer;
4
5use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6use PhpAmqpLib\Message\AMQPMessage;
7
8class AMQPEntryConsumer extends AbstractConsumer implements ConsumerInterface
9{
10 /**
11 * {@inheritdoc}
12 */
13 public function execute(AMQPMessage $msg)
14 {
15 return $this->handleMessage($msg->body);
16 }
17}
diff --git a/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php
new file mode 100644
index 00000000..2b85ad76
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Consumer/AbstractConsumer.php
@@ -0,0 +1,74 @@
1<?php
2
3namespace Wallabag\ImportBundle\Consumer;
4
5use Doctrine\ORM\EntityManager;
6use Wallabag\ImportBundle\Import\AbstractImport;
7use Wallabag\UserBundle\Repository\UserRepository;
8use Wallabag\CoreBundle\Entity\Entry;
9use Wallabag\CoreBundle\Entity\Tag;
10use Psr\Log\LoggerInterface;
11use Psr\Log\NullLogger;
12
13abstract class AbstractConsumer
14{
15 protected $em;
16 protected $userRepository;
17 protected $import;
18 protected $logger;
19
20 public function __construct(EntityManager $em, UserRepository $userRepository, AbstractImport $import, LoggerInterface $logger = null)
21 {
22 $this->em = $em;
23 $this->userRepository = $userRepository;
24 $this->import = $import;
25 $this->logger = $logger ?: new NullLogger();
26 }
27
28 /**
29 * Handle a message and save it.
30 *
31 * @param string $body Message from the queue (in json)
32 *
33 * @return bool
34 */
35 protected function handleMessage($body)
36 {
37 $storedEntry = json_decode($body, true);
38
39 $user = $this->userRepository->find($storedEntry['userId']);
40
41 // no user? Drop message
42 if (null === $user) {
43 $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]);
44
45 return false;
46 }
47
48 $this->import->setUser($user);
49
50 $entry = $this->import->parseEntry($storedEntry);
51
52 if (null === $entry) {
53 $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]);
54
55 return false;
56 }
57
58 try {
59 $this->em->flush();
60
61 // clear only affected entities
62 $this->em->clear(Entry::class);
63 $this->em->clear(Tag::class);
64 } catch (\Exception $e) {
65 $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]);
66
67 return false;
68 }
69
70 $this->logger->info('Content with url imported! ('.$entry->getUrl().')');
71
72 return true;
73 }
74}
diff --git a/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php
new file mode 100644
index 00000000..450b71ff
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Consumer/RedisEntryConsumer.php
@@ -0,0 +1,29 @@
1<?php
2
3namespace Wallabag\ImportBundle\Consumer;
4
5use Simpleue\Job\Job;
6
7class RedisEntryConsumer extends AbstractConsumer implements Job
8{
9 /**
10 * Handle one message by one message.
11 *
12 * @param string $job Content of the message (directly from Redis)
13 *
14 * @return bool
15 */
16 public function manage($job)
17 {
18 return $this->handleMessage($job);
19 }
20
21 /**
22 * Should tell if the given job will kill the worker.
23 * We don't want to stop it :).
24 */
25 public function isStopJob($job)
26 {
27 return false;
28 }
29}
diff --git a/src/Wallabag/ImportBundle/Controller/ImportController.php b/src/Wallabag/ImportBundle/Controller/ImportController.php
index c1486e38..ea4f7d7b 100644
--- a/src/Wallabag/ImportBundle/Controller/ImportController.php
+++ b/src/Wallabag/ImportBundle/Controller/ImportController.php
@@ -16,4 +16,66 @@ class ImportController extends Controller
16 'imports' => $this->get('wallabag_import.chain')->getAll(), 16 'imports' => $this->get('wallabag_import.chain')->getAll(),
17 ]); 17 ]);
18 } 18 }
19
20 /**
21 * Display how many messages are queue (both in Redis and RabbitMQ).
22 * Only for admins.
23 */
24 public function checkQueueAction()
25 {
26 $nbRedisMessages = null;
27 $nbRabbitMessages = null;
28
29 if (!$this->get('security.authorization_checker')->isGranted('ROLE_SUPER_ADMIN')) {
30 return $this->render('WallabagImportBundle:Import:check_queue.html.twig', [
31 'nbRedisMessages' => $nbRedisMessages,
32 'nbRabbitMessages' => $nbRabbitMessages,
33 ]);
34 }
35
36 if ($this->get('craue_config')->get('import_with_rabbitmq')) {
37 $nbRabbitMessages = $this->getTotalMessageInRabbitQueue('pocket')
38 + $this->getTotalMessageInRabbitQueue('readability')
39 + $this->getTotalMessageInRabbitQueue('wallabag_v1')
40 + $this->getTotalMessageInRabbitQueue('wallabag_v2')
41 ;
42 } elseif ($this->get('craue_config')->get('import_with_redis')) {
43 $redis = $this->get('wallabag_core.redis.client');
44
45 $nbRedisMessages = $redis->llen('wallabag.import.pocket')
46 + $redis->llen('wallabag.import.readability')
47 + $redis->llen('wallabag.import.wallabag_v1')
48 + $redis->llen('wallabag.import.wallabag_v2')
49 ;
50 }
51
52 return $this->render('WallabagImportBundle:Import:check_queue.html.twig', [
53 'nbRedisMessages' => $nbRedisMessages,
54 'nbRabbitMessages' => $nbRabbitMessages,
55 ]);
56 }
57
58 /**
59 * Count message in RabbitMQ queue.
60 * It get one message without acking it (so it'll stay in the queue)
61 * which will include the total of *other* messages in the queue.
62 * Adding one to that messages will result in the full total message.
63 *
64 * @param string $importService The import service related: pocket, readability, wallabag_v1 or wallabag_v2
65 *
66 * @return int
67 */
68 private function getTotalMessageInRabbitQueue($importService)
69 {
70 $message = $this
71 ->get('old_sound_rabbit_mq.import_'.$importService.'_consumer')
72 ->getChannel()
73 ->basic_get('wallabag.import.'.$importService);
74
75 if (null === $message) {
76 return 0;
77 }
78
79 return $message->delivery_info['message_count'] + 1;
80 }
19} 81}
diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php
index 36ee25bf..56be5cbf 100644
--- a/src/Wallabag/ImportBundle/Controller/PocketController.php
+++ b/src/Wallabag/ImportBundle/Controller/PocketController.php
@@ -11,11 +11,30 @@ use Symfony\Component\Form\Extension\Core\Type\CheckboxType;
11class PocketController extends Controller 11class PocketController extends Controller
12{ 12{
13 /** 13 /**
14 * Return Pocket Import Service with or without RabbitMQ enabled.
15 *
16 * @return \Wallabag\ImportBundle\Import\PocketImport
17 */
18 private function getPocketImportService()
19 {
20 $pocket = $this->get('wallabag_import.pocket.import');
21 $pocket->setUser($this->getUser());
22
23 if ($this->get('craue_config')->get('import_with_rabbitmq')) {
24 $pocket->setProducer($this->get('old_sound_rabbit_mq.import_pocket_producer'));
25 } elseif ($this->get('craue_config')->get('import_with_redis')) {
26 $pocket->setProducer($this->get('wallabag_import.producer.redis.pocket'));
27 }
28
29 return $pocket;
30 }
31
32 /**
14 * @Route("/pocket", name="import_pocket") 33 * @Route("/pocket", name="import_pocket")
15 */ 34 */
16 public function indexAction() 35 public function indexAction()
17 { 36 {
18 $pocket = $this->get('wallabag_import.pocket.import'); 37 $pocket = $this->getPocketImportService();
19 $form = $this->createFormBuilder($pocket) 38 $form = $this->createFormBuilder($pocket)
20 ->add('mark_as_read', CheckboxType::class, [ 39 ->add('mark_as_read', CheckboxType::class, [
21 'label' => 'import.form.mark_as_read_label', 40 'label' => 'import.form.mark_as_read_label',
@@ -24,8 +43,8 @@ class PocketController extends Controller
24 ->getForm(); 43 ->getForm();
25 44
26 return $this->render('WallabagImportBundle:Pocket:index.html.twig', [ 45 return $this->render('WallabagImportBundle:Pocket:index.html.twig', [
27 'import' => $this->get('wallabag_import.pocket.import'), 46 'import' => $this->getPocketImportService(),
28 'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true, 47 'has_consumer_key' => '' === trim($this->getUser()->getConfig()->getPocketConsumerKey()) ? false : true,
29 'form' => $form->createView(), 48 'form' => $form->createView(),
30 ]); 49 ]);
31 } 50 }
@@ -35,7 +54,7 @@ class PocketController extends Controller
35 */ 54 */
36 public function authAction(Request $request) 55 public function authAction(Request $request)
37 { 56 {
38 $requestToken = $this->get('wallabag_import.pocket.import') 57 $requestToken = $this->getPocketImportService()
39 ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL)); 58 ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL));
40 59
41 if (false === $requestToken) { 60 if (false === $requestToken) {
@@ -62,7 +81,7 @@ class PocketController extends Controller
62 public function callbackAction() 81 public function callbackAction()
63 { 82 {
64 $message = 'flashes.import.notice.failed'; 83 $message = 'flashes.import.notice.failed';
65 $pocket = $this->get('wallabag_import.pocket.import'); 84 $pocket = $this->getPocketImportService();
66 85
67 $markAsRead = $this->get('session')->get('mark_as_read'); 86 $markAsRead = $this->get('session')->get('mark_as_read');
68 $this->get('session')->remove('mark_as_read'); 87 $this->get('session')->remove('mark_as_read');
@@ -83,6 +102,12 @@ class PocketController extends Controller
83 '%imported%' => $summary['imported'], 102 '%imported%' => $summary['imported'],
84 '%skipped%' => $summary['skipped'], 103 '%skipped%' => $summary['skipped'],
85 ]); 104 ]);
105
106 if (0 < $summary['queued']) {
107 $message = $this->get('translator')->trans('flashes.import.notice.summary_with_queue', [
108 '%queued%' => $summary['queued'],
109 ]);
110 }
86 } 111 }
87 112
88 $this->get('session')->getFlashBag()->add( 113 $this->get('session')->getFlashBag()->add(
diff --git a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php
index b61aa99c..d00e22c2 100644
--- a/src/Wallabag/ImportBundle/Controller/ReadabilityController.php
+++ b/src/Wallabag/ImportBundle/Controller/ReadabilityController.php
@@ -18,15 +18,21 @@ class ReadabilityController extends Controller
18 $form->handleRequest($request); 18 $form->handleRequest($request);
19 19
20 $readability = $this->get('wallabag_import.readability.import'); 20 $readability = $this->get('wallabag_import.readability.import');
21 $readability->setUser($this->getUser());
22
23 if ($this->get('craue_config')->get('import_with_rabbitmq')) {
24 $readability->setProducer($this->get('old_sound_rabbit_mq.import_readability_producer'));
25 } elseif ($this->get('craue_config')->get('import_with_redis')) {
26 $readability->setProducer($this->get('wallabag_import.producer.redis.readability'));
27 }
21 28
22 if ($form->isValid()) { 29 if ($form->isValid()) {
23 $file = $form->get('file')->getData(); 30 $file = $form->get('file')->getData();
24 $markAsRead = $form->get('mark_as_read')->getData(); 31 $markAsRead = $form->get('mark_as_read')->getData();
25 $name = 'readability_'.$this->getUser()->getId().'.json'; 32 $name = 'readability_'.$this->getUser()->getId().'.json';
26 33
27 if (in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { 34 if (null !== $file && in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) {
28 $res = $readability 35 $res = $readability
29 ->setUser($this->getUser())
30 ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name) 36 ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name)
31 ->setMarkAsRead($markAsRead) 37 ->setMarkAsRead($markAsRead)
32 ->import(); 38 ->import();
@@ -40,6 +46,12 @@ class ReadabilityController extends Controller
40 '%skipped%' => $summary['skipped'], 46 '%skipped%' => $summary['skipped'],
41 ]); 47 ]);
42 48
49 if (0 < $summary['queued']) {
50 $message = $this->get('translator')->trans('flashes.import.notice.summary_with_queue', [
51 '%queued%' => $summary['queued'],
52 ]);
53 }
54
43 unlink($this->getParameter('wallabag_import.resource_dir').'/'.$name); 55 unlink($this->getParameter('wallabag_import.resource_dir').'/'.$name);
44 } 56 }
45 57
diff --git a/src/Wallabag/ImportBundle/Controller/WallabagController.php b/src/Wallabag/ImportBundle/Controller/WallabagController.php
index 76ced0d2..9c0cde80 100644
--- a/src/Wallabag/ImportBundle/Controller/WallabagController.php
+++ b/src/Wallabag/ImportBundle/Controller/WallabagController.php
@@ -38,15 +38,15 @@ abstract class WallabagController extends Controller
38 $form->handleRequest($request); 38 $form->handleRequest($request);
39 39
40 $wallabag = $this->getImportService(); 40 $wallabag = $this->getImportService();
41 $wallabag->setUser($this->getUser());
41 42
42 if ($form->isValid()) { 43 if ($form->isValid()) {
43 $file = $form->get('file')->getData(); 44 $file = $form->get('file')->getData();
44 $markAsRead = $form->get('mark_as_read')->getData(); 45 $markAsRead = $form->get('mark_as_read')->getData();
45 $name = $this->getUser()->getId().'.json'; 46 $name = $this->getUser()->getId().'.json';
46 47
47 if (in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) { 48 if (null !== $file && in_array($file->getClientMimeType(), $this->getParameter('wallabag_import.allow_mimetypes')) && $file->move($this->getParameter('wallabag_import.resource_dir'), $name)) {
48 $res = $wallabag 49 $res = $wallabag
49 ->setUser($this->getUser())
50 ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name) 50 ->setFilepath($this->getParameter('wallabag_import.resource_dir').'/'.$name)
51 ->setMarkAsRead($markAsRead) 51 ->setMarkAsRead($markAsRead)
52 ->import(); 52 ->import();
@@ -60,6 +60,12 @@ abstract class WallabagController extends Controller
60 '%skipped%' => $summary['skipped'], 60 '%skipped%' => $summary['skipped'],
61 ]); 61 ]);
62 62
63 if (0 < $summary['queued']) {
64 $message = $this->get('translator')->trans('flashes.import.notice.summary_with_queue', [
65 '%queued%' => $summary['queued'],
66 ]);
67 }
68
63 unlink($this->getParameter('wallabag_import.resource_dir').'/'.$name); 69 unlink($this->getParameter('wallabag_import.resource_dir').'/'.$name);
64 } 70 }
65 71
diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php
index 3e748d57..312c7a35 100644
--- a/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php
+++ b/src/Wallabag/ImportBundle/Controller/WallabagV1Controller.php
@@ -12,7 +12,15 @@ class WallabagV1Controller extends WallabagController
12 */ 12 */
13 protected function getImportService() 13 protected function getImportService()
14 { 14 {
15 return $this->get('wallabag_import.wallabag_v1.import'); 15 $service = $this->get('wallabag_import.wallabag_v1.import');
16
17 if ($this->get('craue_config')->get('import_with_rabbitmq')) {
18 $service->setProducer($this->get('old_sound_rabbit_mq.import_wallabag_v1_producer'));
19 } elseif ($this->get('craue_config')->get('import_with_redis')) {
20 $service->setProducer($this->get('wallabag_import.producer.redis.wallabag_v1'));
21 }
22
23 return $service;
16 } 24 }
17 25
18 /** 26 /**
diff --git a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php
index c2a42165..45211fe6 100644
--- a/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php
+++ b/src/Wallabag/ImportBundle/Controller/WallabagV2Controller.php
@@ -12,7 +12,15 @@ class WallabagV2Controller extends WallabagController
12 */ 12 */
13 protected function getImportService() 13 protected function getImportService()
14 { 14 {
15 return $this->get('wallabag_import.wallabag_v2.import'); 15 $service = $this->get('wallabag_import.wallabag_v2.import');
16
17 if ($this->get('craue_config')->get('import_with_rabbitmq')) {
18 $service->setProducer($this->get('old_sound_rabbit_mq.import_wallabag_v2_producer'));
19 } elseif ($this->get('craue_config')->get('import_with_redis')) {
20 $service->setProducer($this->get('wallabag_import.producer.redis.wallabag_v2'));
21 }
22
23 return $service;
16 } 24 }
17 25
18 /** 26 /**
diff --git a/src/Wallabag/ImportBundle/Form/Type/UploadImportType.php b/src/Wallabag/ImportBundle/Form/Type/UploadImportType.php
index 92a167d9..f50424c1 100644
--- a/src/Wallabag/ImportBundle/Form/Type/UploadImportType.php
+++ b/src/Wallabag/ImportBundle/Form/Type/UploadImportType.php
@@ -15,6 +15,7 @@ class UploadImportType extends AbstractType
15 $builder 15 $builder
16 ->add('file', FileType::class, [ 16 ->add('file', FileType::class, [
17 'label' => 'import.form.file_label', 17 'label' => 'import.form.file_label',
18 'required' => true,
18 ]) 19 ])
19 ->add('mark_as_read', CheckboxType::class, [ 20 ->add('mark_as_read', CheckboxType::class, [
20 'label' => 'import.form.mark_as_read_label', 21 'label' => 'import.form.mark_as_read_label',
diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php
index 14377a35..a1a14576 100644
--- a/src/Wallabag/ImportBundle/Import/AbstractImport.php
+++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php
@@ -7,12 +7,21 @@ use Psr\Log\NullLogger;
7use Doctrine\ORM\EntityManager; 7use Doctrine\ORM\EntityManager;
8use Wallabag\CoreBundle\Helper\ContentProxy; 8use Wallabag\CoreBundle\Helper\ContentProxy;
9use Wallabag\CoreBundle\Entity\Entry; 9use Wallabag\CoreBundle\Entity\Entry;
10use Wallabag\CoreBundle\Entity\Tag;
11use Wallabag\UserBundle\Entity\User;
12use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
10 13
11abstract class AbstractImport implements ImportInterface 14abstract class AbstractImport implements ImportInterface
12{ 15{
13 protected $em; 16 protected $em;
14 protected $logger; 17 protected $logger;
15 protected $contentProxy; 18 protected $contentProxy;
19 protected $producer;
20 protected $user;
21 protected $markAsRead;
22 protected $skippedEntries = 0;
23 protected $importedEntries = 0;
24 protected $queuedEntries = 0;
16 25
17 public function __construct(EntityManager $em, ContentProxy $contentProxy) 26 public function __construct(EntityManager $em, ContentProxy $contentProxy)
18 { 27 {
@@ -27,21 +36,150 @@ abstract class AbstractImport implements ImportInterface
27 } 36 }
28 37
29 /** 38 /**
39 * Set RabbitMQ/Redis Producer to send each entry to a queue.
40 * This method should be called when user has enabled RabbitMQ.
41 *
42 * @param ProducerInterface $producer
43 */
44 public function setProducer(ProducerInterface $producer)
45 {
46 $this->producer = $producer;
47 }
48
49 /**
50 * Set current user.
51 * Could the current *connected* user or one retrieve by the consumer.
52 *
53 * @param User $user
54 */
55 public function setUser(User $user)
56 {
57 $this->user = $user;
58 }
59
60 /**
61 * Set whether articles must be all marked as read.
62 *
63 * @param bool $markAsRead
64 */
65 public function setMarkAsRead($markAsRead)
66 {
67 $this->markAsRead = $markAsRead;
68
69 return $this;
70 }
71
72 /**
73 * Get whether articles must be all marked as read.
74 */
75 public function getMarkAsRead()
76 {
77 return $this->markAsRead;
78 }
79
80 /**
30 * Fetch content from the ContentProxy (using graby). 81 * Fetch content from the ContentProxy (using graby).
31 * If it fails return false instead of the updated entry. 82 * If it fails return the given entry to be saved in all case (to avoid user to loose the content).
32 * 83 *
33 * @param Entry $entry Entry to update 84 * @param Entry $entry Entry to update
34 * @param string $url Url to grab content for 85 * @param string $url Url to grab content for
35 * @param array $content An array with AT LEAST keys title, html, url, language & content_type to skip the fetchContent from the url 86 * @param array $content An array with AT LEAST keys title, html, url, language & content_type to skip the fetchContent from the url
36 * 87 *
37 * @return Entry|false 88 * @return Entry
38 */ 89 */
39 protected function fetchContent(Entry $entry, $url, array $content = []) 90 protected function fetchContent(Entry $entry, $url, array $content = [])
40 { 91 {
41 try { 92 try {
42 return $this->contentProxy->updateEntry($entry, $url, $content); 93 return $this->contentProxy->updateEntry($entry, $url, $content);
43 } catch (\Exception $e) { 94 } catch (\Exception $e) {
44 return false; 95 return $entry;
96 }
97 }
98
99 /**
100 * Parse and insert all given entries.
101 *
102 * @param $entries
103 */
104 protected function parseEntries($entries)
105 {
106 $i = 1;
107
108 foreach ($entries as $importedEntry) {
109 $entry = $this->parseEntry($importedEntry);
110
111 if (null === $entry) {
112 continue;
113 }
114
115 // flush every 20 entries
116 if (($i % 20) === 0) {
117 $this->em->flush();
118
119 // clear only affected entities
120 $this->em->clear(Entry::class);
121 $this->em->clear(Tag::class);
122 }
123 ++$i;
124 }
125
126 $this->em->flush();
127 }
128
129 /**
130 * Parse entries and send them to the queue.
131 * It should just be a simple loop on all item, no call to the database should be done
132 * to speedup queuing.
133 *
134 * Faster parse entries for Producer.
135 * We don't care to make check at this time. They'll be done by the consumer.
136 *
137 * @param array $entries
138 */
139 protected function parseEntriesForProducer(array $entries)
140 {
141 foreach ($entries as $importedEntry) {
142 // set userId for the producer (it won't know which user is connected)
143 $importedEntry['userId'] = $this->user->getId();
144
145 if ($this->markAsRead) {
146 $importedEntry = $this->setEntryAsRead($importedEntry);
147 }
148
149 ++$this->queuedEntries;
150
151 $this->producer->publish(json_encode($importedEntry));
45 } 152 }
46 } 153 }
154
155 /**
156 * {@inheritdoc}
157 */
158 public function getSummary()
159 {
160 return [
161 'skipped' => $this->skippedEntries,
162 'imported' => $this->importedEntries,
163 'queued' => $this->queuedEntries,
164 ];
165 }
166
167 /**
168 * Parse one entry.
169 *
170 * @param array $importedEntry
171 *
172 * @return Entry
173 */
174 abstract public function parseEntry(array $importedEntry);
175
176 /**
177 * Set current imported entry to archived / read.
178 * Implementation is different accross all imports.
179 *
180 * @param array $importedEntry
181 *
182 * @return array
183 */
184 abstract protected function setEntryAsRead(array $importedEntry);
47} 185}
diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php
index a6f905b1..e00eb44b 100644
--- a/src/Wallabag/ImportBundle/Import/PocketImport.php
+++ b/src/Wallabag/ImportBundle/Import/PocketImport.php
@@ -6,31 +6,34 @@ use Psr\Log\NullLogger;
6use Doctrine\ORM\EntityManager; 6use Doctrine\ORM\EntityManager;
7use GuzzleHttp\Client; 7use GuzzleHttp\Client;
8use GuzzleHttp\Exception\RequestException; 8use GuzzleHttp\Exception\RequestException;
9use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface;
10use Wallabag\CoreBundle\Entity\Entry; 9use Wallabag\CoreBundle\Entity\Entry;
11use Wallabag\CoreBundle\Helper\ContentProxy; 10use Wallabag\CoreBundle\Helper\ContentProxy;
12use Craue\ConfigBundle\Util\Config;
13 11
14class PocketImport extends AbstractImport 12class PocketImport extends AbstractImport
15{ 13{
16 private $user;
17 private $client; 14 private $client;
18 private $consumerKey; 15 private $accessToken;
19 private $skippedEntries = 0;
20 private $importedEntries = 0;
21 private $markAsRead;
22 protected $accessToken;
23 16
24 public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) 17 const NB_ELEMENTS = 5000;
18
19 public function __construct(EntityManager $em, ContentProxy $contentProxy)
25 { 20 {
26 $this->user = $tokenStorage->getToken()->getUser();
27 $this->em = $em; 21 $this->em = $em;
28 $this->contentProxy = $contentProxy; 22 $this->contentProxy = $contentProxy;
29 $this->consumerKey = $craueConfig->get('pocket_consumer_key');
30 $this->logger = new NullLogger(); 23 $this->logger = new NullLogger();
31 } 24 }
32 25
33 /** 26 /**
27 * Only used for test purpose.
28 *
29 * @return string
30 */
31 public function getAccessToken()
32 {
33 return $this->accessToken;
34 }
35
36 /**
34 * {@inheritdoc} 37 * {@inheritdoc}
35 */ 38 */
36 public function getName() 39 public function getName()
@@ -66,7 +69,7 @@ class PocketImport extends AbstractImport
66 $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/oauth/request', 69 $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/oauth/request',
67 [ 70 [
68 'body' => json_encode([ 71 'body' => json_encode([
69 'consumer_key' => $this->consumerKey, 72 'consumer_key' => $this->user->getConfig()->getPocketConsumerKey(),
70 'redirect_uri' => $redirectUri, 73 'redirect_uri' => $redirectUri,
71 ]), 74 ]),
72 ] 75 ]
@@ -96,7 +99,7 @@ class PocketImport extends AbstractImport
96 $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/oauth/authorize', 99 $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/oauth/authorize',
97 [ 100 [
98 'body' => json_encode([ 101 'body' => json_encode([
99 'consumer_key' => $this->consumerKey, 102 'consumer_key' => $this->user->getConfig()->getPocketConsumerKey(),
100 'code' => $code, 103 'code' => $code,
101 ]), 104 ]),
102 ] 105 ]
@@ -116,38 +119,22 @@ class PocketImport extends AbstractImport
116 } 119 }
117 120
118 /** 121 /**
119 * Set whether articles must be all marked as read.
120 *
121 * @param bool $markAsRead
122 */
123 public function setMarkAsRead($markAsRead)
124 {
125 $this->markAsRead = $markAsRead;
126
127 return $this;
128 }
129
130 /**
131 * Get whether articles must be all marked as read.
132 */
133 public function getMarkAsRead()
134 {
135 return $this->markAsRead;
136 }
137
138 /**
139 * {@inheritdoc} 122 * {@inheritdoc}
140 */ 123 */
141 public function import() 124 public function import($offset = 0)
142 { 125 {
126 static $run = 0;
127
143 $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/get', 128 $request = $this->client->createRequest('POST', 'https://getpocket.com/v3/get',
144 [ 129 [
145 'body' => json_encode([ 130 'body' => json_encode([
146 'consumer_key' => $this->consumerKey, 131 'consumer_key' => $this->user->getConfig()->getPocketConsumerKey(),
147 'access_token' => $this->accessToken, 132 'access_token' => $this->accessToken,
148 'detailType' => 'complete', 133 'detailType' => 'complete',
149 'state' => 'all', 134 'state' => 'all',
150 'sort' => 'oldest', 135 'sort' => 'newest',
136 'count' => self::NB_ELEMENTS,
137 'offset' => $offset,
151 ]), 138 ]),
152 ] 139 ]
153 ); 140 );
@@ -162,20 +149,24 @@ class PocketImport extends AbstractImport
162 149
163 $entries = $response->json(); 150 $entries = $response->json();
164 151
165 $this->parseEntries($entries['list']); 152 if ($this->producer) {
153 $this->parseEntriesForProducer($entries['list']);
154 } else {
155 $this->parseEntries($entries['list']);
156 }
166 157
167 return true; 158 // if we retrieve exactly the amount of items requested it means we can get more
168 } 159 // re-call import and offset item by the amount previous received:
160 // - first call get 5k offset 0
161 // - second call get 5k offset 5k
162 // - and so on
163 if (count($entries['list']) === self::NB_ELEMENTS) {
164 ++$run;
169 165
170 /** 166 return $this->import(self::NB_ELEMENTS * $run);
171 * {@inheritdoc} 167 }
172 */ 168
173 public function getSummary() 169 return true;
174 {
175 return [
176 'skipped' => $this->skippedEntries,
177 'imported' => $this->importedEntries,
178 ];
179 } 170 }
180 171
181 /** 172 /**
@@ -189,77 +180,74 @@ class PocketImport extends AbstractImport
189 } 180 }
190 181
191 /** 182 /**
192 * @see https://getpocket.com/developer/docs/v3/retrieve 183 * {@inheritdoc}
193 * 184 *
194 * @param $entries 185 * @see https://getpocket.com/developer/docs/v3/retrieve
195 */ 186 */
196 private function parseEntries($entries) 187 public function parseEntry(array $importedEntry)
197 { 188 {
198 $i = 1; 189 $url = isset($importedEntry['resolved_url']) && $importedEntry['resolved_url'] != '' ? $importedEntry['resolved_url'] : $importedEntry['given_url'];
199 190
200 foreach ($entries as $pocketEntry) { 191 $existingEntry = $this->em
201 $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; 192 ->getRepository('WallabagCoreBundle:Entry')
202 193 ->findByUrlAndUserId($url, $this->user->getId());
203 $existingEntry = $this->em 194
204 ->getRepository('WallabagCoreBundle:Entry') 195 if (false !== $existingEntry) {
205 ->findByUrlAndUserId($url, $this->user->getId()); 196 ++$this->skippedEntries;
206 197
207 if (false !== $existingEntry) { 198 return;
208 ++$this->skippedEntries; 199 }
209 continue; 200
210 } 201 $entry = new Entry($this->user);
211 202 $entry->setUrl($url);
212 $entry = new Entry($this->user); 203
213 $entry = $this->fetchContent($entry, $url); 204 // update entry with content (in case fetching failed, the given entry will be return)
214 205 $entry = $this->fetchContent($entry, $url);
215 // jump to next entry in case of problem while getting content 206
216 if (false === $entry) { 207 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
217 ++$this->skippedEntries; 208 $entry->setArchived($importedEntry['status'] == 1 || $this->markAsRead);
218 continue; 209
219 } 210 // 0 or 1 - 1 If the item is starred
220 211 $entry->setStarred($importedEntry['favorite'] == 1);
221 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted 212
222 if ($pocketEntry['status'] == 1 || $this->markAsRead) { 213 $title = 'Untitled';
223 $entry->setArchived(true); 214 if (isset($importedEntry['resolved_title']) && $importedEntry['resolved_title'] != '') {
224 } 215 $title = $importedEntry['resolved_title'];
225 216 } elseif (isset($importedEntry['given_title']) && $importedEntry['given_title'] != '') {
226 // 0 or 1 - 1 If the item is starred 217 $title = $importedEntry['given_title'];
227 if ($pocketEntry['favorite'] == 1) { 218 }
228 $entry->setStarred(true); 219
229 } 220 $entry->setTitle($title);
230 221
231 $title = 'Untitled'; 222 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
232 if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { 223 if (isset($importedEntry['has_image']) && $importedEntry['has_image'] > 0 && isset($importedEntry['images'][1])) {
233 $title = $pocketEntry['resolved_title']; 224 $entry->setPreviewPicture($importedEntry['images'][1]['src']);
234 } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') { 225 }
235 $title = $pocketEntry['given_title']; 226
236 } 227 if (isset($importedEntry['tags']) && !empty($importedEntry['tags'])) {
237 228 $this->contentProxy->assignTagsToEntry(
238 $entry->setTitle($title); 229 $entry,
239 230 array_keys($importedEntry['tags'])
240 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image 231 );
241 if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
242 $entry->setPreviewPicture($pocketEntry['images'][1]['src']);
243 }
244
245 if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) {
246 $this->contentProxy->assignTagsToEntry(
247 $entry,
248 array_keys($pocketEntry['tags'])
249 );
250 }
251
252 $this->em->persist($entry);
253 ++$this->importedEntries;
254
255 // flush every 20 entries
256 if (($i % 20) === 0) {
257 $this->em->flush();
258 }
259 ++$i;
260 } 232 }
261 233
262 $this->em->flush(); 234 if (!empty($importedEntry['time_added'])) {
263 $this->em->clear(); 235 $entry->setCreatedAt((new \DateTime())->setTimestamp($importedEntry['time_added']));
236 }
237
238 $this->em->persist($entry);
239 ++$this->importedEntries;
240
241 return $entry;
242 }
243
244 /**
245 * {@inheritdoc}
246 */
247 protected function setEntryAsRead(array $importedEntry)
248 {
249 $importedEntry['status'] = '1';
250
251 return $importedEntry;
264 } 252 }
265} 253}
diff --git a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php
index c7cfe15d..fa2b7053 100644
--- a/src/Wallabag/ImportBundle/Import/ReadabilityImport.php
+++ b/src/Wallabag/ImportBundle/Import/ReadabilityImport.php
@@ -3,28 +3,10 @@
3namespace Wallabag\ImportBundle\Import; 3namespace Wallabag\ImportBundle\Import;
4 4
5use Wallabag\CoreBundle\Entity\Entry; 5use Wallabag\CoreBundle\Entity\Entry;
6use Wallabag\UserBundle\Entity\User;
7 6
8class ReadabilityImport extends AbstractImport 7class ReadabilityImport extends AbstractImport
9{ 8{
10 private $user;
11 private $skippedEntries = 0;
12 private $importedEntries = 0;
13 private $filepath; 9 private $filepath;
14 private $markAsRead;
15
16 /**
17 * We define the user in a custom call because on the import command there is no logged in user.
18 * So we can't retrieve user from the `security.token_storage` service.
19 *
20 * @param User $user
21 */
22 public function setUser(User $user)
23 {
24 $this->user = $user;
25
26 return $this;
27 }
28 10
29 /** 11 /**
30 * {@inheritdoc} 12 * {@inheritdoc}
@@ -63,37 +45,6 @@ class ReadabilityImport extends AbstractImport
63 } 45 }
64 46
65 /** 47 /**
66 * Set whether articles must be all marked as read.
67 *
68 * @param bool $markAsRead
69 */
70 public function setMarkAsRead($markAsRead)
71 {
72 $this->markAsRead = $markAsRead;
73
74 return $this;
75 }
76
77 /**
78 * Get whether articles must be all marked as read.
79 */
80 public function getMarkAsRead()
81 {
82 return $this->markAsRead;
83 }
84
85 /**
86 * {@inheritdoc}
87 */
88 public function getSummary()
89 {
90 return [
91 'skipped' => $this->skippedEntries,
92 'imported' => $this->importedEntries,
93 ];
94 }
95
96 /**
97 * {@inheritdoc} 48 * {@inheritdoc}
98 */ 49 */
99 public function import() 50 public function import()
@@ -116,64 +67,66 @@ class ReadabilityImport extends AbstractImport
116 return false; 67 return false;
117 } 68 }
118 69
70 if ($this->producer) {
71 $this->parseEntriesForProducer($data['bookmarks']);
72
73 return true;
74 }
75
119 $this->parseEntries($data['bookmarks']); 76 $this->parseEntries($data['bookmarks']);
120 77
121 return true; 78 return true;
122 } 79 }
123 80
124 /** 81 /**
125 * Parse and insert all given entries. 82 * {@inheritdoc}
126 *
127 * @param $entries
128 */ 83 */
129 protected function parseEntries($entries) 84 public function parseEntry(array $importedEntry)
130 { 85 {
131 $i = 1; 86 $existingEntry = $this->em
132 87 ->getRepository('WallabagCoreBundle:Entry')
133 foreach ($entries as $importedEntry) { 88 ->findByUrlAndUserId($importedEntry['article__url'], $this->user->getId());
134 $existingEntry = $this->em 89
135 ->getRepository('WallabagCoreBundle:Entry') 90 if (false !== $existingEntry) {
136 ->findByUrlAndUserId($importedEntry['article__url'], $this->user->getId()); 91 ++$this->skippedEntries;
137 92
138 if (false !== $existingEntry) { 93 return;
139 ++$this->skippedEntries;
140 continue;
141 }
142
143 $data = [
144 'title' => $importedEntry['article__title'],
145 'url' => $importedEntry['article__url'],
146 'content_type' => '',
147 'language' => '',
148 'is_archived' => $importedEntry['archive'] || $this->markAsRead,
149 'is_starred' => $importedEntry['favorite'],
150 ];
151
152 $entry = $this->fetchContent(
153 new Entry($this->user),
154 $data['url'],
155 $data
156 );
157
158 // jump to next entry in case of problem while getting content
159 if (false === $entry) {
160 ++$this->skippedEntries;
161 continue;
162 }
163 $entry->setArchived($data['is_archived']);
164 $entry->setStarred($data['is_starred']);
165
166 $this->em->persist($entry);
167 ++$this->importedEntries;
168
169 // flush every 20 entries
170 if (($i % 20) === 0) {
171 $this->em->flush();
172 }
173 ++$i;
174 } 94 }
175 95
176 $this->em->flush(); 96 $data = [
177 $this->em->clear(); 97 'title' => $importedEntry['article__title'],
98 'url' => $importedEntry['article__url'],
99 'content_type' => '',
100 'language' => '',
101 'is_archived' => $importedEntry['archive'] || $this->markAsRead,
102 'is_starred' => $importedEntry['favorite'],
103 'created_at' => $importedEntry['date_added'],
104 ];
105
106 $entry = new Entry($this->user);
107 $entry->setUrl($data['url']);
108 $entry->setTitle($data['title']);
109
110 // update entry with content (in case fetching failed, the given entry will be return)
111 $entry = $this->fetchContent($entry, $data['url'], $data);
112
113 $entry->setArchived($data['is_archived']);
114 $entry->setStarred($data['is_starred']);
115 $entry->setCreatedAt(new \DateTime($data['created_at']));
116
117 $this->em->persist($entry);
118 ++$this->importedEntries;
119
120 return $entry;
121 }
122
123 /**
124 * {@inheritdoc}
125 */
126 protected function setEntryAsRead(array $importedEntry)
127 {
128 $importedEntry['archive'] = 1;
129
130 return $importedEntry;
178 } 131 }
179} 132}
diff --git a/src/Wallabag/ImportBundle/Import/WallabagImport.php b/src/Wallabag/ImportBundle/Import/WallabagImport.php
index 581ec178..043bb0a2 100644
--- a/src/Wallabag/ImportBundle/Import/WallabagImport.php
+++ b/src/Wallabag/ImportBundle/Import/WallabagImport.php
@@ -3,15 +3,10 @@
3namespace Wallabag\ImportBundle\Import; 3namespace Wallabag\ImportBundle\Import;
4 4
5use Wallabag\CoreBundle\Entity\Entry; 5use Wallabag\CoreBundle\Entity\Entry;
6use Wallabag\UserBundle\Entity\User;
7 6
8abstract class WallabagImport extends AbstractImport 7abstract class WallabagImport extends AbstractImport
9{ 8{
10 protected $user;
11 protected $skippedEntries = 0;
12 protected $importedEntries = 0;
13 protected $filepath; 9 protected $filepath;
14 protected $markAsRead;
15 // untitled in all languages from v1 10 // untitled in all languages from v1
16 protected $untitled = [ 11 protected $untitled = [
17 'Untitled', 12 'Untitled',
@@ -29,19 +24,6 @@ abstract class WallabagImport extends AbstractImport
29 ]; 24 ];
30 25
31 /** 26 /**
32 * We define the user in a custom call because on the import command there is no logged in user.
33 * So we can't retrieve user from the `security.token_storage` service.
34 *
35 * @param User $user
36 */
37 public function setUser(User $user)
38 {
39 $this->user = $user;
40
41 return $this;
42 }
43
44 /**
45 * {@inheritdoc} 27 * {@inheritdoc}
46 */ 28 */
47 abstract public function getName(); 29 abstract public function getName();
@@ -79,23 +61,18 @@ abstract class WallabagImport extends AbstractImport
79 return false; 61 return false;
80 } 62 }
81 63
64 if ($this->producer) {
65 $this->parseEntriesForProducer($data);
66
67 return true;
68 }
69
82 $this->parseEntries($data); 70 $this->parseEntries($data);
83 71
84 return true; 72 return true;
85 } 73 }
86 74
87 /** 75 /**
88 * {@inheritdoc}
89 */
90 public function getSummary()
91 {
92 return [
93 'skipped' => $this->skippedEntries,
94 'imported' => $this->importedEntries,
95 ];
96 }
97
98 /**
99 * Set file path to the json file. 76 * Set file path to the json file.
100 * 77 *
101 * @param string $filepath 78 * @param string $filepath
@@ -108,85 +85,59 @@ abstract class WallabagImport extends AbstractImport
108 } 85 }
109 86
110 /** 87 /**
111 * Set whether articles must be all marked as read. 88 * {@inheritdoc}
112 *
113 * @param bool $markAsRead
114 */ 89 */
115 public function setMarkAsRead($markAsRead) 90 public function parseEntry(array $importedEntry)
116 { 91 {
117 $this->markAsRead = $markAsRead; 92 $existingEntry = $this->em
93 ->getRepository('WallabagCoreBundle:Entry')
94 ->findByUrlAndUserId($importedEntry['url'], $this->user->getId());
118 95
119 return $this; 96 if (false !== $existingEntry) {
120 } 97 ++$this->skippedEntries;
121 98
122 /** 99 return;
123 * Parse and insert all given entries. 100 }
124 *
125 * @param $entries
126 */
127 protected function parseEntries($entries)
128 {
129 $i = 1;
130 101
131 foreach ($entries as $importedEntry) { 102 $data = $this->prepareEntry($importedEntry);
132 $existingEntry = $this->em
133 ->getRepository('WallabagCoreBundle:Entry')
134 ->findByUrlAndUserId($importedEntry['url'], $this->user->getId());
135 103
136 if (false !== $existingEntry) { 104 $entry = new Entry($this->user);
137 ++$this->skippedEntries; 105 $entry->setUrl($data['url']);
138 continue; 106 $entry->setTitle($data['title']);
139 }
140 107
141 $data = $this->prepareEntry($importedEntry, $this->markAsRead); 108 // update entry with content (in case fetching failed, the given entry will be return)
109 $entry = $this->fetchContent($entry, $data['url'], $data);
142 110
143 $entry = $this->fetchContent( 111 if (array_key_exists('tags', $data)) {
144 new Entry($this->user), 112 $this->contentProxy->assignTagsToEntry(
145 $importedEntry['url'], 113 $entry,
146 $data 114 $data['tags']
147 ); 115 );
116 }
148 117
149 // jump to next entry in case of problem while getting content 118 if (isset($importedEntry['preview_picture'])) {
150 if (false === $entry) { 119 $entry->setPreviewPicture($importedEntry['preview_picture']);
151 ++$this->skippedEntries;
152 continue;
153 }
154
155 if (array_key_exists('tags', $data)) {
156 $this->contentProxy->assignTagsToEntry(
157 $entry,
158 $data['tags']
159 );
160 }
161
162 if (isset($importedEntry['preview_picture'])) {
163 $entry->setPreviewPicture($importedEntry['preview_picture']);
164 }
165
166 $entry->setArchived($data['is_archived']);
167 $entry->setStarred($data['is_starred']);
168
169 $this->em->persist($entry);
170 ++$this->importedEntries;
171
172 // flush every 20 entries
173 if (($i % 20) === 0) {
174 $this->em->flush();
175 }
176 ++$i;
177 } 120 }
178 121
179 $this->em->flush(); 122 $entry->setArchived($data['is_archived']);
180 $this->em->clear(); 123 $entry->setStarred($data['is_starred']);
124
125 if (!empty($data['created_at'])) {
126 $entry->setCreatedAt(new \DateTime($data['created_at']));
127 }
128
129 $this->em->persist($entry);
130 ++$this->importedEntries;
131
132 return $entry;
181 } 133 }
182 134
183 /** 135 /**
184 * This should return a cleaned array for a given entry to be given to `updateEntry`. 136 * This should return a cleaned array for a given entry to be given to `updateEntry`.
185 * 137 *
186 * @param array $entry Data from the imported file 138 * @param array $entry Data from the imported file
187 * @param bool $markAsRead Should we mark as read content?
188 * 139 *
189 * @return array 140 * @return array
190 */ 141 */
191 abstract protected function prepareEntry($entry = [], $markAsRead = false); 142 abstract protected function prepareEntry($entry = []);
192} 143}
diff --git a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php
index 6cf3467a..4f001062 100644
--- a/src/Wallabag/ImportBundle/Import/WallabagV1Import.php
+++ b/src/Wallabag/ImportBundle/Import/WallabagV1Import.php
@@ -31,7 +31,7 @@ class WallabagV1Import extends WallabagImport
31 /** 31 /**
32 * {@inheritdoc} 32 * {@inheritdoc}
33 */ 33 */
34 protected function prepareEntry($entry = [], $markAsRead = false) 34 protected function prepareEntry($entry = [])
35 { 35 {
36 $data = [ 36 $data = [
37 'title' => $entry['title'], 37 'title' => $entry['title'],
@@ -39,9 +39,10 @@ class WallabagV1Import extends WallabagImport
39 'url' => $entry['url'], 39 'url' => $entry['url'],
40 'content_type' => '', 40 'content_type' => '',
41 'language' => '', 41 'language' => '',
42 'is_archived' => $entry['is_read'] || $markAsRead, 42 'is_archived' => $entry['is_read'] || $this->markAsRead,
43 'is_starred' => $entry['is_fav'], 43 'is_starred' => $entry['is_fav'],
44 'tags' => '', 44 'tags' => '',
45 'created_at' => '',
45 ]; 46 ];
46 47
47 // force content to be refreshed in case on bad fetch in the v1 installation 48 // force content to be refreshed in case on bad fetch in the v1 installation
@@ -56,4 +57,14 @@ class WallabagV1Import extends WallabagImport
56 57
57 return $data; 58 return $data;
58 } 59 }
60
61 /**
62 * {@inheritdoc}
63 */
64 protected function setEntryAsRead(array $importedEntry)
65 {
66 $importedEntry['is_read'] = 1;
67
68 return $importedEntry;
69 }
59} 70}
diff --git a/src/Wallabag/ImportBundle/Import/WallabagV2Import.php b/src/Wallabag/ImportBundle/Import/WallabagV2Import.php
index d0035b63..37c8ca14 100644
--- a/src/Wallabag/ImportBundle/Import/WallabagV2Import.php
+++ b/src/Wallabag/ImportBundle/Import/WallabagV2Import.php
@@ -31,12 +31,22 @@ class WallabagV2Import extends WallabagImport
31 /** 31 /**
32 * {@inheritdoc} 32 * {@inheritdoc}
33 */ 33 */
34 protected function prepareEntry($entry = [], $markAsRead = false) 34 protected function prepareEntry($entry = [])
35 { 35 {
36 return [ 36 return [
37 'html' => $entry['content'], 37 'html' => $entry['content'],
38 'content_type' => $entry['mimetype'], 38 'content_type' => $entry['mimetype'],
39 'is_archived' => ($entry['is_archived'] || $markAsRead), 39 'is_archived' => ($entry['is_archived'] || $this->markAsRead),
40 ] + $entry; 40 ] + $entry;
41 } 41 }
42
43 /**
44 * {@inheritdoc}
45 */
46 protected function setEntryAsRead(array $importedEntry)
47 {
48 $importedEntry['is_archived'] = 1;
49
50 return $importedEntry;
51 }
42} 52}
diff --git a/src/Wallabag/ImportBundle/Redis/Producer.php b/src/Wallabag/ImportBundle/Redis/Producer.php
new file mode 100644
index 00000000..fedc3e57
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Redis/Producer.php
@@ -0,0 +1,36 @@
1<?php
2
3namespace Wallabag\ImportBundle\Redis;
4
5use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
6use Simpleue\Queue\RedisQueue;
7
8/**
9 * This is a proxy class for "Simpleue\Queue\RedisQueue".
10 * It allow us to use the same way to publish a message between RabbitMQ & Redis: publish().
11 *
12 * It implements the ProducerInterface of RabbitMQ (yes it's ugly) so we can have the same
13 * kind of class which implements the same interface.
14 * So we can inject either a RabbitMQ producer or a Redis producer with the same signature
15 */
16class Producer implements ProducerInterface
17{
18 private $queue;
19
20 public function __construct(RedisQueue $queue)
21 {
22 $this->queue = $queue;
23 }
24
25 /**
26 * Publish a message in the Redis queue.
27 *
28 * @param string $msgBody
29 * @param string $routingKey NOT USED
30 * @param array $additionalProperties NOT USED
31 */
32 public function publish($msgBody, $routingKey = '', $additionalProperties = array())
33 {
34 $this->queue->sendJob($msgBody);
35 }
36}
diff --git a/src/Wallabag/ImportBundle/Resources/config/rabbit.yml b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml
new file mode 100644
index 00000000..aa049749
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Resources/config/rabbit.yml
@@ -0,0 +1,30 @@
1# RabbitMQ stuff
2services:
3 wallabag_import.consumer.amqp.pocket:
4 class: Wallabag\ImportBundle\Consumer\AMQPEntryConsumer
5 arguments:
6 - "@doctrine.orm.entity_manager"
7 - "@wallabag_user.user_repository"
8 - "@wallabag_import.pocket.import"
9 - "@logger"
10 wallabag_import.consumer.amqp.readability:
11 class: Wallabag\ImportBundle\Consumer\AMQPEntryConsumer
12 arguments:
13 - "@doctrine.orm.entity_manager"
14 - "@wallabag_user.user_repository"
15 - "@wallabag_import.readability.import"
16 - "@logger"
17 wallabag_import.consumer.amqp.wallabag_v1:
18 class: Wallabag\ImportBundle\Consumer\AMQPEntryConsumer
19 arguments:
20 - "@doctrine.orm.entity_manager"
21 - "@wallabag_user.user_repository"
22 - "@wallabag_import.wallabag_v1.import"
23 - "@logger"
24 wallabag_import.consumer.amqp.wallabag_v2:
25 class: Wallabag\ImportBundle\Consumer\AMQPEntryConsumer
26 arguments:
27 - "@doctrine.orm.entity_manager"
28 - "@wallabag_user.user_repository"
29 - "@wallabag_import.wallabag_v2.import"
30 - "@logger"
diff --git a/src/Wallabag/ImportBundle/Resources/config/redis.yml b/src/Wallabag/ImportBundle/Resources/config/redis.yml
new file mode 100644
index 00000000..7d3248e5
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Resources/config/redis.yml
@@ -0,0 +1,81 @@
1# Redis stuff
2services:
3 # readability
4 wallabag_import.queue.redis.readability:
5 class: Simpleue\Queue\RedisQueue
6 arguments:
7 - "@wallabag_core.redis.client"
8 - "wallabag.import.readability"
9
10 wallabag_import.producer.redis.readability:
11 class: Wallabag\ImportBundle\Redis\Producer
12 arguments:
13 - "@wallabag_import.queue.redis.readability"
14
15 wallabag_import.consumer.redis.readability:
16 class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer
17 arguments:
18 - "@doctrine.orm.entity_manager"
19 - "@wallabag_user.user_repository"
20 - "@wallabag_import.readability.import"
21 - "@logger"
22
23 # pocket
24 wallabag_import.queue.redis.pocket:
25 class: Simpleue\Queue\RedisQueue
26 arguments:
27 - "@wallabag_core.redis.client"
28 - "wallabag.import.pocket"
29
30 wallabag_import.producer.redis.pocket:
31 class: Wallabag\ImportBundle\Redis\Producer
32 arguments:
33 - "@wallabag_import.queue.redis.pocket"
34
35 wallabag_import.consumer.redis.pocket:
36 class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer
37 arguments:
38 - "@doctrine.orm.entity_manager"
39 - "@wallabag_user.user_repository"
40 - "@wallabag_import.pocket.import"
41 - "@logger"
42
43 # wallabag v1
44 wallabag_import.queue.redis.wallabag_v1:
45 class: Simpleue\Queue\RedisQueue
46 arguments:
47 - "@wallabag_core.redis.client"
48 - "wallabag.import.wallabag_v1"
49
50 wallabag_import.producer.redis.wallabag_v1:
51 class: Wallabag\ImportBundle\Redis\Producer
52 arguments:
53 - "@wallabag_import.queue.redis.wallabag_v1"
54
55 wallabag_import.consumer.redis.wallabag_v1:
56 class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer
57 arguments:
58 - "@doctrine.orm.entity_manager"
59 - "@wallabag_user.user_repository"
60 - "@wallabag_import.wallabag_v1.import"
61 - "@logger"
62
63 # wallabag v2
64 wallabag_import.queue.redis.wallabag_v2:
65 class: Simpleue\Queue\RedisQueue
66 arguments:
67 - "@wallabag_core.redis.client"
68 - "wallabag.import.wallabag_v2"
69
70 wallabag_import.producer.redis.wallabag_v2:
71 class: Wallabag\ImportBundle\Redis\Producer
72 arguments:
73 - "@wallabag_import.queue.redis.wallabag_v2"
74
75 wallabag_import.consumer.redis.wallabag_v2:
76 class: Wallabag\ImportBundle\Consumer\RedisEntryConsumer
77 arguments:
78 - "@doctrine.orm.entity_manager"
79 - "@wallabag_user.user_repository"
80 - "@wallabag_import.wallabag_v2.import"
81 - "@logger"
diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml
index 520d43af..f03404ae 100644
--- a/src/Wallabag/ImportBundle/Resources/config/services.yml
+++ b/src/Wallabag/ImportBundle/Resources/config/services.yml
@@ -1,3 +1,7 @@
1imports:
2 - { resource: rabbit.yml }
3 - { resource: redis.yml }
4
1services: 5services:
2 wallabag_import.chain: 6 wallabag_import.chain:
3 class: Wallabag\ImportBundle\Import\ImportChain 7 class: Wallabag\ImportBundle\Import\ImportChain
@@ -14,7 +18,6 @@ services:
14 wallabag_import.pocket.import: 18 wallabag_import.pocket.import:
15 class: Wallabag\ImportBundle\Import\PocketImport 19 class: Wallabag\ImportBundle\Import\PocketImport
16 arguments: 20 arguments:
17 - "@security.token_storage"
18 - "@doctrine.orm.entity_manager" 21 - "@doctrine.orm.entity_manager"
19 - "@wallabag_core.content_proxy" 22 - "@wallabag_core.content_proxy"
20 - "@craue_config" 23 - "@craue_config"
diff --git a/src/Wallabag/ImportBundle/Resources/views/Import/_workerEnabled.html.twig b/src/Wallabag/ImportBundle/Resources/views/Import/_workerEnabled.html.twig
new file mode 100644
index 00000000..2390a41f
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Resources/views/Import/_workerEnabled.html.twig
@@ -0,0 +1,8 @@
1{% set redis = craue_setting('import_with_redis') %}
2{% set rabbit = craue_setting('import_with_rabbitmq') %}
3
4{% if redis or rabbit %}
5 <div class="card-panel yellow darken-1 black-text">
6 {{ 'import.worker.enabled'|trans }} <strong>{% if rabbit %}RabbitMQ{% elseif redis %}Redis{% endif %}</strong>
7 </div>
8{% endif %}
diff --git a/src/Wallabag/ImportBundle/Resources/views/Import/check_queue.html.twig b/src/Wallabag/ImportBundle/Resources/views/Import/check_queue.html.twig
new file mode 100644
index 00000000..7168ea35
--- /dev/null
+++ b/src/Wallabag/ImportBundle/Resources/views/Import/check_queue.html.twig
@@ -0,0 +1,11 @@
1{% if nbRedisMessages > 0 %}
2 <script>
3 Materialize.toast('Messages in queue: {{ nbRedisMessages }}', 4000);
4 </script>
5{% endif %}
6
7{% if nbRabbitMessages > 0 %}
8 <script>
9 Materialize.toast('Messages in queue: {{ nbRabbitMessages }}', 4000);
10 </script>
11{% endif %}
diff --git a/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig b/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig
index 401dbd33..6195fa07 100644
--- a/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig
+++ b/src/Wallabag/ImportBundle/Resources/views/Pocket/index.html.twig
@@ -6,15 +6,13 @@
6<div class="row"> 6<div class="row">
7 <div class="col s12"> 7 <div class="col s12">
8 <div class="card-panel settings"> 8 <div class="card-panel settings">
9 {% include 'WallabagImportBundle:Import:_workerEnabled.html.twig' %}
10
9 {% if not has_consumer_key %} 11 {% if not has_consumer_key %}
10 <div class="card-panel red darken-1"> 12 <div class="card-panel red white-text">
11 {{ 'import.pocket.config_missing.description'|trans }} 13 {{ 'import.pocket.config_missing.description'|trans }}
12 14
13 {% if is_granted('ROLE_SUPER_ADMIN') %} 15 {{ 'import.pocket.config_missing.admin_message'|trans({'%keyurls%': '<a href="' ~ path('config') ~ '">', '%keyurle%':'</a>'})|raw }}
14 {{ 'import.pocket.config_missing.admin_message'|trans({'%keyurls%': '<a href="' ~ path('craue_config_settings_modify') ~ '#set-import">', '%keyurle%':'</a>'})|raw }}
15 {% else %}
16 {{ 'import.pocket.config_missing.user_message'|trans }}
17 {% endif %}
18 </div> 16 </div>
19 {% endif %} 17 {% endif %}
20 18
@@ -29,7 +27,7 @@
29 {{ form_label(form.mark_as_read) }} 27 {{ form_label(form.mark_as_read) }}
30 </div> 28 </div>
31 </div> 29 </div>
32 <button class="btn waves-effect waves-light" type="submit" name="action"> 30 <button class="btn waves-effect waves-light" type="submit" name="action" {% if not has_consumer_key %}disabled="disabled"{% endif %}>
33 {{ 'import.pocket.connect_to_pocket'|trans }} 31 {{ 'import.pocket.connect_to_pocket'|trans }}
34 </button> 32 </button>
35 </form> 33 </form>
diff --git a/src/Wallabag/ImportBundle/Resources/views/Readability/index.html.twig b/src/Wallabag/ImportBundle/Resources/views/Readability/index.html.twig
index f527d309..74653b0f 100644
--- a/src/Wallabag/ImportBundle/Resources/views/Readability/index.html.twig
+++ b/src/Wallabag/ImportBundle/Resources/views/Readability/index.html.twig
@@ -6,6 +6,8 @@
6<div class="row"> 6<div class="row">
7 <div class="col s12"> 7 <div class="col s12">
8 <div class="card-panel settings"> 8 <div class="card-panel settings">
9 {% include 'WallabagImportBundle:Import:_workerEnabled.html.twig' %}
10
9 <div class="row"> 11 <div class="row">
10 <blockquote>{{ import.description|trans }}</blockquote> 12 <blockquote>{{ import.description|trans }}</blockquote>
11 <p>{{ 'import.readability.how_to'|trans }}</p> 13 <p>{{ 'import.readability.how_to'|trans }}</p>
diff --git a/src/Wallabag/ImportBundle/Resources/views/WallabagV1/index.html.twig b/src/Wallabag/ImportBundle/Resources/views/WallabagV1/index.html.twig
index 13e24c8c..0b19bc34 100644
--- a/src/Wallabag/ImportBundle/Resources/views/WallabagV1/index.html.twig
+++ b/src/Wallabag/ImportBundle/Resources/views/WallabagV1/index.html.twig
@@ -6,6 +6,8 @@
6<div class="row"> 6<div class="row">
7 <div class="col s12"> 7 <div class="col s12">
8 <div class="card-panel settings"> 8 <div class="card-panel settings">
9 {% include 'WallabagImportBundle:Import:_workerEnabled.html.twig' %}
10
9 <div class="row"> 11 <div class="row">
10 <blockquote>{{ import.description|trans }}</blockquote> 12 <blockquote>{{ import.description|trans }}</blockquote>
11 <p>{{ 'import.wallabag_v1.how_to'|trans }}</p> 13 <p>{{ 'import.wallabag_v1.how_to'|trans }}</p>