Skip to content

Commit

Permalink
netbench: add new --clients and --servers args
Browse files Browse the repository at this point in the history
  • Loading branch information
breuner committed Aug 8, 2023
1 parent 8fee052 commit 11f6b78
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 74 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* Fixed Windows build not listening for TCP/IPv4 connections in service mode.

### Contributors
* Thanks to Glenn K. Lockwood, Avi Drabkin, Michael Bertelson for reporting an issues. Thanks to Jan Heichler, Rob Mallory, Maria Gutierrez for helpful comments and suggestions.
* Thanks to Glenn K. Lockwood, Avi Drabkin, Michael Bertelson for reporting issues. Thanks to Jan Heichler, Rob Mallory, Maria Gutierrez for helpful comments and suggestions.

## v2.3.1 (Apr 10, 2023)

Expand Down
12 changes: 12 additions & 0 deletions dist/etc/bash_completion.d/elbencho
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ _elbencho_opts()
--block
--blockvaralgo
--blockvarpct
--clients
--clientsfile
--configfile
--cores
--cpu
Expand Down Expand Up @@ -116,6 +118,8 @@ _elbencho_opts()
--s3secret
--s3transman
--sendbuf
--servers
--serversfile
--service
--sharesize
--size
Expand Down Expand Up @@ -252,6 +256,8 @@ _elbencho()
# options that take a file/dir argument
-c)
;&
--clientsfile)
;&
--configfile)
;&
--csvfile)
Expand All @@ -262,14 +268,20 @@ _elbencho()
;&
--resfile)
;&
--serversfile)
;&
--treefile)
compopt -o filenames 2>/dev/null
COMPREPLY=( $(compgen -f ${cur}) )
return 0
;;

# options that take a hostname argument
--clients)
;&
--hosts)
;&
--servers)
compopt -o nospace 2>/dev/null
COMPREPLY=( $(compgen -A hostname ${cur}) )
return 0
Expand Down
199 changes: 133 additions & 66 deletions source/ProgArgs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@

#define CSVFILE_EXPECTED_COMMAS 54 // to check if existing csv was written with other version

#define NETBENCH_PORT_OFFSET 1000 // offset from service port for netbench listen socket
#define NETBENCH_PORT_OFFSET_STR STRINGIZE(NETBENCH_PORT_OFFSET)

/**
* Constructor.
*
Expand Down Expand Up @@ -168,12 +171,19 @@ void ProgArgs::defineAllowedArgs()
RANDALGO_STRONG_STR "\" for high CPU cost but strong randomness. "
"(Default: " RANDALGO_FAST_STR ")")
/*bl*/ (ARG_BLOCKVARIANCE_LONG, bpo::value(&this->blockVariancePercent),
"Percentage of each block that will be refilled with random data between writes. "
"This can be used to defeat compression/deduplication. (Default: 100; Range: 0-100)")
"Block variance percentage. Defines the percentage of each block that will be refilled "
"with random data between writes. This can be used to defeat "
"compression/deduplication. (Default: 100; Range: 0-100)")
/*c*/ (ARG_CONFIGFILE_LONG "," ARG_CONFIGFILE_SHORT, bpo::value(&this->configFilePath),
"Path to benchmark configuration file. All command line options starting with "
"double dashes can be used as \"OPTIONNAME=VALUE\" in the config file. Multiple "
"options are newline-separated. Lines starting with \"#\" are ignored.")
/*cl*/ (ARG_CLIENTS_LONG, bpo::value(&this->clientsStr),
"Comma-separated list of service hosts to use as clients in netbench mode. "
"(Format: hostname[:port])")
/*cl*/ (ARG_CLIENTSFILE_LONG, bpo::value(&this->clientsFilePath),
"Path to file containing line-separated service hosts to use as clients in netbench "
"mode. (Format: hostname[:port])")
#ifdef COREBIND_SUPPORT
/*co*/ (ARG_CPUCORES_LONG, bpo::value(&this->cpuCoresStr),
"Comma-separated list of CPU cores to bind this process to. If multiple cores are "
Expand Down Expand Up @@ -265,8 +275,8 @@ void ProgArgs::defineAllowedArgs()
"service mode hosts. The given number of threads, dirs and files is per-host then. "
"(Format: hostname[:port])")
/*ho*/ (ARG_HOSTSFILE_LONG, bpo::value(&this->hostsFilePath),
"Path to file containing line-separated service hosts to use for benchmark. (Format: "
"hostname[:port])")
"Path to file containing line-separated service hosts to use for benchmark. Lines "
"starting with \"#\" will be ignored. (Format: hostname[:port])")
/*i*/ (ARG_ITERATIONS_LONG "," ARG_ITERATIONS_SHORT, bpo::value(&this->iterations),
"Number of iterations to run the benchmark. (Default: 1)")
/*in*/ (ARG_INFINITEIOLOOP_LONG, bpo::bool_switch(&this->doInfiniteIOLoop),
Expand Down Expand Up @@ -335,16 +345,18 @@ void ProgArgs::defineAllowedArgs()
"Number of directories per thread. This can be 0 to disable creation of any subdirs, "
"in which case all workers share the given dir. (Default: 1)")
/*net*/ (ARG_NETBENCH_LONG, bpo::bool_switch(&this->useNetBench),
"Run network benchmarking. This will transfer data from clients to servers using the "
"given filesize as amount to send by each client thread. To simulate a storage access "
"request/response pattern, a client thread waits for a response of length "
"\"--" ARG_RESPSIZE_LONG "\" bytes after each sent blocksized chunk. "
"See \"--" ARG_NUMNETBENCHSERVERS_LONG "\" for how to define servers as subset of the "
"service instances list. Netbench always implies a write phase. To simulate reads, "
"a 1 byte blocksize (\"-" ARG_BLOCK_SHORT " 1\") can be used with an inrecreased "
"\"--" ARG_RESPSIZE_LONG "\". The used network port for data transfer connections will "
"be \"--" ARG_SERVICEPORT_LONG "\" plus 1000. (Netbench mode defaults to disabled "
"block variance.)")
"Run network benchmarking. To simulate the typical storage access request/response "
"pattern, each client thread will send blocksized chunks (\"-" ARG_BLOCK_SHORT "\") to "
"one of the servers and wait for a reponse of length \"--" ARG_RESPSIZE_LONG "\" bytes "
"before transmitting the next block. Client threads will get connected round-robin to "
"the given servers. Blocksize larger than response size simulates writes from clients "
"to servers, blocksize smaller than response size simulates reads. Client threads use "
"filesize as limit for amount of data to send. "
"See \"--" ARG_SERVERS_LONG "\" & \"--" ARG_CLIENTS_LONG "\" for how to define servers "
"and clients. "
"The used network port for data transfer connections will be "
"\"--" ARG_SERVICEPORT_LONG "\" plus " NETBENCH_PORT_OFFSET_STR ". "
"(Netbench mode defaults to zero block variance.)")
/*net*/ (ARG_NETDEVS_LONG, bpo::value(&this->netDevsStr),
"Comma-separated list of network device names (e.g. \"eth0\") for "
"round-robin binding of outgoing (client-side) connections in network benchmark mode. "
Expand All @@ -369,9 +381,6 @@ void ProgArgs::defineAllowedArgs()
/*nu*/ (ARG_NUMHOSTS_LONG, bpo::value(&this->numHosts),
"Number of hosts to use from given hosts list or hosts file. (Default: use all given "
"hosts)")
/*nu*/ (ARG_NUMNETBENCHSERVERS_LONG, bpo::value(&this->numNetBenchServers),
"Number of servers for network benchmark mode, counted from the beginning of the given "
"hosts list or hosts file. The rest will be used as clients. (Default: 1)")
/*po*/ (ARG_SERVICEPORT_LONG, bpo::value(&this->servicePort),
"TCP port of background service. (Default: " ARGDEFAULT_SERVICEPORT_STR ")")
/*qr*/ (ARG_PREALLOCFILE_LONG, bpo::bool_switch(&this->doPreallocFile),
Expand Down Expand Up @@ -465,6 +474,12 @@ void ProgArgs::defineAllowedArgs()
/*se*/ (ARG_SENDBUFSIZE_LONG, bpo::value(&this->sockSendBufSizeOrigStr),
"In netbench mode, this sets the send buffer size of sockets in bytes. "
"(Supports base2 suffixes, e.g. \"2M\")")
/*se*/ (ARG_SERVERS_LONG, bpo::value(&this->serversStr),
"Comma-separated list of service hosts to use as servers in netbench mode. "
"(Format: hostname[:port])")
/*se*/ (ARG_SERVERSFILE_LONG, bpo::value(&this->serversFilePath),
"Path to file containing line-separated service hosts to use as servers in netbench "
"mode. (Format: hostname[:port])")
/*se*/ (ARG_RUNASSERVICE_LONG, bpo::bool_switch(&this->runAsService),
"Run as service for distributed mode, waiting for requests from master.")
/*sh*/ (ARG_FILESHARESIZE_LONG, bpo::value(&this->fileShareSizeOrigStr),
Expand Down Expand Up @@ -636,7 +651,7 @@ void ProgArgs::defineDefaults()
this->useAlternativeHTTPService = false;
this->useHDFS = false;
this->useNetBench = false;
this->numNetBenchServers = 1;
this->numNetBenchServers = 0;
this->netBenchRespSize = 1;
this->netBenchRespSizeOrigStr = "1";
this->sockRecvBufSize = 0;
Expand Down Expand Up @@ -822,14 +837,21 @@ void ProgArgs::checkArgs()
LOGGER(Log_NORMAL, "Note: Block variance is not supported for cuFile/GDS writes.");

if(useNetBench && hostsVec.empty() )
throw ProgException("Missing hosts definition for netbench mode.");
throw ProgException("Missing servers & clients definition for netbench mode.");

if(useNetBench && !numNetBenchServers)
throw ProgException("At least one server needs to be defined for netbench mode.");

if(useNetBench && (numNetBenchServers >= hostsVec.size() ) )
throw ProgException("At least one client needed in hosts list.");
throw ProgException("At least one client needs to be defined for netbench mode.");

if(useNetBench && (!blockSize || !netBenchRespSize || !fileSize) )
throw ProgException(
"Blocksize, response size and file size must not be zero in netbench mode.");

if(useNetBench && (runCreateDirsPhase || runDeleteDirsPhase || runStatFilesPhase ||
runReadPhase || runDeleteFilesPhase) )
throw ProgException("Netbench only supports write/send phase.");
throw ProgException("Netbench mode only run in write phase.");

if(useRandomOffsets && useHDFS && runCreateFilesPhase)
throw ProgException("HDFS does not support random offsets for writes.");
Expand Down Expand Up @@ -1590,90 +1612,135 @@ void ProgArgs::prepareFileSize(int fd, std::string& path)
}

/**
* Parse hosts string to fill hostsVec. Do nothing if hosts string is empty.
* Parse hosts string (and prepended servers string and appended clients string) to fill hostsVec.
* Do nothing if hosts strings and hosts file paths are empty.
*
* hostsVec elements will have default port appended if no port was defined.
* hostsVec is the result of this. elements will have default port appended if no port was defined.
*
* @throw ProgException if a problem is found, e.g. hosts string was not empty, but parsed result
* is empty.
*/
void ProgArgs::parseHosts()
{
if(hostsStr.empty() && hostsFilePath.empty() )
return; // nothing to do

if(!numHosts)
{ // user explicitly selected zero hosts, so ignore any given hosts list or hosts file
hostsStr.clear();
hostsFilePath.clear();
serversStr = hostsStr = clientsStr = "";
serversFilePath = hostsFilePath = clientsFilePath = "";
return;
}

// read service hosts from file and add to hostsStr
if(!hostsFilePath.empty() )
struct ArgStrAndFilePathPair
{
std::ifstream hostsFile(hostsFilePath);
std::string* hostsStr;
std::string* hostsFilePath;
};

if(!hostsFile)
throw ProgException("Unable to read hosts file. Path: " + hostsFilePath);

hostsStr += " "; // add separator to existing hosts
std::vector<ArgStrAndFilePathPair> hostsStrAndPathsVec;
if(!useNetBench)
hostsStrAndPathsVec.push_back( { &hostsStr, &hostsFilePath } );
else
{
// note: the order is important: servers first and clients last (for netbench mode)

std::string lineStr;
hostsStrAndPathsVec.push_back( { &serversStr, &serversFilePath } );
hostsStrAndPathsVec.push_back( { &clientsStr, &clientsFilePath } );
}

while(std::getline(hostsFile, lineStr) )
// read service hosts from file and add to hostsStr, then add all together to hostsVec
for(ArgStrAndFilePathPair& currentPair : hostsStrAndPathsVec)
{
std::string& currentHostsStr = *currentPair.hostsStr;
std::string& currentHostsFilePath = *currentPair.hostsFilePath;

if(currentHostsStr.empty() && currentHostsFilePath.empty() )
continue; // nothing to do for this pair

if(!currentHostsFilePath.empty() )
{
if(lineStr.rfind("#", 0) == 0)
continue; // skip lines starting with "#" as comment char
std::ifstream hostsFile(currentHostsFilePath);

if(!hostsFile)
throw ProgException("Unable to read hosts file. Path: " + currentHostsFilePath);

hostsStr += lineStr + ",";
currentHostsStr += " "; // add separator to existing hosts

std::string lineStr;

while(std::getline(hostsFile, lineStr) )
{
if(lineStr.rfind("#", 0) == 0)
continue; // skip lines starting with "#" as comment char

currentHostsStr += lineStr + ",";
}

hostsFile.close();
}

hostsFile.close();
}
StringVec currentHostsVec;

boost::split(hostsVec, hostsStr, boost::is_any_of(HOSTLIST_DELIMITERS),
boost::token_compress_on);
boost::split(currentHostsVec, currentHostsStr, boost::is_any_of(HOSTLIST_DELIMITERS),
boost::token_compress_on);

// delete empty string elements from vec (they come from delimiter use at beginning or end)
for( ; ; )
{
StringVec::iterator iter = std::find(hostsVec.begin(), hostsVec.end(), "");
if(iter == hostsVec.end() )
break;
// delete empty string elements from vec (they come from delimiter use at beginning or end)
for( ; ; )
{
StringVec::iterator iter = std::find(
currentHostsVec.begin(), currentHostsVec.end(), "");

hostsVec.erase(iter);
}
if(iter == currentHostsVec.end() )
break;

for(std::string& host : hostsVec)
{
std::size_t findRes = host.find(HOST_PORT_SEPARATOR);
currentHostsVec.erase(iter);
}

// add default port to hosts where port is not provided by user
if(findRes == std::string::npos)
host += HOST_PORT_SEPARATOR + std::to_string(servicePort);
}
for(std::string& host : currentHostsVec)
{
std::size_t findRes = host.find(HOST_PORT_SEPARATOR);

if(hostsVec.empty() )
throw ProgException("Hosts defined, but parsing resulted in an empty list: " + hostsStr);
// add default port to hosts where port is not provided by user
if(findRes == std::string::npos)
host += HOST_PORT_SEPARATOR + std::to_string(servicePort);
}

if(currentHostsVec.empty() )
throw ProgException(
"Hosts defined, but parsing resulted in an empty list. "
"Given list: \"" + currentHostsStr + "\"");

// init number of netbench servers
if(currentPair.hostsStr == &serversStr)
numNetBenchServers = currentHostsVec.size();

// append hostsVec of current round to general hostsVec
hostsVec.insert(hostsVec.end(), currentHostsVec.begin(), currentHostsVec.end() );

} // end of for-loop

// check for duplicates
std::set<std::string> hostsSet(hostsVec.begin(), hostsVec.end() );
if(hostsSet.size() != hostsVec.size() )
throw ProgException("List of hosts contains duplicates. "
"Number of duplicates: " + std::to_string(hostsVec.size() - hostsSet.size() ) + "; "
"List: " + hostsStr);
"List: " + TranslatorTk::stringVecToString(hostsVec, ",") );

// reduce to user-defined number of hosts
// ("numHosts==-1" means "use all hosts")
if( (numHosts != -1) && (hostsVec.size() > (unsigned)numHosts) )
hostsVec.resize(numHosts);

LOGGER(Log_DEBUG,
"Finished parsing hosts list. "
"numHosts: " << numHosts << "; "
"numNetBenchServers: " << numNetBenchServers << "; "
"hostsVec: " << TranslatorTk::stringVecToString(hostsVec, ",") << std::endl);
}

/**
* Parse netBenchServersVec from netBenchServersStr. This is only used in service instances.
* The master sends the full service hosts list, so we only keep the configured numNetBenchServers
* and calculate the ports as service port +1 for each service.
* and calculate the ports as service port +NETBENCH_PORT_OFFSET for each service.
*/
void ProgArgs::parseNetBenchServersForService()
{
Expand Down Expand Up @@ -1704,18 +1771,18 @@ void ProgArgs::parseNetBenchServersForService()
NetBenchServerAddr newServerAddr;

// add default port to hosts where port is not provided by user
// note: netbench port is service port + 1000.
// note: netbench port is service port + NETBENCH_PORT_OFFSET.
if(findRes == std::string::npos)
{ // no port given
newServerAddr.host = server;
newServerAddr.port = ARGDEFAULT_SERVICEPORT + 1000;
newServerAddr.port = ARGDEFAULT_SERVICEPORT + NETBENCH_PORT_OFFSET;
}
else
{ // port given by user
newServerAddr.host = server.substr(0, findRes);

std::string portStr = server.substr(findRes+1);
newServerAddr.port = std::stoi(portStr) + 1000;
newServerAddr.port = std::stoi(portStr) + NETBENCH_PORT_OFFSET;
}

netBenchServersVec.push_back(newServerAddr);
Expand Down Expand Up @@ -2923,7 +2990,7 @@ void ProgArgs::getAsPropertyTreeForService(bpt::ptree& outTree, size_t serviceRa
outTree.put(ARG_HDFS_LONG, useHDFS);
outTree.put(ARG_NETBENCH_LONG, useNetBench);
outTree.put(ARG_NUMNETBENCHSERVERS_LONG, numNetBenchServers);
outTree.put(ARG_NETBENCHSERVERSSTR_LONG, hostsStr); // (yes, hostsStr sent as netbench servers)
outTree.put(ARG_NETBENCHSERVERSSTR_LONG, serversStr);
outTree.put(ARG_RESPSIZE_LONG, netBenchRespSize);
outTree.put(ARG_RECVBUFSIZE_LONG, sockRecvBufSize);
outTree.put(ARG_SENDBUFSIZE_LONG, sockSendBufSize);
Expand Down
Loading

0 comments on commit 11f6b78

Please sign in to comment.