Skip to content

Commit

Permalink
Merge pull request #155 from brzyangg/feature-sql-breaker
Browse files Browse the repository at this point in the history
mq:区分泳道隔离消费  修复readmsg返回nil的问题
  • Loading branch information
niubell authored Mar 18, 2020
2 parents 20b3618 + 28bc515 commit c4f71bd
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 23 deletions.
48 changes: 28 additions & 20 deletions mq/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{
reader := defaultInstanceManager.getReader(ctx, conf)
if reader == nil {
slog.Errorf(ctx, "%s getReader err, topic: %s", fun, topic)
return nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic)
return ctx, fmt.Errorf("%s, getReader err, topic: %s", fun, topic)
}

var payload Payload
Expand All @@ -145,11 +145,11 @@ func ReadMsgByGroup(ctx context.Context, topic, groupId string, value interface{

if err != nil {
slog.Errorf(ctx, "%s ReadMsg err: %v, topic: %s", fun, err, topic)
return nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic)
return ctx, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic)
}

if len(payload.Value) == 0 {
return context.TODO(), nil
return ctx, nil
}

mctx, err := parsePayload(&payload, "mq.ReadMsgByGroup", value)
Expand Down Expand Up @@ -181,7 +181,7 @@ func ReadMsgByPartition(ctx context.Context, topic string, partition int, value
reader := defaultInstanceManager.getReader(ctx, conf)
if reader == nil {
slog.Errorf(ctx, "%s getReader err, topic: %s", fun, topic)
return nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic)
return ctx, fmt.Errorf("%s, getReader err, topic: %s", fun, topic)
}

var payload Payload
Expand All @@ -196,11 +196,11 @@ func ReadMsgByPartition(ctx context.Context, topic string, partition int, value

if err != nil {
slog.Errorf(ctx, "%s ReadMsg err: %v, topic: %s", fun, err, topic)
return nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic)
return ctx, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic)
}

if len(payload.Value) == 0 {
return context.TODO(), nil
return ctx, nil
}

mctx, err := parsePayload(&payload, "mq.ReadMsgByPartition", value)
Expand Down Expand Up @@ -232,7 +232,7 @@ func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface
reader := defaultInstanceManager.getReader(ctx, conf)
if reader == nil {
slog.Errorf(ctx, "%s getReader err, topic: %s", fun, topic)
return nil, nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic)
return ctx, nil, fmt.Errorf("%s, getReader err, topic: %s", fun, topic)
}

var payload Payload
Expand All @@ -247,11 +247,11 @@ func FetchMsgByGroup(ctx context.Context, topic, groupId string, value interface

if err != nil {
slog.Errorf(ctx, "%s ReadMsg err: %v, topic: %s", fun, err, topic)
return nil, nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic)
return ctx, nil, fmt.Errorf("%s, ReadMsg err: %v, topic: %s", fun, err, topic)
}

if len(payload.Value) == 0 {
return context.TODO(), handler, nil
return ctx, handler, nil
}

mctx, err := parsePayload(&payload, "mq.FetchMsgByGroup", value)
Expand Down Expand Up @@ -324,7 +324,7 @@ func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (contex
if client == nil {
slog.Errorf(ctx, "%s getDelayClient nil, topic: %s", fun, topic)
err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic)
return nil, nil, err
return ctx, nil, err
}

var payload Payload
Expand All @@ -333,16 +333,16 @@ func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (contex
job, err := client.Read(ctx, client.ttrSeconds)
if err != nil {
slog.Errorf(ctx, "%s Read err: %v, topic: %s", fun, err, topic)
return nil, nil, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic)
return ctx, nil, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic)
}
err = json.Unmarshal(job.Body, &payload)
if err != nil {
slog.Errorf(ctx, "%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
return nil, nil, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
return ctx, nil, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
}
err = json.Unmarshal(job.Body, &value)
if err != nil {
return nil, nil, err
return ctx, nil, err
}

handler := NewDelayHandler(client, job.ID)
Expand All @@ -353,7 +353,7 @@ func FetchDelayMsg(ctx context.Context, topic string, value interface{}) (contex
}

if len(payload.Value) == 0 {
return context.TODO(), handler, nil
return ctx, handler, nil
}
mctx, err := parsePayload(&payload, "mq.FetchDelayMsg", value)
mspan := opentracing.SpanFromContext(mctx)
Expand Down Expand Up @@ -385,7 +385,7 @@ func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context
if client == nil {
slog.Errorf(ctx, "%s getDelayClient nil, topic: %s", fun, topic)
err := fmt.Errorf("%s, getDelayClient nil, topic: %s", fun, topic)
return nil, err
return ctx, err
}

var payload Payload
Expand All @@ -394,16 +394,16 @@ func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context
job, err := client.Read(ctx, client.ttrSeconds)
if err != nil {
slog.Errorf(ctx, "%s Read err: %v, topic: %s", fun, err, topic)
return nil, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic)
return ctx, fmt.Errorf("%s, Read err: %v, topic: %s", fun, err, topic)
}
err = json.Unmarshal(job.Body, &payload)
if err != nil {
slog.Errorf(ctx, "%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
return nil, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
return ctx, fmt.Errorf("%s, Unmarshal payload err: %v, topic: %s", fun, err, topic)
}
err = json.Unmarshal(job.Body, &value)
if err != nil {
return nil, err
return ctx, err
}

dur := st.Duration()
Expand All @@ -413,10 +413,10 @@ func ReadDelayMsg(ctx context.Context, topic string, value interface{}) (context
err = client.Ack(ctx, job.ID)
if err != nil {
slog.Errorf(ctx, "%s, delay Ack err: %v, jobID", fun, err, job.ID)
return nil, err
return ctx, err
}
if len(payload.Value) == 0 {
return context.TODO(), nil
return ctx, nil
}
mctx, err := parsePayload(&payload, "mq.ReadDelayMsg", value)
mspan := opentracing.SpanFromContext(mctx)
Expand Down Expand Up @@ -458,3 +458,11 @@ func init() {
}
WatchUpdate(ctx)
}

func wrapTopicFromContext(ctx context.Context, topic string) string {
group, ok := scontext.GetControlRouteGroup(ctx)
if !ok {
return topic
}
return fmt.Sprintf("%s_%s", topic, group)
}
4 changes: 2 additions & 2 deletions mq/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewGroupReader(ctx context.Context, topic, groupId string) (Reader, error)
mqType := config.MQType
switch mqType {
case MQTypeKafka:
return NewKafkaReader(config.MQAddr, topic, groupId, 0, 1, 10e6, config.CommitInterval), nil
return NewKafkaReader(config.MQAddr, wrapTopicFromContext(ctx, topic), groupId, 0, 1, 10e6, config.CommitInterval), nil

default:
return nil, fmt.Errorf("mqType %d error", mqType)
Expand All @@ -55,7 +55,7 @@ func NewPartitionReader(ctx context.Context, topic string, partition int) (Reade
mqType := config.MQType
switch mqType {
case MQTypeKafka:
reader := NewKafkaReader(config.MQAddr, topic, "", partition, 1, 10e6, 0)
reader := NewKafkaReader(config.MQAddr, wrapTopicFromContext(ctx, topic), "", partition, 1, 10e6, 0)
if len(offsetAt) == 0 {
return nil, fmt.Errorf("no offsetAt config found")
}
Expand Down
2 changes: 1 addition & 1 deletion mq/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewWriter(ctx context.Context, topic string) (Writer, error) {
mqType := config.MQType
switch mqType {
case MQTypeKafka:
return NewKafkaWriter(config.MQAddr, topic), nil
return NewKafkaWriter(config.MQAddr, wrapTopicFromContext(ctx, topic)), nil

default:
return nil, fmt.Errorf("mqType %d error", mqType)
Expand Down

0 comments on commit c4f71bd

Please sign in to comment.