Skip to content

Commit 8a7a61b

Browse files
pkedydmitsh
andauthored
Decode CloudEvent Data consistently between HTTP and gRPC (#230)
* Decode CloudEvent Data consistently between HTTP and gRPC Signed-off-by: Phil Kedy <[email protected]> * linter issue Signed-off-by: Phil Kedy <[email protected]> * Added parsing media type Signed-off-by: Phil Kedy <[email protected]> * Adding extension json media type handling Signed-off-by: Phil Kedy <[email protected]> * add TTL to actor timer/reminder requests (#225) Signed-off-by: Phil Kedy <[email protected]> * Adding more tests Signed-off-by: Phil Kedy <[email protected]> * Tweak Signed-off-by: Phil Kedy <[email protected]> Co-authored-by: Dmitry Shmulevich <[email protected]>
1 parent 15fc672 commit 8a7a61b

File tree

5 files changed

+331
-4
lines changed

5 files changed

+331
-4
lines changed

service/common/type.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package common
22

3+
import (
4+
"encoding/json"
5+
)
6+
37
// TopicEvent is the content of the inbound topic message.
48
type TopicEvent struct {
59
// ID identifies the event.
@@ -15,8 +19,12 @@ type TopicEvent struct {
1519
// The content of the event.
1620
// Note, this is why the gRPC and HTTP implementations need separate structs for cloud events.
1721
Data interface{} `json:"data"`
22+
// The content of the event represented as raw bytes.
23+
// This can be deserialized into a Go struct using `Struct`.
24+
RawData []byte `json:"-"`
1825
// The base64 encoding content of the event.
19-
// Note, this is processing rawPayload and binary content types .
26+
// Note, this is processing rawPayload and binary content types.
27+
// This field is deprecated and will be removed in the future.
2028
DataBase64 string `json:"data_base64,omitempty"`
2129
// Cloud event subject
2230
Subject string `json:"subject"`
@@ -26,6 +34,12 @@ type TopicEvent struct {
2634
PubsubName string `json:"pubsubname"`
2735
}
2836

37+
func (e *TopicEvent) Struct(target interface{}) error {
38+
// TODO: Enhance to inspect DataContentType for the best
39+
// deserialization method.
40+
return json.Unmarshal(e.RawData, target)
41+
}
42+
2943
// InvocationEvent represents the input and output of binding invocation.
3044
type InvocationEvent struct {
3145
// Data is the payload that the input bindings sent.

service/grpc/topic.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package grpc
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
7+
"mime"
8+
"strings"
69

710
"github.com/golang/protobuf/ptypes/empty"
811
"github.com/pkg/errors"
@@ -62,13 +65,38 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p
6265
}
6366
key := fmt.Sprintf("%s-%s", in.PubsubName, in.Topic)
6467
if h, ok := s.topicSubscriptions[key]; ok {
68+
data := interface{}(in.Data)
69+
if len(in.Data) > 0 {
70+
mediaType, _, err := mime.ParseMediaType(in.DataContentType)
71+
if err == nil {
72+
var v interface{}
73+
switch mediaType {
74+
case "application/json":
75+
if err := json.Unmarshal(in.Data, &v); err == nil {
76+
data = v
77+
}
78+
case "text/plain":
79+
// Assume UTF-8 encoded string.
80+
data = string(in.Data)
81+
default:
82+
if strings.HasPrefix(mediaType, "application/") &&
83+
strings.HasSuffix(mediaType, "+json") {
84+
if err := json.Unmarshal(in.Data, &v); err == nil {
85+
data = v
86+
}
87+
}
88+
}
89+
}
90+
}
91+
6592
e := &common.TopicEvent{
6693
ID: in.Id,
6794
Source: in.Source,
6895
Type: in.Type,
6996
SpecVersion: in.SpecVersion,
7097
DataContentType: in.DataContentType,
71-
Data: in.Data,
98+
Data: data,
99+
RawData: in.Data,
72100
Topic: in.Topic,
73101
PubsubName: in.PubsubName,
74102
}

service/grpc/topic_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,79 @@ func eventHandlerWithRetryError(ctx context.Context, event *common.TopicEvent) (
161161
func eventHandlerWithError(ctx context.Context, event *common.TopicEvent) (retry bool, err error) {
162162
return false, errors.New("nil event")
163163
}
164+
165+
func TestEventDataHandling(t *testing.T) {
166+
ctx := context.Background()
167+
168+
tests := map[string]struct {
169+
contentType string
170+
data string
171+
value interface{}
172+
}{
173+
"JSON bytes": {
174+
contentType: "application/json",
175+
data: `{"message":"hello"}`,
176+
value: map[string]interface{}{
177+
"message": "hello",
178+
},
179+
},
180+
"JSON entension media type bytes": {
181+
contentType: "application/extension+json",
182+
data: `{"message":"hello"}`,
183+
value: map[string]interface{}{
184+
"message": "hello",
185+
},
186+
},
187+
"Test": {
188+
contentType: "text/plain",
189+
data: `message = hello`,
190+
value: `message = hello`,
191+
},
192+
"Other": {
193+
contentType: "application/octet-stream",
194+
data: `message = hello`,
195+
value: []byte(`message = hello`),
196+
},
197+
}
198+
199+
s := getTestServer()
200+
201+
sub := &common.Subscription{
202+
PubsubName: "messages",
203+
Topic: "test",
204+
Route: "/test",
205+
Metadata: map[string]string{},
206+
}
207+
208+
recv := make(chan struct{}, 1)
209+
var topicEvent *common.TopicEvent
210+
handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
211+
topicEvent = e
212+
recv <- struct{}{}
213+
214+
return false, nil
215+
}
216+
err := s.AddTopicEventHandler(sub, handler)
217+
assert.NoErrorf(t, err, "error adding event handler")
218+
219+
startTestServer(s)
220+
221+
for name, tt := range tests {
222+
t.Run(name, func(t *testing.T) {
223+
in := runtime.TopicEventRequest{
224+
Id: "a123",
225+
Source: "test",
226+
Type: "test",
227+
SpecVersion: "v1.0",
228+
DataContentType: tt.contentType,
229+
Data: []byte(tt.data),
230+
Topic: sub.Topic,
231+
PubsubName: sub.PubsubName,
232+
}
233+
234+
s.OnTopicEvent(ctx, &in)
235+
<-recv
236+
assert.Equal(t, tt.value, topicEvent.Data)
237+
})
238+
}
239+
}

service/http/topic.go

+88-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package http
22

33
import (
44
"context"
5+
"encoding/base64"
56
"encoding/json"
67
"fmt"
78
"io/ioutil"
@@ -29,6 +30,34 @@ const (
2930
PubSubHandlerDropStatusCode int = http.StatusSeeOther
3031
)
3132

33+
// topicEventJSON is identical to `common.TopicEvent`
34+
// except for it treats `data` as a json.RawMessage so it can
35+
// be used as bytes or interface{}.
36+
type topicEventJSON struct {
37+
// ID identifies the event.
38+
ID string `json:"id"`
39+
// The version of the CloudEvents specification.
40+
SpecVersion string `json:"specversion"`
41+
// The type of event related to the originating occurrence.
42+
Type string `json:"type"`
43+
// Source identifies the context in which an event happened.
44+
Source string `json:"source"`
45+
// The content type of data value.
46+
DataContentType string `json:"datacontenttype"`
47+
// The content of the event.
48+
// Note, this is why the gRPC and HTTP implementations need separate structs for cloud events.
49+
Data json.RawMessage `json:"data"`
50+
// The base64 encoding content of the event.
51+
// Note, this is processing rawPayload and binary content types.
52+
DataBase64 string `json:"data_base64,omitempty"`
53+
// Cloud event subject
54+
Subject string `json:"subject"`
55+
// The pubsub topic which publisher sent to.
56+
Topic string `json:"topic"`
57+
// PubsubName is name of the pub/sub this message came from
58+
PubsubName string `json:"pubsubname"`
59+
}
60+
3261
func (s *Server) registerBaseHandler() {
3362
// register subscribe handler
3463
f := func(w http.ResponseWriter, r *http.Request) {
@@ -168,21 +197,78 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn func(ctx cont
168197
}
169198

170199
// deserialize the event
171-
var in common.TopicEvent
200+
var in topicEventJSON
172201
if err := json.NewDecoder(r.Body).Decode(&in); err != nil {
173202
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
174203
return
175204
}
176205

206+
if in.PubsubName == "" {
207+
in.Topic = sub.PubsubName
208+
}
177209
if in.Topic == "" {
178210
in.Topic = sub.Topic
179211
}
180212

213+
var data interface{}
214+
var rawData []byte
215+
if len(in.Data) > 0 {
216+
rawData = []byte(in.Data)
217+
data = rawData
218+
var v interface{}
219+
// We can assume that rawData is valid JSON
220+
// without checking in.DataContentType == "application/json".
221+
if err := json.Unmarshal(rawData, &v); err == nil {
222+
data = v
223+
// Handling of JSON base64 encoded or escaped in a string.
224+
if str, ok := v.(string); ok {
225+
// This is the path that will most likely succeed.
226+
var vString interface{}
227+
if err := json.Unmarshal([]byte(str), &vString); err == nil {
228+
data = vString
229+
} else if decoded, err := base64.StdEncoding.DecodeString(str); err == nil {
230+
// Decoded Base64 encoded JSON does not seem to be in the spec
231+
// but it is in existing unit tests so this handles that case.
232+
var vBase64 interface{}
233+
if err := json.Unmarshal(decoded, &vBase64); err == nil {
234+
data = vBase64
235+
}
236+
}
237+
}
238+
}
239+
} else if in.DataBase64 != "" {
240+
var err error
241+
rawData, err = base64.StdEncoding.DecodeString(in.DataBase64)
242+
if err == nil {
243+
data = rawData
244+
if in.DataContentType == "application/json" {
245+
var v interface{}
246+
if err := json.Unmarshal(rawData, &v); err == nil {
247+
data = v
248+
}
249+
}
250+
}
251+
}
252+
253+
te := common.TopicEvent{
254+
ID: in.ID,
255+
SpecVersion: in.SpecVersion,
256+
Type: in.Type,
257+
Source: in.Source,
258+
DataContentType: in.DataContentType,
259+
Data: data,
260+
RawData: rawData,
261+
DataBase64: in.DataBase64,
262+
Subject: in.Subject,
263+
PubsubName: in.PubsubName,
264+
Topic: in.Topic,
265+
}
266+
181267
w.Header().Add("Content-Type", "application/json")
182268
w.WriteHeader(http.StatusOK)
183269

184270
// execute user handler
185-
retry, err := fn(r.Context(), &in)
271+
retry, err := fn(r.Context(), &te)
186272
if err == nil {
187273
writeStatus(w, common.SubscriptionResponseStatusSuccess)
188274
return

0 commit comments

Comments
 (0)