Skip to content

Commit

Permalink
[CELEBORN-805] Immediate shutdown of server upon completion of unit t…
Browse files Browse the repository at this point in the history
…est to prevent potential resource leakage

### What changes were proposed in this pull request?

As title

### Why are the changes needed?

Recently, while conducting the sbt build test, it came to my attention that certain resources such as ports and threads were not being released promptly.

This pull request introduces a new method, `shutdown(graceful: Boolean)`, to the `Service` trait. When invoked by `MiniClusterFeature.shutdownMiniCluster`, it calls `worker.shutdown(graceful = false)`. This implementation aims to prevent possible memory leaks during CI processes.

Before this PR the unit tests in the `client/common/master/service/worker` modules resulted in leaked ports.

```
$ jps
1138131 Jps
1130743 sbt-launch-1.9.0.jar
$ netstat -lntp | grep 1130743
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp        0      0 127.0.0.1:12345         0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:41563           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:42905           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:44419           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:45025           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:44799           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:39053           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:39029           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:39475           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:40153           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:33051           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:33449           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:34073           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:35347           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:35971           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 0.0.0.0:36799           0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 192.168.1.151:40775     0.0.0.0:*               LISTEN      1130743/java
tcp        0      0 192.168.1.151:44457     0.0.0.0:*               LISTEN      1130743/java
```

After this PR:

```
$ jps
1114423 Jps
1107544 sbt-launch-1.9.0.jar
$ netstat -lntp | grep 1107544
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass GA

Closes #1727 from cfmcgrady/shutdown.

Authored-by: Fu Chen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 7c6644b)
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
cfmcgrady authored and pan3793 committed Jul 18, 2023
1 parent fc34af5 commit affc8f1
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,28 @@ protected void initChannel(SocketChannel ch) {

@Override
public void close() {
shutdown(true);
}

public void shutdown(boolean graceful) {
if (channelFuture != null) {
// close is a local operation and should finish within milliseconds; timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null;
}
if (bootstrap != null && bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully();
if (graceful) {
bootstrap.config().group().shutdownGracefully();
} else {
bootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
}
if (bootstrap != null && bootstrap.config().childGroup() != null) {
bootstrap.config().childGroup().shutdownGracefully();
if (graceful) {
bootstrap.config().childGroup().shutdownGracefully();
} else {
bootstrap.config().childGroup().shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
}
bootstrap = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,83 +222,85 @@ class WorkerInfoSuite extends CelebornFunSuite {
ResourceConsumption(20971520, 1, 52428800, 1))
val conf = new CelebornConf()
val endpointAddress = new RpcEndpointAddress(new RpcAddress("localhost", 12345), "mockRpc")
val rpcEnv = RpcEnv.create("mockEnv", "localhost", "localhost", 12345, conf, 64)
val worker4 = new WorkerInfo(
"h4",
40001,
40002,
40003,
4000,
disks,
userResourceConsumption)
var rpcEnv: RpcEnv = null
try {
rpcEnv = RpcEnv.create("mockEnv", "localhost", "localhost", 12345, conf, 64)
val worker4 = new WorkerInfo(
"h4",
40001,
40002,
40003,
4000,
disks,
userResourceConsumption)

val placeholder = ""
val exp1 =
s"""
|Host: h1
|RpcPort: 10001
|PushPort: 10002
|FetchPort: 10003
|ReplicatePort: 1000
|SlotsUsed: 0
|LastHeartbeat: 0
|Disks: empty
|UserResourceConsumption: empty
|WorkerRef: null
|""".stripMargin
val placeholder = ""
val exp1 =
s"""
|Host: h1
|RpcPort: 10001
|PushPort: 10002
|FetchPort: 10003
|ReplicatePort: 1000
|SlotsUsed: 0
|LastHeartbeat: 0
|Disks: empty
|UserResourceConsumption: empty
|WorkerRef: null
|""".stripMargin

val exp2 =
"""
|Host: h2
|RpcPort: 20001
|PushPort: 20002
|FetchPort: 20003
|ReplicatePort: 2000
|SlotsUsed: 0
|LastHeartbeat: 0
|Disks: empty
|UserResourceConsumption: empty
|WorkerRef: null
|""".stripMargin
val exp3 =
s"""
|Host: h3
|RpcPort: 30001
|PushPort: 30002
|FetchPort: 30003
|ReplicatePort: 3000
|SlotsUsed: 0
|LastHeartbeat: 0
|Disks: empty
|UserResourceConsumption: empty
|WorkerRef: null
|""".stripMargin
val exp4 =
s"""
|Host: h4
|RpcPort: 40001
|PushPort: 40002
|FetchPort: 40003
|ReplicatePort: 4000
|SlotsUsed: 60
|LastHeartbeat: 0
|Disks: $placeholder
| DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs $placeholder
| DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs $placeholder
| DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs $placeholder
|UserResourceConsumption: $placeholder
| UserIdentifier: `tenant1`.`name1`, ResourceConsumption: ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
|WorkerRef: null
|""".stripMargin;
val exp2 =
"""
|Host: h2
|RpcPort: 20001
|PushPort: 20002
|FetchPort: 20003
|ReplicatePort: 2000
|SlotsUsed: 0
|LastHeartbeat: 0
|Disks: empty
|UserResourceConsumption: empty
|WorkerRef: null
|""".stripMargin
val exp3 =
s"""
|Host: h3
|RpcPort: 30001
|PushPort: 30002
|FetchPort: 30003
|ReplicatePort: 3000
|SlotsUsed: 0
|LastHeartbeat: 0
|Disks: empty
|UserResourceConsumption: empty
|WorkerRef: null
|""".stripMargin
val exp4 =
s"""
|Host: h4
|RpcPort: 40001
|PushPort: 40002
|FetchPort: 40003
|ReplicatePort: 4000
|SlotsUsed: 60
|LastHeartbeat: 0
|Disks: $placeholder
| DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs $placeholder
| DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs $placeholder
| DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs $placeholder
|UserResourceConsumption: $placeholder
| UserIdentifier: `tenant1`.`name1`, ResourceConsumption: ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
|WorkerRef: null
|""".stripMargin;

println(worker1)
println(worker2)
println(worker3)
println(worker4)

assertEquals(exp1, worker1.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
assertEquals(exp2, worker2.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
assertEquals(exp3, worker3.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
assertEquals(exp4, worker4.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
assertEquals(exp1, worker1.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
assertEquals(exp2, worker2.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
assertEquals(exp3, worker3.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
assertEquals(exp4, worker4.toString.replaceAll("HeartbeatElapsedSeconds:.*\n", ""))
} finally {
if (null != rpcEnv) {
rpcEnv.shutdown()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,18 @@ abstract class HttpService extends Service with Logging {
}

override def close(): Unit = {
httpServer.stop()
// may be null when running the unit test
if (null != httpServer) {
httpServer.stop(true)
}
super.close()
}

override def shutdown(graceful: Boolean): Unit = {
// may be null when running the unit test
if (null != httpServer) {
httpServer.stop(graceful)
}
super.shutdown(graceful)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ abstract class Service extends Logging {
}

def close(): Unit = {}

def shutdown(graceful: Boolean): Unit = {}
}

object Service {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class HttpServer(
isStarted = true
}

def stop(): Unit = synchronized {
def stop(graceful: Boolean): Unit = synchronized {
if (isStarted) {
logInfo(s"$role: Stopping HttpServer")
if (bindFuture != null) {
Expand All @@ -66,12 +66,20 @@ class HttpServer(
}
if (bootstrap != null && bootstrap.config.group != null) {
Utils.tryLogNonFatalError {
bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
if (graceful) {
bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
} else {
bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS)
}
}
}
if (bootstrap != null && bootstrap.config.childGroup != null) {
Utils.tryLogNonFatalError {
bootstrap.config.childGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS)
if (graceful) {
bootstrap.config.childGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS)
} else {
bootstrap.config.childGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS)
}
}
}
bootstrap = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,37 +393,62 @@ private[celeborn] class Worker(
}

override def close(): Unit = synchronized {
shutdown(gracefulShutdown)
}

override def shutdown(graceful: Boolean): Unit = {
if (!stopped) {
logInfo("Stopping Worker.")

if (sendHeartbeatTask != null) {
sendHeartbeatTask.cancel(true)
if (graceful) {
sendHeartbeatTask.cancel(false)
} else {
sendHeartbeatTask.cancel(true)
}
sendHeartbeatTask = null
}
if (checkFastFailTask != null) {
checkFastFailTask.cancel(true)
if (graceful) {
checkFastFailTask.cancel(false)
} else {
checkFastFailTask.cancel(true)
}
checkFastFailTask = null
}
forwardMessageScheduler.shutdownNow()
replicateThreadPool.shutdownNow()
commitThreadPool.shutdownNow()
asyncReplyPool.shutdownNow()
partitionsSorter.close()
if (graceful) {
forwardMessageScheduler.shutdown()
replicateThreadPool.shutdown()
commitThreadPool.shutdown()
asyncReplyPool.shutdown()
partitionsSorter.close()
} else {
forwardMessageScheduler.shutdownNow()
replicateThreadPool.shutdownNow()
commitThreadPool.shutdownNow()
asyncReplyPool.shutdownNow()
partitionsSorter.close()
}

if (null != storageManager) {
storageManager.close()
}
memoryManager.close();
memoryManager.close()

masterClient.close()
replicateServer.close()
fetchServer.close()
replicateServer.shutdown(graceful)
fetchServer.shutdown(graceful)
// TODO: `pushServer` never be closed before this PR.
pushServer.shutdown(graceful)

super.close()
super.shutdown(graceful)

logInfo("Worker is stopped.")
stopped = true
}
if (!graceful) {
shutdown.set(true)
}
}

private def registerWithMaster(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ trait MiniClusterFeature extends Logging {
// interrupt threads
Thread.sleep(5000)
workerInfos.foreach {
case (_, thread) =>
case (worker, thread) =>
worker.shutdown(graceful = false)
thread.interrupt()
}
workerInfos.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._

import org.junit.Assert
import org.mockito.MockitoSugar._
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite

import org.apache.celeborn.common.CelebornConf
Expand All @@ -33,12 +34,26 @@ import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.util.JavaUtils
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}

class WorkerSuite extends AnyFunSuite {
val conf = new CelebornConf()
val workerArgs = new WorkerArguments(Array(), conf)
class WorkerSuite extends AnyFunSuite with BeforeAndAfterEach {
private var worker: Worker = _
private val conf = new CelebornConf()
private val workerArgs = new WorkerArguments(Array(), conf)

override def beforeEach(): Unit = {
assert(null == worker)
}

override def afterEach(): Unit = {
if (null != worker) {
worker.rpcEnv.shutdown()
worker.shutdown(false)
worker = null
}
}

test("clean up") {
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
val worker = new Worker(conf, workerArgs)
worker = new Worker(conf, workerArgs)

val pl1 = new PartitionLocation(0, 0, "12", 0, 0, 0, 0, PartitionLocation.Mode.PRIMARY)
val pl2 = new PartitionLocation(1, 0, "12", 0, 0, 0, 0, PartitionLocation.Mode.REPLICA)
Expand Down Expand Up @@ -74,7 +89,7 @@ class WorkerSuite extends AnyFunSuite {

test("flush filewriters") {
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
val worker = new Worker(conf, workerArgs)
worker = new Worker(conf, workerArgs)
val dir = new File("/tmp")
val allWriters = new util.HashSet[FileWriter]()
val map = JavaUtils.newConcurrentHashMap[String, FileWriter]()
Expand Down

0 comments on commit affc8f1

Please sign in to comment.