Skip to content

Commit

Permalink
Introduce batchManagement (Issue #11)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosmalt committed Feb 2, 2020
1 parent fbb3f35 commit 508e8f2
Showing 1 changed file with 44 additions and 37 deletions.
81 changes: 44 additions & 37 deletions radossim.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def osdClient(env, priority, meanInterArrivalTime, meanReqSize, dstQ):
# Submit request
with dstQ.put(request) as put:
yield put

# Move requests to BlueStore
def osdThread(env, srcQ, dstQ):
while True:
Expand All @@ -38,10 +38,9 @@ def osdThread(env, srcQ, dstQ):
# Submit BlueStore transaction
with dstQ.put(bsTxn) as put:
yield put

# Batch incoming requests and process
def kvThread(env, srcQ):
latMap = {}; cntMap = {}; count = 0; lat = 0
while True:
# Create batch
batch = []
Expand All @@ -61,66 +60,74 @@ def kvThread(env, srcQ):
bsTxn = yield get
batch.append(bsTxn)
# Process batch
#print("batch size =", len(batch))
kvQDispatch = env.now
batchReqSize = 0
for bsTxn in batch:
# Unpack transaction
((priority, reqSize, arrivalOSD), arrivalKV) = bsTxn
# Build request of entire batch (see Issue #6)
batchReqSize += reqSize
# Measure latencies
osdQLat = arrivalKV - arrivalOSD
kvQLat = env.now - arrivalKV
count += 1
lat += osdQLat + kvQLat
# Account latencies
if priority in latMap:
latMap[priority] += osdQLat + kvQLat
cntMap[priority] += 1
else:
latMap[priority] = osdQLat + kvQLat
cntMap[priority] = 1
# Periodically print latencies (averages so far)
if count % 10000 == 0:
for priority in latMap.keys():
print(priority, latMap[priority] / cntMap[priority] / 1000000)
print('total', lat / count / 1000000)
#print(priority, (arrivalKV - arrivalOSD) / (env.now - arrivalKV))
# Process batch
yield env.timeout(latModel(batchReqSize))



kvCommit = env.now
# Diagnose and manage batching
batchManagement(batch, batchReqSize, kvQDispatch, kvCommit)

def batchManagement(batch, batchSize, dispatchTime, commitTime):
latMap = {}; cntMap = {}; count = 0; lat = 0
# Process batch
#print("batch size =", len(batch))
for bsTxn in batch:
# Unpack transaction
((priority, reqSize, arrivalOSD), arrivalKV) = bsTxn
# Measure latencies
osdQLat = arrivalKV - arrivalOSD
kvQLat = dispatchTime - arrivalKV
count += 1
lat += osdQLat + kvQLat
# Account latencies
if priority in latMap:
latMap[priority] += osdQLat + kvQLat
cntMap[priority] += 1
else:
latMap[priority] = osdQLat + kvQLat
cntMap[priority] = 1
# Periodically print latencies (averages so far)
if count % 10000 == 0:
for priority in latMap.keys():
print(priority, latMap[priority] / cntMap[priority] / 1000000)
print('total', lat / count / 1000000)
#print(priority, (arrivalKV - arrivalOSD) / (env.now - arrivalKV))

if __name__ == '__main__':

env = simpy.Environment()
env = simpy.Environment()

# Constants
meanInterArrivalTime = 28500 # micro seconds
meanReqSize = 4096 # bytes
meanReqSize = 4096 # bytes
#meanInterArrivalTime = 4200 # micro seconds
#meanReqSize = 16 * 4096 # bytes

# OSD queue(s)
# Add capacity parameter for max queue lengths
osdQ1 = simpy.PriorityStore(env)
osdQ2 = simpy.PriorityStore(env)
#osdQ = simpy.Store(env) # infinite capacity

# KV queue (capacity translates into batch size)
kvQ = simpy.Store(env, 1)
kvQ = simpy.Store(env, 1)

# OSD client(s), each with a particular priority pushing request into a particular queue
env.process(osdClient(env, 1, meanInterArrivalTime*2, meanReqSize, osdQ1))
env.process(osdClient(env, 2, meanInterArrivalTime*2, meanReqSize, osdQ1))
env.process(osdClient(env, 2, meanInterArrivalTime*2, meanReqSize, osdQ1))

# OSD thread(s) (one per OSD queue)
# env.process(osdThread(env, osdQ, kvQ))
env.process(osdThread(env, osdQ1, kvQ))
env.process(osdThread(env, osdQ2, kvQ))

# KV thread in BlueStore
env.process(kvThread(env, kvQ))

# Run simulation
env.run(60 * 60 * 1000000)

0 comments on commit 508e8f2

Please sign in to comment.