Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python3 compatibility #14

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pytpcc/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
MIN_CARRIER_ID = 1
MAX_CARRIER_ID = 10
# HACK: This is not strictly correct, but it works
NULL_CARRIER_ID = 0L
NULL_CARRIER_ID = 0
# o_id < than this value, carrier != null, >= -> carrier == null
NULL_CARRIER_LOWER_BOUND = 2101
MIN_OL_CNT = 5
Expand Down
2 changes: 1 addition & 1 deletion pytpcc/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def startExecution(scaleParameters, args, config,channels):
## Load Configuration file
if args['config']:
logging.debug("Loading configuration file '%s'" % args['config'])
cparser = SafeConfigParser()
cparser = ConfigParser()
cparser.read(os.path.realpath(args['config'].name))
config = dict(cparser.items(args['system']))
else:
Expand Down
36 changes: 20 additions & 16 deletions pytpcc/drivers/mongodbdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import pymongo

import constants
from abstractdriver import AbstractDriver
from .abstractdriver import AbstractDriver

TABLE_COLUMNS = {
constants.TABLENAME_ITEM: [
Expand Down Expand Up @@ -345,9 +345,9 @@ def loadConfig(self, config):
logging.error("ConnectionFailure %d (%s) when connected to %s: ",
exc.code, exc.details, display_uri)
return
except pymongo.errors.PyMongoError, err:
except pymongo.errors.PyMongoError as err:
logging.error("Some general error (%s) when connected to %s: ", str(err), display_uri)
print "Got some other error: %s" % str(err)
print("Got some other error: %s" % str(err))
return

## ----------------------------------------------
Expand Down Expand Up @@ -408,15 +408,15 @@ def loadTuples(self, tableName, tuples):
tuple_dicts.append(dict([(columns[i], t[i]) for i in num_columns]))
## FOR

self.database[tableName].insert(tuple_dicts)
self.database[tableName].insert_many(tuple_dicts)
## IF

return

def loadFinishDistrict(self, w_id, d_id):
if self.denormalize:
logging.debug("Pushing %d denormalized ORDERS records for WAREHOUSE %d DISTRICT %d into MongoDB", len(self.w_orders), w_id, d_id)
self.database[constants.TABLENAME_ORDERS].insert(self.w_orders.values())
self.database[constants.TABLENAME_ORDERS].insert_many(self.w_orders.values())
self.w_orders.clear()
## IF

Expand Down Expand Up @@ -593,7 +593,7 @@ def _doNewOrderTxn(self, s, params):
session=s)
if not d:
d1 = self.district.find_one({"D_ID": d_id, "D_W_ID": w_id, "$comment": "new order did not find district"})
print d1, w_id, d_id, c_id, i_ids, i_w_ids, s_dist_col
print(d1, w_id, d_id, c_id, i_ids, i_w_ids, s_dist_col)
assert d, "Couldn't find district in new order w_id %d d_id %d" % (w_id, d_id)
else:
d = self.district.find_one({"D_ID": d_id, "D_W_ID": w_id, "$comment": comment},
Expand Down Expand Up @@ -621,7 +621,9 @@ def _doNewOrderTxn(self, s, params):
#print constants.INVALID_ITEM_MESSAGE + ", Aborting transaction (ok for 1%)"
return None
## IF
items = sorted(items, key=lambda x: i_ids.index(x['I_ID']))

xxi_ids = tuple(map(lambda o: o['I_ID'], items))
items = sorted(items, key=lambda x: xxi_ids.index(x['I_ID']))

# getWarehouseTaxRate
w = self.warehouse.find_one({"W_ID": w_id, "$comment": comment}, {"_id":0, "W_TAX": 1}, session=s)
Expand Down Expand Up @@ -676,7 +678,9 @@ def _doNewOrderTxn(self, s, params):
session=s))
## IF
assert len(all_stocks) == ol_cnt, "all_stocks len %d != ol_cnt %d" % (len(all_stocks), ol_cnt)
all_stocks = sorted(all_stocks, key=lambda x: item_w_list.index((x['S_I_ID'], x["S_W_ID"])))

xxxi_ids = tuple(map(lambda o: (o['S_I_ID'], o['S_W_ID']), all_stocks))
all_stocks = sorted(all_stocks, key=lambda x: xxxi_ids.index((x['S_I_ID'], x["S_W_ID"])))

## ----------------
## Insert Order Line, Stock Item Information
Expand Down Expand Up @@ -820,7 +824,7 @@ def _doOrderStatusTxn(self, s, params):
all_customers = list(self.customer.find(search_fields, return_fields, session=s))
namecnt = len(all_customers)
assert namecnt > 0, "No matching customer for last name %s!" % c_last
index = (namecnt-1)/2
index = (namecnt-1)//2
c = all_customers[index]
c_id = c["C_ID"]
## IF
Expand Down Expand Up @@ -891,7 +895,7 @@ def _doPaymentTxn(self, s, params):
session=s)
if not d:
d1 = self.district.find_one({"D_ID": d_id, "D_W_ID": w_id, "$comment": "payment did not find district"})
print d1, w_id, d_id, h_amount, c_w_id, c_d_id, c_id, c_last, h_date
print(d1, w_id, d_id, h_amount, c_w_id, c_d_id, c_id, c_last, h_date)
assert d, "Couldn't find district in payment w_id %d d_id %d" % (w_id, d_id)
else:
d = self.district.find_one({"D_W_ID": w_id, "D_ID": d_id, "$comment": comment},
Expand Down Expand Up @@ -942,7 +946,7 @@ def _doPaymentTxn(self, s, params):
all_customers = list(self.customer.find(search_fields, return_fields, session=s))
namecnt = len(all_customers)
assert namecnt > 0, "No matching customer w %d d %d clast %s" % (w_id, d_id, c_last)
index = (namecnt-1)/2
index = (namecnt-1)//2
c = all_customers[index]
c_id = c["C_ID"]
## IF
Expand Down Expand Up @@ -1075,9 +1079,9 @@ def _doStockLevelTxn(self, s, params):
ol_ids.add(ol["OL_I_ID"])
## FOR

result = self.stock.find({"S_W_ID": w_id,
result = self.stock.count_documents({"S_W_ID": w_id,
"S_I_ID": {"$in": list(ol_ids)},
"S_QUANTITY": {"$lt": threshold}, "$comment": comment}).count()
"S_QUANTITY": {"$lt": threshold}, "$comment": comment})

return int(result)

Expand All @@ -1095,11 +1099,11 @@ def run_transaction(self, txn_callback, session, name, params):
exc.code, exc.details, name)
return (False, None)
logging.error("Failed with unknown OperationFailure: %d", exc.code)
print "Failed with unknown OperationFailure: %d" % exc.code
print exc.details
print("Failed with unknown OperationFailure: %d" % exc.code)
print(exc.details)
raise
except pymongo.errors.ConnectionFailure:
print "ConnectionFailure during %s: " % name
print("ConnectionFailure during %s: " % name)
return (False, None)
## TRY

Expand Down
4 changes: 2 additions & 2 deletions pytpcc/runtime/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def execute(self, duration):
(val, retries) = self.driver.executeTransaction(txn, params)
except KeyboardInterrupt:
return -1
except (Exception, AssertionError), ex:
except (Exception, AssertionError) as ex:
logging.warn("Failed to execute Transaction '%s': %s" % (txn, ex))
traceback.print_exc(file=sys.stdout)
print "Aborting some transaction with some error %s %s" % (txn, ex)
print("Aborting some transaction with some error %s %s" % (txn, ex))
global_result.abortTransaction(global_txn_id)
batch_result.abortTransaction(batch_txn_id)
if self.stop_on_error: raise
Expand Down
6 changes: 3 additions & 3 deletions pytpcc/runtime/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def execute(self):
## ==============================================
def loadItems(self):
## Select 10% of the rows to be marked "original"
originalRows = rand.selectUniqueIds(self.scaleParameters.items / 10, 1, self.scaleParameters.items)
originalRows = rand.selectUniqueIds(self.scaleParameters.items // 10, 1, self.scaleParameters.items)

## Load all of the items
tuples = [ ]
Expand Down Expand Up @@ -112,7 +112,7 @@ def loadWarehouse(self, w_id):
h_tuples = [ ]

## Select 10% of the customers to have bad credit
selectedRows = rand.selectUniqueIds(self.scaleParameters.customersPerDistrict / 10, 1, self.scaleParameters.customersPerDistrict)
selectedRows = rand.selectUniqueIds(self.scaleParameters.customersPerDistrict // 10, 1, self.scaleParameters.customersPerDistrict)

## TPC-C 4.3.3.1. says that o_c_id should be a permutation of [1, 3000]. But since it
## is a c_id field, it seems to make sense to have it be a permutation of the
Expand Down Expand Up @@ -160,7 +160,7 @@ def loadWarehouse(self, w_id):

## Select 10% of the stock to be marked "original"
s_tuples = [ ]
selectedRows = rand.selectUniqueIds(self.scaleParameters.items / 10, 1, self.scaleParameters.items)
selectedRows = rand.selectUniqueIds(self.scaleParameters.items // 10, 1, self.scaleParameters.items)
total_tuples = 0
for i_id in range(1, self.scaleParameters.items+1):
original = (i_id in selectedRows)
Expand Down
24 changes: 18 additions & 6 deletions pytpcc/tpcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import glob
import time
import multiprocessing
from ConfigParser import SafeConfigParser
from configparser import ConfigParser
from pprint import pprint, pformat

from util import results, scaleparameters
Expand Down Expand Up @@ -84,12 +84,18 @@ def startLoading(driverClass, scaleParameters, args, config):

# Split the warehouses into chunks
w_ids = [[] for _ in range(args['clients'])]

for w_id in range(scaleParameters.starting_warehouse, scaleParameters.ending_warehouse+1):
idx = w_id % args['clients']
w_ids[idx].append(w_id)
## FOR

loader_results = []
try:
del args['config']
except KeyError:
print()

for i in range(args['clients']):
r = pool.apply_async(loaderFunc, (driverClass, scaleParameters, args, config, w_ids[i]))
loader_results.append(r)
Expand All @@ -104,6 +110,7 @@ def startLoading(driverClass, scaleParameters, args, config):
## loaderFunc
## ==============================================
def loaderFunc(driverClass, scaleParameters, args, config, w_ids):

driver = driverClass(args['ddl'])
assert driver != None, "Driver in loadFunc is none!"
logging.debug("Starting client execution: %s [warehouses=%d]", driver, len(w_ids))
Expand All @@ -122,7 +129,7 @@ def loaderFunc(driverClass, scaleParameters, args, config, w_ids):
driver.loadFinish()
except KeyboardInterrupt:
return -1
except (Exception, AssertionError), ex:
except (Exception, AssertionError) as ex:
logging.warn("Failed to load data: %s", ex)
raise

Expand All @@ -136,6 +143,11 @@ def startExecution(driverClass, scaleParameters, args, config):
pool = multiprocessing.Pool(args['clients'])
debug = logging.getLogger().isEnabledFor(logging.DEBUG)

try:
del args['config']
except KeyError:
print()

worker_results = []
for _ in range(args['clients']):
r = pool.apply_async(executorFunc, (driverClass, scaleParameters, args, config, debug,))
Expand Down Expand Up @@ -184,7 +196,7 @@ def executorFunc(driverClass, scaleParameters, args, config, debug):
aparser = argparse.ArgumentParser(description='Python implementation of the TPC-C Benchmark')
aparser.add_argument('system', choices=getDrivers(),
help='Target system driver')
aparser.add_argument('--config', type=file,
aparser.add_argument('--config', type=open,
help='Path to driver configuration file')
aparser.add_argument('--reset', action='store_true',
help='Instruct the driver to reset the contents of the database')
Expand Down Expand Up @@ -221,14 +233,14 @@ def executorFunc(driverClass, scaleParameters, args, config, debug):
assert driver != None, "Failed to create '%s' driver" % args['system']
if args['print_config']:
config = driver.makeDefaultConfig()
print driver.formatConfig(config)
print
print(driver.formatConfig(config))
print()
sys.exit(0)

## Load Configuration file
if args['config']:
logging.debug("Loading configuration file '%s'", args['config'])
cparser = SafeConfigParser()
cparser = ConfigParser()
cparser.read(os.path.realpath(args['config'].name))
config = dict(cparser.items(args['system']))
else:
Expand Down
16 changes: 8 additions & 8 deletions pytpcc/util/nurand.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
# OTHER DEALINGS IN THE SOFTWARE.
# -----------------------------------------------------------------------

import rand
import random

def makeForLoad():
"""Create random NURand constants, appropriate for loading the database."""
cLast = rand.number(0, 255)
cId = rand.number(0, 1023)
orderLineItemId = rand.number(0, 8191)
cLast = random.randint(0, 255)
cId = random.randint(0, 1023)
orderLineItemId = random.randint(0, 8191)
return NURandC(cLast, cId, orderLineItemId)

def validCRun(cRun, cLoad):
Expand All @@ -45,13 +45,13 @@ def validCRun(cRun, cLoad):

def makeForRun(loadC):
"""Create random NURand constants for running TPC-C. TPC-C 2.1.6.1. (page 20) specifies the valid range for these constants."""
cRun = rand.number(0, 255)
cRun = random.randint(0, 255)
while validCRun(cRun, loadC.cLast) == False:
cRun = rand.number(0, 255)
cRun = random.randint(0, 255)
assert validCRun(cRun, loadC.cLast)

cId = rand.number(0, 1023)
orderLineItemId = rand.number(0, 8191)
cId = random.randint(0, 1023)
orderLineItemId = random.randint(0, 8191)
return NURandC(cRun, cId, orderLineItemId)

class NURandC:
Expand Down
4 changes: 2 additions & 2 deletions pytpcc/util/rand.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
# -----------------------------------------------------------------------

import random
import nurand
from . import nurand

SYLLABLES = [ "BAR", "OUGHT", "ABLE", "PRI", "PRES", "ESE", "ANTI", "CALLY", "ATION", "EING" ]

Expand Down Expand Up @@ -129,7 +129,7 @@ def makeLastName(number):
"""A last name as defined by TPC-C 4.3.2.3. Not actually random."""
global SYLLABLES
assert 0 <= number and number <= 999
indicies = [ number/100, (number/10)%10, number%10 ]
indicies = [ number//100, (number//10)%10, number%10 ]
return "".join(map(lambda x: SYLLABLES[x], indicies))
## DEF

Expand Down
2 changes: 1 addition & 1 deletion pytpcc/util/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def append(self, r):
self.txn_times[txn_name] = orig_time + r.txn_times[txn_name]
self.txn_retries[txn_name] = orig_retries + r.txn_retries[txn_name]
self.txn_aborts[txn_name] = orig_aborts + r.txn_aborts[txn_name]
print "%s [cnt=%d, time=%d]" % (txn_name, self.txn_counters[txn_name], self.txn_times[txn_name])
print("%s [cnt=%d, time=%d]" % (txn_name, self.txn_counters[txn_name], self.txn_times[txn_name]))
# logging.debug("%s [cnt=%d, time=%d]" % (txn_name, self.txn_counters[txn_name], self.txn_times[txn_name]))
if txn_name not in self.latencies:
self.latencies[txn_name] = []
Expand Down