Skip to content

Commit

Permalink
further testing
Browse files Browse the repository at this point in the history
  • Loading branch information
JWUST committed Nov 26, 2013
1 parent dea5467 commit 8e14877
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 60 deletions.
21 changes: 11 additions & 10 deletions benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def __init__(self, benchmarkGroupId, benchmarkRunId, buildSettings, **kwargs):
self._scheduler = kwargs["scheduler"] if kwargs.has_key("scheduler") else "CoreBoundQueuesScheduler"
self._serverIP = kwargs["serverIP"] if kwargs.has_key("serverIP") else "127.0.0.1"
self._remoteUser = kwargs["remoteUser"] if kwargs.has_key("remoteUser") else "hyrise"
self._ssh = paramiko.SSHClient()
if self._remote:
self._ssh = paramiko.SSHClient()

self._session.headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"}
if not os.path.isdir(self._dirResults):
Expand All @@ -78,6 +79,7 @@ def run(self):
self._startSSHConnection()

if not self._manual:
# no support for building on remote machine yet
if not self._remote:
self._buildServer()
self._startServer()
Expand Down Expand Up @@ -236,17 +238,15 @@ def _startRemoteServer(self):
if (self._serverThreads > 0):
threadstring = "--threads=%s" % self._serverThreads

# note: apparently there is an issue with large outputs of the the server command; either write to /dev/null on server machine of a file on server side
# otherwise, maybe try to get the transport and read from a channel
# note: there is an issue with large outputs of the server command;
# the remote command hangs, probably when the channel buffer is full
# either write to /dev/null on server machine of a file on server side
# otherwise, get the transport and read from a channel
command_str = "cd " + str(self._dirBinary) + "; env " + env + " " + server + " --port=%s" % self._port + " --logdef=%s" % logdef + " --scheduler=%s" % self._scheduler + " " + threadstring + " &> /dev/null"
stdin, stdout, stderr = self._ssh.exec_command(command_str);

time.sleep(1)
# self._serverProc = subprocess.Popen([server, "--port=%s" % self._port, "--logdef=%s" % logdef, "--scheduler=%s" % self._scheduler, threadstring],
# cwd=self._dirBinary,
# env=env,
# stdout=open("/dev/null") if not self._stdout else None,
# stderr=open("/dev/null") if not self._stderr else None)
print "done"


def _runPrepareQueries(self):
Expand Down Expand Up @@ -304,8 +304,9 @@ def _signalHandler(self, signal, frame):

def _startSSHConnection(self):
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print self._host + " "+ self._remoteUser
self._ssh.connect(self._host, username=self._remoteUser, password='Reacti0n')
# expects authentication per key on remote server
self._ssh.connect(self._host, username=self._remoteUser)
print "connected"

def _stopSSHConnection(self):
self._ssh.close()
Expand Down
36 changes: 28 additions & 8 deletions benchmark/mixedWLPlotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ def __init__(self, benchmarkGroupId):

def printStatistics(self):
logStr = ""
output = {}
for runId, runData in self._runs.iteritems():
stats = runData[runData.keys()[0]]["txStats"]["0"]
log = str(runData[runData.keys()[0]]["numUsers"]) + " " + str(stats["totalRuns"]) + " " + str(stats["rtAvg"]) + " " + str(stats["rtMed"]) + " " + str(stats["rtStd"]) + "\n"
logStr += log
output[runData[runData.keys()[0]]["numUsers"]] = str(stats["totalRuns"]) + " " + str(stats["rtAvg"]) + " " + str(stats["rtMed"]) + " " + str(stats["rtStd"]) + " " + str(stats["srtMin"]) + " " + str(stats["srtMax"]) + " " +str(stats["srtAvg"]) + " " + str(stats["srtMed"]) + " " + str(stats["srtStd"]) + "\n"

for users in sorted(output.iterkeys()):
logStr += "%s %s" % (str(users), output[users])
# numUsers = runData[runData.keys()[0]]["numUsers"]
# print "Run ID: %s [%s users]" % (runId, numUsers)
# print "=============================="
Expand Down Expand Up @@ -201,22 +204,29 @@ def _collect(self):
"userTime": 0.0,
"totalRuns": 0,
"rtTuples": [],
"srtTuples": [],
"rtMin": 0.0,
"rtMax": 0.0,
"rtAvg": 0.0,
"rtMed": 0.0,
"rtStd": 0.0
"rtStd": 0.0,
"srtMin": 0.0,
"srtMax": 0.0,
"srtAvg": 0.0,
"srtMed": 0.0,
"srtStd": 0.0
})
txStats[txId]["totalTime"] += runtime
txStats[txId]["userTime"] += runtime / float(numUsers)
txStats[txId]["totalRuns"] += 1
txStats[txId]["rtTuples"].append((starttime, runtime))

# if len(linedata) > 3:
# opData = ast.literal_eval(linedata[3])
# for op in opData:
# pass
# # opData = opStr.split(",")
if len(linedata) > 3:
opData = ast.literal_eval(linedata[3])
for op in opData:
if op["name"].encode('utf8') == "ResponseTask":
txStats[txId]["srtTuples"].append(float(op["endTime"]))
# opData = opStr.split(",")
# opStats[txId].setdefault(op["name"], {
# "rtTuples": [],
# "avgRuns": 0.0,
Expand All @@ -236,6 +246,16 @@ def _collect(self):
txStats[txId]["rtAvg"] = average(allRuntimes)
txStats[txId]["rtMed"] = median(allRuntimes)
txStats[txId]["rtStd"] = std(allRuntimes)

allSRuntimes = txData["srtTuples"]

if len(allSRuntimes):
txStats[txId]["srtMin"] = amin(allSRuntimes)
txStats[txId]["srtMax"] = amax(allSRuntimes)
txStats[txId]["srtAvg"] = average(allSRuntimes)
txStats[txId]["srtMed"] = median(allSRuntimes)
txStats[txId]["srtStd"] = std(allSRuntimes)

for opId, opData in opStats[txId].iteritems():
opStats[txId][opId]["avgRuns"] = average([a[0] for a in opData["rtTuples"]])
opStats[txId][opId]["rtMin"] = amin([a[1] for a in opData["rtTuples"]])
Expand Down
3 changes: 1 addition & 2 deletions benchmark/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def run(self):


def stop(self):
print "stop " + str(self._userId)
self._stopevent.set()


Expand All @@ -81,7 +80,7 @@ def fireQuery(self, queryString, queryArgs={"papi": "NO_PAPI"}, sessionContext=N
else:
result = self._session.post("http://%s:%s/" % (self._host, self._port), data=data, timeout=100000)
self._totalQueryTime += time.time() - tStart

if result.status_code != 200:
print "Rquest failed. Status code: ", result.status_code
print "Response: ", result.text
Expand Down
102 changes: 67 additions & 35 deletions exp_mixed.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import argparse
import benchmark
import os
import pprint
import time

from benchmark.bench_mixed import MixedWLBenchmark
from benchmark.mixedWLPlotter import MixedWLPlotter

def runbenchmarks(groupId, s1, **kwargs):
output = ""
users = [2] #2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 30, 40, 50]
users = [1, 2, 4, 8, 16]#, 24, 32]#, 48, 64, 96, 128]
# users = [1, 32, 128]
for i in users:
runId = str(i)
kwargs["numUsers"] = i
b1 = MixedWLBenchmark(groupId, runId, s1, **kwargs)
b1.run()
time.sleep(5)
plotter = MixedWLPlotter(groupId)
output += groupId + "\n"
output += plotter.printStatistics()
Expand Down Expand Up @@ -66,8 +70,8 @@ def runbenchmarks(groupId, s1, **kwargs):
kwargs = {
"port" : args["port"],
"manual" : args["manual"],
"warmuptime" : 10,
"runtime" : 30,
"warmuptime" : 2,
"runtime" : 20,
"benchmarkQueries" : ("q7idx_vbak",),
"prepareQueries" : ("create_vbak_index",),
"showStdout" : True,
Expand All @@ -78,70 +82,98 @@ def runbenchmarks(groupId, s1, **kwargs):
"serverThreads" : args["threads"],
"collectPerfData" : args["perfdata"],
"useJson" : args["json"],
"dirBinary" : "/home/Johannes.Wust/hyrise/build/",
#"dirBinary" : "/home/Johannes.Wust/hyrise/build/",
"hyriseDBPath" : "/home/Johannes.Wust/hyrise/test/",
"scheduler" : "CoreBoundQueuesScheduler",
"serverThreads" : 11,
"remote" : True,
"remote" : False,
"remoteUser" : "Johannes.Wust",
"host" : "127.0.0.1"
#"host" : "gaza"
}

output = ""
output += "kwargs\n"
output += str(kwargs)
output += "\n"
output += "\n"
output += "OLTP 11 threads\n"
output += "\n"
output += runbenchmarks(kwargs["scheduler"] + "_OLTP", s1, **kwargs)
#kwargs["scheduler"] = "WSCoreBoundQueuesScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP", s1, **kwargs)
#kwargs["scheduler"] = "CentralScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP", s1, **kwargs)
#kwargs["scheduler"] = "ThreadPerTaskScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP", s1, **kwargs)
#
#kwargs["serverThreads"] = 22
output += runbenchmarks(kwargs["scheduler"] + "_OLTP_" + str(kwargs["serverThreads"]), s1, **kwargs)
kwargs["scheduler"] = "WSCoreBoundQueuesScheduler"
output += runbenchmarks(kwargs["scheduler"] + "_OLTP_" + str(kwargs["serverThreads"]), s1, **kwargs)
kwargs["scheduler"] = "CentralScheduler"
output += runbenchmarks(kwargs["scheduler"] + "_OLTP_" + str(kwargs["serverThreads"]), s1, **kwargs)
kwargs["scheduler"] = "ThreadPerTaskScheduler"
output += runbenchmarks(kwargs["scheduler"] + "_OLTP_" + str(kwargs["serverThreads"]), s1, **kwargs)


#kwargs["serverThreads"] = 62

## write output to file
#filename = "results_" + str(int(time.time()))
#f = open(filename,'w')
#f.write(output) # python will convert \n to os.linesep
#f.close() # you can omit in most cases as the destructor will call if
#
#output = ""
#output += "OLTP 22 threads\n"
#output += ""
#output += "OLTP 62 threads\n"
#output += "\n"
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP", s1, **kwargs)
#kwargs["scheduler"] = "CoreBoundQueuesScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#kwargs["scheduler"] = "WSCoreBoundQueuesScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#kwargs["scheduler"] = "CentralScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#kwargs["scheduler"] = "ThreadPerTaskScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLTP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#
#
## write output to file
#filename = "results_" + str(int(time.time()))
#f = open(filename,'w')
#f.write(output) # python will convert \n to os.linesep
#f.close() # you can omit in most cases as the destructor will call if
#
#kwargs["serverThreads"] = 11
#kwargs["serverThreads"] = 31
#kwargs["benchmarkQueries"] = ("xselling",)
#kwargs["prepareQueries"] = ("preload_vbap",)
#
#output += "\n"
#output += "OLAP 11 threads\n"
#output += "OLAP 31 threads\n"
#output += "\n"
#kwargs["scheduler"] = "CoreBoundQueuesScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#kwargs["scheduler"] = "WSCoreBoundQueuesScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#kwargs["scheduler"] = "CentralScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#kwargs["scheduler"] = "ThreadPerTaskScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#
#
## write output to file
#filename = "results_" + str(int(time.time()))
#f = open(filename,'w')
#f.write(output) # python will convert \n to os.linesep
#f.close() # you can omit in most cases as the destructor will call if
#
#kwargs["serverThreads"] = 22
#
#kwargs["serverThreads"] = 62
#
#output += "\n"
#output += "OLAP 22 threads\n"
#output += "OLAP 62 threads\n"
#output += "\n"
#kwargs["scheduler"] = "CoreBoundQueuesScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#kwargs["scheduler"] = "WSCoreBoundQueuesScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#kwargs["scheduler"] = "CentralScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP", s1, **kwargs)
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP_" + str(kwargs["serverThreads"]), s1, **kwargs)
#kwargs["scheduler"] = "ThreadPerTaskScheduler"
#output += runbenchmarks(kwargs["scheduler"] + "_OLAP", s1, **kwargs)

#output += runbenchmarks(kwargs["scheduler"] + "_OLAP_" + str(kwargs["serverThreads"]), s1, **kwargs)

# write output to file
filename = "results_" + str(int(time.time()))
f = open(filename,'w')
f.write(output) # python will convert \n to os.linesep
f.close() # you can omit in most cases as the destructor will call if
print output
10 changes: 5 additions & 5 deletions queries/mixed/xselling.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"type" : "TableScan",
"expression": "hyrise::Store_FLV_F1_EQ_STRING",
"f1": 3,
"instances": 11,
"instances": 31,
"COMMENT": "value is in the dictionary for 10mill",
"v_f1": "CP_5301"
},
Expand All @@ -21,14 +21,14 @@
"f1": 3,
"COMMENT": "value is in the dictionary for 10mill",
"v_f1": "CP_5305",
"instances": 11
"instances": 31
}, "4": {
"type": "RadixJoin",
"bits1": 10,
"bits2": 5,
"hash_par": 11,
"probe_par": 11,
"join_par": 11,
"hash_par": 20,
"probe_par": 20,
"join_par": 31,
"fields": [1,1] // "field_names": ["VBELN"], // Left and right
},
"5": {
Expand Down

0 comments on commit 8e14877

Please sign in to comment.