diff options
author | Jeremy Benoist <jeremy.benoist@gmail.com> | 2016-09-03 17:36:57 +0200 |
---|---|---|
committer | Jeremy Benoist <jeremy.benoist@gmail.com> | 2016-09-11 21:57:46 +0200 |
commit | ef75e1220ebb76a8df019d946460ad612759f0bb (patch) | |
tree | 100830464851d6834e1e24ccabca1961a862fcd8 | |
parent | 87c9995b6c61a9f5cde3771bd4f9d44b5da26c43 (diff) | |
download | wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.gz wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.zst wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.zip |
Send every imported item to the queue
Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format.
Then, the worker retrieve that information, find / create the entry and save it.
-rw-r--r-- | app/config/config.yml | 22 | ||||
-rw-r--r-- | app/config/parameters.yml.dist | 4 | ||||
-rw-r--r-- | src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php | 39 | ||||
-rw-r--r-- | src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php | 63 | ||||
-rw-r--r-- | src/Wallabag/ImportBundle/Controller/PocketController.php | 25 | ||||
-rw-r--r-- | src/Wallabag/ImportBundle/Import/PocketImport.php | 177 | ||||
-rw-r--r-- | src/Wallabag/ImportBundle/Resources/config/services.yml | 11 | ||||
-rw-r--r-- | src/Wallabag/UserBundle/Resources/config/services.yml | 6 | ||||
-rw-r--r-- | tests/Wallabag/ImportBundle/Import/PocketImportTest.php | 28 |
9 files changed, 221 insertions, 154 deletions
diff --git a/app/config/config.yml b/app/config/config.yml index ef5ae0aa..fa829637 100644 --- a/app/config/config.yml +++ b/app/config/config.yml | |||
@@ -219,24 +219,24 @@ lexik_maintenance: | |||
219 | old_sound_rabbit_mq: | 219 | old_sound_rabbit_mq: |
220 | connections: | 220 | connections: |
221 | default: | 221 | default: |
222 | host: %rabbitmq_host% | 222 | host: "%rabbitmq_host%" |
223 | port: %rabbitmq_port% | 223 | port: "%rabbitmq_port%" |
224 | user: %rabbitmq_user% | 224 | user: "%rabbitmq_user%" |
225 | password: %rabbitmq_password% | 225 | password: "%rabbitmq_password%" |
226 | vhost: / | 226 | vhost: / |
227 | lazy: false | 227 | lazy: true |
228 | producers: | 228 | producers: |
229 | wallabag: | 229 | wallabag_pocket: |
230 | connection: default | 230 | connection: default |
231 | exchange_options: | 231 | exchange_options: |
232 | name: 'wallabag_exchange' | 232 | name: 'wallabag.import.pocket' |
233 | type: topic | 233 | type: topic |
234 | consumers: | 234 | consumers: |
235 | entries: | 235 | wallabag_pocket: |
236 | connection: default | 236 | connection: default |
237 | exchange_options: | 237 | exchange_options: |
238 | name: 'wallabag_exchange' | 238 | name: 'wallabag.import.pocket' |
239 | type: topic | 239 | type: topic |
240 | queue_options: | 240 | queue_options: |
241 | name: 'wallabag_queue' | 241 | name: 'wallabag.import.pocket' |
242 | callback: wallabag_import.consumer.entry | 242 | callback: wallabag_import.consumer.pocket |
diff --git a/app/config/parameters.yml.dist b/app/config/parameters.yml.dist index e925b412..a59dc02c 100644 --- a/app/config/parameters.yml.dist +++ b/app/config/parameters.yml.dist | |||
@@ -41,11 +41,7 @@ parameters: | |||
41 | 41 | ||
42 | rss_limit: 50 | 42 | rss_limit: 50 |
43 | 43 | ||
44 | # pocket import | ||
45 | pocket_consumer_key: xxxxxxxx | ||
46 | |||
47 | # RabbitMQ processing | 44 | # RabbitMQ processing |
48 | rabbitmq: false | ||
49 | rabbitmq_host: localhost | 45 | rabbitmq_host: localhost |
50 | rabbitmq_port: 5672 | 46 | rabbitmq_port: 5672 |
51 | rabbitmq_user: guest | 47 | rabbitmq_user: guest |
diff --git a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php b/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php deleted file mode 100644 index 7775f01c..00000000 --- a/src/Wallabag/ImportBundle/Component/AMPQ/EntryConsumer.php +++ /dev/null | |||
@@ -1,39 +0,0 @@ | |||
1 | <?php | ||
2 | |||
3 | namespace Wallabag\ImportBundle\Component\AMPQ; | ||
4 | |||
5 | use Doctrine\ORM\EntityManager; | ||
6 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; | ||
7 | use PhpAmqpLib\Message\AMQPMessage; | ||
8 | use Wallabag\CoreBundle\Helper\ContentProxy; | ||
9 | use Wallabag\CoreBundle\Repository\EntryRepository; | ||
10 | |||
11 | class EntryConsumer implements ConsumerInterface | ||
12 | { | ||
13 | private $em; | ||
14 | private $contentProxy; | ||
15 | private $entryRepository; | ||
16 | |||
17 | public function __construct(EntityManager $em, EntryRepository $entryRepository, ContentProxy $contentProxy) | ||
18 | { | ||
19 | $this->em = $em; | ||
20 | $this->entryRepository = $entryRepository; | ||
21 | $this->contentProxy = $contentProxy; | ||
22 | } | ||
23 | |||
24 | /** | ||
25 | * {@inheritdoc} | ||
26 | */ | ||
27 | public function execute(AMQPMessage $msg) | ||
28 | { | ||
29 | $storedEntry = unserialize($msg->body); | ||
30 | $entry = $this->entryRepository->findByUrlAndUserId($storedEntry['url'], $storedEntry['userId']); | ||
31 | if ($entry) { | ||
32 | $entry = $this->contentProxy->updateEntry($entry, $entry->getUrl()); | ||
33 | if ($entry) { | ||
34 | $this->em->persist($entry); | ||
35 | $this->em->flush(); | ||
36 | } | ||
37 | } | ||
38 | } | ||
39 | } | ||
diff --git a/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php new file mode 100644 index 00000000..239e7446 --- /dev/null +++ b/src/Wallabag/ImportBundle/Consumer/AMPQ/PocketConsumer.php | |||
@@ -0,0 +1,63 @@ | |||
1 | <?php | ||
2 | |||
3 | namespace Wallabag\ImportBundle\Consumer\AMPQ; | ||
4 | |||
5 | use Doctrine\ORM\EntityManager; | ||
6 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; | ||
7 | use PhpAmqpLib\Message\AMQPMessage; | ||
8 | use Wallabag\ImportBundle\Import\PocketImport; | ||
9 | use Wallabag\UserBundle\Repository\UserRepository; | ||
10 | use Psr\Log\LoggerInterface; | ||
11 | use Psr\Log\NullLogger; | ||
12 | |||
13 | class PocketConsumer implements ConsumerInterface | ||
14 | { | ||
15 | private $em; | ||
16 | private $userRepository; | ||
17 | private $pocketImport; | ||
18 | private $logger; | ||
19 | |||
20 | public function __construct(EntityManager $em, UserRepository $userRepository, PocketImport $pocketImport, LoggerInterface $logger = null) | ||
21 | { | ||
22 | $this->em = $em; | ||
23 | $this->userRepository = $userRepository; | ||
24 | $this->pocketImport = $pocketImport; | ||
25 | $this->logger = $logger ?: new NullLogger(); | ||
26 | } | ||
27 | |||
28 | /** | ||
29 | * {@inheritdoc} | ||
30 | */ | ||
31 | public function execute(AMQPMessage $msg) | ||
32 | { | ||
33 | $storedEntry = json_decode($msg->body, true); | ||
34 | |||
35 | $user = $this->userRepository->find($storedEntry['userId']); | ||
36 | |||
37 | // no user? Drop message | ||
38 | if (null === $user) { | ||
39 | $this->logger->warning('Unable to retrieve user', ['entry' => $storedEntry]); | ||
40 | |||
41 | return; | ||
42 | } | ||
43 | |||
44 | $this->pocketImport->setUser($user); | ||
45 | |||
46 | $entry = $this->pocketImport->parseEntry($storedEntry); | ||
47 | |||
48 | if (null === $entry) { | ||
49 | $this->logger->warning('Unable to parse entry', ['entry' => $storedEntry]); | ||
50 | |||
51 | return; | ||
52 | } | ||
53 | |||
54 | try { | ||
55 | $this->em->flush(); | ||
56 | $this->em->clear($entry); | ||
57 | } catch (\Exception $e) { | ||
58 | $this->logger->warning('Unable to save entry', ['entry' => $storedEntry, 'exception' => $e]); | ||
59 | |||
60 | return; | ||
61 | } | ||
62 | } | ||
63 | } | ||
diff --git a/src/Wallabag/ImportBundle/Controller/PocketController.php b/src/Wallabag/ImportBundle/Controller/PocketController.php index 36ee25bf..a2dcd8a7 100644 --- a/src/Wallabag/ImportBundle/Controller/PocketController.php +++ b/src/Wallabag/ImportBundle/Controller/PocketController.php | |||
@@ -11,11 +11,28 @@ use Symfony\Component\Form\Extension\Core\Type\CheckboxType; | |||
11 | class PocketController extends Controller | 11 | class PocketController extends Controller |
12 | { | 12 | { |
13 | /** | 13 | /** |
14 | * Return Pocket Import Service with or without RabbitMQ enabled. | ||
15 | * | ||
16 | * @return \Wallabag\ImportBundle\Import\PocketImport | ||
17 | */ | ||
18 | private function getPocketImportService() | ||
19 | { | ||
20 | $pocket = $this->get('wallabag_import.pocket.import'); | ||
21 | $pocket->setUser($this->getUser()); | ||
22 | |||
23 | if ($this->get('craue_config')->get('rabbitmq')) { | ||
24 | $pocket->setRabbitmqProducer($this->get('old_sound_rabbit_mq.wallabag_pocket_producer')); | ||
25 | } | ||
26 | |||
27 | return $pocket; | ||
28 | } | ||
29 | |||
30 | /** | ||
14 | * @Route("/pocket", name="import_pocket") | 31 | * @Route("/pocket", name="import_pocket") |
15 | */ | 32 | */ |
16 | public function indexAction() | 33 | public function indexAction() |
17 | { | 34 | { |
18 | $pocket = $this->get('wallabag_import.pocket.import'); | 35 | $pocket = $this->getPocketImportService(); |
19 | $form = $this->createFormBuilder($pocket) | 36 | $form = $this->createFormBuilder($pocket) |
20 | ->add('mark_as_read', CheckboxType::class, [ | 37 | ->add('mark_as_read', CheckboxType::class, [ |
21 | 'label' => 'import.form.mark_as_read_label', | 38 | 'label' => 'import.form.mark_as_read_label', |
@@ -24,7 +41,7 @@ class PocketController extends Controller | |||
24 | ->getForm(); | 41 | ->getForm(); |
25 | 42 | ||
26 | return $this->render('WallabagImportBundle:Pocket:index.html.twig', [ | 43 | return $this->render('WallabagImportBundle:Pocket:index.html.twig', [ |
27 | 'import' => $this->get('wallabag_import.pocket.import'), | 44 | 'import' => $this->getPocketImportService(), |
28 | 'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true, | 45 | 'has_consumer_key' => '' == trim($this->get('craue_config')->get('pocket_consumer_key')) ? false : true, |
29 | 'form' => $form->createView(), | 46 | 'form' => $form->createView(), |
30 | ]); | 47 | ]); |
@@ -35,7 +52,7 @@ class PocketController extends Controller | |||
35 | */ | 52 | */ |
36 | public function authAction(Request $request) | 53 | public function authAction(Request $request) |
37 | { | 54 | { |
38 | $requestToken = $this->get('wallabag_import.pocket.import') | 55 | $requestToken = $this->getPocketImportService() |
39 | ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL)); | 56 | ->getRequestToken($this->generateUrl('import', [], UrlGeneratorInterface::ABSOLUTE_URL)); |
40 | 57 | ||
41 | if (false === $requestToken) { | 58 | if (false === $requestToken) { |
@@ -62,7 +79,7 @@ class PocketController extends Controller | |||
62 | public function callbackAction() | 79 | public function callbackAction() |
63 | { | 80 | { |
64 | $message = 'flashes.import.notice.failed'; | 81 | $message = 'flashes.import.notice.failed'; |
65 | $pocket = $this->get('wallabag_import.pocket.import'); | 82 | $pocket = $this->getPocketImportService(); |
66 | 83 | ||
67 | $markAsRead = $this->get('session')->get('mark_as_read'); | 84 | $markAsRead = $this->get('session')->get('mark_as_read'); |
68 | $this->get('session')->remove('mark_as_read'); | 85 | $this->get('session')->remove('mark_as_read'); |
diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php index 7d1c0c61..27df4917 100644 --- a/src/Wallabag/ImportBundle/Import/PocketImport.php +++ b/src/Wallabag/ImportBundle/Import/PocketImport.php | |||
@@ -3,12 +3,11 @@ | |||
3 | namespace Wallabag\ImportBundle\Import; | 3 | namespace Wallabag\ImportBundle\Import; |
4 | 4 | ||
5 | use OldSound\RabbitMqBundle\RabbitMq\Producer; | 5 | use OldSound\RabbitMqBundle\RabbitMq\Producer; |
6 | use Psr\Log\LoggerInterface; | ||
7 | use Psr\Log\NullLogger; | 6 | use Psr\Log\NullLogger; |
8 | use Doctrine\ORM\EntityManager; | 7 | use Doctrine\ORM\EntityManager; |
9 | use GuzzleHttp\Client; | 8 | use GuzzleHttp\Client; |
10 | use GuzzleHttp\Exception\RequestException; | 9 | use GuzzleHttp\Exception\RequestException; |
11 | use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; | 10 | use Symfony\Component\Security\Core\User\UserInterface; |
12 | use Wallabag\CoreBundle\Entity\Entry; | 11 | use Wallabag\CoreBundle\Entity\Entry; |
13 | use Wallabag\CoreBundle\Helper\ContentProxy; | 12 | use Wallabag\CoreBundle\Helper\ContentProxy; |
14 | use Craue\ConfigBundle\Util\Config; | 13 | use Craue\ConfigBundle\Util\Config; |
@@ -21,22 +20,40 @@ class PocketImport extends AbstractImport | |||
21 | private $skippedEntries = 0; | 20 | private $skippedEntries = 0; |
22 | private $importedEntries = 0; | 21 | private $importedEntries = 0; |
23 | private $markAsRead; | 22 | private $markAsRead; |
24 | protected $accessToken; | ||
25 | private $producer; | 23 | private $producer; |
26 | private $rabbitMQ; | 24 | protected $accessToken; |
27 | 25 | ||
28 | public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, Producer $producer) | 26 | public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig) |
29 | { | 27 | { |
30 | $this->user = $tokenStorage->getToken()->getUser(); | ||
31 | $this->em = $em; | 28 | $this->em = $em; |
32 | $this->contentProxy = $contentProxy; | 29 | $this->contentProxy = $contentProxy; |
33 | $this->consumerKey = $craueConfig->get('pocket_consumer_key'); | 30 | $this->consumerKey = $craueConfig->get('pocket_consumer_key'); |
34 | $this->logger = new NullLogger(); | 31 | $this->logger = new NullLogger(); |
35 | $this->rabbitMQ = $craueConfig->get('rabbitmq'); | 32 | } |
33 | |||
34 | /** | ||
35 | * Set RabbitMQ Producer to send each entry to a queue. | ||
36 | * This method should be called when user has enabled RabbitMQ. | ||
37 | * | ||
38 | * @param Producer $producer | ||
39 | */ | ||
40 | public function setRabbitmqProducer(Producer $producer) | ||
41 | { | ||
36 | $this->producer = $producer; | 42 | $this->producer = $producer; |
37 | } | 43 | } |
38 | 44 | ||
39 | /** | 45 | /** |
46 | * Set current user. | ||
47 | * Could the current *connected* user or one retrieve by the consumer. | ||
48 | * | ||
49 | * @param UserInterface $user | ||
50 | */ | ||
51 | public function setUser(UserInterface $user) | ||
52 | { | ||
53 | $this->user = $user; | ||
54 | } | ||
55 | |||
56 | /** | ||
40 | * {@inheritdoc} | 57 | * {@inheritdoc} |
41 | */ | 58 | */ |
42 | public function getName() | 59 | public function getName() |
@@ -168,6 +185,12 @@ class PocketImport extends AbstractImport | |||
168 | 185 | ||
169 | $entries = $response->json(); | 186 | $entries = $response->json(); |
170 | 187 | ||
188 | if ($this->producer) { | ||
189 | $this->parseEntriesForProducer($entries['list']); | ||
190 | |||
191 | return true; | ||
192 | } | ||
193 | |||
171 | $this->parseEntries($entries['list']); | 194 | $this->parseEntries($entries['list']); |
172 | 195 | ||
173 | return true; | 196 | return true; |
@@ -197,88 +220,112 @@ class PocketImport extends AbstractImport | |||
197 | /** | 220 | /** |
198 | * @see https://getpocket.com/developer/docs/v3/retrieve | 221 | * @see https://getpocket.com/developer/docs/v3/retrieve |
199 | * | 222 | * |
200 | * @param $entries | 223 | * @param array $entries |
201 | */ | 224 | */ |
202 | private function parseEntries($entries) | 225 | private function parseEntries(array $entries) |
203 | { | 226 | { |
204 | $i = 1; | 227 | $i = 1; |
205 | 228 | ||
206 | foreach ($entries as &$pocketEntry) { | 229 | foreach ($entries as $pocketEntry) { |
207 | $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; | 230 | $entry = $this->parseEntry($pocketEntry); |
208 | |||
209 | $existingEntry = $this->em | ||
210 | ->getRepository('WallabagCoreBundle:Entry') | ||
211 | ->findByUrlAndUserId($url, $this->user->getId()); | ||
212 | 231 | ||
213 | if (false !== $existingEntry) { | 232 | if (null === $entry) { |
214 | ++$this->skippedEntries; | ||
215 | continue; | 233 | continue; |
216 | } | 234 | } |
217 | 235 | ||
218 | $entry = new Entry($this->user); | 236 | // flush every 20 entries |
237 | if (($i % 20) === 0) { | ||
238 | $this->em->flush(); | ||
239 | $this->em->clear($entry); | ||
240 | } | ||
219 | 241 | ||
220 | if (!$this->rabbitMQ) { | 242 | ++$i; |
221 | $entry = $this->fetchContent($entry, $url); | 243 | } |
222 | 244 | ||
223 | // jump to next entry in case of problem while getting content | 245 | $this->em->flush(); |
224 | if (false === $entry) { | 246 | } |
225 | ++$this->skippedEntries; | ||
226 | continue; | ||
227 | } | ||
228 | } | ||
229 | 247 | ||
230 | // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted | 248 | public function parseEntry(array $pocketEntry) |
231 | if ($pocketEntry['status'] == 1 || $this->markAsRead) { | 249 | { |
232 | $entry->setArchived(true); | 250 | $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; |
233 | } | ||
234 | 251 | ||
235 | // 0 or 1 - 1 If the item is starred | 252 | $existingEntry = $this->em |
236 | if ($pocketEntry['favorite'] == 1) { | 253 | ->getRepository('WallabagCoreBundle:Entry') |
237 | $entry->setStarred(true); | 254 | ->findByUrlAndUserId($url, $this->user->getId()); |
238 | } | ||
239 | 255 | ||
240 | $title = 'Untitled'; | 256 | if (false !== $existingEntry) { |
241 | if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { | 257 | ++$this->skippedEntries; |
242 | $title = $pocketEntry['resolved_title']; | ||
243 | } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') { | ||
244 | $title = $pocketEntry['given_title']; | ||
245 | } | ||
246 | 258 | ||
247 | $entry->setTitle($title); | 259 | return; |
248 | $entry->setUrl($url); | 260 | } |
249 | 261 | ||
250 | // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image | 262 | $entry = new Entry($this->user); |
251 | if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { | 263 | $entry = $this->fetchContent($entry, $url); |
252 | $entry->setPreviewPicture($pocketEntry['images'][1]['src']); | ||
253 | } | ||
254 | 264 | ||
255 | if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { | 265 | // jump to next entry in case of problem while getting content |
256 | $this->contentProxy->assignTagsToEntry( | 266 | if (false === $entry) { |
257 | $entry, | 267 | ++$this->skippedEntries; |
258 | array_keys($pocketEntry['tags']) | ||
259 | ); | ||
260 | } | ||
261 | 268 | ||
262 | $pocketEntry['url'] = $url; | 269 | return; |
263 | $pocketEntry['userId'] = $this->user->getId(); | 270 | } |
264 | 271 | ||
265 | $this->em->persist($entry); | 272 | // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted |
266 | ++$this->importedEntries; | 273 | if ($pocketEntry['status'] == 1 || $this->markAsRead) { |
274 | $entry->setArchived(true); | ||
275 | } | ||
267 | 276 | ||
268 | // flush every 20 entries | 277 | // 0 or 1 - 1 If the item is starred |
269 | if (($i % 20) === 0) { | 278 | if ($pocketEntry['favorite'] == 1) { |
270 | $this->em->flush(); | 279 | $entry->setStarred(true); |
271 | } | 280 | } |
272 | 281 | ||
273 | ++$i; | 282 | $title = 'Untitled'; |
283 | if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { | ||
284 | $title = $pocketEntry['resolved_title']; | ||
285 | } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') { | ||
286 | $title = $pocketEntry['given_title']; | ||
274 | } | 287 | } |
275 | 288 | ||
276 | $this->em->flush(); | 289 | $entry->setTitle($title); |
290 | $entry->setUrl($url); | ||
291 | |||
292 | // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image | ||
293 | if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { | ||
294 | $entry->setPreviewPicture($pocketEntry['images'][1]['src']); | ||
295 | } | ||
296 | |||
297 | if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { | ||
298 | $this->contentProxy->assignTagsToEntry( | ||
299 | $entry, | ||
300 | array_keys($pocketEntry['tags']) | ||
301 | ); | ||
302 | } | ||
303 | |||
304 | $this->em->persist($entry); | ||
305 | ++$this->importedEntries; | ||
277 | 306 | ||
278 | if ($this->rabbitMQ) { | 307 | return $entry; |
279 | foreach ($entries as $entry) { | 308 | } |
280 | $this->producer->publish(serialize($entry)); | 309 | |
310 | /** | ||
311 | * Faster parse entries for Producer. | ||
312 | * We don't care to make check at this time. They'll be done by the consumer. | ||
313 | * | ||
314 | * @param array $entries | ||
315 | */ | ||
316 | public function parseEntriesForProducer($entries) | ||
317 | { | ||
318 | foreach ($entries as $pocketEntry) { | ||
319 | // set userId for the producer (it won't know which user is connected) | ||
320 | $pocketEntry['userId'] = $this->user->getId(); | ||
321 | |||
322 | if ($this->markAsRead) { | ||
323 | $pocketEntry['status'] = 1; | ||
281 | } | 324 | } |
325 | |||
326 | ++$this->importedEntries; | ||
327 | |||
328 | $this->producer->publish(json_encode($pocketEntry)); | ||
282 | } | 329 | } |
283 | } | 330 | } |
284 | } | 331 | } |
diff --git a/src/Wallabag/ImportBundle/Resources/config/services.yml b/src/Wallabag/ImportBundle/Resources/config/services.yml index 60eb4e18..fe388b26 100644 --- a/src/Wallabag/ImportBundle/Resources/config/services.yml +++ b/src/Wallabag/ImportBundle/Resources/config/services.yml | |||
@@ -1,10 +1,11 @@ | |||
1 | services: | 1 | services: |
2 | wallabag_import.consumer.entry: | 2 | wallabag_import.consumer.pocket: |
3 | class: Wallabag\ImportBundle\Component\AMPQ\EntryConsumer | 3 | class: Wallabag\ImportBundle\Consumer\AMPQ\PocketConsumer |
4 | arguments: | 4 | arguments: |
5 | - "@doctrine.orm.entity_manager" | 5 | - "@doctrine.orm.entity_manager" |
6 | - "@wallabag_core.entry_repository" | 6 | - "@wallabag_user.user_repository" |
7 | - "@wallabag_core.content_proxy" | 7 | - "@wallabag_import.pocket.import" |
8 | - "@logger" | ||
8 | 9 | ||
9 | wallabag_import.chain: | 10 | wallabag_import.chain: |
10 | class: Wallabag\ImportBundle\Import\ImportChain | 11 | class: Wallabag\ImportBundle\Import\ImportChain |
@@ -21,11 +22,9 @@ services: | |||
21 | wallabag_import.pocket.import: | 22 | wallabag_import.pocket.import: |
22 | class: Wallabag\ImportBundle\Import\PocketImport | 23 | class: Wallabag\ImportBundle\Import\PocketImport |
23 | arguments: | 24 | arguments: |
24 | - "@security.token_storage" | ||
25 | - "@doctrine.orm.entity_manager" | 25 | - "@doctrine.orm.entity_manager" |
26 | - "@wallabag_core.content_proxy" | 26 | - "@wallabag_core.content_proxy" |
27 | - "@craue_config" | 27 | - "@craue_config" |
28 | - "@old_sound_rabbit_mq.wallabag_producer" | ||
29 | calls: | 28 | calls: |
30 | - [ setClient, [ "@wallabag_import.pocket.client" ] ] | 29 | - [ setClient, [ "@wallabag_import.pocket.client" ] ] |
31 | - [ setLogger, [ "@logger" ]] | 30 | - [ setLogger, [ "@logger" ]] |
diff --git a/src/Wallabag/UserBundle/Resources/config/services.yml b/src/Wallabag/UserBundle/Resources/config/services.yml index d79d8fa2..05830555 100644 --- a/src/Wallabag/UserBundle/Resources/config/services.yml +++ b/src/Wallabag/UserBundle/Resources/config/services.yml | |||
@@ -14,3 +14,9 @@ services: | |||
14 | - "@router" | 14 | - "@router" |
15 | tags: | 15 | tags: |
16 | - { name: kernel.event_subscriber } | 16 | - { name: kernel.event_subscriber } |
17 | |||
18 | wallabag_user.user_repository: | ||
19 | class: Wallabag\UserBundle\Repository\UserRepository | ||
20 | factory: [ "@doctrine.orm.default_entity_manager", getRepository ] | ||
21 | arguments: | ||
22 | - WallabagUserBundle:User | ||
diff --git a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php index 5bf47d96..d6b9617e 100644 --- a/tests/Wallabag/ImportBundle/Import/PocketImportTest.php +++ b/tests/Wallabag/ImportBundle/Import/PocketImportTest.php | |||
@@ -27,32 +27,15 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase | |||
27 | protected $em; | 27 | protected $em; |
28 | protected $contentProxy; | 28 | protected $contentProxy; |
29 | protected $logHandler; | 29 | protected $logHandler; |
30 | protected $producer; | ||
31 | 30 | ||
32 | private function getPocketImport($consumerKey = 'ConsumerKey', $rabbitMQ = false) | 31 | private function getPocketImport($consumerKey = 'ConsumerKey') |
33 | { | 32 | { |
34 | $this->user = new User(); | 33 | $this->user = new User(); |
35 | 34 | ||
36 | $this->tokenStorage = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface') | ||
37 | ->disableOriginalConstructor() | ||
38 | ->getMock(); | ||
39 | |||
40 | $token = $this->getMockBuilder('Symfony\Component\Security\Core\Authentication\Token\TokenInterface') | ||
41 | ->disableOriginalConstructor() | ||
42 | ->getMock(); | ||
43 | |||
44 | $this->contentProxy = $this->getMockBuilder('Wallabag\CoreBundle\Helper\ContentProxy') | 35 | $this->contentProxy = $this->getMockBuilder('Wallabag\CoreBundle\Helper\ContentProxy') |
45 | ->disableOriginalConstructor() | 36 | ->disableOriginalConstructor() |
46 | ->getMock(); | 37 | ->getMock(); |
47 | 38 | ||
48 | $token->expects($this->once()) | ||
49 | ->method('getUser') | ||
50 | ->willReturn($this->user); | ||
51 | |||
52 | $this->tokenStorage->expects($this->once()) | ||
53 | ->method('getToken') | ||
54 | ->willReturn($token); | ||
55 | |||
56 | $this->em = $this->getMockBuilder('Doctrine\ORM\EntityManager') | 39 | $this->em = $this->getMockBuilder('Doctrine\ORM\EntityManager') |
57 | ->disableOriginalConstructor() | 40 | ->disableOriginalConstructor() |
58 | ->getMock(); | 41 | ->getMock(); |
@@ -66,17 +49,12 @@ class PocketImportTest extends \PHPUnit_Framework_TestCase | |||
66 | ->with('pocket_consumer_key') | 49 | ->with('pocket_consumer_key') |
67 | ->willReturn($consumerKey); | 50 | ->willReturn($consumerKey); |
68 | 51 | ||
69 | $this->producer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\Producer') | ||
70 | ->disableOriginalConstructor() | ||
71 | ->getMock(); | ||
72 | |||
73 | $pocket = new PocketImportMock( | 52 | $pocket = new PocketImportMock( |
74 | $this->tokenStorage, | ||
75 | $this->em, | 53 | $this->em, |
76 | $this->contentProxy, | 54 | $this->contentProxy, |
77 | $config, | 55 | $config |
78 | $this->producer | ||
79 | ); | 56 | ); |
57 | $pocket->setUser($this->user); | ||
80 | 58 | ||
81 | $this->logHandler = new TestHandler(); | 59 | $this->logHandler = new TestHandler(); |
82 | $logger = new Logger('test', [$this->logHandler]); | 60 | $logger = new Logger('test', [$this->logHandler]); |