Skip to content

Commit

Permalink
refactor(logstream): deprecate old replicateTask pool management
Browse files Browse the repository at this point in the history
This PR modifies replicate task pool implementation for future refactoring that
will resolve #843.

- Deprecated `newReplicateTask` and `release` functions in favor of new
  implementations.
- Added `replicateTaskPool` struct for simplified pool management.
- Updated tests to use the new functions and ensure backward compatibility.
  • Loading branch information
ijsong committed Feb 3, 2025
1 parent 45a6d6b commit 7f3f55b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 44 deletions.
2 changes: 1 addition & 1 deletion internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end
// replicate tasks
st.rts = newReplicateTaskSlice()
for i := 0; i < numBackups; i++ {
rt := newReplicateTask(batchletClassIdx)
rt := newReplicateTask(end - begin)
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.dataList = batchletData
Expand Down
6 changes: 3 additions & 3 deletions internal/storagenode/logstream/replicate_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestReplicateClientRPCError(t *testing.T) {
rc.queue = make(chan *replicateTask, 1)
rc.streamClient = mockStreamClient

rt := newReplicateTask(0)
rt := newReplicateTaskDeprecated(0)
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.llsnList = append(rt.llsnList, 1)
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestReplicateClientDrain(t *testing.T) {
rc.queue = make(chan *replicateTask, numTasks)

for i := 0; i < numTasks; i++ {
rt := newReplicateTask(0)
rt := newReplicateTaskDeprecated(0)
err := rc.send(context.Background(), rt)
assert.NoError(t, err)
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestReplicateClient(t *testing.T) {
assert.NoError(t, err)
defer rc.stop()

rt := newReplicateTask(0)
rt := newReplicateTaskDeprecated(0)
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.llsnList = append(rt.llsnList, 1)
Expand Down
121 changes: 85 additions & 36 deletions internal/storagenode/logstream/replicate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,65 +17,114 @@ type replicateTask struct {
poolIdx int
}

// newReplicateTask returns a new replicateTask.
// newReplicateTaskDeprecated returns a new replicateTask.
// The argument poolIdx should be the index of replicateTaskPools, which is returned from batchlet.SelectLengthClass.
func newReplicateTask(poolIdx int) *replicateTask {
//
// Deprecated: Use newReplicateTask.
func newReplicateTaskDeprecated(poolIdx int) *replicateTask {
rt := replicateTaskPools[poolIdx].Get().(*replicateTask)
return rt
}

// release releases the task to the pool.
func (rt *replicateTask) release() {
// releaseDeprecated releases the task to the pool.
//
// Deprecated: Use release.
func (rt *replicateTask) releaseDeprecated() {
rt.tpid = 0
rt.lsid = 0
rt.llsnList = rt.llsnList[0:0]
rt.dataList = nil
replicateTaskPools[rt.poolIdx].Put(rt)
}

// newReplicateTask returns a new replicateTask. The capacity of the returned
// replicateTask's llsnList is equal to or greater than the argument size, and
// its length is zero.
// Since (snpb.ReplicateRequest).LLSN is deprecated, (*replicateTask).llsnList
// will be deprecated soon. Until that, newReplicateTask simplifies the pool
// management of replicateTask.
func newReplicateTask(size int) *replicateTask {
return defaultReplicateTaskPool.get(size)
}

// release relreases the task to the pool.
func (rt *replicateTask) release() {
rt.tpid = 0
rt.lsid = 0
rt.llsnList = rt.llsnList[0:0]
rt.dataList = nil
defaultReplicateTaskPool.put(rt)
}

// releaseReplicateTasks releases all tasks in the list to the pool.
func releaseReplicateTasks(rts []*replicateTask) {
for i := range rts {
rts[i].release()
}
}

var (
replicateTaskPools = [...]sync.Pool{
{
New: func() interface{} {
return &replicateTask{
llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[0]),
poolIdx: 0,
}
},
// replicateTaskPool is a simple pool for replicateTask.
type replicateTaskPool struct {
pool sync.Pool
}

var defaultReplicateTaskPool replicateTaskPool

func (p *replicateTaskPool) get(size int) *replicateTask {
rt, ok := p.pool.Get().(*replicateTask)
if ok && cap(rt.llsnList) >= size {
rt.llsnList = rt.llsnList[0:0]
return rt
}
if ok {
p.pool.Put(rt)
}
return &replicateTask{
llsnList: make([]types.LLSN, 0, size),
poolIdx: 0,
}
}

func (p *replicateTaskPool) put(rt *replicateTask) {
p.pool.Put(rt)
}

// replicateTaskPools is a set of pools for replicateTask.
// Deprecated: Use defaultReplicateTaskPool.
var replicateTaskPools = [...]sync.Pool{
{
New: func() interface{} {
return &replicateTask{
llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[0]),
poolIdx: 0,
}
},
{
New: func() interface{} {
return &replicateTask{
llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[1]),
poolIdx: 1,
}
},
},
{
New: func() interface{} {
return &replicateTask{
llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[1]),
poolIdx: 1,
}
},
{
New: func() interface{} {
return &replicateTask{
llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[2]),
poolIdx: 2,
}
},
},
{
New: func() interface{} {
return &replicateTask{
llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[2]),
poolIdx: 2,
}
},
{
New: func() interface{} {
return &replicateTask{
llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[3]),
poolIdx: 3,
}
},
},
{
New: func() interface{} {
return &replicateTask{
llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[3]),
poolIdx: 3,
}
},
}
)
},
}

const defaultLengthOfReplicationTaskSlice = 3

Expand Down
19 changes: 15 additions & 4 deletions internal/storagenode/logstream/replicate_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,20 @@ import (
)

func TestReplicateTaskPools(t *testing.T) {
for poolIdx, batchletLen := range batchlet.LengthClasses {
rt := newReplicateTask(poolIdx)
assert.Empty(t, rt.llsnList)
assert.Equal(t, batchletLen, cap(rt.llsnList))
const repeatCount = 1000

for range repeatCount {
for poolIdx, batchletLen := range batchlet.LengthClasses {
rt1 := newReplicateTaskDeprecated(poolIdx)
assert.Empty(t, rt1.llsnList)
assert.Equal(t, batchletLen, cap(rt1.llsnList))

rt2 := newReplicateTask(batchletLen)
assert.Empty(t, rt2.llsnList)
assert.GreaterOrEqual(t, cap(rt2.llsnList), batchletLen)

rt1.releaseDeprecated()
rt2.release()
}
}
}

0 comments on commit 7f3f55b

Please sign in to comment.