diff options
Diffstat (limited to 'server/helpers/stream-replacer.ts')
-rw-r--r-- | server/helpers/stream-replacer.ts | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/server/helpers/stream-replacer.ts b/server/helpers/stream-replacer.ts new file mode 100644 index 000000000..4babab418 --- /dev/null +++ b/server/helpers/stream-replacer.ts | |||
@@ -0,0 +1,58 @@ | |||
1 | import { Transform, TransformCallback } from 'stream' | ||
2 | |||
3 | // Thanks: https://stackoverflow.com/a/45126242 | ||
4 | class StreamReplacer extends Transform { | ||
5 | private pendingChunk: Buffer | ||
6 | |||
7 | constructor (private readonly replacer: (line: string) => string) { | ||
8 | super() | ||
9 | } | ||
10 | |||
11 | _transform (chunk: Buffer, _encoding: BufferEncoding, done: TransformCallback) { | ||
12 | try { | ||
13 | this.pendingChunk = this.pendingChunk?.length | ||
14 | ? Buffer.concat([ this.pendingChunk, chunk ]) | ||
15 | : chunk | ||
16 | |||
17 | let index: number | ||
18 | |||
19 | // As long as we keep finding newlines, keep making slices of the buffer and push them to the | ||
20 | // readable side of the transform stream | ||
21 | while ((index = this.pendingChunk.indexOf('\n')) !== -1) { | ||
22 | // The `end` parameter is non-inclusive, so increase it to include the newline we found | ||
23 | const line = this.pendingChunk.slice(0, ++index) | ||
24 | |||
25 | // `start` is inclusive, but we are already one char ahead of the newline -> all good | ||
26 | this.pendingChunk = this.pendingChunk.slice(index) | ||
27 | |||
28 | // We have a single line here! Prepend the string we want | ||
29 | this.push(this.doReplace(line)) | ||
30 | } | ||
31 | |||
32 | return done() | ||
33 | } catch (err) { | ||
34 | return done(err) | ||
35 | } | ||
36 | } | ||
37 | |||
38 | _flush (done: TransformCallback) { | ||
39 | // If we have any remaining data in the cache, send it out | ||
40 | if (!this.pendingChunk?.length) return done() | ||
41 | |||
42 | try { | ||
43 | return done(null, this.doReplace(this.pendingChunk)) | ||
44 | } catch (err) { | ||
45 | return done(err) | ||
46 | } | ||
47 | } | ||
48 | |||
49 | private doReplace (buffer: Buffer) { | ||
50 | const line = this.replacer(buffer.toString('utf8')) | ||
51 | |||
52 | return Buffer.from(line, 'utf8') | ||
53 | } | ||
54 | } | ||
55 | |||
56 | export { | ||
57 | StreamReplacer | ||
58 | } | ||