diff --git a/pkg/source/pulsar.go b/pkg/source/pulsar.go index 002c171..7dab3e5 100644 --- a/pkg/source/pulsar.go +++ b/pkg/source/pulsar.go @@ -3,17 +3,22 @@ package source import ( "context" "encoding/hex" + "fmt" "os" "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/bits-and-blooms/bitset" "github.com/replicase/pgcapture/pkg/cursor" "github.com/replicase/pgcapture/pkg/pb" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) -var ReceiverQueueSize = 5000 +const ( + ReceiverQueueSize = 5000 + AckTrackerSize = 1000 +) type PulsarReaderSource struct { BaseSource @@ -160,9 +165,10 @@ type PulsarConsumerSource struct { PulsarReplicateState bool PulsarMaxReconnect *uint - client pulsar.Client - consumer pulsar.Consumer - log *logrus.Entry + client pulsar.Client + consumer pulsar.Consumer + log *logrus.Entry + ackTrackers map[string]*ackTracker } func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Change, err error) { @@ -189,6 +195,8 @@ func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Chang return nil, err } + p.ackTrackers = make(map[string]*ackTracker, AckTrackerSize) + p.log = logrus.WithFields(logrus.Fields{ "From": "PulsarConsumerSource", "Topic": p.PulsarTopic, @@ -222,9 +230,11 @@ func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Chang first = true } - if m.GetChange() == nil { - p.consumer.Ack(msg) - return + if msg.ID().BatchSize() > 1 { + key := p.ackTrackerKey(msg.ID()) + if _, ok := p.ackTrackers[key]; !ok { + p.ackTrackers[key] = newAckTracker(uint(msg.ID().BatchSize())) + } } change = Change{Checkpoint: checkpoint, Message: m} @@ -239,7 +249,13 @@ func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Chang func (p *PulsarConsumerSource) Commit(cp cursor.Checkpoint) { if mid, err := pulsar.DeserializeMessageID(cp.Data); err == nil { - p.consumer.AckID(mid) + tracker, ok := p.ackTrackers[p.ackTrackerKey(mid)] + if ok && tracker.ack(int(mid.BatchIdx())) { + _ = p.consumer.AckID(mid) + delete(p.ackTrackers, p.ackTrackerKey(mid)) + } else if !ok { + _ = p.consumer.AckID(mid) + } } } @@ -248,3 +264,31 @@ func (p *PulsarConsumerSource) Requeue(cp cursor.Checkpoint, reason string) { p.consumer.NackID(mid) } } + +func (p *PulsarConsumerSource) ackTrackerKey(id pulsar.MessageID) string { + return fmt.Sprintf("%d:%d", id.LedgerID(), id.EntryID()) +} + +type ackTracker struct { + size uint + batchIDs *bitset.BitSet +} + +func newAckTracker(size uint) *ackTracker { + batchIDs := bitset.New(size) + for i := uint(0); i < size; i++ { + batchIDs.Set(i) + } + return &ackTracker{ + size: size, + batchIDs: batchIDs, + } +} + +func (t *ackTracker) ack(batchID int) bool { + if batchID < 0 { + return true + } + t.batchIDs.Clear(uint(batchID)) + return t.batchIDs.None() +} diff --git a/pkg/source/pulsar_test.go b/pkg/source/pulsar_test.go index b9d2caf..731c446 100644 --- a/pkg/source/pulsar_test.go +++ b/pkg/source/pulsar_test.go @@ -132,8 +132,10 @@ func TestPulsarConsumerSource(t *testing.T) { defer client.Close() producer, err := client.CreateProducer(pulsar.ProducerOptions{ - Topic: topic, - Name: topic, + Topic: topic, + Name: topic, + BatchingMaxPublishDelay: 3 * time.Second, + BatchingMaxMessages: 3, }) if err != nil { t.Fatal(err) @@ -155,58 +157,72 @@ func TestPulsarConsumerSource(t *testing.T) { t.Fatal(err) } - // begin, commit message should be ignored - cp := cursor.Checkpoint{LSN: 1} - bs, _ := proto.Marshal(&pb.Message{Type: &pb.Message_Begin{Begin: &pb.Begin{}}}) - if _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ - Key: cp.ToKey(), - Payload: bs, - }); err != nil { - t.Fatal(err) - } - cp = cursor.Checkpoint{LSN: 1} - bs, _ = proto.Marshal(&pb.Message{Type: &pb.Message_Commit{Commit: &pb.Commit{}}}) - if _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ - Key: cp.ToKey(), - Payload: bs, - }); err != nil { - t.Fatal(err) - } - - lsn := 0 - for ; lsn < 3; lsn++ { - cp := cursor.Checkpoint{LSN: uint64(lsn)} - bs, _ := proto.Marshal(&pb.Message{Type: &pb.Message_Change{Change: &pb.Change{Table: strconv.Itoa(lsn)}}}) - if _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ + messages := []*pb.Message{ + { + Type: &pb.Message_Begin{Begin: &pb.Begin{FinalLsn: 0}}, + }, + { + Type: &pb.Message_Change{Change: &pb.Change{Table: "1"}}, + }, + { + Type: &pb.Message_Change{Change: &pb.Change{Table: "2"}}, + }, + { + Type: &pb.Message_Commit{Commit: &pb.Commit{CommitLsn: 3}}, + }, + } + + for i, m := range messages { + cp := cursor.Checkpoint{LSN: uint64(i)} + bs, _ := proto.Marshal(m) + producer.SendAsync(context.Background(), &pulsar.ProducerMessage{ Key: cp.ToKey(), Payload: bs, - }); err != nil { - t.Fatal(err) - } + }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { + if err != nil { + t.Fatal(err) + } + }) } producer.Flush() - for i := 0; i < lsn; i++ { + for i := 0; i < len(messages); i++ { change := <-changes - if c := change.Message.GetChange(); c == nil || c.Table != strconv.Itoa(i) { - t.Fatalf("unexpected %v", change.Message.String()) + msgID, err := pulsar.DeserializeMessageID(change.Checkpoint.Data) + if err != nil { + t.Fatal(err) + } + if msgID.BatchSize() > 1 && msgID.BatchSize() != int32(len(messages)-1) { + t.Fatalf("unexpected pulsar message id batch size %v %v", msgID.BatchSize(), len(messages)) + } + + if i == 0 { + if b := change.Message.GetBegin(); b == nil || b.FinalLsn != uint64(i) { + t.Fatalf("unexpected begin message %v", change.Message.String()) + } + } else if i == len(messages)-1 { + if c := change.Message.GetCommit(); c == nil || c.CommitLsn != uint64(len(messages)-1) { + t.Fatalf("unexpected commit message %v", change.Message.String()) + } + } else { + if c := change.Message.GetChange(); c == nil || c.Table != strconv.Itoa(i) { + t.Fatalf("unexpected change message %v", change.Message.String()) + } } } // stop without ack src.Stop() - // restart to receive same messages, and commit '0' and '2', but abort '1' + // restart to receive same messages, and only abort the last message of the batch src = newPulsarConsumerSource() changes, err = src.Capture(cursor.Checkpoint{}) if err != nil { t.Fatal(err) } - for i := 0; i < lsn; i++ { + + for i := 0; i < len(messages); i++ { change := <-changes - if c := change.Message.GetChange(); c == nil || c.Table != strconv.Itoa(i) { - t.Fatalf("unexpected %v", change.Message.String()) - } - if i == 1 { + if i == 2 { src.Requeue(change.Checkpoint, "") } else { src.Commit(change.Checkpoint) @@ -214,16 +230,19 @@ func TestPulsarConsumerSource(t *testing.T) { } src.Stop() - // the '1' message should be redelivered + // restart to receive same batch messages src = newPulsarConsumerSource() changes, err = src.Capture(cursor.Checkpoint{}) if err != nil { t.Fatal(err) } - change := <-changes - if c := change.Message.GetChange(); c == nil || c.Table != strconv.Itoa(1) { - t.Fatalf("unexpected %v", change.Message.String()) + + // should only redeliver same batch messages + for i := 0; i < len(messages)-1; i++ { + change := <-changes + src.Commit(change.Checkpoint) } + select { case <-changes: t.Fatal("unexpected message")