From fd7de3346cfe3b954369af094692a31372745430 Mon Sep 17 00:00:00 2001 From: wenliang zhu <73632785+juniaoshaonian@users.noreply.github.com> Date: Wed, 8 May 2024 11:50:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(memory)=EF=BC=9A=E6=B7=BB=E5=8A=A0MQ?= =?UTF-8?q?=E5=86=85=E5=AD=98=E5=AE=9E=E7=8E=B0=20(#14)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/e2e/base_test.go | 6 +- internal/e2e/memory_test.go | 44 +++ memory/consumer.go | 174 ++++++++++ memory/consumergroup.go | 296 ++++++++++++++++++ memory/consumergroup_test.go | 84 +++++ .../equaldivide/balancer.go | 50 +++ .../equaldivide/balancer_test.go | 73 +++++ memory/mq.go | 166 ++++++++++ memory/mq_test.go | 39 +++ memory/partition.go | 54 ++++ memory/partition_test.go | 133 ++++++++ memory/producer.go | 63 ++++ memory/produceridgetter/hash/get.go | 33 ++ memory/produceridgetter/hash/get_test.go | 32 ++ memory/topic.go | 97 ++++++ memory/topic_test.go | 55 ++++ memory/type.go | 27 ++ 17 files changed, 1425 insertions(+), 1 deletion(-) create mode 100644 internal/e2e/memory_test.go create mode 100644 memory/consumer.go create mode 100644 memory/consumergroup.go create mode 100644 memory/consumergroup_test.go create mode 100644 memory/consumerpartitionassigner/equaldivide/balancer.go create mode 100644 memory/consumerpartitionassigner/equaldivide/balancer_test.go create mode 100644 memory/mq.go create mode 100644 memory/mq_test.go create mode 100644 memory/partition.go create mode 100644 memory/partition_test.go create mode 100644 memory/producer.go create mode 100644 memory/produceridgetter/hash/get.go create mode 100644 memory/produceridgetter/hash/get_test.go create mode 100644 memory/topic.go create mode 100644 memory/topic_test.go create mode 100644 memory/type.go diff --git a/internal/e2e/base_test.go b/internal/e2e/base_test.go index 9d063c3..8fb8c96 100644 --- a/internal/e2e/base_test.go +++ b/internal/e2e/base_test.go @@ -260,6 +260,10 @@ func (b *TestSuite) TestMQ_Producer() { err := b.messageQueue.CreateTopic(context.Background(), unknownTopic, 1) require.NoError(t, err) require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), unknownTopic)) + // 如果topic不存在会默认创建,不会报错 + notExistTopic := "notExistTopic" + _, err = b.messageQueue.Producer(notExistTopic) + require.NoError(t, err) } func (b *TestSuite) TestMQ_Consumer() { @@ -270,7 +274,7 @@ func (b *TestSuite) TestMQ_Consumer() { err := b.messageQueue.CreateTopic(context.Background(), topic, 1) require.NoError(t, err) require.NoError(t, b.messageQueue.DeleteTopics(context.Background(), topic)) - + // 如果topic不存在会默认创建,不会报错 _, err = b.messageQueue.Consumer(topic, groupID) require.NoError(t, err) } diff --git a/internal/e2e/memory_test.go b/internal/e2e/memory_test.go new file mode 100644 index 0000000..8f51c82 --- /dev/null +++ b/internal/e2e/memory_test.go @@ -0,0 +1,44 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build e2e + +package e2e + +import ( + "context" + "testing" + + "github.com/ecodeclub/mq-api" + "github.com/ecodeclub/mq-api/memory" + "github.com/stretchr/testify/suite" +) + +func TestMemory(t *testing.T) { + suite.Run(t, NewTestSuite( + &MemoryTestSuite{}, + )) +} + +type MemoryTestSuite struct{} + +func (k *MemoryTestSuite) Create() mq.MQ { + memoryMq := memory.NewMQ() + + return memoryMq +} + +func (k *MemoryTestSuite) Ping(ctx context.Context) error { + return nil +} diff --git a/memory/consumer.go b/memory/consumer.go new file mode 100644 index 0000000..3d3c2ec --- /dev/null +++ b/memory/consumer.go @@ -0,0 +1,174 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "context" + "log" + "sync" + "time" + + "github.com/ecodeclub/mq-api" + "github.com/ecodeclub/mq-api/internal/errs" +) + +const ( + interval = 1 * time.Second + defaultMessageChannelSize = 1000 + // 每个分区取数据的上限 + limit = 25 +) + +type Consumer struct { + locker sync.RWMutex + name string + closed bool + // 用于存放分区号,每个元素就是一个分区号 + partitions []*Partition + partitionRecords []PartitionRecord + closeCh chan struct{} + msgCh chan *mq.Message + once sync.Once + reportCh chan *Event + receiveCh chan *Event +} + +func (c *Consumer) Consume(ctx context.Context) (*mq.Message, error) { + if c.isClosed() { + return nil, errs.ErrConsumerIsClosed + } + select { + case val, ok := <-c.msgCh: + if !ok { + return nil, errs.ErrConsumerIsClosed + } + return val, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// 启动Consume +func (c *Consumer) eventLoop() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + log.Printf("消费者 %s 开始消费数据", c.name) + c.consumeAndReport() + log.Printf("消费者 %s 结束消费数据", c.name) + case event, ok := <-c.receiveCh: + if !ok { + return + } + // 处理各种事件 + c.handle(event) + } + } +} + +func (c *Consumer) consumeAndReport() { + for idx, record := range c.partitionRecords { + msgs := c.partitions[record.Index].getBatch(record.Offset, limit) + for _, msg := range msgs { + log.Printf("消费者 %s 消费数据 %v", c.name, msg) + c.msgCh <- msg + } + record.Offset += len(msgs) + errCh := make(chan error, 1) + c.reportCh <- &Event{ + Type: ReportOffsetEvent, + Data: ReportData{ + Records: []PartitionRecord{record}, + ErrChan: errCh, + }, + } + err := <-errCh + if err != nil { + log.Printf("上报偏移量失败:%v", err) + return + } + close(errCh) + c.partitionRecords[idx] = record + } +} + +func (c *Consumer) handle(event *Event) { + switch event.Type { + // 服务端发起的重新加入事件 + case RejoinEvent: + // 消费者上报消费进度 + log.Printf("消费者 %s开始上报消费进度", c.name) + c.reportCh <- &Event{ + Type: RejoinAckEvent, + Data: c.partitionRecords, + } + // 设置消费进度 + partitionInfo := <-c.receiveCh + log.Printf("消费者 %s接收到分区信息 %v", c.name, partitionInfo) + c.partitionRecords, _ = partitionInfo.Data.([]PartitionRecord) + // 返回设置完成的信号 + c.reportCh <- &Event{ + Type: PartitionNotifyAckEvent, + } + case CloseEvent: + // 未返回错误不做处理 + _ = c.Close() + ch, ok := event.Data.(chan struct{}) + if !ok { + return + } + ch <- struct{}{} + + } +} + +func (c *Consumer) ConsumeChan(ctx context.Context) (<-chan *mq.Message, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + if c.isClosed() { + return nil, errs.ErrConsumerIsClosed + } + return c.msgCh, nil +} + +func (c *Consumer) Close() error { + c.locker.Lock() + defer c.locker.Unlock() + c.once.Do(func() { + c.closed = true + c.reportCh <- &Event{ + Type: ExitGroupEvent, + Data: c.closeCh, + } + log.Printf("消费者 %s 准备关闭", c.name) + // 等待服务端退出完成 + <-c.closeCh + // 关闭资源 + close(c.receiveCh) + close(c.msgCh) + log.Printf("消费者 %s 关闭成功", c.name) + }) + + return nil +} + +func (c *Consumer) isClosed() bool { + c.locker.RLock() + defer c.locker.RUnlock() + return c.closed +} diff --git a/memory/consumergroup.go b/memory/consumergroup.go new file mode 100644 index 0000000..365fa74 --- /dev/null +++ b/memory/consumergroup.go @@ -0,0 +1,296 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "fmt" + "log" + "sync" + "sync/atomic" + "time" + + "github.com/ecodeclub/mq-api" + + "github.com/ecodeclub/ekit/syncx" + "github.com/pkg/errors" +) + +var ( + ErrReportOffsetFail = errors.New("非平衡状态,无法上报偏移量") + ErrConsumerGroupClosed = errors.New("消费组已经关闭") +) + +const ( + consumerCap = 16 + defaultEventCap = 16 + msgChannelLength = 1000 + defaultSleepTime = 100 * time.Millisecond + + // ExitGroupEvent consumer=>consumer_group 表示消费者退出消费组的事件 + ExitGroupEvent = "exit_group" + // ReportOffsetEvent consumer=>consumer_group 表示消费者向消费组上报消费进度事件 + ReportOffsetEvent = "report_offset" + // RejoinEvent consumer_group=>consumer 表示消费组通知消费者重新加入消费组 + RejoinEvent = "rejoin" + // RejoinAckEvent consumer=>consumer_group 表示消费者收到重新加入消费组的指令并将offset进行上报 + RejoinAckEvent = "rejoin_ack" + // CloseEvent consumer_group=>consumer 表示消费组关闭所有消费者,向所有消费者发出关闭事件 + CloseEvent = "close" + // PartitionNotifyEvent consumer_group=>consumer 表示消费组向消费者下发分区情况 + PartitionNotifyEvent = "partition_notify" + // PartitionNotifyAckEvent consumer=>consumer_group 表示消费者对消费组下发分区情况事件的确认 + PartitionNotifyAckEvent = "partition_notify_ack" + + StatusStable = 1 // 稳定状态,可以正常的进行消费数据 + StatusBalancing = 2 + // 消费组关闭 + StatusStop = 3 + // 一个消费者正在退出消费组 + StatusStopping = 4 +) + +// ConsumerGroup 表示消费组是并发安全的 +type ConsumerGroup struct { + name string + // 存储消费者元数据,键为消费者的名称 + consumers syncx.Map[string, *Consumer] + // 消费者平衡器 + consumerPartitionAssigner ConsumerPartitionAssigner + // 分区消费记录 + partitionRecords *syncx.Map[int, PartitionRecord] + // 分区 + partitions []*Partition + status int32 + balanceCh chan struct{} + once sync.Once +} + +type PartitionRecord struct { + // 属于哪个分区 + Index int + // 消费进度 + Offset int +} +type ReportData struct { + Records []PartitionRecord + ErrChan chan error +} + +type Event struct { + // 事件类型 + Type string + // 事件所需要处理的数据 + Data any +} + +func (c *ConsumerGroup) eventHandler(name string, event *Event) { + switch event.Type { + case ExitGroupEvent: + closeCh, _ := event.Data.(chan struct{}) + c.exitGroup(name, closeCh) + case ReportOffsetEvent: + data, _ := event.Data.(ReportData) + var err error + err = c.reportOffset(data.Records) + data.ErrChan <- err + log.Printf("消费者%s上报offset成功", name) + case RejoinAckEvent: + // consumer响应重平衡信号返回的数据,返回的是当前所有分区的偏移量 + records, _ := event.Data.([]PartitionRecord) + // 不管上报成不成功 + _ = c.reportOffset(records) + log.Printf("消费者%s成功接受到重平衡信号,并上报offset", name) + c.balanceCh <- struct{}{} + case PartitionNotifyAckEvent: + log.Printf("消费者%s 成功设置分区信息", name) + c.balanceCh <- struct{}{} + } +} + +// ExitGroupEvent 退出消费组 +func (c *ConsumerGroup) exitGroup(name string, closeCh chan struct{}) { + // 把自己从消费组内摘除 + for { + if !atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) && + !atomic.CompareAndSwapInt32(&c.status, StatusStop, StatusStopping) { + time.Sleep(defaultSleepTime) + continue + } + log.Printf("消费者 %s 准备退出消费组", name) + c.consumers.Delete(name) + c.reBalance() + log.Printf("给消费者 %s 发送退出确认信号", name) + close(closeCh) + log.Printf("消费者 %s 成功退出消费组", name) + if !atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable) { + atomic.CompareAndSwapInt32(&c.status, StatusStopping, StatusStop) + } + return + } +} + +// ReportOffsetEvent 上报偏移量 +func (c *ConsumerGroup) reportOffset(records []PartitionRecord) error { + status := atomic.LoadInt32(&c.status) + if status != StatusStable && status != StatusStop { + return ErrReportOffsetFail + } + for _, record := range records { + c.partitionRecords.Store(record.Index, record) + } + return nil +} + +func (c *ConsumerGroup) Close() { + c.once.Do(func() { + for { + if !atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusStop) { + time.Sleep(defaultSleepTime) + continue + } + c.close() + return + } + }) +} + +func (c *ConsumerGroup) close() { + c.consumers.Range(func(key string, value *Consumer) bool { + ch := make(chan struct{}) + value.receiveCh <- &Event{ + Type: CloseEvent, + Data: ch, + } + <-ch + return true + }) +} + +// reBalance 单独使用该方法是并发不安全的 +func (c *ConsumerGroup) reBalance() { + log.Println("开始重平衡") + // 通知每一个消费者进行偏移量的上报 + length := 0 + consumers := make([]string, 0, consumerCap) + log.Println("开始给每个消费者,重平衡信号") + c.consumers.Range(func(key string, value *Consumer) bool { + log.Printf("开始通知消费者%s", key) + value.receiveCh <- &Event{ + Type: RejoinEvent, + } + consumers = append(consumers, key) + length++ + log.Printf("通知消费者%s成功", key) + return true + }) + number := 0 + log.Println("xxxxxxxxxx长度", length) + // 等待所有消费者都接收到信号,并上报自己offset + for length > 0 { + <-c.balanceCh + number++ + if number != length { + log.Println("xxxxxxxxxx number", number) + continue + } + // 接收到所有信号 + log.Println("所有消费者已经接受到重平衡请求,并上报了消费进度") + consumerMap := c.consumerPartitionAssigner.AssignPartition(consumers, len(c.partitions)) + // 通知所有消费者分配 + log.Println("开始分配分区") + for consumerName, partitions := range consumerMap { + // 查找消费者所属的channel + log.Printf("消费者 %s 消费 %v 分区", consumerName, partitions) + consumer, ok := c.consumers.Load(consumerName) + if ok { + // 往每个消费者的receive_channel发送partition的信息 + records := make([]PartitionRecord, 0, len(partitions)) + for _, p := range partitions { + record, ok := c.partitionRecords.Load(p) + if ok { + records = append(records, record) + } + } + consumer.receiveCh <- &Event{ + Type: PartitionNotifyEvent, + Data: records, + } + // 等待消费者接收到并保存 + <-c.balanceCh + + } + } + log.Println("重平衡结束") + return + } +} + +// JoinGroup 加入消费组 +func (c *ConsumerGroup) JoinGroup() (*Consumer, error) { + for { + + if atomic.LoadInt32(&c.status) > StatusBalancing { + return nil, ErrConsumerGroupClosed + } + if !atomic.CompareAndSwapInt32(&c.status, StatusStable, StatusBalancing) { + time.Sleep(defaultSleepTime) + continue + } + + var length int + c.consumers.Range(func(key string, value *Consumer) bool { + length++ + return true + }) + name := fmt.Sprintf("%s_%d", c.name, length) + reportCh := make(chan *Event, defaultEventCap) + receiveCh := make(chan *Event, defaultEventCap) + consumer := &Consumer{ + partitions: c.partitions, + receiveCh: receiveCh, + reportCh: reportCh, + name: name, + msgCh: make(chan *mq.Message, msgChannelLength), + partitionRecords: []PartitionRecord{}, + closeCh: make(chan struct{}), + } + c.consumers.Store(name, consumer) + go c.consumerEventsHandler(name, reportCh) + go consumer.eventLoop() + log.Printf("新建消费者 %s", name) + // 重平衡分配分区 + c.reBalance() + atomic.CompareAndSwapInt32(&c.status, StatusBalancing, StatusStable) + return consumer, nil + } +} + +// consumerEventsHandler 处理消费者上报的事件 +func (c *ConsumerGroup) consumerEventsHandler(name string, reportCh chan *Event) { + for event := range reportCh { + c.eventHandler(name, event) + if event.Type == ExitGroupEvent { + close(reportCh) + return + } + } +} + +func min(i, j int) int { + if i < j { + return i + } + return j +} diff --git a/memory/consumergroup_test.go b/memory/consumergroup_test.go new file mode 100644 index 0000000..a0730d2 --- /dev/null +++ b/memory/consumergroup_test.go @@ -0,0 +1,84 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ecodeclub/ekit/syncx" + "github.com/ecodeclub/mq-api/memory/consumerpartitionassigner/equaldivide" + "github.com/stretchr/testify/assert" +) + +// 测试场景: 不断有 消费者加入 消费组,最后达成的效果,调用consumerGroup的close方法成功之后,consumerGroup里面没有consumer存在且所有的consumer都是关闭的状态 + +func TestConsumerGroup_Close(t *testing.T) { + t.Parallel() + cg := &ConsumerGroup{ + name: "test_group", + consumers: syncx.Map[string, *Consumer]{}, + consumerPartitionAssigner: equaldivide.NewAssigner(), + partitions: []*Partition{ + NewPartition(), + NewPartition(), + NewPartition(), + }, + balanceCh: make(chan struct{}, defaultBalanceChLen), + status: StatusStable, + } + partitionRecords := syncx.Map[int, PartitionRecord]{} + for idx := range cg.partitions { + partitionRecords.Store(idx, PartitionRecord{ + Index: idx, + Offset: 0, + }) + } + cg.partitionRecords = &partitionRecords + var wg sync.WaitGroup + mu := &sync.RWMutex{} + consumerGroups := make([]*Consumer, 0, 100) + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + c, err := cg.JoinGroup() + if err != nil { + assert.Equal(t, ErrConsumerGroupClosed, err) + return + } + mu.Lock() + consumerGroups = append(consumerGroups, c) + mu.Unlock() + }() + } + time.Sleep(100 * time.Millisecond) + cg.Close() + wg.Wait() + // consumerGroup中没有消费者 + var flag atomic.Bool + cg.consumers.Range(func(key string, value *Consumer) bool { + flag.Store(true) + return true + }) + assert.False(t, flag.Load()) + // 所有加入的消费者都是关闭状态 + cg.consumers.Range(func(key string, value *Consumer) bool { + assert.True(t, value.closed) + return true + }) +} diff --git a/memory/consumerpartitionassigner/equaldivide/balancer.go b/memory/consumerpartitionassigner/equaldivide/balancer.go new file mode 100644 index 0000000..63bbbbd --- /dev/null +++ b/memory/consumerpartitionassigner/equaldivide/balancer.go @@ -0,0 +1,50 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package equaldivide + +type Assigner struct{} + +func (b *Assigner) AssignPartition(consumers []string, partitions int) map[string][]int { + result := make(map[string][]int) + consumerCount := len(consumers) + partitionPerConsumer := partitions / consumerCount + remainingPartitions := partitions % consumerCount + + // 初始化每个 consumer 对应的 partitions + for _, consumer := range consumers { + result[consumer] = make([]int, 0) + } + // 平均分配 partitions + partitionIndex := 0 + for i := 0; i < consumerCount; i++ { + consumer := consumers[i] + numPartitions := partitionPerConsumer + // 如果还有剩余的 partitions,则将其分配给当前 consumer + if remainingPartitions > 0 { + numPartitions++ + remainingPartitions-- + } + // 分配 partitions + for j := 0; j < numPartitions; j++ { + result[consumer] = append(result[consumer], partitionIndex) + partitionIndex++ + } + } + return result +} + +func NewAssigner() *Assigner { + return &Assigner{} +} diff --git a/memory/consumerpartitionassigner/equaldivide/balancer_test.go b/memory/consumerpartitionassigner/equaldivide/balancer_test.go new file mode 100644 index 0000000..844b50c --- /dev/null +++ b/memory/consumerpartitionassigner/equaldivide/balancer_test.go @@ -0,0 +1,73 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package equaldivide + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBalancer_AssignPartition(t *testing.T) { + t.Parallel() + balancer := NewAssigner() + testcases := []struct { + name string + consumers []string + partition int + wantAnswer map[string][]int + }{ + { + name: "分区数超过consumer个数", + consumers: []string{"c1", "c2", "c3", "c4"}, + partition: 5, + wantAnswer: map[string][]int{ + "c1": {0, 1}, + "c2": {2}, + "c3": {3}, + "c4": {4}, + }, + }, + { + name: "分区数小于consumer个数", + consumers: []string{"c1", "c2", "c3", "c4"}, + partition: 3, + wantAnswer: map[string][]int{ + "c1": {0}, + "c2": {1}, + "c3": {2}, + "c4": {}, + }, + }, + { + name: "分区数等于consumer个数", + consumers: []string{"c1", "c2", "c3"}, + partition: 3, + wantAnswer: map[string][]int{ + "c1": {0}, + "c2": {1}, + "c3": {2}, + }, + }, + } + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + actualVal := balancer.AssignPartition(tc.consumers, tc.partition) + assert.Equal(t, tc.wantAnswer, actualVal) + }) + } +} diff --git a/memory/mq.go b/memory/mq.go new file mode 100644 index 0000000..03ec30b --- /dev/null +++ b/memory/mq.go @@ -0,0 +1,166 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "context" + "fmt" + "log" + "sync" + + "github.com/ecodeclub/mq-api/internal/pkg/validator" + + "github.com/ecodeclub/ekit/syncx" + "github.com/ecodeclub/mq-api" + "github.com/ecodeclub/mq-api/internal/errs" +) + +const ( + defaultBalanceChLen = 10 + defaultPartitions = 3 +) + +type MQ struct { + locker sync.RWMutex + closed bool + topics syncx.Map[string, *Topic] +} + +func NewMQ() mq.MQ { + return &MQ{ + topics: syncx.Map[string, *Topic]{}, + } +} + +func (m *MQ) CreateTopic(ctx context.Context, topic string, partitions int) error { + if !validator.IsValidTopic(topic) { + return fmt.Errorf("%w: %s", errs.ErrInvalidTopic, topic) + } + if ctx.Err() != nil { + return ctx.Err() + } + m.locker.Lock() + defer m.locker.Unlock() + if m.closed { + return errs.ErrMQIsClosed + } + if partitions <= 0 { + return errs.ErrInvalidPartition + } + _, ok := m.topics.Load(topic) + if !ok { + m.topics.Store(topic, newTopic(topic, partitions)) + } + return nil +} + +func (m *MQ) Producer(topic string) (mq.Producer, error) { + m.locker.Lock() + defer m.locker.Unlock() + if m.closed { + return nil, errs.ErrMQIsClosed + } + t, ok := m.topics.Load(topic) + if !ok { + t = newTopic(topic, defaultPartitions) + m.topics.Store(topic, t) + } + p := &Producer{ + t: t, + } + err := t.addProducer(p) + if err != nil { + return nil, err + } + return p, nil +} + +func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error) { + m.locker.Lock() + defer m.locker.Unlock() + if m.closed { + return nil, errs.ErrMQIsClosed + } + t, ok := m.topics.Load(topic) + if !ok { + t = newTopic(topic, defaultPartitions) + m.topics.Store(topic, t) + } + group, ok := t.consumerGroups.Load(groupID) + if !ok { + group = &ConsumerGroup{ + name: groupID, + consumers: syncx.Map[string, *Consumer]{}, + consumerPartitionAssigner: t.consumerPartitionAssigner, + partitions: t.partitions, + balanceCh: make(chan struct{}, defaultBalanceChLen), + status: StatusStable, + } + // 初始化分区消费进度 + partitionRecords := syncx.Map[int, PartitionRecord]{} + for idx := range t.partitions { + partitionRecords.Store(idx, PartitionRecord{ + Index: idx, + Offset: 0, + }) + } + group.partitionRecords = &partitionRecords + } + consumer, err := group.JoinGroup() + if err != nil { + return nil, err + } + t.consumerGroups.Store(groupID, group) + return consumer, nil +} + +func (m *MQ) Close() error { + m.locker.Lock() + defer m.locker.Unlock() + m.closed = true + m.topics.Range(func(key string, value *Topic) bool { + err := value.Close() + if err != nil { + log.Printf("topic: %s关闭失败 %v", key, err) + } + return true + }) + + return nil +} + +func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error { + m.locker.Lock() + defer m.locker.Unlock() + if m.closed { + return errs.ErrMQIsClosed + } + if ctx.Err() != nil { + return ctx.Err() + } + for _, t := range topics { + topic, ok := m.topics.Load(t) + if ok { + err := topic.Close() + if err != nil { + log.Printf("topic: %s关闭失败 %v", t, err) + continue + } + m.topics.Delete(t) + } + + } + return nil +} diff --git a/memory/mq_test.go b/memory/mq_test.go new file mode 100644 index 0000000..85a7d42 --- /dev/null +++ b/memory/mq_test.go @@ -0,0 +1,39 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "testing" + + "github.com/ecodeclub/ekit/syncx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMQ(t *testing.T) { + t.Parallel() + // 测试调用consumer 和 producer 如果topic不存在就新建 + testmq := &MQ{ + topics: syncx.Map[string, *Topic]{}, + } + _, err := testmq.Consumer("test_topic", "group1") + require.NoError(t, err) + _, ok := testmq.topics.Load("test_topic") + assert.Equal(t, ok, true) + _, err = testmq.Producer("test_topic1") + require.NoError(t, err) + _, ok = testmq.topics.Load("test_topic1") + assert.Equal(t, ok, true) +} diff --git a/memory/partition.go b/memory/partition.go new file mode 100644 index 0000000..72f2000 --- /dev/null +++ b/memory/partition.go @@ -0,0 +1,54 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "sync" + + "github.com/ecodeclub/ekit/list" + "github.com/ecodeclub/mq-api" +) + +// Partition 表示分区 是并发安全的 +const ( + defaultPartitionCap = 64 +) + +type Partition struct { + locker sync.RWMutex + data *list.ArrayList[*mq.Message] +} + +func NewPartition() *Partition { + return &Partition{ + data: list.NewArrayList[*mq.Message](defaultPartitionCap), + } +} + +func (p *Partition) append(msg *mq.Message) { + p.locker.Lock() + defer p.locker.Unlock() + msg.Offset = int64(p.data.Len()) + _ = p.data.Append(msg) +} + +func (p *Partition) getBatch(offset, limit int) []*mq.Message { + p.locker.RLock() + defer p.locker.RUnlock() + wantLen := offset + limit + length := min(wantLen, p.data.Len()) + res := p.data.AsSlice()[offset:length] + return res +} diff --git a/memory/partition_test.go b/memory/partition_test.go new file mode 100644 index 0000000..37b2e97 --- /dev/null +++ b/memory/partition_test.go @@ -0,0 +1,133 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "strconv" + "sync" + "testing" + + "github.com/ecodeclub/mq-api" + "github.com/stretchr/testify/assert" +) + +func Test_Partition(t *testing.T) { + t.Parallel() + p := NewPartition() + for i := 0; i < 5; i++ { + msg := &mq.Message{Value: []byte(strconv.Itoa(i))} + p.append(msg) + } + msgs := p.getBatch(2, 2) + assert.Equal(t, []*mq.Message{ + { + Value: []byte(strconv.Itoa(2)), + Offset: 2, + }, + { + Value: []byte(strconv.Itoa(3)), + Offset: 3, + }, + }, msgs) + msgs = p.getBatch(2, 5) + assert.Equal(t, []*mq.Message{ + { + Value: []byte(strconv.Itoa(2)), + Offset: 2, + }, + { + Value: []byte(strconv.Itoa(3)), + Offset: 3, + }, + { + Value: []byte(strconv.Itoa(4)), + Offset: 4, + }, + }, msgs) +} + +// 测试多个goroutine往同一个队列中发送消息 +func Test_PartitionConcurrent(t *testing.T) { + t.Parallel() + // 测试多个goroutine往partition里写 + p2 := NewPartition() + wg := &sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + index := i * 5 + go func() { + defer wg.Done() + for j := index; j < index+5; j++ { + p2.append(&mq.Message{ + Value: []byte(strconv.Itoa(j)), + }) + } + }() + } + wg.Wait() + msgs := p2.getBatch(0, 16) + for idx := range msgs { + msgs[idx].Partition = 0 + msgs[idx].Offset = 0 + } + wantVal := []*mq.Message{ + { + Value: []byte("0"), + }, + { + Value: []byte("1"), + }, + { + Value: []byte("2"), + }, + { + Value: []byte("3"), + }, + { + Value: []byte("4"), + }, + { + Value: []byte("5"), + }, + { + Value: []byte("6"), + }, + { + Value: []byte("7"), + }, + { + Value: []byte("8"), + }, + { + Value: []byte("9"), + }, + { + Value: []byte("10"), + }, + { + Value: []byte("11"), + }, + { + Value: []byte("12"), + }, + { + Value: []byte("13"), + }, + { + Value: []byte("14"), + }, + } + assert.ElementsMatch(t, wantVal, msgs) +} diff --git a/memory/producer.go b/memory/producer.go new file mode 100644 index 0000000..6d88723 --- /dev/null +++ b/memory/producer.go @@ -0,0 +1,63 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "context" + "sync" + + "github.com/ecodeclub/mq-api/internal/errs" + + "github.com/ecodeclub/mq-api" +) + +type Producer struct { + mu sync.RWMutex + t *Topic + closed bool +} + +func (p *Producer) Produce(ctx context.Context, m *mq.Message) (*mq.ProducerResult, error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { + return nil, errs.ErrProducerIsClosed + } + if ctx.Err() != nil { + return nil, ctx.Err() + } + err := p.t.addMessage(m) + return &mq.ProducerResult{}, err +} + +func (p *Producer) ProduceWithPartition(ctx context.Context, m *mq.Message, partition int) (*mq.ProducerResult, error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { + return nil, errs.ErrProducerIsClosed + } + if ctx.Err() != nil { + return nil, ctx.Err() + } + err := p.t.addMessageWithPartition(m, int64(partition)) + return &mq.ProducerResult{}, err +} + +func (p *Producer) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + p.closed = true + return nil +} diff --git a/memory/produceridgetter/hash/get.go b/memory/produceridgetter/hash/get.go new file mode 100644 index 0000000..18da3ea --- /dev/null +++ b/memory/produceridgetter/hash/get.go @@ -0,0 +1,33 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hash + +import "hash/fnv" + +type Getter struct { + Partitions int +} + +// PartitionID 暂时使用hash,保证同一个key的值,在同一个分区。 +func (g *Getter) PartitionID(key string) int64 { + return hashString(key, g.Partitions) +} + +func hashString(s string, numBuckets int) int64 { + h := fnv.New32a() + h.Write([]byte(s)) + hash := h.Sum32() + return int64(hash % uint32(numBuckets)) +} diff --git a/memory/produceridgetter/hash/get_test.go b/memory/produceridgetter/hash/get_test.go new file mode 100644 index 0000000..cab704f --- /dev/null +++ b/memory/produceridgetter/hash/get_test.go @@ -0,0 +1,32 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hash + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetter(t *testing.T) { + t.Parallel() + // 测试两个相同的key返回的partition是同一个 + getter := Getter{ + 3, + } + partition1 := getter.PartitionID("msg1") + partition2 := getter.PartitionID("msg1") + assert.Equal(t, partition1, partition2) +} diff --git a/memory/topic.go b/memory/topic.go new file mode 100644 index 0000000..e967f76 --- /dev/null +++ b/memory/topic.go @@ -0,0 +1,97 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "log" + "sync" + + "github.com/ecodeclub/ekit/syncx" + "github.com/ecodeclub/mq-api" + "github.com/ecodeclub/mq-api/internal/errs" + "github.com/ecodeclub/mq-api/memory/consumerpartitionassigner/equaldivide" + "github.com/ecodeclub/mq-api/memory/produceridgetter/hash" +) + +type Topic struct { + locker sync.RWMutex + closed bool + name string + partitions []*Partition + producers []mq.Producer + // 消费组 + consumerGroups syncx.Map[string, *ConsumerGroup] + // 生产消息的时候获取分区号 + producerPartitionIDGetter PartitionIDGetter + consumerPartitionAssigner ConsumerPartitionAssigner +} + +func newTopic(name string, partitions int) *Topic { + t := &Topic{ + name: name, + consumerGroups: syncx.Map[string, *ConsumerGroup]{}, + consumerPartitionAssigner: equaldivide.NewAssigner(), + producerPartitionIDGetter: &hash.Getter{Partitions: partitions}, + } + partitionList := make([]*Partition, 0, partitions) + for i := 0; i < partitions; i++ { + partitionList = append(partitionList, NewPartition()) + } + t.partitions = partitionList + return t +} + +func (t *Topic) addProducer(producer mq.Producer) error { + t.locker.Lock() + defer t.locker.Unlock() + if t.closed { + return errs.ErrMQIsClosed + } + t.producers = append(t.producers, producer) + return nil +} + +// addMessage 往分区里面添加消息 +func (t *Topic) addMessage(msg *mq.Message) error { + partitionID := t.producerPartitionIDGetter.PartitionID(string(msg.Key)) + return t.addMessageWithPartition(msg, partitionID) +} + +func (t *Topic) addMessageWithPartition(msg *mq.Message, partitionID int64) error { + if partitionID < 0 || int(partitionID) >= len(t.partitions) { + return errs.ErrInvalidPartition + } + msg.Topic = t.name + msg.Partition = partitionID + t.partitions[partitionID].append(msg) + log.Printf("生产消息 %s,消息为 %s", t.name, msg.Value) + return nil +} + +func (t *Topic) Close() error { + t.locker.Lock() + defer t.locker.Unlock() + if !t.closed { + t.closed = true + t.consumerGroups.Range(func(key string, value *ConsumerGroup) bool { + value.Close() + return true + }) + for _, producer := range t.producers { + _ = producer.Close() + } + } + return nil +} diff --git a/memory/topic_test.go b/memory/topic_test.go new file mode 100644 index 0000000..272b66e --- /dev/null +++ b/memory/topic_test.go @@ -0,0 +1,55 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "context" + "testing" + + "github.com/ecodeclub/mq-api" + "github.com/ecodeclub/mq-api/internal/errs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTopic_Close(t *testing.T) { + t.Parallel() + topic := newTopic("test_topic", 3) + p1 := &Producer{ + t: topic, + } + p2 := &Producer{ + t: topic, + } + p3 := &Producer{ + t: topic, + } + err := topic.addProducer(p1) + require.NoError(t, err) + err = topic.addProducer(p2) + require.NoError(t, err) + err = topic.Close() + require.NoError(t, err) + require.Equal(t, true, topic.closed) + err = topic.Close() + require.NoError(t, err) + require.Equal(t, true, topic.closed) + err = topic.addProducer(p3) + assert.Equal(t, errs.ErrMQIsClosed, err) + _, err = p1.Produce(context.Background(), &mq.Message{ + Value: []byte("1"), + }) + assert.Equal(t, errs.ErrProducerIsClosed, err) +} diff --git a/memory/type.go b/memory/type.go new file mode 100644 index 0000000..f45bcd7 --- /dev/null +++ b/memory/type.go @@ -0,0 +1,27 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +// PartitionIDGetter 此抽象用于Producer获取对应分区号 +type PartitionIDGetter interface { + // PartitionID 用于Producer获取分区号,返回值就是分区号 + PartitionID(key string) int64 +} + +// ConsumerPartitionAssigner 此抽象是给消费组使用,用于将分区分配给消费组内的消费者。 +type ConsumerPartitionAssigner interface { + // AssignPartition partitions表示分区数,返回值为map[消费者名称][]分区索引 + AssignPartition(consumers []string, partitions int) map[string][]int +}