diff --git a/CHANGELOG.md b/CHANGELOG.md index a234629..8240fc7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ * Fixed Windows build not listening for TCP/IPv4 connections in service mode. ### Contributors -* Thanks to Glenn K. Lockwood, Avi Drabkin, Michael Bertelson for reporting an issues. Thanks to Jan Heichler, Rob Mallory, Maria Gutierrez for helpful comments and suggestions. +* Thanks to Glenn K. Lockwood, Avi Drabkin, Michael Bertelson for reporting issues. Thanks to Jan Heichler, Rob Mallory, Maria Gutierrez for helpful comments and suggestions. ## v2.3.1 (Apr 10, 2023) diff --git a/dist/etc/bash_completion.d/elbencho b/dist/etc/bash_completion.d/elbencho index 31cdec3..36f8a64 100644 --- a/dist/etc/bash_completion.d/elbencho +++ b/dist/etc/bash_completion.d/elbencho @@ -28,6 +28,8 @@ _elbencho_opts() --block --blockvaralgo --blockvarpct + --clients + --clientsfile --configfile --cores --cpu @@ -116,6 +118,8 @@ _elbencho_opts() --s3secret --s3transman --sendbuf + --servers + --serversfile --service --sharesize --size @@ -252,6 +256,8 @@ _elbencho() # options that take a file/dir argument -c) ;& + --clientsfile) + ;& --configfile) ;& --csvfile) @@ -262,6 +268,8 @@ _elbencho() ;& --resfile) ;& + --serversfile) + ;& --treefile) compopt -o filenames 2>/dev/null COMPREPLY=( $(compgen -f ${cur}) ) @@ -269,7 +277,11 @@ _elbencho() ;; # options that take a hostname argument + --clients) + ;& --hosts) + ;& + --servers) compopt -o nospace 2>/dev/null COMPREPLY=( $(compgen -A hostname ${cur}) ) return 0 diff --git a/source/ProgArgs.cpp b/source/ProgArgs.cpp index e142a0d..75f2926 100644 --- a/source/ProgArgs.cpp +++ b/source/ProgArgs.cpp @@ -37,6 +37,9 @@ #define CSVFILE_EXPECTED_COMMAS 54 // to check if existing csv was written with other version +#define NETBENCH_PORT_OFFSET 1000 // offset from service port for netbench listen socket +#define NETBENCH_PORT_OFFSET_STR STRINGIZE(NETBENCH_PORT_OFFSET) + /** * Constructor. * @@ -168,12 +171,19 @@ void ProgArgs::defineAllowedArgs() RANDALGO_STRONG_STR "\" for high CPU cost but strong randomness. " "(Default: " RANDALGO_FAST_STR ")") /*bl*/ (ARG_BLOCKVARIANCE_LONG, bpo::value(&this->blockVariancePercent), - "Percentage of each block that will be refilled with random data between writes. " - "This can be used to defeat compression/deduplication. (Default: 100; Range: 0-100)") + "Block variance percentage. Defines the percentage of each block that will be refilled " + "with random data between writes. This can be used to defeat " + "compression/deduplication. (Default: 100; Range: 0-100)") /*c*/ (ARG_CONFIGFILE_LONG "," ARG_CONFIGFILE_SHORT, bpo::value(&this->configFilePath), "Path to benchmark configuration file. All command line options starting with " "double dashes can be used as \"OPTIONNAME=VALUE\" in the config file. Multiple " "options are newline-separated. Lines starting with \"#\" are ignored.") +/*cl*/ (ARG_CLIENTS_LONG, bpo::value(&this->clientsStr), + "Comma-separated list of service hosts to use as clients in netbench mode. " + "(Format: hostname[:port])") +/*cl*/ (ARG_CLIENTSFILE_LONG, bpo::value(&this->clientsFilePath), + "Path to file containing line-separated service hosts to use as clients in netbench " + "mode. (Format: hostname[:port])") #ifdef COREBIND_SUPPORT /*co*/ (ARG_CPUCORES_LONG, bpo::value(&this->cpuCoresStr), "Comma-separated list of CPU cores to bind this process to. If multiple cores are " @@ -265,8 +275,8 @@ void ProgArgs::defineAllowedArgs() "service mode hosts. The given number of threads, dirs and files is per-host then. " "(Format: hostname[:port])") /*ho*/ (ARG_HOSTSFILE_LONG, bpo::value(&this->hostsFilePath), - "Path to file containing line-separated service hosts to use for benchmark. (Format: " - "hostname[:port])") + "Path to file containing line-separated service hosts to use for benchmark. Lines " + "starting with \"#\" will be ignored. (Format: hostname[:port])") /*i*/ (ARG_ITERATIONS_LONG "," ARG_ITERATIONS_SHORT, bpo::value(&this->iterations), "Number of iterations to run the benchmark. (Default: 1)") /*in*/ (ARG_INFINITEIOLOOP_LONG, bpo::bool_switch(&this->doInfiniteIOLoop), @@ -335,16 +345,18 @@ void ProgArgs::defineAllowedArgs() "Number of directories per thread. This can be 0 to disable creation of any subdirs, " "in which case all workers share the given dir. (Default: 1)") /*net*/ (ARG_NETBENCH_LONG, bpo::bool_switch(&this->useNetBench), - "Run network benchmarking. This will transfer data from clients to servers using the " - "given filesize as amount to send by each client thread. To simulate a storage access " - "request/response pattern, a client thread waits for a response of length " - "\"--" ARG_RESPSIZE_LONG "\" bytes after each sent blocksized chunk. " - "See \"--" ARG_NUMNETBENCHSERVERS_LONG "\" for how to define servers as subset of the " - "service instances list. Netbench always implies a write phase. To simulate reads, " - "a 1 byte blocksize (\"-" ARG_BLOCK_SHORT " 1\") can be used with an inrecreased " - "\"--" ARG_RESPSIZE_LONG "\". The used network port for data transfer connections will " - "be \"--" ARG_SERVICEPORT_LONG "\" plus 1000. (Netbench mode defaults to disabled " - "block variance.)") + "Run network benchmarking. To simulate the typical storage access request/response " + "pattern, each client thread will send blocksized chunks (\"-" ARG_BLOCK_SHORT "\") to " + "one of the servers and wait for a reponse of length \"--" ARG_RESPSIZE_LONG "\" bytes " + "before transmitting the next block. Client threads will get connected round-robin to " + "the given servers. Blocksize larger than response size simulates writes from clients " + "to servers, blocksize smaller than response size simulates reads. Client threads use " + "filesize as limit for amount of data to send. " + "See \"--" ARG_SERVERS_LONG "\" & \"--" ARG_CLIENTS_LONG "\" for how to define servers " + "and clients. " + "The used network port for data transfer connections will be " + "\"--" ARG_SERVICEPORT_LONG "\" plus " NETBENCH_PORT_OFFSET_STR ". " + "(Netbench mode defaults to zero block variance.)") /*net*/ (ARG_NETDEVS_LONG, bpo::value(&this->netDevsStr), "Comma-separated list of network device names (e.g. \"eth0\") for " "round-robin binding of outgoing (client-side) connections in network benchmark mode. " @@ -369,9 +381,6 @@ void ProgArgs::defineAllowedArgs() /*nu*/ (ARG_NUMHOSTS_LONG, bpo::value(&this->numHosts), "Number of hosts to use from given hosts list or hosts file. (Default: use all given " "hosts)") -/*nu*/ (ARG_NUMNETBENCHSERVERS_LONG, bpo::value(&this->numNetBenchServers), - "Number of servers for network benchmark mode, counted from the beginning of the given " - "hosts list or hosts file. The rest will be used as clients. (Default: 1)") /*po*/ (ARG_SERVICEPORT_LONG, bpo::value(&this->servicePort), "TCP port of background service. (Default: " ARGDEFAULT_SERVICEPORT_STR ")") /*qr*/ (ARG_PREALLOCFILE_LONG, bpo::bool_switch(&this->doPreallocFile), @@ -465,6 +474,12 @@ void ProgArgs::defineAllowedArgs() /*se*/ (ARG_SENDBUFSIZE_LONG, bpo::value(&this->sockSendBufSizeOrigStr), "In netbench mode, this sets the send buffer size of sockets in bytes. " "(Supports base2 suffixes, e.g. \"2M\")") +/*se*/ (ARG_SERVERS_LONG, bpo::value(&this->serversStr), + "Comma-separated list of service hosts to use as servers in netbench mode. " + "(Format: hostname[:port])") +/*se*/ (ARG_SERVERSFILE_LONG, bpo::value(&this->serversFilePath), + "Path to file containing line-separated service hosts to use as servers in netbench " + "mode. (Format: hostname[:port])") /*se*/ (ARG_RUNASSERVICE_LONG, bpo::bool_switch(&this->runAsService), "Run as service for distributed mode, waiting for requests from master.") /*sh*/ (ARG_FILESHARESIZE_LONG, bpo::value(&this->fileShareSizeOrigStr), @@ -636,7 +651,7 @@ void ProgArgs::defineDefaults() this->useAlternativeHTTPService = false; this->useHDFS = false; this->useNetBench = false; - this->numNetBenchServers = 1; + this->numNetBenchServers = 0; this->netBenchRespSize = 1; this->netBenchRespSizeOrigStr = "1"; this->sockRecvBufSize = 0; @@ -822,14 +837,21 @@ void ProgArgs::checkArgs() LOGGER(Log_NORMAL, "Note: Block variance is not supported for cuFile/GDS writes."); if(useNetBench && hostsVec.empty() ) - throw ProgException("Missing hosts definition for netbench mode."); + throw ProgException("Missing servers & clients definition for netbench mode."); + + if(useNetBench && !numNetBenchServers) + throw ProgException("At least one server needs to be defined for netbench mode."); if(useNetBench && (numNetBenchServers >= hostsVec.size() ) ) - throw ProgException("At least one client needed in hosts list."); + throw ProgException("At least one client needs to be defined for netbench mode."); + + if(useNetBench && (!blockSize || !netBenchRespSize || !fileSize) ) + throw ProgException( + "Blocksize, response size and file size must not be zero in netbench mode."); if(useNetBench && (runCreateDirsPhase || runDeleteDirsPhase || runStatFilesPhase || runReadPhase || runDeleteFilesPhase) ) - throw ProgException("Netbench only supports write/send phase."); + throw ProgException("Netbench mode only run in write phase."); if(useRandomOffsets && useHDFS && runCreateFilesPhase) throw ProgException("HDFS does not support random offsets for writes."); @@ -1590,90 +1612,135 @@ void ProgArgs::prepareFileSize(int fd, std::string& path) } /** - * Parse hosts string to fill hostsVec. Do nothing if hosts string is empty. + * Parse hosts string (and prepended servers string and appended clients string) to fill hostsVec. + * Do nothing if hosts strings and hosts file paths are empty. * - * hostsVec elements will have default port appended if no port was defined. + * hostsVec is the result of this. elements will have default port appended if no port was defined. * * @throw ProgException if a problem is found, e.g. hosts string was not empty, but parsed result * is empty. */ void ProgArgs::parseHosts() { - if(hostsStr.empty() && hostsFilePath.empty() ) - return; // nothing to do - if(!numHosts) { // user explicitly selected zero hosts, so ignore any given hosts list or hosts file - hostsStr.clear(); - hostsFilePath.clear(); + serversStr = hostsStr = clientsStr = ""; + serversFilePath = hostsFilePath = clientsFilePath = ""; return; } - // read service hosts from file and add to hostsStr - if(!hostsFilePath.empty() ) + struct ArgStrAndFilePathPair { - std::ifstream hostsFile(hostsFilePath); + std::string* hostsStr; + std::string* hostsFilePath; + }; - if(!hostsFile) - throw ProgException("Unable to read hosts file. Path: " + hostsFilePath); - hostsStr += " "; // add separator to existing hosts + std::vector hostsStrAndPathsVec; + if(!useNetBench) + hostsStrAndPathsVec.push_back( { &hostsStr, &hostsFilePath } ); + else + { + // note: the order is important: servers first and clients last (for netbench mode) - std::string lineStr; + hostsStrAndPathsVec.push_back( { &serversStr, &serversFilePath } ); + hostsStrAndPathsVec.push_back( { &clientsStr, &clientsFilePath } ); + } - while(std::getline(hostsFile, lineStr) ) + // read service hosts from file and add to hostsStr, then add all together to hostsVec + for(ArgStrAndFilePathPair& currentPair : hostsStrAndPathsVec) + { + std::string& currentHostsStr = *currentPair.hostsStr; + std::string& currentHostsFilePath = *currentPair.hostsFilePath; + + if(currentHostsStr.empty() && currentHostsFilePath.empty() ) + continue; // nothing to do for this pair + + if(!currentHostsFilePath.empty() ) { - if(lineStr.rfind("#", 0) == 0) - continue; // skip lines starting with "#" as comment char + std::ifstream hostsFile(currentHostsFilePath); + + if(!hostsFile) + throw ProgException("Unable to read hosts file. Path: " + currentHostsFilePath); - hostsStr += lineStr + ","; + currentHostsStr += " "; // add separator to existing hosts + + std::string lineStr; + + while(std::getline(hostsFile, lineStr) ) + { + if(lineStr.rfind("#", 0) == 0) + continue; // skip lines starting with "#" as comment char + + currentHostsStr += lineStr + ","; + } + + hostsFile.close(); } - hostsFile.close(); - } + StringVec currentHostsVec; - boost::split(hostsVec, hostsStr, boost::is_any_of(HOSTLIST_DELIMITERS), - boost::token_compress_on); + boost::split(currentHostsVec, currentHostsStr, boost::is_any_of(HOSTLIST_DELIMITERS), + boost::token_compress_on); - // delete empty string elements from vec (they come from delimiter use at beginning or end) - for( ; ; ) - { - StringVec::iterator iter = std::find(hostsVec.begin(), hostsVec.end(), ""); - if(iter == hostsVec.end() ) - break; + // delete empty string elements from vec (they come from delimiter use at beginning or end) + for( ; ; ) + { + StringVec::iterator iter = std::find( + currentHostsVec.begin(), currentHostsVec.end(), ""); - hostsVec.erase(iter); - } + if(iter == currentHostsVec.end() ) + break; - for(std::string& host : hostsVec) - { - std::size_t findRes = host.find(HOST_PORT_SEPARATOR); + currentHostsVec.erase(iter); + } - // add default port to hosts where port is not provided by user - if(findRes == std::string::npos) - host += HOST_PORT_SEPARATOR + std::to_string(servicePort); - } + for(std::string& host : currentHostsVec) + { + std::size_t findRes = host.find(HOST_PORT_SEPARATOR); - if(hostsVec.empty() ) - throw ProgException("Hosts defined, but parsing resulted in an empty list: " + hostsStr); + // add default port to hosts where port is not provided by user + if(findRes == std::string::npos) + host += HOST_PORT_SEPARATOR + std::to_string(servicePort); + } + + if(currentHostsVec.empty() ) + throw ProgException( + "Hosts defined, but parsing resulted in an empty list. " + "Given list: \"" + currentHostsStr + "\""); + + // init number of netbench servers + if(currentPair.hostsStr == &serversStr) + numNetBenchServers = currentHostsVec.size(); + + // append hostsVec of current round to general hostsVec + hostsVec.insert(hostsVec.end(), currentHostsVec.begin(), currentHostsVec.end() ); + + } // end of for-loop // check for duplicates std::set hostsSet(hostsVec.begin(), hostsVec.end() ); if(hostsSet.size() != hostsVec.size() ) throw ProgException("List of hosts contains duplicates. " "Number of duplicates: " + std::to_string(hostsVec.size() - hostsSet.size() ) + "; " - "List: " + hostsStr); + "List: " + TranslatorTk::stringVecToString(hostsVec, ",") ); // reduce to user-defined number of hosts // ("numHosts==-1" means "use all hosts") if( (numHosts != -1) && (hostsVec.size() > (unsigned)numHosts) ) hostsVec.resize(numHosts); + + LOGGER(Log_DEBUG, + "Finished parsing hosts list. " + "numHosts: " << numHosts << "; " + "numNetBenchServers: " << numNetBenchServers << "; " + "hostsVec: " << TranslatorTk::stringVecToString(hostsVec, ",") << std::endl); } /** * Parse netBenchServersVec from netBenchServersStr. This is only used in service instances. * The master sends the full service hosts list, so we only keep the configured numNetBenchServers - * and calculate the ports as service port +1 for each service. + * and calculate the ports as service port +NETBENCH_PORT_OFFSET for each service. */ void ProgArgs::parseNetBenchServersForService() { @@ -1704,18 +1771,18 @@ void ProgArgs::parseNetBenchServersForService() NetBenchServerAddr newServerAddr; // add default port to hosts where port is not provided by user - // note: netbench port is service port + 1000. + // note: netbench port is service port + NETBENCH_PORT_OFFSET. if(findRes == std::string::npos) { // no port given newServerAddr.host = server; - newServerAddr.port = ARGDEFAULT_SERVICEPORT + 1000; + newServerAddr.port = ARGDEFAULT_SERVICEPORT + NETBENCH_PORT_OFFSET; } else { // port given by user newServerAddr.host = server.substr(0, findRes); std::string portStr = server.substr(findRes+1); - newServerAddr.port = std::stoi(portStr) + 1000; + newServerAddr.port = std::stoi(portStr) + NETBENCH_PORT_OFFSET; } netBenchServersVec.push_back(newServerAddr); @@ -2923,7 +2990,7 @@ void ProgArgs::getAsPropertyTreeForService(bpt::ptree& outTree, size_t serviceRa outTree.put(ARG_HDFS_LONG, useHDFS); outTree.put(ARG_NETBENCH_LONG, useNetBench); outTree.put(ARG_NUMNETBENCHSERVERS_LONG, numNetBenchServers); - outTree.put(ARG_NETBENCHSERVERSSTR_LONG, hostsStr); // (yes, hostsStr sent as netbench servers) + outTree.put(ARG_NETBENCHSERVERSSTR_LONG, serversStr); outTree.put(ARG_RESPSIZE_LONG, netBenchRespSize); outTree.put(ARG_RECVBUFSIZE_LONG, sockRecvBufSize); outTree.put(ARG_SENDBUFSIZE_LONG, sockSendBufSize); diff --git a/source/ProgArgs.h b/source/ProgArgs.h index 7e1cced..27eafc6 100644 --- a/source/ProgArgs.h +++ b/source/ProgArgs.h @@ -60,6 +60,10 @@ namespace bpt = boost::property_tree; #define ARG_HOSTS_LONG "hosts" #define ARG_HOSTSFILE_LONG "hostsfile" #define ARG_NUMHOSTS_LONG "numhosts" +#define ARG_SERVERS_LONG "servers" +#define ARG_SERVERSFILE_LONG "serversfile" +#define ARG_CLIENTS_LONG "clients" +#define ARG_CLIENTSFILE_LONG "clientsfile" #define ARG_INTERRUPT_LONG "interrupt" #define ARG_ITERATIONS_LONG "iterations" #define ARG_ITERATIONS_SHORT "i" @@ -287,6 +291,10 @@ class ProgArgs std::string hostsFilePath; // path to file for service hosts StringVec hostsVec; // service hosts broken down into individual hostname[:port] int numHosts; // number of hosts to use from hostsStr/hostsFilePath ("-1" means "all") + std::string serversStr; // prepended to hostsStr in netbench mode + std::string serversFilePath; // path to file for preprended service hosts + std::string clientsStr; // // appended to hostsStr in netbench mode + std::string clientsFilePath; // path to file for appended service hosts bool interruptServices; // send interrupt msg to given hosts to stop current phase bool quitServices; // send quit (via interrupt msg) to given hosts to exit service bool noSharedServicePath; // true if bench paths not shared between service instances diff --git a/source/Statistics.cpp b/source/Statistics.cpp index 74ab18c..3bb3d5e 100644 --- a/source/Statistics.cpp +++ b/source/Statistics.cpp @@ -804,6 +804,7 @@ void Statistics::printFullScreenLiveStatsWorkerTable(const LiveResults& liveResu (workerDonePerSec *= 1000) /= progArgs.getLiveStatsSleepMS(); + const char* netbenchServiceSuffixStr = ""; // only set in netbench mode size_t workerPercentDoneNum = 0; std::string workerPercentDoneStr = "-"; @@ -820,8 +821,16 @@ void Statistics::printFullScreenLiveStatsWorkerTable(const LiveResults& liveResu workerPercentDoneNum = std::min(workerPercentDoneNum, (size_t)100); workerPercentDoneStr = std::to_string(workerPercentDoneNum); - if(useNetBench && (i < progArgs.getNumNetBenchServers() ) ) - workerPercentDoneStr = "-"; // this is a server, we only have pct done for clients + if(useNetBench) + { // special settings for netbench mode + if(i < progArgs.getNumNetBenchServers() ) + { // this is a netbench server + workerPercentDoneStr = "-"; // this is a server, we only have pct done for clients + netbenchServiceSuffixStr = " [server]"; + } + else // this is a netbench client + netbenchServiceSuffixStr = " [client]"; + } stream << boost::format(tableHeadlineFormat) % i @@ -850,7 +859,7 @@ void Statistics::printFullScreenLiveStatsWorkerTable(const LiveResults& liveResu stream << boost::format(remoteTableHeadlineFormat) % (progArgs.getNumThreads() - remoteWorker->getNumWorkersDone() ) % remoteWorker->getCPUUtilLive() - % progArgs.getHostsVec()[i]; + % (progArgs.getHostsVec()[i] + netbenchServiceSuffixStr); } printFullScreenLiveStatsLine(stream, liveResults.winWidth, true); diff --git a/source/toolkits/TranslatorTk.cpp b/source/toolkits/TranslatorTk.cpp index 0ad4a15..64e1d0f 100644 --- a/source/toolkits/TranslatorTk.cpp +++ b/source/toolkits/TranslatorTk.cpp @@ -135,7 +135,7 @@ std::string TranslatorTk::stringVecToString(const StringVec& vec, std::string se for(const std::string& elem : vec) { if(!result.empty() ) - result += separator; // not the first element, so prepend separator + result += separator; // this is not the first element, so add separator result += elem; } diff --git a/source/workers/LocalWorker.cpp b/source/workers/LocalWorker.cpp index e9d772f..f186e5c 100644 --- a/source/workers/LocalWorker.cpp +++ b/source/workers/LocalWorker.cpp @@ -265,7 +265,7 @@ void LocalWorker::run() checkInterruptionRequest(); // for infinite loop workers with no work - } while(doInfiniteIOLoop); // end of infinite loop + } while(doInfiniteIOLoop && workerGotPhaseWork); // end of infinite loop // let coordinator know that we are done finishPhase(); @@ -5203,7 +5203,7 @@ void LocalWorker::netbenchDoTransferServer() LOGGER(Log_DEBUG,"Server: Transfer finished: " << workerSocketVec[i]->getPeername() << std::endl); - try { workerSocketVec[i]->shutdown(); } catch(...) {} + // (note: no sock shutdown() here because of possible infloop) transferredBytesVec.erase(transferredBytesVec.begin() + i); pollFDVec.erase(pollFDVec.begin() + i); @@ -5316,7 +5316,7 @@ void LocalWorker::netbenchDoTransferClient() LOGGER(Log_DEBUG,"Client: Transfer finished: " << clientSocket->getPeername() << std::endl); - try { clientSocket->shutdown(); } catch(...) {} + // (note: no sock shutdown() here because of possible infloop) } } // end of for-loop for each block