diff options
Diffstat (limited to 'src/Wallabag/ImportBundle/Import/AbstractImport.php')
-rw-r--r-- | src/Wallabag/ImportBundle/Import/AbstractImport.php | 144 |
1 files changed, 141 insertions, 3 deletions
diff --git a/src/Wallabag/ImportBundle/Import/AbstractImport.php b/src/Wallabag/ImportBundle/Import/AbstractImport.php index 14377a35..a1a14576 100644 --- a/src/Wallabag/ImportBundle/Import/AbstractImport.php +++ b/src/Wallabag/ImportBundle/Import/AbstractImport.php | |||
@@ -7,12 +7,21 @@ use Psr\Log\NullLogger; | |||
7 | use Doctrine\ORM\EntityManager; | 7 | use Doctrine\ORM\EntityManager; |
8 | use Wallabag\CoreBundle\Helper\ContentProxy; | 8 | use Wallabag\CoreBundle\Helper\ContentProxy; |
9 | use Wallabag\CoreBundle\Entity\Entry; | 9 | use Wallabag\CoreBundle\Entity\Entry; |
10 | use Wallabag\CoreBundle\Entity\Tag; | ||
11 | use Wallabag\UserBundle\Entity\User; | ||
12 | use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; | ||
10 | 13 | ||
11 | abstract class AbstractImport implements ImportInterface | 14 | abstract class AbstractImport implements ImportInterface |
12 | { | 15 | { |
13 | protected $em; | 16 | protected $em; |
14 | protected $logger; | 17 | protected $logger; |
15 | protected $contentProxy; | 18 | protected $contentProxy; |
19 | protected $producer; | ||
20 | protected $user; | ||
21 | protected $markAsRead; | ||
22 | protected $skippedEntries = 0; | ||
23 | protected $importedEntries = 0; | ||
24 | protected $queuedEntries = 0; | ||
16 | 25 | ||
17 | public function __construct(EntityManager $em, ContentProxy $contentProxy) | 26 | public function __construct(EntityManager $em, ContentProxy $contentProxy) |
18 | { | 27 | { |
@@ -27,21 +36,150 @@ abstract class AbstractImport implements ImportInterface | |||
27 | } | 36 | } |
28 | 37 | ||
29 | /** | 38 | /** |
39 | * Set RabbitMQ/Redis Producer to send each entry to a queue. | ||
40 | * This method should be called when user has enabled RabbitMQ. | ||
41 | * | ||
42 | * @param ProducerInterface $producer | ||
43 | */ | ||
44 | public function setProducer(ProducerInterface $producer) | ||
45 | { | ||
46 | $this->producer = $producer; | ||
47 | } | ||
48 | |||
49 | /** | ||
50 | * Set current user. | ||
51 | * Could the current *connected* user or one retrieve by the consumer. | ||
52 | * | ||
53 | * @param User $user | ||
54 | */ | ||
55 | public function setUser(User $user) | ||
56 | { | ||
57 | $this->user = $user; | ||
58 | } | ||
59 | |||
60 | /** | ||
61 | * Set whether articles must be all marked as read. | ||
62 | * | ||
63 | * @param bool $markAsRead | ||
64 | */ | ||
65 | public function setMarkAsRead($markAsRead) | ||
66 | { | ||
67 | $this->markAsRead = $markAsRead; | ||
68 | |||
69 | return $this; | ||
70 | } | ||
71 | |||
72 | /** | ||
73 | * Get whether articles must be all marked as read. | ||
74 | */ | ||
75 | public function getMarkAsRead() | ||
76 | { | ||
77 | return $this->markAsRead; | ||
78 | } | ||
79 | |||
80 | /** | ||
30 | * Fetch content from the ContentProxy (using graby). | 81 | * Fetch content from the ContentProxy (using graby). |
31 | * If it fails return false instead of the updated entry. | 82 | * If it fails return the given entry to be saved in all case (to avoid user to loose the content). |
32 | * | 83 | * |
33 | * @param Entry $entry Entry to update | 84 | * @param Entry $entry Entry to update |
34 | * @param string $url Url to grab content for | 85 | * @param string $url Url to grab content for |
35 | * @param array $content An array with AT LEAST keys title, html, url, language & content_type to skip the fetchContent from the url | 86 | * @param array $content An array with AT LEAST keys title, html, url, language & content_type to skip the fetchContent from the url |
36 | * | 87 | * |
37 | * @return Entry|false | 88 | * @return Entry |
38 | */ | 89 | */ |
39 | protected function fetchContent(Entry $entry, $url, array $content = []) | 90 | protected function fetchContent(Entry $entry, $url, array $content = []) |
40 | { | 91 | { |
41 | try { | 92 | try { |
42 | return $this->contentProxy->updateEntry($entry, $url, $content); | 93 | return $this->contentProxy->updateEntry($entry, $url, $content); |
43 | } catch (\Exception $e) { | 94 | } catch (\Exception $e) { |
44 | return false; | 95 | return $entry; |
96 | } | ||
97 | } | ||
98 | |||
99 | /** | ||
100 | * Parse and insert all given entries. | ||
101 | * | ||
102 | * @param $entries | ||
103 | */ | ||
104 | protected function parseEntries($entries) | ||
105 | { | ||
106 | $i = 1; | ||
107 | |||
108 | foreach ($entries as $importedEntry) { | ||
109 | $entry = $this->parseEntry($importedEntry); | ||
110 | |||
111 | if (null === $entry) { | ||
112 | continue; | ||
113 | } | ||
114 | |||
115 | // flush every 20 entries | ||
116 | if (($i % 20) === 0) { | ||
117 | $this->em->flush(); | ||
118 | |||
119 | // clear only affected entities | ||
120 | $this->em->clear(Entry::class); | ||
121 | $this->em->clear(Tag::class); | ||
122 | } | ||
123 | ++$i; | ||
124 | } | ||
125 | |||
126 | $this->em->flush(); | ||
127 | } | ||
128 | |||
129 | /** | ||
130 | * Parse entries and send them to the queue. | ||
131 | * It should just be a simple loop on all item, no call to the database should be done | ||
132 | * to speedup queuing. | ||
133 | * | ||
134 | * Faster parse entries for Producer. | ||
135 | * We don't care to make check at this time. They'll be done by the consumer. | ||
136 | * | ||
137 | * @param array $entries | ||
138 | */ | ||
139 | protected function parseEntriesForProducer(array $entries) | ||
140 | { | ||
141 | foreach ($entries as $importedEntry) { | ||
142 | // set userId for the producer (it won't know which user is connected) | ||
143 | $importedEntry['userId'] = $this->user->getId(); | ||
144 | |||
145 | if ($this->markAsRead) { | ||
146 | $importedEntry = $this->setEntryAsRead($importedEntry); | ||
147 | } | ||
148 | |||
149 | ++$this->queuedEntries; | ||
150 | |||
151 | $this->producer->publish(json_encode($importedEntry)); | ||
45 | } | 152 | } |
46 | } | 153 | } |
154 | |||
155 | /** | ||
156 | * {@inheritdoc} | ||
157 | */ | ||
158 | public function getSummary() | ||
159 | { | ||
160 | return [ | ||
161 | 'skipped' => $this->skippedEntries, | ||
162 | 'imported' => $this->importedEntries, | ||
163 | 'queued' => $this->queuedEntries, | ||
164 | ]; | ||
165 | } | ||
166 | |||
167 | /** | ||
168 | * Parse one entry. | ||
169 | * | ||
170 | * @param array $importedEntry | ||
171 | * | ||
172 | * @return Entry | ||
173 | */ | ||
174 | abstract public function parseEntry(array $importedEntry); | ||
175 | |||
176 | /** | ||
177 | * Set current imported entry to archived / read. | ||
178 | * Implementation is different accross all imports. | ||
179 | * | ||
180 | * @param array $importedEntry | ||
181 | * | ||
182 | * @return array | ||
183 | */ | ||
184 | abstract protected function setEntryAsRead(array $importedEntry); | ||
47 | } | 185 | } |