11 type decodedMessage struct {
13 Headers decodedHeaders `json:"headers"`
15 type jsonMessage struct {
16 Length json.Number `json:"total_length"`
17 HeadersLen json.Number `json:"headers_length"`
18 PreludeCRC json.Number `json:"prelude_crc"`
19 Headers decodedHeaders `json:"headers"`
20 Payload []byte `json:"payload"`
21 CRC json.Number `json:"message_crc"`
24 func (d *decodedMessage) UnmarshalJSON(b []byte) (err error) {
25 var jsonMsg jsonMessage
26 if err = json.Unmarshal(b, &jsonMsg); err != nil {
30 d.Length, err = numAsUint32(jsonMsg.Length)
34 d.HeadersLen, err = numAsUint32(jsonMsg.HeadersLen)
38 d.PreludeCRC, err = numAsUint32(jsonMsg.PreludeCRC)
42 d.Headers = jsonMsg.Headers
43 d.Payload = jsonMsg.Payload
44 d.CRC, err = numAsUint32(jsonMsg.CRC)
52 func (d *decodedMessage) MarshalJSON() ([]byte, error) {
53 jsonMsg := jsonMessage{
54 Length: json.Number(strconv.Itoa(int(d.Length))),
55 HeadersLen: json.Number(strconv.Itoa(int(d.HeadersLen))),
56 PreludeCRC: json.Number(strconv.Itoa(int(d.PreludeCRC))),
59 CRC: json.Number(strconv.Itoa(int(d.CRC))),
62 return json.Marshal(jsonMsg)
65 func numAsUint32(n json.Number) (uint32, error) {
68 return 0, fmt.Errorf("failed to get int64 json number, %v", err)
74 func (d decodedMessage) Message() Message {
76 Headers: Headers(d.Headers),
81 type decodedHeaders Headers
83 func (hs *decodedHeaders) UnmarshalJSON(b []byte) error {
84 var jsonHeaders []struct {
85 Name string `json:"name"`
86 Type valueType `json:"type"`
87 Value interface{} `json:"value"`
90 decoder := json.NewDecoder(bytes.NewReader(b))
92 if err := decoder.Decode(&jsonHeaders); err != nil {
97 for _, h := range jsonHeaders {
98 value, err := valueFromType(h.Type, h.Value)
102 headers.Set(h.Name, value)
104 (*hs) = decodedHeaders(headers)
109 func valueFromType(typ valueType, val interface{}) (Value, error) {
112 return BoolValue(true), nil
114 return BoolValue(false), nil
116 v, err := val.(json.Number).Int64()
117 return Int8Value(int8(v)), err
119 v, err := val.(json.Number).Int64()
120 return Int16Value(int16(v)), err
122 v, err := val.(json.Number).Int64()
123 return Int32Value(int32(v)), err
125 v, err := val.(json.Number).Int64()
126 return Int64Value(v), err
128 v, err := base64.StdEncoding.DecodeString(val.(string))
129 return BytesValue(v), err
130 case stringValueType:
131 v, err := base64.StdEncoding.DecodeString(val.(string))
132 return StringValue(string(v)), err
133 case timestampValueType:
134 v, err := val.(json.Number).Int64()
135 return TimestampValue(timeFromEpochMilli(v)), err
137 v, err := base64.StdEncoding.DecodeString(val.(string))
142 panic(fmt.Sprintf("unknown type, %s, %T", typ.String(), val))