Skip to content

Commit f19a505

Browse files
committed
reformat
Signed-off-by: sadath-12 <[email protected]>
1 parent 5d4e498 commit f19a505

File tree

5 files changed

+49
-105
lines changed

5 files changed

+49
-105
lines changed

examples/pubsub/pub/pub.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var (
2929

3030
func main() {
3131
ctx := context.Background()
32-
publishEventData := []byte("ping")
32+
// publishEventData := []byte("ping")
3333
publishEventsData := []interface{}{"multi-ping", "multi-pong"}
3434

3535
client, err := dapr.NewClient()
@@ -39,9 +39,9 @@ func main() {
3939
defer client.Close()
4040

4141
// Publish a single event
42-
if err := client.PublishEvent(ctx, pubsubName, topicName, publishEventData); err != nil {
43-
panic(err)
44-
}
42+
// if err := client.PublishEvent(ctx, pubsubName, topicName, publishEventData); err != nil {
43+
// panic(err)
44+
// }
4545

4646
// Publish multiple events
4747
if res := client.PublishEvents(ctx, pubsubName, topicName, publishEventsData); res.Error != nil {

examples/pubsub/sub/sub.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
6666
return false, nil
6767
}
6868

69-
func bulkeventHandler(ctx context.Context, e []common.BulkTopic) (retry bool, err error) {
69+
func bulkeventHandler(ctx context.Context, e []common.TopicEvent) (retry bool, err error) {
7070
for _, event := range e {
7171
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", event.PubsubName, event.Topic, event.ID, event.Data)
7272
}

service/common/service.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type Service interface {
5555
type (
5656
ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error)
5757
TopicEventHandler func(ctx context.Context, e *TopicEvent) (retry bool, err error)
58-
BulkTopicEventHandler func(ctx context.Context, e []BulkTopic) (retry bool, err error)
58+
BulkTopicEventHandler func(ctx context.Context, e []TopicEvent) (retry bool, err error)
5959
BindingInvocationHandler func(ctx context.Context, in *BindingEvent) (out []byte, err error)
6060
HealthCheckHandler func(context.Context) error
6161
)

service/http/topic.go

+41-97
Original file line numberDiff line numberDiff line change
@@ -321,58 +321,20 @@ func (in topicEventJSON) getData() (data any, rawData []byte) {
321321
return data, rawData
322322
}
323323

324-
func (in BulkTopicJson) getData() (data any, rawData []byte) {
325-
var (
326-
err error
327-
v any
328-
)
329-
if len(in.Data) > 0 {
330-
rawData = []byte(in.Data)
331-
data = rawData
332-
// We can assume that rawData is valid JSON
333-
// without checking in.DataContentType == "application/json".
334-
if err = json.Unmarshal(rawData, &v); err == nil {
335-
data = v
336-
// Handling of JSON base64 encoded or escaped in a string.
337-
if str, ok := v.(string); ok {
338-
// This is the path that will most likely succeed.
339-
var (
340-
vString any
341-
decoded []byte
342-
)
343-
if err = json.Unmarshal([]byte(str), &vString); err == nil {
344-
data = vString
345-
} else if decoded, err = base64.StdEncoding.DecodeString(str); err == nil {
346-
// Decoded Base64 encoded JSON does not seem to be in the spec
347-
// but it is in existing unit tests so this handles that case.
348-
var vBase64 any
349-
if err = json.Unmarshal(decoded, &vBase64); err == nil {
350-
data = vBase64
351-
}
352-
}
353-
}
354-
}
355-
}
356-
357-
return data, rawData
324+
type BulkSubscribeMessageItem struct {
325+
EntryId string `json:"entryId"` //nolint:stylecheck
326+
Event interface{} `json:"event"`
327+
Metadata map[string]string `json:"metadata"`
328+
ContentType string `json:"contentType,omitempty"`
358329
}
359330

360-
type BulkTopicJson struct {
361-
ContentType string `json:"contentType"`
362-
EntryID string `json:"entryId"`
363-
Event map[string]string `json:"event"`
364-
Data json.RawMessage `json:"data"`
365-
DataContentType string `json:"datacontenttype"`
366-
ID string `json:"id"`
367-
PubsubName string `json:"pubsubname"`
368-
Source string `json:"source"`
369-
SpecVersion string `json:"specversion"`
370-
Time string `json:"time"`
371-
Topic string `json:"topic"`
372-
TraceID string `json:"traceid"`
373-
TraceParent string `json:"traceparent"`
374-
TraceState string `json:"tracestate"`
375-
Metadata interface{} `json:"metadata"`
331+
type BulkSubscribeEnvelope struct {
332+
ID string
333+
Entries []BulkSubscribeMessageItem
334+
Metadata map[string]string
335+
Topic string
336+
Pubsub string
337+
EventType string
376338
}
377339

378340
// == APP == the item is {application/cloudevents+json 5e582fd2-f1c4-47bd-81b1-803fb7e86552 map[data:multi-pong datacontenttype:text/plain id:92fc5348-097d-4b9f-b093-7e6fcda77add pubsubname:messages source:pub specversion:1.0 time:2023-12-02T11:42:31+05:30 topic:neworder traceid:00-a0373ef078e14e8db358c06e0ec18b27-d8dae6b5080eb9da-01 traceparent:00-a0373ef078e14e8db358c06e0ec18b27-d8dae6b5080eb9da-01 tracestate: type:com.dapr.event.sent] [] <nil>}
@@ -410,88 +372,70 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
410372
}
411373

412374
// deserialize the event
413-
var ins map[string]interface{}
375+
var ins BulkSubscribeEnvelope
414376
if err = json.Unmarshal(body, &ins); err != nil {
415377
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
416378
return
417379
}
418380

419-
entriesInterface, ok := ins["entries"].([]interface{})
420-
if !ok {
421-
// Handle the error or return an error response
422-
http.Error(w, "Entries format error", PubSubHandlerDropStatusCode)
423-
return
424-
}
425-
426-
statuses := make([]BulkSubscribeResponseEntry, 0, len(entriesInterface))
427-
428-
var messages []common.BulkTopic
429-
for _, entry := range entriesInterface {
430-
itemMap, ok := entry.(map[string]interface{})
431-
if !ok {
432-
http.Error(w, "Entry format error", PubSubHandlerDropStatusCode)
433-
return
434-
}
381+
statuses := make([]BulkSubscribeResponseEntry, 0, len(ins.Entries))
435382

436-
itemJSON, err := json.Marshal(itemMap["event"])
383+
var messages []common.TopicEvent
384+
for _, entry := range ins.Entries {
385+
itemJSON, err := json.Marshal(entry.Event)
437386
if err != nil {
438387
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
439388
return
440389
}
441-
var item BulkTopicJson
390+
var in topicEventJSON
442391

443-
if err := json.Unmarshal(itemJSON, &item); err != nil {
392+
if err := json.Unmarshal(itemJSON, &in); err != nil {
444393
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
445394
return
446395
}
447-
data, rawData := item.getData()
448-
449-
450-
if item.PubsubName == "" {
451-
item.PubsubName = sub.PubsubName
396+
if in.PubsubName == "" {
397+
in.Topic = sub.PubsubName
452398
}
453-
454-
if item.Topic == "" {
455-
item.Topic = sub.Topic
399+
if in.Topic == "" {
400+
in.Topic = sub.Topic
456401
}
402+
data, rawData := in.getData()
457403

458404
statuses = append(statuses, BulkSubscribeResponseEntry{
459-
entryId: item.EntryID,
405+
entryId: in.ID,
460406
status: SubscriptionResponseStatusSuccess,
461407
},
462408
)
463409

464-
newItem := common.BulkTopic{
465-
ContentType: item.ContentType,
466-
EntryID: item.EntryID,
467-
Event: item.Event,
410+
te := common.TopicEvent{
411+
ID: in.ID,
412+
SpecVersion: in.SpecVersion,
413+
Type: in.Type,
414+
Source: in.Source,
415+
DataContentType: in.DataContentType,
468416
Data: data,
469417
RawData: rawData,
470-
DataContentType: item.DataContentType,
471-
ID: item.EntryID,
472-
PubsubName: item.PubsubName,
473-
Source: item.Source,
474-
SpecVersion: item.SpecVersion,
475-
Metadata: item.Metadata,
476-
Time: item.Time,
477-
Topic: item.Topic,
478-
TraceID: item.TraceID,
479-
TraceParent: item.TraceParent,
480-
TraceState: item.TraceState,
418+
DataBase64: in.DataBase64,
419+
Subject: in.Subject,
420+
PubsubName: in.PubsubName,
421+
Topic: in.Topic,
481422
}
482423

483-
messages = append(messages, newItem)
424+
messages = append(messages, te)
484425
}
485426
resp := BulkSubscribeResponse{
486427
statuses: statuses,
487428
}
488-
responseJSON, err := json.Marshal(resp)
489429
if err != nil {
490430
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
491431
return
492432
}
493433
w.Header().Add("Content-Type", "application/json")
494-
w.Write(responseJSON)
434+
if err := json.NewEncoder(w).Encode(resp); err != nil {
435+
http.Error(w, err.Error(), http.StatusInternalServerError)
436+
return
437+
}
438+
w.WriteHeader(http.StatusOK)
495439

496440
retry, err := fn(r.Context(), messages)
497441
if err == nil {

service/internal/topicregistrar.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,14 @@ func (m TopicRegistrar) AddBulkSubscription(sub *common.Subscription, fn common.
107107
if err := ts.Subscription.SetDefaultRoute(sub.Route); err != nil {
108108
return err
109109
}
110-
ts.DefaultBulkHandler = func(ctx context.Context, e []common.BulkTopic) (retry bool, err error) {
110+
ts.DefaultBulkHandler = func(ctx context.Context, e []common.TopicEvent) (retry bool, err error) {
111111
return false,nil
112112
}
113113
ts.DefaultHandler = func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
114114
return false,nil
115115
}
116116
}
117-
ts.BulkRouteHandlers[sub.Route] = func(ctx context.Context, e []common.BulkTopic) (retry bool, err error) {
117+
ts.BulkRouteHandlers[sub.Route] = func(ctx context.Context, e []common.TopicEvent) (retry bool, err error) {
118118
return false,nil
119119
}
120120
ts.RouteHandlers[sub.Route] = func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {

0 commit comments

Comments
 (0)