diff options
Diffstat (limited to 'vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/decode.go')
-rw-r--r-- | vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/decode.go | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/decode.go b/vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/decode.go new file mode 100644 index 0000000..4b972b2 --- /dev/null +++ b/vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/decode.go | |||
@@ -0,0 +1,199 @@ | |||
1 | package eventstream | ||
2 | |||
3 | import ( | ||
4 | "bytes" | ||
5 | "encoding/binary" | ||
6 | "encoding/hex" | ||
7 | "encoding/json" | ||
8 | "fmt" | ||
9 | "hash" | ||
10 | "hash/crc32" | ||
11 | "io" | ||
12 | |||
13 | "github.com/aws/aws-sdk-go/aws" | ||
14 | ) | ||
15 | |||
16 | // Decoder provides decoding of an Event Stream messages. | ||
17 | type Decoder struct { | ||
18 | r io.Reader | ||
19 | logger aws.Logger | ||
20 | } | ||
21 | |||
22 | // NewDecoder initializes and returns a Decoder for decoding event | ||
23 | // stream messages from the reader provided. | ||
24 | func NewDecoder(r io.Reader) *Decoder { | ||
25 | return &Decoder{ | ||
26 | r: r, | ||
27 | } | ||
28 | } | ||
29 | |||
30 | // Decode attempts to decode a single message from the event stream reader. | ||
31 | // Will return the event stream message, or error if Decode fails to read | ||
32 | // the message from the stream. | ||
33 | func (d *Decoder) Decode(payloadBuf []byte) (m Message, err error) { | ||
34 | reader := d.r | ||
35 | if d.logger != nil { | ||
36 | debugMsgBuf := bytes.NewBuffer(nil) | ||
37 | reader = io.TeeReader(reader, debugMsgBuf) | ||
38 | defer func() { | ||
39 | logMessageDecode(d.logger, debugMsgBuf, m, err) | ||
40 | }() | ||
41 | } | ||
42 | |||
43 | crc := crc32.New(crc32IEEETable) | ||
44 | hashReader := io.TeeReader(reader, crc) | ||
45 | |||
46 | prelude, err := decodePrelude(hashReader, crc) | ||
47 | if err != nil { | ||
48 | return Message{}, err | ||
49 | } | ||
50 | |||
51 | if prelude.HeadersLen > 0 { | ||
52 | lr := io.LimitReader(hashReader, int64(prelude.HeadersLen)) | ||
53 | m.Headers, err = decodeHeaders(lr) | ||
54 | if err != nil { | ||
55 | return Message{}, err | ||
56 | } | ||
57 | } | ||
58 | |||
59 | if payloadLen := prelude.PayloadLen(); payloadLen > 0 { | ||
60 | buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen))) | ||
61 | if err != nil { | ||
62 | return Message{}, err | ||
63 | } | ||
64 | m.Payload = buf | ||
65 | } | ||
66 | |||
67 | msgCRC := crc.Sum32() | ||
68 | if err := validateCRC(reader, msgCRC); err != nil { | ||
69 | return Message{}, err | ||
70 | } | ||
71 | |||
72 | return m, nil | ||
73 | } | ||
74 | |||
75 | // UseLogger specifies the Logger that that the decoder should use to log the | ||
76 | // message decode to. | ||
77 | func (d *Decoder) UseLogger(logger aws.Logger) { | ||
78 | d.logger = logger | ||
79 | } | ||
80 | |||
81 | func logMessageDecode(logger aws.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) { | ||
82 | w := bytes.NewBuffer(nil) | ||
83 | defer func() { logger.Log(w.String()) }() | ||
84 | |||
85 | fmt.Fprintf(w, "Raw message:\n%s\n", | ||
86 | hex.Dump(msgBuf.Bytes())) | ||
87 | |||
88 | if decodeErr != nil { | ||
89 | fmt.Fprintf(w, "Decode error: %v\n", decodeErr) | ||
90 | return | ||
91 | } | ||
92 | |||
93 | rawMsg, err := msg.rawMessage() | ||
94 | if err != nil { | ||
95 | fmt.Fprintf(w, "failed to create raw message, %v\n", err) | ||
96 | return | ||
97 | } | ||
98 | |||
99 | decodedMsg := decodedMessage{ | ||
100 | rawMessage: rawMsg, | ||
101 | Headers: decodedHeaders(msg.Headers), | ||
102 | } | ||
103 | |||
104 | fmt.Fprintf(w, "Decoded message:\n") | ||
105 | encoder := json.NewEncoder(w) | ||
106 | if err := encoder.Encode(decodedMsg); err != nil { | ||
107 | fmt.Fprintf(w, "failed to generate decoded message, %v\n", err) | ||
108 | } | ||
109 | } | ||
110 | |||
111 | func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) { | ||
112 | var p messagePrelude | ||
113 | |||
114 | var err error | ||
115 | p.Length, err = decodeUint32(r) | ||
116 | if err != nil { | ||
117 | return messagePrelude{}, err | ||
118 | } | ||
119 | |||
120 | p.HeadersLen, err = decodeUint32(r) | ||
121 | if err != nil { | ||
122 | return messagePrelude{}, err | ||
123 | } | ||
124 | |||
125 | if err := p.ValidateLens(); err != nil { | ||
126 | return messagePrelude{}, err | ||
127 | } | ||
128 | |||
129 | preludeCRC := crc.Sum32() | ||
130 | if err := validateCRC(r, preludeCRC); err != nil { | ||
131 | return messagePrelude{}, err | ||
132 | } | ||
133 | |||
134 | p.PreludeCRC = preludeCRC | ||
135 | |||
136 | return p, nil | ||
137 | } | ||
138 | |||
139 | func decodePayload(buf []byte, r io.Reader) ([]byte, error) { | ||
140 | w := bytes.NewBuffer(buf[0:0]) | ||
141 | |||
142 | _, err := io.Copy(w, r) | ||
143 | return w.Bytes(), err | ||
144 | } | ||
145 | |||
146 | func decodeUint8(r io.Reader) (uint8, error) { | ||
147 | type byteReader interface { | ||
148 | ReadByte() (byte, error) | ||
149 | } | ||
150 | |||
151 | if br, ok := r.(byteReader); ok { | ||
152 | v, err := br.ReadByte() | ||
153 | return uint8(v), err | ||
154 | } | ||
155 | |||
156 | var b [1]byte | ||
157 | _, err := io.ReadFull(r, b[:]) | ||
158 | return uint8(b[0]), err | ||
159 | } | ||
160 | func decodeUint16(r io.Reader) (uint16, error) { | ||
161 | var b [2]byte | ||
162 | bs := b[:] | ||
163 | _, err := io.ReadFull(r, bs) | ||
164 | if err != nil { | ||
165 | return 0, err | ||
166 | } | ||
167 | return binary.BigEndian.Uint16(bs), nil | ||
168 | } | ||
169 | func decodeUint32(r io.Reader) (uint32, error) { | ||
170 | var b [4]byte | ||
171 | bs := b[:] | ||
172 | _, err := io.ReadFull(r, bs) | ||
173 | if err != nil { | ||
174 | return 0, err | ||
175 | } | ||
176 | return binary.BigEndian.Uint32(bs), nil | ||
177 | } | ||
178 | func decodeUint64(r io.Reader) (uint64, error) { | ||
179 | var b [8]byte | ||
180 | bs := b[:] | ||
181 | _, err := io.ReadFull(r, bs) | ||
182 | if err != nil { | ||
183 | return 0, err | ||
184 | } | ||
185 | return binary.BigEndian.Uint64(bs), nil | ||
186 | } | ||
187 | |||
188 | func validateCRC(r io.Reader, expect uint32) error { | ||
189 | msgCRC, err := decodeUint32(r) | ||
190 | if err != nil { | ||
191 | return err | ||
192 | } | ||
193 | |||
194 | if msgCRC != expect { | ||
195 | return ChecksumError{} | ||
196 | } | ||
197 | |||
198 | return nil | ||
199 | } | ||