]>
Commit | Line | Data |
---|---|---|
9b12e4fe JC |
1 | package stdcopy |
2 | ||
3 | import ( | |
4 | "encoding/binary" | |
5 | "errors" | |
6 | "io" | |
7 | ||
8 | "github.com/fsouza/go-dockerclient/external/github.com/Sirupsen/logrus" | |
9 | ) | |
10 | ||
11 | const ( | |
12 | stdWriterPrefixLen = 8 | |
13 | stdWriterFdIndex = 0 | |
14 | stdWriterSizeIndex = 4 | |
15 | ||
16 | startingBufLen = 32*1024 + stdWriterPrefixLen + 1 | |
17 | ) | |
18 | ||
19 | // StdType prefixes type and length to standard stream. | |
20 | type StdType [stdWriterPrefixLen]byte | |
21 | ||
22 | var ( | |
23 | // Stdin represents standard input stream type. | |
24 | Stdin = StdType{0: 0} | |
25 | // Stdout represents standard output stream type. | |
26 | Stdout = StdType{0: 1} | |
27 | // Stderr represents standard error steam type. | |
28 | Stderr = StdType{0: 2} | |
29 | ) | |
30 | ||
31 | // StdWriter is wrapper of io.Writer with extra customized info. | |
32 | type StdWriter struct { | |
33 | io.Writer | |
34 | prefix StdType | |
35 | sizeBuf []byte | |
36 | } | |
37 | ||
38 | func (w *StdWriter) Write(buf []byte) (n int, err error) { | |
39 | var n1, n2 int | |
40 | if w == nil || w.Writer == nil { | |
41 | return 0, errors.New("Writer not instantiated") | |
42 | } | |
43 | binary.BigEndian.PutUint32(w.prefix[4:], uint32(len(buf))) | |
44 | n1, err = w.Writer.Write(w.prefix[:]) | |
45 | if err != nil { | |
46 | n = n1 - stdWriterPrefixLen | |
47 | } else { | |
48 | n2, err = w.Writer.Write(buf) | |
49 | n = n1 + n2 - stdWriterPrefixLen | |
50 | } | |
51 | if n < 0 { | |
52 | n = 0 | |
53 | } | |
54 | return | |
55 | } | |
56 | ||
57 | // NewStdWriter instantiates a new Writer. | |
58 | // Everything written to it will be encapsulated using a custom format, | |
59 | // and written to the underlying `w` stream. | |
60 | // This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection. | |
61 | // `t` indicates the id of the stream to encapsulate. | |
62 | // It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr. | |
63 | func NewStdWriter(w io.Writer, t StdType) *StdWriter { | |
64 | return &StdWriter{ | |
65 | Writer: w, | |
66 | prefix: t, | |
67 | sizeBuf: make([]byte, 4), | |
68 | } | |
69 | } | |
70 | ||
71 | var errInvalidStdHeader = errors.New("Unrecognized input header") | |
72 | ||
73 | // StdCopy is a modified version of io.Copy. | |
74 | // | |
75 | // StdCopy will demultiplex `src`, assuming that it contains two streams, | |
76 | // previously multiplexed together using a StdWriter instance. | |
77 | // As it reads from `src`, StdCopy will write to `dstout` and `dsterr`. | |
78 | // | |
79 | // StdCopy will read until it hits EOF on `src`. It will then return a nil error. | |
80 | // In other words: if `err` is non nil, it indicates a real underlying error. | |
81 | // | |
82 | // `written` will hold the total number of bytes written to `dstout` and `dsterr`. | |
83 | func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) { | |
84 | var ( | |
85 | buf = make([]byte, startingBufLen) | |
86 | bufLen = len(buf) | |
87 | nr, nw int | |
88 | er, ew error | |
89 | out io.Writer | |
90 | frameSize int | |
91 | ) | |
92 | ||
93 | for { | |
94 | // Make sure we have at least a full header | |
95 | for nr < stdWriterPrefixLen { | |
96 | var nr2 int | |
97 | nr2, er = src.Read(buf[nr:]) | |
98 | nr += nr2 | |
99 | if er == io.EOF { | |
100 | if nr < stdWriterPrefixLen { | |
101 | logrus.Debugf("Corrupted prefix: %v", buf[:nr]) | |
102 | return written, nil | |
103 | } | |
104 | break | |
105 | } | |
106 | if er != nil { | |
107 | logrus.Debugf("Error reading header: %s", er) | |
108 | return 0, er | |
109 | } | |
110 | } | |
111 | ||
112 | // Check the first byte to know where to write | |
113 | switch buf[stdWriterFdIndex] { | |
114 | case 0: | |
115 | fallthrough | |
116 | case 1: | |
117 | // Write on stdout | |
118 | out = dstout | |
119 | case 2: | |
120 | // Write on stderr | |
121 | out = dsterr | |
122 | default: | |
123 | logrus.Debugf("Error selecting output fd: (%d)", buf[stdWriterFdIndex]) | |
124 | return 0, errInvalidStdHeader | |
125 | } | |
126 | ||
127 | // Retrieve the size of the frame | |
128 | frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) | |
129 | logrus.Debugf("framesize: %d", frameSize) | |
130 | ||
131 | // Check if the buffer is big enough to read the frame. | |
132 | // Extend it if necessary. | |
133 | if frameSize+stdWriterPrefixLen > bufLen { | |
134 | logrus.Debugf("Extending buffer cap by %d (was %d)", frameSize+stdWriterPrefixLen-bufLen+1, len(buf)) | |
135 | buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) | |
136 | bufLen = len(buf) | |
137 | } | |
138 | ||
139 | // While the amount of bytes read is less than the size of the frame + header, we keep reading | |
140 | for nr < frameSize+stdWriterPrefixLen { | |
141 | var nr2 int | |
142 | nr2, er = src.Read(buf[nr:]) | |
143 | nr += nr2 | |
144 | if er == io.EOF { | |
145 | if nr < frameSize+stdWriterPrefixLen { | |
146 | logrus.Debugf("Corrupted frame: %v", buf[stdWriterPrefixLen:nr]) | |
147 | return written, nil | |
148 | } | |
149 | break | |
150 | } | |
151 | if er != nil { | |
152 | logrus.Debugf("Error reading frame: %s", er) | |
153 | return 0, er | |
154 | } | |
155 | } | |
156 | ||
157 | // Write the retrieved frame (without header) | |
158 | nw, ew = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) | |
159 | if ew != nil { | |
160 | logrus.Debugf("Error writing frame: %s", ew) | |
161 | return 0, ew | |
162 | } | |
163 | // If the frame has not been fully written: error | |
164 | if nw != frameSize { | |
165 | logrus.Debugf("Error Short Write: (%d on %d)", nw, frameSize) | |
166 | return 0, io.ErrShortWrite | |
167 | } | |
168 | written += int64(nw) | |
169 | ||
170 | // Move the rest of the buffer to the beginning | |
171 | copy(buf, buf[frameSize+stdWriterPrefixLen:]) | |
172 | // Move the index | |
173 | nr -= frameSize + stdWriterPrefixLen | |
174 | } | |
175 | } |