]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | package eventstream |
2 | ||
3 | import ( | |
4 | "bytes" | |
5 | "encoding/binary" | |
6 | "hash" | |
7 | "hash/crc32" | |
8 | "io" | |
9 | ) | |
10 | ||
11 | // Encoder provides EventStream message encoding. | |
12 | type Encoder struct { | |
13 | w io.Writer | |
14 | ||
15 | headersBuf *bytes.Buffer | |
16 | } | |
17 | ||
18 | // NewEncoder initializes and returns an Encoder to encode Event Stream | |
19 | // messages to an io.Writer. | |
20 | func NewEncoder(w io.Writer) *Encoder { | |
21 | return &Encoder{ | |
22 | w: w, | |
23 | headersBuf: bytes.NewBuffer(nil), | |
24 | } | |
25 | } | |
26 | ||
27 | // Encode encodes a single EventStream message to the io.Writer the Encoder | |
28 | // was created with. An error is returned if writing the message fails. | |
29 | func (e *Encoder) Encode(msg Message) error { | |
30 | e.headersBuf.Reset() | |
31 | ||
32 | err := encodeHeaders(e.headersBuf, msg.Headers) | |
33 | if err != nil { | |
34 | return err | |
35 | } | |
36 | ||
37 | crc := crc32.New(crc32IEEETable) | |
38 | hashWriter := io.MultiWriter(e.w, crc) | |
39 | ||
40 | headersLen := uint32(e.headersBuf.Len()) | |
41 | payloadLen := uint32(len(msg.Payload)) | |
42 | ||
43 | if err := encodePrelude(hashWriter, crc, headersLen, payloadLen); err != nil { | |
44 | return err | |
45 | } | |
46 | ||
47 | if headersLen > 0 { | |
48 | if _, err := io.Copy(hashWriter, e.headersBuf); err != nil { | |
49 | return err | |
50 | } | |
51 | } | |
52 | ||
53 | if payloadLen > 0 { | |
54 | if _, err := hashWriter.Write(msg.Payload); err != nil { | |
55 | return err | |
56 | } | |
57 | } | |
58 | ||
59 | msgCRC := crc.Sum32() | |
60 | return binary.Write(e.w, binary.BigEndian, msgCRC) | |
61 | } | |
62 | ||
63 | func encodePrelude(w io.Writer, crc hash.Hash32, headersLen, payloadLen uint32) error { | |
64 | p := messagePrelude{ | |
65 | Length: minMsgLen + headersLen + payloadLen, | |
66 | HeadersLen: headersLen, | |
67 | } | |
68 | if err := p.ValidateLens(); err != nil { | |
69 | return err | |
70 | } | |
71 | ||
72 | err := binaryWriteFields(w, binary.BigEndian, | |
73 | p.Length, | |
74 | p.HeadersLen, | |
75 | ) | |
76 | if err != nil { | |
77 | return err | |
78 | } | |
79 | ||
80 | p.PreludeCRC = crc.Sum32() | |
81 | err = binary.Write(w, binary.BigEndian, p.PreludeCRC) | |
82 | if err != nil { | |
83 | return err | |
84 | } | |
85 | ||
86 | return nil | |
87 | } | |
88 | ||
89 | func encodeHeaders(w io.Writer, headers Headers) error { | |
90 | for _, h := range headers { | |
91 | hn := headerName{ | |
92 | Len: uint8(len(h.Name)), | |
93 | } | |
94 | copy(hn.Name[:hn.Len], h.Name) | |
95 | if err := hn.encode(w); err != nil { | |
96 | return err | |
97 | } | |
98 | ||
99 | if err := h.Value.encode(w); err != nil { | |
100 | return err | |
101 | } | |
102 | } | |
103 | ||
104 | return nil | |
105 | } | |
106 | ||
107 | func binaryWriteFields(w io.Writer, order binary.ByteOrder, vs ...interface{}) error { | |
108 | for _, v := range vs { | |
109 | if err := binary.Write(w, order, v); err != nil { | |
110 | return err | |
111 | } | |
112 | } | |
113 | return nil | |
114 | } |