]>
Commit | Line | Data |
---|---|---|
1 | <?php | |
2 | ||
3 | namespace Wallabag\ImportBundle\Import; | |
4 | ||
5 | use Doctrine\ORM\EntityManager; | |
6 | use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; | |
7 | use Psr\Log\LoggerInterface; | |
8 | use Psr\Log\NullLogger; | |
9 | use Symfony\Component\EventDispatcher\EventDispatcherInterface; | |
10 | use Wallabag\CoreBundle\Entity\Entry; | |
11 | use Wallabag\CoreBundle\Entity\Tag; | |
12 | use Wallabag\CoreBundle\Event\EntrySavedEvent; | |
13 | use Wallabag\CoreBundle\Helper\ContentProxy; | |
14 | use Wallabag\CoreBundle\Helper\TagsAssigner; | |
15 | use Wallabag\UserBundle\Entity\User; | |
16 | ||
17 | abstract class AbstractImport implements ImportInterface | |
18 | { | |
19 | protected $em; | |
20 | protected $logger; | |
21 | protected $contentProxy; | |
22 | protected $tagsAssigner; | |
23 | protected $eventDispatcher; | |
24 | protected $producer; | |
25 | protected $user; | |
26 | protected $markAsRead; | |
27 | protected $disableContentUpdate = false; | |
28 | protected $skippedEntries = 0; | |
29 | protected $importedEntries = 0; | |
30 | protected $queuedEntries = 0; | |
31 | ||
32 | public function __construct(EntityManager $em, ContentProxy $contentProxy, TagsAssigner $tagsAssigner, EventDispatcherInterface $eventDispatcher) | |
33 | { | |
34 | $this->em = $em; | |
35 | $this->logger = new NullLogger(); | |
36 | $this->contentProxy = $contentProxy; | |
37 | $this->tagsAssigner = $tagsAssigner; | |
38 | $this->eventDispatcher = $eventDispatcher; | |
39 | } | |
40 | ||
41 | public function setLogger(LoggerInterface $logger) | |
42 | { | |
43 | $this->logger = $logger; | |
44 | } | |
45 | ||
46 | /** | |
47 | * Set RabbitMQ/Redis Producer to send each entry to a queue. | |
48 | * This method should be called when user has enabled RabbitMQ. | |
49 | * | |
50 | * @param ProducerInterface $producer | |
51 | */ | |
52 | public function setProducer(ProducerInterface $producer) | |
53 | { | |
54 | $this->producer = $producer; | |
55 | } | |
56 | ||
57 | /** | |
58 | * Set current user. | |
59 | * Could the current *connected* user or one retrieve by the consumer. | |
60 | * | |
61 | * @param User $user | |
62 | */ | |
63 | public function setUser(User $user) | |
64 | { | |
65 | $this->user = $user; | |
66 | } | |
67 | ||
68 | /** | |
69 | * Set whether articles must be all marked as read. | |
70 | * | |
71 | * @param bool $markAsRead | |
72 | */ | |
73 | public function setMarkAsRead($markAsRead) | |
74 | { | |
75 | $this->markAsRead = $markAsRead; | |
76 | ||
77 | return $this; | |
78 | } | |
79 | ||
80 | /** | |
81 | * Get whether articles must be all marked as read. | |
82 | */ | |
83 | public function getMarkAsRead() | |
84 | { | |
85 | return $this->markAsRead; | |
86 | } | |
87 | ||
88 | /** | |
89 | * Set whether articles should be fetched for updated content. | |
90 | * | |
91 | * @param bool $disableContentUpdate | |
92 | */ | |
93 | public function setDisableContentUpdate($disableContentUpdate) | |
94 | { | |
95 | $this->disableContentUpdate = $disableContentUpdate; | |
96 | ||
97 | return $this; | |
98 | } | |
99 | ||
100 | /** | |
101 | * {@inheritdoc} | |
102 | */ | |
103 | public function getSummary() | |
104 | { | |
105 | return [ | |
106 | 'skipped' => $this->skippedEntries, | |
107 | 'imported' => $this->importedEntries, | |
108 | 'queued' => $this->queuedEntries, | |
109 | ]; | |
110 | } | |
111 | ||
112 | /** | |
113 | * Parse one entry. | |
114 | * | |
115 | * @param array $importedEntry | |
116 | * | |
117 | * @return Entry | |
118 | */ | |
119 | abstract public function parseEntry(array $importedEntry); | |
120 | ||
121 | /** | |
122 | * Validate that an entry is valid (like has some required keys, etc.). | |
123 | * | |
124 | * @param array $importedEntry | |
125 | * | |
126 | * @return bool | |
127 | */ | |
128 | abstract public function validateEntry(array $importedEntry); | |
129 | ||
130 | /** | |
131 | * Fetch content from the ContentProxy (using graby). | |
132 | * If it fails return the given entry to be saved in all case (to avoid user to loose the content). | |
133 | * | |
134 | * @param Entry $entry Entry to update | |
135 | * @param string $url Url to grab content for | |
136 | * @param array $content An array with AT LEAST keys title, html, url, language & content_type to skip the fetchContent from the url | |
137 | */ | |
138 | protected function fetchContent(Entry $entry, $url, array $content = []) | |
139 | { | |
140 | try { | |
141 | $this->contentProxy->updateEntry($entry, $url, $content, $this->disableContentUpdate); | |
142 | } catch (\Exception $e) { | |
143 | $this->logger->error('Error trying to import an entry.', [ | |
144 | 'entry_url' => $url, | |
145 | 'error_msg' => $e->getMessage(), | |
146 | ]); | |
147 | } | |
148 | } | |
149 | ||
150 | /** | |
151 | * Parse and insert all given entries. | |
152 | * | |
153 | * @param array $entries | |
154 | */ | |
155 | protected function parseEntries(array $entries) | |
156 | { | |
157 | $i = 1; | |
158 | $entryToBeFlushed = []; | |
159 | ||
160 | foreach ($entries as $importedEntry) { | |
161 | if ($this->markAsRead) { | |
162 | $importedEntry = $this->setEntryAsRead($importedEntry); | |
163 | } | |
164 | ||
165 | if (false === $this->validateEntry($importedEntry)) { | |
166 | continue; | |
167 | } | |
168 | ||
169 | $entry = $this->parseEntry($importedEntry); | |
170 | ||
171 | if (null === $entry) { | |
172 | continue; | |
173 | } | |
174 | ||
175 | // store each entry to be flushed so we can trigger the entry.saved event for each of them | |
176 | // entry.saved needs the entry to be persisted in db because it needs it id to generate | |
177 | // images (at least) | |
178 | $entryToBeFlushed[] = $entry; | |
179 | ||
180 | // flush every 20 entries | |
181 | if (0 === ($i % 20)) { | |
182 | $this->em->flush(); | |
183 | ||
184 | foreach ($entryToBeFlushed as $entry) { | |
185 | $this->eventDispatcher->dispatch(EntrySavedEvent::NAME, new EntrySavedEvent($entry)); | |
186 | } | |
187 | ||
188 | $entryToBeFlushed = []; | |
189 | ||
190 | // clear only affected entities | |
191 | $this->em->clear(Entry::class); | |
192 | $this->em->clear(Tag::class); | |
193 | } | |
194 | ++$i; | |
195 | } | |
196 | ||
197 | $this->em->flush(); | |
198 | ||
199 | if (!empty($entryToBeFlushed)) { | |
200 | foreach ($entryToBeFlushed as $entry) { | |
201 | $this->eventDispatcher->dispatch(EntrySavedEvent::NAME, new EntrySavedEvent($entry)); | |
202 | } | |
203 | } | |
204 | } | |
205 | ||
206 | /** | |
207 | * Parse entries and send them to the queue. | |
208 | * It should just be a simple loop on all item, no call to the database should be done | |
209 | * to speedup queuing. | |
210 | * | |
211 | * Faster parse entries for Producer. | |
212 | * We don't care to make check at this time. They'll be done by the consumer. | |
213 | * | |
214 | * @param array $entries | |
215 | */ | |
216 | protected function parseEntriesForProducer(array $entries) | |
217 | { | |
218 | foreach ($entries as $importedEntry) { | |
219 | // set userId for the producer (it won't know which user is connected) | |
220 | $importedEntry['userId'] = $this->user->getId(); | |
221 | ||
222 | if ($this->markAsRead) { | |
223 | $importedEntry = $this->setEntryAsRead($importedEntry); | |
224 | } | |
225 | ||
226 | ++$this->queuedEntries; | |
227 | ||
228 | $this->producer->publish(json_encode($importedEntry)); | |
229 | } | |
230 | } | |
231 | ||
232 | /** | |
233 | * Set current imported entry to archived / read. | |
234 | * Implementation is different accross all imports. | |
235 | * | |
236 | * @param array $importedEntry | |
237 | * | |
238 | * @return array | |
239 | */ | |
240 | abstract protected function setEntryAsRead(array $importedEntry); | |
241 | } |