Skip to content

Commit

Permalink
refactor client code
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiibou-chan committed Nov 11, 2024
1 parent 03049a5 commit 37977ff
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 171 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import channels.{Abort, NioTCP, TCP, UDP}
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.{CodecMakerConfig, JsonCodecMaker}
import de.rmgk.options.*
import probench.clients.{ClientCLI, ProBenchClient}
import probench.data.{ClientNodeState, KVOperation, Request}
import rdts.base.Uid
import rdts.datatypes.experiments.protocols.Membership
Expand Down Expand Up @@ -116,17 +117,16 @@ object cli {
}.value

subcommand("client", "starts a client to interact with a node") {
val client = Client(name.value)
val client = ProBenchClient(name.value)

val (ip, port) = clientNode.value

client.addLatentConnection(TCP.connect(TCP.defaultSocket(socketPath(ip, port)), ec))

client.startCLI()
ClientCLI(name.value, client).startCLI()
}.value

subcommand("nio-client", "starts a client to interact with a node") {
val client = Client(name.value)
val client = ProBenchClient(name.value)

val (ip, port) = clientNode.value

Expand All @@ -135,17 +135,17 @@ object cli {

client.addLatentConnection(nioTCP.connect(nioTCP.defaultSocketChannel(socketPath(ip, port))))

client.startCLI()
ClientCLI(name.value, client).startCLI()
}.value

subcommand("udp-client", "starts a client to interact with a node") {
val client = Client(name.value)
val client = ProBenchClient(name.value)

val (ip, port) = clientNode.value

client.addLatentConnection(UDP.connect(InetSocketAddress(ip, port), () => new DatagramSocket(), ec))

client.startCLI()
ClientCLI(name.value, client).startCLI()
}.value

subcommand("benchmark", "") {}.value
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package probench.clients

import probench.benchmark.BenchmarkData
import probench.data.KVOperation
import rdts.base.Uid

import scala.collection.mutable

trait Client(name: Uid) {

var doBenchmark: Boolean = false
val benchmarkData: mutable.ListBuffer[BenchmarkData] = mutable.ListBuffer.empty

def read(key: String): Unit = handleOp(KVOperation.Read(key))
def write(key: String, value: String): Unit = handleOp(KVOperation.Write(key, value))

def multiget(key: String, times: Int): Unit = {
val start = System.nanoTime()
for i <- 1 to times do read(key.replace("%n", i.toString))
println(s"Did $times get queries in ${(System.nanoTime() - start) / 1_000_000}ms")
}

def multiput(key: String, value: String, times: Int): Unit = {
val start = System.nanoTime()
for i <- 1 to times do write(key.replace("%n", i.toString), value.replace("%n", i.toString))
println(s"Did $times put queries in ${(System.nanoTime() - start) / 1_000_000}ms")
}

def handleOp(op: KVOperation[String, String]): Unit = {
val start = if doBenchmark then System.nanoTime() else 0

handleOpImpl(op)

if doBenchmark then {
val end = System.nanoTime()
val opString = op match
case KVOperation.Read(_) => "get"
case KVOperation.Write(_, _) => "put"
val args = op match
case KVOperation.Read(key) => key
case KVOperation.Write(key, value) => s"$key $value"
benchmarkData.append(BenchmarkData(
name.delegate,
opString,
args,
start / 1000,
end / 1000,
(end - start).toDouble / 1000,
"µs"
))
}
}

def handleOpImpl(op: KVOperation[String, String]): Unit

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package probench.clients

import probench.benchmark.{BenchmarkData, CSVWriter}
import rdts.base.Uid

import java.nio.file.Path
import scala.io.StdIn.readLine
import scala.util.matching.Regex

class ClientCLI(name: Uid, client: Client) {

private val commented: Regex = """#.*""".r
private val get: Regex = """get ([\w%]+)""".r
private val put: Regex = """put ([\w%]+) ([\w%]+)""".r
private val multiget: Regex = """multiget ([\w%]+) ([\d_]+)""".r
private val multiput: Regex = """multiput ([\w%]+) ([\w%]+) ([\d_]+)""".r
private val mp: Regex = """mp ([\d_]+)""".r
private val benchmark: Regex = """benchmark""".r
private val saveBenchmark: Regex = """save-benchmark""".r

def startCLI(): Unit = {
var running = true
while running do {
print("client> ")
val line = Option(readLine()).map(_.strip())
line match {
case Some(commented()) => // ignore
case Some(get(key)) => client.read(key)
case Some(put(key, value)) => client.write(key, value)
case Some(multiget(key, times)) => client.multiget(key, times.replace("_", "").toInt)
case Some(multiput(key, value, times)) => client.multiput(key, value, times.replace("_", "").toInt)
case Some(mp(times)) => client.multiput("key%n", "value%n", times.replace("_", "").toInt)
case Some(benchmark()) =>
client.doBenchmark = true
case Some(saveBenchmark()) =>
val env = System.getenv()

val runId = env.getOrDefault("RUN_ID", Uid.gen().delegate)
val benchmarkPath = Path.of(env.getOrDefault("BENCH_RESULTS_DIR", "bench-results")).resolve(runId)
val writer = new CSVWriter(";", benchmarkPath, s"${name.delegate}-$runId", BenchmarkData.header)
client.benchmarkData.foreach { row =>
writer.writeRow(
s"${row.name}",
row.op,
row.args,
row.sendTime.toString,
row.receiveTime.toString,
row.latency.toString,
row.unit
)
}
writer.close()
case None | Some("exit") => running = false
case _ =>
println("assuming put")
client.write("key", "value")
}
}
println(s"ended")
System.exit(1)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package probench.clients

import probench.benchmark.{BenchmarkData, CSVWriter}
import probench.data.*
import rdts.base.{Bottom, LocalUid, Uid}
import rdts.datatypes.contextual.CausalQueue
import rdts.dotted.Dotted
import rdts.syntax.DeltaBuffer

import java.nio.file.Path
import java.util.concurrent.Semaphore
import scala.collection.mutable
import scala.io.StdIn
import scala.io.StdIn.readLine
import scala.util.matching.Regex

class ProBenchClient(val name: Uid) extends Client(name) {
given localUid: LocalUid = LocalUid(name)
private val dataManager = ProDataManager[ClientNodeState](localUid, Bottom[ClientNodeState].empty, onStateChange)

private var currentOp: Option[Request] = None
val requestSemaphore = new Semaphore(0)

private def onStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = {
for {
op <- currentOp
CausalQueue.QueueElement(res @ Response(req, _), _, _) <- newState.responses.data.values if req == op
} {
println(res.payload)

currentOp = None

dataManager.transform(_.mod(state => state.copy(responses = state.responses.mod(_.removeBy(_ == res)))))

requestSemaphore.release(1)
}
}

override def handleOpImpl(op: KVOperation[String, String]): Unit = {
val req = Request(op)
currentOp = Some(req)

// TODO: still not sure that the semaphore use is correct …
// its quite likely possible that some other request is answered after draining, causing the code below to return immediately
// though overall currentOp is not protected at all, so it is triple unclear what is going on
requestSemaphore.drainPermits()

dataManager.transform { current =>
current.mod(it => it.copy(requests = it.requests.mod(_.enqueue(req))))
}

requestSemaphore.acquire(1)
}

export dataManager.addLatentConnection

}

0 comments on commit 37977ff

Please sign in to comment.