diff --git a/mq/interface.go b/mq/interface.go index e173a2e..74f8931 100644 --- a/mq/interface.go +++ b/mq/interface.go @@ -130,7 +130,7 @@ func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{ reader := defaultInstanceManager.getReader(ctx, conf) if reader == nil { slog.Errorf(ctx, "%s getReader err, topic: %s", fun, topic) - return nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic) + return ctx, fmt.Errorf("%s, getReader err, topic: %s", fun, topic) } var payload Payload @@ -145,11 +145,11 @@ func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{ if err != nil { slog.Errorf(ctx, "%s ReadMsg err: %v, topic: %s", fun, err, topic) - return nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic) + return ctx, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic) } if len(payload.Value) == 0 { - return context.TODO(), nil + return ctx, nil } mctx, err := parsePayload(&payload, "mq.ReadMsgByGroup", value) @@ -181,7 +181,7 @@ func ReadMsgByPartition(ctx context.Context, topic string, partition int, value reader := defaultInstanceManager.getReader(ctx, conf) if reader == nil { slog.Errorf(ctx, "%s getReader err, topic: %s", fun, topic) - return nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic) + return ctx, fmt.Errorf("%s, getReader err, topic: %s", fun, topic) } var payload Payload @@ -196,11 +196,11 @@ func ReadMsgByPartition(ctx context.Context, topic string, partition int, value if err != nil { slog.Errorf(ctx, "%s ReadMsg err: %v, topic: %s", fun, err, topic) - return nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic) + return ctx, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic) } if len(payload.Value) == 0 { - return context.TODO(), nil + return ctx, nil } mctx, err := parsePayload(&payload, "mq.ReadMsgByPartition", value) @@ -232,7 +232,7 @@ func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface reader := defaultInstanceManager.getReader(ctx, conf) if reader == nil { slog.Errorf(ctx, "%s getReader err, topic: %s", fun, topic) - return nil, nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic) + return ctx, nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic) } var payload Payload @@ -247,11 +247,11 @@ func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface if err != nil { slog.Errorf(ctx, "%s ReadMsg err: %v, topic: %s", fun, err, topic) - return nil, nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic) + return ctx, nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic) } if len(payload.Value) == 0 { - return context.TODO(), handler, nil + return ctx, handler, nil } mctx, err := parsePayload(&payload, "mq.FetchMsgByGroup", value) @@ -324,7 +324,7 @@ func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (contex if client == nil { slog.Errorf(ctx, "%s getDelayClient nil, topic: %s", fun, topic) err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic) - return nil, nil, err + return ctx, nil, err } var payload Payload @@ -333,16 +333,16 @@ func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (contex job, err := client.Read(ctx, client.ttrSeconds) if err != nil { slog.Errorf(ctx, "%s Read err: %v, topic: %s", fun, err, topic) - return nil, nil, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic) + return ctx, nil, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic) } err = json.Unmarshal(job.Body, &payload) if err != nil { slog.Errorf(ctx, "%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) - return nil, nil, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) + return ctx, nil, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) } err = json.Unmarshal(job.Body, &value) if err != nil { - return nil, nil, err + return ctx, nil, err } handler := NewDelayHandler(client, job.ID) @@ -353,7 +353,7 @@ func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (contex } if len(payload.Value) == 0 { - return context.TODO(), handler, nil + return ctx, handler, nil } mctx, err := parsePayload(&payload, "mq.FetchDelayMsg", value) mspan := opentracing.SpanFromContext(mctx) @@ -385,7 +385,7 @@ func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context if client == nil { slog.Errorf(ctx, "%s getDelayClient nil, topic: %s", fun, topic) err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic) - return nil, err + return ctx, err } var payload Payload @@ -394,16 +394,16 @@ func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context job, err := client.Read(ctx, client.ttrSeconds) if err != nil { slog.Errorf(ctx, "%s Read err: %v, topic: %s", fun, err, topic) - return nil, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic) + return ctx, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic) } err = json.Unmarshal(job.Body, &payload) if err != nil { slog.Errorf(ctx, "%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) - return nil, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) + return ctx, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic) } err = json.Unmarshal(job.Body, &value) if err != nil { - return nil, err + return ctx, err } dur := st.Duration() @@ -413,10 +413,10 @@ func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context err = client.Ack(ctx, job.ID) if err != nil { slog.Errorf(ctx, "%s, delay Ack err: %v, jobID", fun, err, job.ID) - return nil, err + return ctx, err } if len(payload.Value) == 0 { - return context.TODO(), nil + return ctx, nil } mctx, err := parsePayload(&payload, "mq.ReadDelayMsg", value) mspan := opentracing.SpanFromContext(mctx) @@ -458,3 +458,11 @@ func init() { } WatchUpdate(ctx) } + +func wrapTopicFromContext(ctx context.Context, topic string) string { + group, ok := scontext.GetControlRouteGroup(ctx) + if !ok { + return topic + } + return fmt.Sprintf("%s_%s", topic, group) +} diff --git a/mq/reader.go b/mq/reader.go index d1bd1ca..ddfe034 100644 --- a/mq/reader.go +++ b/mq/reader.go @@ -33,7 +33,7 @@ func NewGroupReader(ctx context.Context, topic, groupId string) (Reader, error) mqType := config.MQType switch mqType { case MQTypeKafka: - return NewKafkaReader(config.MQAddr, topic, groupId, 0, 1, 10e6, config.CommitInterval), nil + return NewKafkaReader(config.MQAddr, wrapTopicFromContext(ctx, topic), groupId, 0, 1, 10e6, config.CommitInterval), nil default: return nil, fmt.Errorf("mqType %d error", mqType) @@ -55,7 +55,7 @@ func NewPartitionReader(ctx context.Context, topic string, partition int) (Reade mqType := config.MQType switch mqType { case MQTypeKafka: - reader := NewKafkaReader(config.MQAddr, topic, "", partition, 1, 10e6, 0) + reader := NewKafkaReader(config.MQAddr, wrapTopicFromContext(ctx, topic), "", partition, 1, 10e6, 0) if len(offsetAt) == 0 { return nil, fmt.Errorf("no offsetAt config found") } diff --git a/mq/writer.go b/mq/writer.go index 4d3ad0f..64b0a76 100644 --- a/mq/writer.go +++ b/mq/writer.go @@ -24,7 +24,7 @@ func NewWriter(ctx context.Context, topic string) (Writer, error) { mqType := config.MQType switch mqType { case MQTypeKafka: - return NewKafkaWriter(config.MQAddr, topic), nil + return NewKafkaWriter(config.MQAddr, wrapTopicFromContext(ctx, topic)), nil default: return nil, fmt.Errorf("mqType %d error", mqType)