]>
Commit | Line | Data |
---|---|---|
1 | import { Transform, TransformCallback } from 'stream' | |
2 | ||
3 | // Thanks: https://stackoverflow.com/a/45126242 | |
4 | class StreamReplacer extends Transform { | |
5 | private pendingChunk: Buffer | |
6 | ||
7 | constructor (private readonly replacer: (line: string) => string) { | |
8 | super() | |
9 | } | |
10 | ||
11 | _transform (chunk: Buffer, _encoding: BufferEncoding, done: TransformCallback) { | |
12 | try { | |
13 | this.pendingChunk = this.pendingChunk?.length | |
14 | ? Buffer.concat([ this.pendingChunk, chunk ]) | |
15 | : chunk | |
16 | ||
17 | let index: number | |
18 | ||
19 | // As long as we keep finding newlines, keep making slices of the buffer and push them to the | |
20 | // readable side of the transform stream | |
21 | while ((index = this.pendingChunk.indexOf('\n')) !== -1) { | |
22 | // The `end` parameter is non-inclusive, so increase it to include the newline we found | |
23 | const line = this.pendingChunk.slice(0, ++index) | |
24 | ||
25 | // `start` is inclusive, but we are already one char ahead of the newline -> all good | |
26 | this.pendingChunk = this.pendingChunk.slice(index) | |
27 | ||
28 | // We have a single line here! Prepend the string we want | |
29 | this.push(this.doReplace(line)) | |
30 | } | |
31 | ||
32 | return done() | |
33 | } catch (err) { | |
34 | return done(err) | |
35 | } | |
36 | } | |
37 | ||
38 | _flush (done: TransformCallback) { | |
39 | // If we have any remaining data in the cache, send it out | |
40 | if (!this.pendingChunk?.length) return done() | |
41 | ||
42 | try { | |
43 | return done(null, this.doReplace(this.pendingChunk)) | |
44 | } catch (err) { | |
45 | return done(err) | |
46 | } | |
47 | } | |
48 | ||
49 | private doReplace (buffer: Buffer) { | |
50 | const line = this.replacer(buffer.toString('utf8')) | |
51 | ||
52 | return Buffer.from(line, 'utf8') | |
53 | } | |
54 | } | |
55 | ||
56 | export { | |
57 | StreamReplacer | |
58 | } |