aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/Wallabag/ImportBundle/Import/PocketImport.php
diff options
context:
space:
mode:
authorJeremy Benoist <jeremy.benoist@gmail.com>2016-09-03 17:36:57 +0200
committerJeremy Benoist <jeremy.benoist@gmail.com>2016-09-11 21:57:46 +0200
commitef75e1220ebb76a8df019d946460ad612759f0bb (patch)
tree100830464851d6834e1e24ccabca1961a862fcd8 /src/Wallabag/ImportBundle/Import/PocketImport.php
parent87c9995b6c61a9f5cde3771bd4f9d44b5da26c43 (diff)
downloadwallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.gz
wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.tar.zst
wallabag-ef75e1220ebb76a8df019d946460ad612759f0bb.zip
Send every imported item to the queue
Instead of queing real Entry to process, we queue all the item to import from Pocket in a raw format. Then, the worker retrieve that information, find / create the entry and save it.
Diffstat (limited to 'src/Wallabag/ImportBundle/Import/PocketImport.php')
-rw-r--r--src/Wallabag/ImportBundle/Import/PocketImport.php177
1 files changed, 112 insertions, 65 deletions
diff --git a/src/Wallabag/ImportBundle/Import/PocketImport.php b/src/Wallabag/ImportBundle/Import/PocketImport.php
index 7d1c0c61..27df4917 100644
--- a/src/Wallabag/ImportBundle/Import/PocketImport.php
+++ b/src/Wallabag/ImportBundle/Import/PocketImport.php
@@ -3,12 +3,11 @@
3namespace Wallabag\ImportBundle\Import; 3namespace Wallabag\ImportBundle\Import;
4 4
5use OldSound\RabbitMqBundle\RabbitMq\Producer; 5use OldSound\RabbitMqBundle\RabbitMq\Producer;
6use Psr\Log\LoggerInterface;
7use Psr\Log\NullLogger; 6use Psr\Log\NullLogger;
8use Doctrine\ORM\EntityManager; 7use Doctrine\ORM\EntityManager;
9use GuzzleHttp\Client; 8use GuzzleHttp\Client;
10use GuzzleHttp\Exception\RequestException; 9use GuzzleHttp\Exception\RequestException;
11use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; 10use Symfony\Component\Security\Core\User\UserInterface;
12use Wallabag\CoreBundle\Entity\Entry; 11use Wallabag\CoreBundle\Entity\Entry;
13use Wallabag\CoreBundle\Helper\ContentProxy; 12use Wallabag\CoreBundle\Helper\ContentProxy;
14use Craue\ConfigBundle\Util\Config; 13use Craue\ConfigBundle\Util\Config;
@@ -21,22 +20,40 @@ class PocketImport extends AbstractImport
21 private $skippedEntries = 0; 20 private $skippedEntries = 0;
22 private $importedEntries = 0; 21 private $importedEntries = 0;
23 private $markAsRead; 22 private $markAsRead;
24 protected $accessToken;
25 private $producer; 23 private $producer;
26 private $rabbitMQ; 24 protected $accessToken;
27 25
28 public function __construct(TokenStorageInterface $tokenStorage, EntityManager $em, ContentProxy $contentProxy, Config $craueConfig, Producer $producer) 26 public function __construct(EntityManager $em, ContentProxy $contentProxy, Config $craueConfig)
29 { 27 {
30 $this->user = $tokenStorage->getToken()->getUser();
31 $this->em = $em; 28 $this->em = $em;
32 $this->contentProxy = $contentProxy; 29 $this->contentProxy = $contentProxy;
33 $this->consumerKey = $craueConfig->get('pocket_consumer_key'); 30 $this->consumerKey = $craueConfig->get('pocket_consumer_key');
34 $this->logger = new NullLogger(); 31 $this->logger = new NullLogger();
35 $this->rabbitMQ = $craueConfig->get('rabbitmq'); 32 }
33
34 /**
35 * Set RabbitMQ Producer to send each entry to a queue.
36 * This method should be called when user has enabled RabbitMQ.
37 *
38 * @param Producer $producer
39 */
40 public function setRabbitmqProducer(Producer $producer)
41 {
36 $this->producer = $producer; 42 $this->producer = $producer;
37 } 43 }
38 44
39 /** 45 /**
46 * Set current user.
47 * Could the current *connected* user or one retrieve by the consumer.
48 *
49 * @param UserInterface $user
50 */
51 public function setUser(UserInterface $user)
52 {
53 $this->user = $user;
54 }
55
56 /**
40 * {@inheritdoc} 57 * {@inheritdoc}
41 */ 58 */
42 public function getName() 59 public function getName()
@@ -168,6 +185,12 @@ class PocketImport extends AbstractImport
168 185
169 $entries = $response->json(); 186 $entries = $response->json();
170 187
188 if ($this->producer) {
189 $this->parseEntriesForProducer($entries['list']);
190
191 return true;
192 }
193
171 $this->parseEntries($entries['list']); 194 $this->parseEntries($entries['list']);
172 195
173 return true; 196 return true;
@@ -197,88 +220,112 @@ class PocketImport extends AbstractImport
197 /** 220 /**
198 * @see https://getpocket.com/developer/docs/v3/retrieve 221 * @see https://getpocket.com/developer/docs/v3/retrieve
199 * 222 *
200 * @param $entries 223 * @param array $entries
201 */ 224 */
202 private function parseEntries($entries) 225 private function parseEntries(array $entries)
203 { 226 {
204 $i = 1; 227 $i = 1;
205 228
206 foreach ($entries as &$pocketEntry) { 229 foreach ($entries as $pocketEntry) {
207 $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url']; 230 $entry = $this->parseEntry($pocketEntry);
208
209 $existingEntry = $this->em
210 ->getRepository('WallabagCoreBundle:Entry')
211 ->findByUrlAndUserId($url, $this->user->getId());
212 231
213 if (false !== $existingEntry) { 232 if (null === $entry) {
214 ++$this->skippedEntries;
215 continue; 233 continue;
216 } 234 }
217 235
218 $entry = new Entry($this->user); 236 // flush every 20 entries
237 if (($i % 20) === 0) {
238 $this->em->flush();
239 $this->em->clear($entry);
240 }
219 241
220 if (!$this->rabbitMQ) { 242 ++$i;
221 $entry = $this->fetchContent($entry, $url); 243 }
222 244
223 // jump to next entry in case of problem while getting content 245 $this->em->flush();
224 if (false === $entry) { 246 }
225 ++$this->skippedEntries;
226 continue;
227 }
228 }
229 247
230 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted 248 public function parseEntry(array $pocketEntry)
231 if ($pocketEntry['status'] == 1 || $this->markAsRead) { 249 {
232 $entry->setArchived(true); 250 $url = isset($pocketEntry['resolved_url']) && $pocketEntry['resolved_url'] != '' ? $pocketEntry['resolved_url'] : $pocketEntry['given_url'];
233 }
234 251
235 // 0 or 1 - 1 If the item is starred 252 $existingEntry = $this->em
236 if ($pocketEntry['favorite'] == 1) { 253 ->getRepository('WallabagCoreBundle:Entry')
237 $entry->setStarred(true); 254 ->findByUrlAndUserId($url, $this->user->getId());
238 }
239 255
240 $title = 'Untitled'; 256 if (false !== $existingEntry) {
241 if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') { 257 ++$this->skippedEntries;
242 $title = $pocketEntry['resolved_title'];
243 } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
244 $title = $pocketEntry['given_title'];
245 }
246 258
247 $entry->setTitle($title); 259 return;
248 $entry->setUrl($url); 260 }
249 261
250 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image 262 $entry = new Entry($this->user);
251 if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) { 263 $entry = $this->fetchContent($entry, $url);
252 $entry->setPreviewPicture($pocketEntry['images'][1]['src']);
253 }
254 264
255 if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) { 265 // jump to next entry in case of problem while getting content
256 $this->contentProxy->assignTagsToEntry( 266 if (false === $entry) {
257 $entry, 267 ++$this->skippedEntries;
258 array_keys($pocketEntry['tags'])
259 );
260 }
261 268
262 $pocketEntry['url'] = $url; 269 return;
263 $pocketEntry['userId'] = $this->user->getId(); 270 }
264 271
265 $this->em->persist($entry); 272 // 0, 1, 2 - 1 if the item is archived - 2 if the item should be deleted
266 ++$this->importedEntries; 273 if ($pocketEntry['status'] == 1 || $this->markAsRead) {
274 $entry->setArchived(true);
275 }
267 276
268 // flush every 20 entries 277 // 0 or 1 - 1 If the item is starred
269 if (($i % 20) === 0) { 278 if ($pocketEntry['favorite'] == 1) {
270 $this->em->flush(); 279 $entry->setStarred(true);
271 } 280 }
272 281
273 ++$i; 282 $title = 'Untitled';
283 if (isset($pocketEntry['resolved_title']) && $pocketEntry['resolved_title'] != '') {
284 $title = $pocketEntry['resolved_title'];
285 } elseif (isset($pocketEntry['given_title']) && $pocketEntry['given_title'] != '') {
286 $title = $pocketEntry['given_title'];
274 } 287 }
275 288
276 $this->em->flush(); 289 $entry->setTitle($title);
290 $entry->setUrl($url);
291
292 // 0, 1, or 2 - 1 if the item has images in it - 2 if the item is an image
293 if (isset($pocketEntry['has_image']) && $pocketEntry['has_image'] > 0 && isset($pocketEntry['images'][1])) {
294 $entry->setPreviewPicture($pocketEntry['images'][1]['src']);
295 }
296
297 if (isset($pocketEntry['tags']) && !empty($pocketEntry['tags'])) {
298 $this->contentProxy->assignTagsToEntry(
299 $entry,
300 array_keys($pocketEntry['tags'])
301 );
302 }
303
304 $this->em->persist($entry);
305 ++$this->importedEntries;
277 306
278 if ($this->rabbitMQ) { 307 return $entry;
279 foreach ($entries as $entry) { 308 }
280 $this->producer->publish(serialize($entry)); 309
310 /**
311 * Faster parse entries for Producer.
312 * We don't care to make check at this time. They'll be done by the consumer.
313 *
314 * @param array $entries
315 */
316 public function parseEntriesForProducer($entries)
317 {
318 foreach ($entries as $pocketEntry) {
319 // set userId for the producer (it won't know which user is connected)
320 $pocketEntry['userId'] = $this->user->getId();
321
322 if ($this->markAsRead) {
323 $pocketEntry['status'] = 1;
281 } 324 }
325
326 ++$this->importedEntries;
327
328 $this->producer->publish(json_encode($pocketEntry));
282 } 329 }
283 } 330 }
284} 331}