Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory:实现添加 #14

Merged
merged 13 commits into from
May 8, 2024
Merged

Memory:实现添加 #14

merged 13 commits into from
May 8, 2024

Conversation

juniaoshaonian
Copy link
Collaborator

自查清单

注意: 请完成下列自查清单中的所有自查项,完成一项勾选一项.

  • 当前PR不存在未合并的代码或者其他冲突.
  • 当前PR中的产品代码有恰当的注释、文档及必备的单元、集成及e2e测试代码.

PR概述

关联Issue

其他内容

memory/produceridgetter/hash/get.go Outdated Show resolved Hide resolved
memory/produceridgetter/hash/get_test.go Outdated Show resolved Hide resolved
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Partition是包外可见的, 但是其上的方法确实包内可见的
    2.方法命令应该站在Partiton的角度, append/getBatch(index, limit)/retrieveBatch(index, limit)
  2. 缺少对应的单元测试

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个并发安全的测试你知道如何操作吗

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

暂时还没添加并发安全的测试

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我有点忘了 你说的是哪个并发测试场景

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

就是这个partition 的添加数据获取数据是并发安全的

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

添加上

memory/type.go Outdated Show resolved Hide resolved
memory/topic.go Outdated
Comment on lines 75 to 80
case 1:
partitionID = partition[0]
} else {
default:
return mqerr.ErrInvalidPartition
}
if partitionID < 0 || int(partitionID) >= len(t.partitions) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

确认一下, mqerr.ErrInvalidPartition 是否表示 PartitionID非法, 还是表示分区总数非法?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

分区id非法

memory/consumergroup.go Outdated Show resolved Hide resolved
memory/consumergroup.go Outdated Show resolved Hide resolved
Copy link
Collaborator

@longyue0521 longyue0521 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

补充对Consumer和ConsumerGroup的测试, 尤其是并发相关测试

memory/partition.go Outdated Show resolved Hide resolved
memory/partition_test.go Outdated Show resolved Hide resolved
memory/producer.go Outdated Show resolved Hide resolved
memory/producer.go Outdated Show resolved Hide resolved
memory/topic.go Outdated Show resolved Hide resolved
memory/consumergroup.go Outdated Show resolved Hide resolved
memory/consumergroup.go Outdated Show resolved Hide resolved
memory/consumergroup.go Outdated Show resolved Hide resolved
memory/consumer.go Outdated Show resolved Hide resolved
memory/consumer.go Outdated Show resolved Hide resolved
Comment on lines +27 to +38
// 测试调用consumer 和 producer 如果topic不存在就新建
testmq := &MQ{
topics: syncx.Map[string, *Topic]{},
}
_, err := testmq.Consumer("test_topic", "group1")
require.NoError(t, err)
_, ok := testmq.topics.Load("test_topic")
assert.Equal(t, ok, true)
_, err = testmq.Producer("test_topic1")
require.NoError(t, err)
_, ok = testmq.topics.Load("test_topic1")
assert.Equal(t, ok, true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

写到针对抽象的e2e测试集中取, 分成两个测试

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经拆分到e2e测试中, 这里就可以删掉了

memory/partition_test.go Show resolved Hide resolved
memory/producer.go Outdated Show resolved Hide resolved
memory/consumer.go Outdated Show resolved Hide resolved
memory/consumergroup.go Outdated Show resolved Hide resolved
memory/consumergroup.go Outdated Show resolved Hide resolved
memory/mq.go Outdated
}
t, ok := m.topics.Load(topic)
if !ok {
t = NewTopic(topic, defaultPartitions)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在针对抽象模型的e2e测试集中添加两个测试:

  1. 未创建topic, 直接创建producer
  2. 未创建topic, 直接创建consumer

运行结果与kafka运行结果对其即可

group.partitionRecords = &partitionRecords
}
consumer := group.JoinGroup()
t.consumerGroups.Store(groupID, group)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

测试在哪里?

memory/topic.go Outdated
Comment on lines 85 to 96
t.locker.Lock()
defer t.locker.Unlock()
if !t.closed {
t.consumerGroups.Range(func(key string, value *ConsumerGroup) bool {
value.Close()
return true
})
for _, producer := range t.producers {
_ = producer.Close()
}
}
return nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.closed 在哪里设置为true?, 添加测试代码

Copy link
Collaborator Author

@juniaoshaonian juniaoshaonian May 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

针对这个,e2e从抽象层面做调用,只能做到调用mq的Consumer时如果不存在这个topic就不报错。这个topic到底存不存在,现在的抽象还没暴露出来方法,我先将这部分测试放到单元测试中


// addMessage 往分区里面添加消息
func (t *Topic) addMessage(msg *mq.Message) error {
partitionID := t.producerPartitionIDGetter.PartitionID(string(msg.Key))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topic在closed后 是否还能添加?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接掉可以但我没暴露出去。用户只能通过producer去调用,producer上已经做了限制。

@@ -0,0 +1,50 @@
// Copyright 2021 ecodeclub
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

文件路径中 memory/consumerpartitionassigner/equaldivide/balancer.go, 既有assigner又有balancer.go统一一下, 测试文件也一样

Comment on lines +27 to +38
// 测试调用consumer 和 producer 如果topic不存在就新建
testmq := &MQ{
topics: syncx.Map[string, *Topic]{},
}
_, err := testmq.Consumer("test_topic", "group1")
require.NoError(t, err)
_, ok := testmq.topics.Load("test_topic")
assert.Equal(t, ok, true)
_, err = testmq.Producer("test_topic1")
require.NoError(t, err)
_, ok = testmq.topics.Load("test_topic1")
assert.Equal(t, ok, true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经拆分到e2e测试中, 这里就可以删掉了

@longyue0521 longyue0521 merged commit fd7de33 into ecodeclub:main May 8, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants