Skip to content

Commit

Permalink
SegmentPool improvements (#352)
Browse files Browse the repository at this point in the history
- Fixed shared segment leakage from the pool
- Fixed segment leakage / excessive allocation when multiple threads taking / recycling them concurrently
- Supported configurable second-level pool (4 MB by default, could be configured via `kotlinx.io.pool.size.bytes` system property)
  • Loading branch information
fzhinkin authored Jul 12, 2024
1 parent 3eb8117 commit be2bfca
Show file tree
Hide file tree
Showing 11 changed files with 394 additions and 55 deletions.
2 changes: 1 addition & 1 deletion core/api/kotlinx-io-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public abstract interface class kotlinx/io/RawSource : java/lang/AutoCloseable {
}

public final class kotlinx/io/Segment {
public synthetic fun <init> ([BIIZZLkotlin/jvm/internal/DefaultConstructorMarker;)V
public synthetic fun <init> ([BIILkotlinx/io/SegmentCopyTracker;ZLkotlin/jvm/internal/DefaultConstructorMarker;)V
public final synthetic fun dataAsByteArray (Z)[B
public final synthetic fun getLimit ()I
public final synthetic fun getNext ()Lkotlinx/io/Segment;
Expand Down
75 changes: 66 additions & 9 deletions core/common/src/Segment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,47 @@ package kotlinx.io
import kotlin.jvm.JvmField
import kotlin.jvm.JvmSynthetic

/**
* Tracks shared segment copies.
*
* A new [SegmentCopyTracker] instance should be not shared by default (i.e. `shared == false`).
* Any further [addCopy] calls should move the tracker to a shared state (i.e. `shared == true`).
* Once a shared segment copy is recycled, [removeCopy] should be called.
* Depending on implementation, calling [removeCopy] the same number of times as [addCopy] may
* or may not transition the tracked back to unshared stated.
*
* The class is not intended for public use and currently designed to fit the only use case - within JVM SegmentPool
* implementation.
*/
internal abstract class SegmentCopyTracker {
/**
* `true` if a tracker shared by multiple segment copies.
*/
abstract val shared: Boolean

/**
* Track a new copy created by sharing an associated segment.
*/
abstract fun addCopy()

/**
* Records reclamation of a shared segment copy associated with this tracker.
* If a tracker was in unshared state, this call should not affect an internal state.
*
* @return `true` if the segment was not shared *before* this called.
*/
abstract fun removeCopy(): Boolean
}

/**
* Simple [SegmentCopyTracker] that always reports shared state.
*/
internal object AlwaysSharedCopyTracker : SegmentCopyTracker() {
override val shared: Boolean = true
override fun addCopy() = Unit
override fun removeCopy(): Boolean = true
}

/**
* A segment of a buffer.
*
Expand Down Expand Up @@ -59,8 +100,17 @@ public class Segment {
internal var limit: Int = 0

/** True if other segments or byte strings use the same byte array. */
@JvmField
internal var shared: Boolean = false
internal val shared: Boolean
get() = copyTracker?.shared ?: false

/**
* Tracks number shared copies
*
* Note that this reference is not `@Volatile` as segments are not thread-safe and it's an error
* to modify the same segment concurrently.
* At the same time, an object [copyTracker] refers to could be modified concurrently.
*/
internal var copyTracker: SegmentCopyTracker? = null

/** True if this segment owns the byte array and can append to it, extending `limit`. */
@JvmField
Expand All @@ -81,14 +131,14 @@ public class Segment {
private constructor() {
this.data = ByteArray(SIZE)
this.owner = true
this.shared = false
this.copyTracker = null
}

private constructor(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean) {
private constructor(data: ByteArray, pos: Int, limit: Int, shareToken: SegmentCopyTracker?, owner: Boolean) {
this.data = data
this.pos = pos
this.limit = limit
this.shared = shared
this.copyTracker = shareToken
this.owner = owner
}

Expand All @@ -98,8 +148,10 @@ public class Segment {
* prevents it from being pooled.
*/
internal fun sharedCopy(): Segment {
shared = true
return Segment(data, pos, limit, true, false)
val t = copyTracker ?: SegmentPool.tracker().also {
copyTracker = it
}
return Segment(data, pos, limit, t.also { it.addCopy() }, false)
}

/**
Expand Down Expand Up @@ -284,8 +336,13 @@ public class Segment {
internal fun new(): Segment = Segment()

@JvmSynthetic
internal fun new(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean): Segment
= Segment(data, pos, limit, shared, owner)
internal fun new(
data: ByteArray,
pos: Int,
limit: Int,
copyTracker: SegmentCopyTracker?,
owner: Boolean
): Segment = Segment(data, pos, limit, copyTracker, owner)
}
}

Expand Down
7 changes: 7 additions & 0 deletions core/common/src/SegmentPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ internal expect object SegmentPool {

/** Recycle a segment that the caller no longer needs. */
fun recycle(segment: Segment)

/**
* Allocates a new copy tracker that'll be associated with a segment from this pool.
* For performance reasons, there's no tracker attached to a segment initially.
* Instead, it's allocated lazily on the first sharing attempt.
*/
fun tracker(): SegmentCopyTracker
}
3 changes: 2 additions & 1 deletion core/common/src/unsafe/UnsafeBufferOperations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public object UnsafeBufferOperations {
public fun moveToTail(buffer: Buffer, bytes: ByteArray, startIndex: Int = 0, endIndex: Int = bytes.size) {
checkBounds(bytes.size, startIndex, endIndex)
val segment = Segment.new(
bytes, startIndex, endIndex, shared = true /* to prevent recycling */,
bytes, startIndex, endIndex,
AlwaysSharedCopyTracker, /* to prevent recycling */
owner = false /* can't append to it */
)
val tail = buffer.tail
Expand Down
26 changes: 26 additions & 0 deletions core/common/test/AlwaysSharedCopyTrackerTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file.
*/

package kotlinx.io

import kotlin.test.Test
import kotlin.test.assertTrue

class AlwaysSharedCopyTrackerTest {
@Test
fun stateTransition() {
val tracker = AlwaysSharedCopyTracker
assertTrue(tracker.shared)

assertTrue(tracker.removeCopy())
assertTrue(tracker.shared)

tracker.addCopy()
assertTrue(tracker.shared)

assertTrue(tracker.removeCopy())
assertTrue(tracker.shared)
}
}
2 changes: 2 additions & 0 deletions core/js/src/SegmentPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ internal actual object SegmentPool {

actual fun recycle(segment: Segment) {
}

actual fun tracker(): SegmentCopyTracker = AlwaysSharedCopyTracker
}
Loading

0 comments on commit be2bfca

Please sign in to comment.