Skip to content

Commit

Permalink
Squashed 'lib/mmseqs/' changes from 0898eb9012..b804fbe384
Browse files Browse the repository at this point in the history
b804fbe384 Include CUDA_VISIBLE_DEVICES into shm hash, so same db can be loaded into multiple GPUs
e0957747fd gpu client waits for gpuserver to be ready
492297bd7a Padded check in Sequence class not needed anymore
ddf2e85f88 Move azure to macos-latest
fd37b377d2 Merged taxID larger than any taxid in nodes.dmp could corrupt memory  #931

git-subtree-dir: lib/mmseqs
git-subtree-split: b804fbe384e6f6c9fe96322ec0e92d48bccd0a42
  • Loading branch information
martin-steinegger committed Jan 19, 2025
1 parent 5ee0260 commit 1013c94
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 32 deletions.
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
- job: build_macos
displayName: macOS
pool:
vmImage: 'macos-12'
vmImage: 'macos-latest'
steps:
- checkout: self
submodules: true
Expand Down
12 changes: 12 additions & 0 deletions src/commons/GpuUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define GPUUTIL_H

#include "Debug.h"
#include "FileUtil.h"
#include "PrefilteringIndexReader.h"
#include "marv.h"
#include <atomic>
#include <cstring>
Expand Down Expand Up @@ -37,6 +39,16 @@ struct GPUSharedMemory {
sizeof(int8_t) * 20 * maxSeqLen; // Size for profile data
}

static std::string getShmHash(const std::string& db) {
std::string dbpath = FileUtil::getRealPathFromSymLink(PrefilteringIndexReader::dbPathWithoutIndex(db));
char* visibleDevices = getenv("CUDA_VISIBLE_DEVICES");
if (visibleDevices) {
dbpath.append(visibleDevices);
}
size_t hash = Util::hash(dbpath.c_str(), dbpath.length());
return SSTR(hash);
}

// Allocate and initialize shared memory
static GPUSharedMemory* alloc(const std::string& name, unsigned int maxSeqLen, unsigned int maxResListLen) {
size_t shm_size = calculateSize(maxSeqLen, maxResListLen);
Expand Down
7 changes: 7 additions & 0 deletions src/commons/Parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Parameters::Parameters():
// gpu
PARAM_GPU(PARAM_GPU_ID, "--gpu", "Use GPU", "Use GPU (CUDA) if possible", typeid(int), (void *) &gpu, "^[0-1]{1}$", MMseqsParameter::COMMAND_COMMON),
PARAM_GPU_SERVER(PARAM_GPU_SERVER_ID, "--gpu-server", "Use GPU server", "Use GPU server", typeid(int), (void *) &gpuServer, "^[0-1]{1}$", MMseqsParameter::COMMAND_COMMON),
PARAM_GPU_SERVER_WAIT_TIMEOUT(PARAM_GPU_SERVER_WAIT_TIMEOUT_ID, "--gpu-server-wait-timeout", "Wait for GPU server", "Wait for GPU server for 0: don't wait -1: no wait limit: >0 this many seconds", typeid(int), (void *) &gpuServerWaitTimeout, "^-?[0-9]+", MMseqsParameter::COMMAND_COMMON),
// convertalignments
PARAM_FORMAT_MODE(PARAM_FORMAT_MODE_ID, "--format-mode", "Alignment format", "Output format:\n0: BLAST-TAB\n1: SAM\n2: BLAST-TAB + query/db length\n3: Pretty HTML\n4: BLAST-TAB + column headers\nBLAST-TAB (0) and BLAST-TAB + column headers (4) support custom output formats (--format-output)", typeid(int), (void *) &formatAlignmentMode, "^[0-4]{1}$"),
PARAM_FORMAT_OUTPUT(PARAM_FORMAT_OUTPUT_ID, "--format-output", "Format alignment output", "Choose comma separated list of output columns from: query,target,evalue,gapopen,pident,fident,nident,qstart,qend,qlen\ntstart,tend,tlen,alnlen,raw,bits,cigar,qseq,tseq,qheader,theader,qaln,taln,qframe,tframe,mismatch,qcov,tcov\nqset,qsetid,tset,tsetid,taxid,taxname,taxlineage,qorfstart,qorfend,torfstart,torfend,ppos", typeid(std::string), (void *) &outfmt, ""),
Expand Down Expand Up @@ -455,6 +456,7 @@ Parameters::Parameters():
ungappedprefilter.push_back(&PARAM_PRELOAD_MODE);
ungappedprefilter.push_back(&PARAM_GPU);
ungappedprefilter.push_back(&PARAM_GPU_SERVER);
ungappedprefilter.push_back(&PARAM_GPU_SERVER_WAIT_TIMEOUT);
ungappedprefilter.push_back(&PARAM_PREF_MODE);
ungappedprefilter.push_back(&PARAM_THREADS);
ungappedprefilter.push_back(&PARAM_COMPRESSED);
Expand Down Expand Up @@ -1357,6 +1359,7 @@ Parameters::Parameters():
clusterworkflow = combineList(clusterworkflow, linclustworkflow);
clusterworkflow = removeParameter(clusterworkflow, PARAM_GPU);
clusterworkflow = removeParameter(clusterworkflow, PARAM_GPU_SERVER);
clusterworkflow = removeParameter(clusterworkflow, PARAM_GPU_SERVER_WAIT_TIMEOUT);
// easyclusterworkflow
easyclusterworkflow = combineList(clusterworkflow, createdb);
Expand Down Expand Up @@ -1400,6 +1403,7 @@ Parameters::Parameters():
clusterUpdate.push_back(&PARAM_RECOVER_DELETED);
clusterUpdate = removeParameter(clusterUpdate, PARAM_GPU);
clusterUpdate = removeParameter(clusterUpdate, PARAM_GPU_SERVER);
clusterUpdate = removeParameter(clusterUpdate, PARAM_GPU_SERVER_WAIT_TIMEOUT);
mapworkflow = combineList(prefilter, rescorediagonal);
mapworkflow = combineList(mapworkflow, extractorfs);
Expand All @@ -1410,6 +1414,7 @@ Parameters::Parameters():
mapworkflow.push_back(&PARAM_REMOVE_TMP_FILES);
mapworkflow = removeParameter(mapworkflow, PARAM_GPU);
mapworkflow = removeParameter(mapworkflow, PARAM_GPU_SERVER);
mapworkflow = removeParameter(mapworkflow, PARAM_GPU_SERVER_WAIT_TIMEOUT);
enrichworkflow = combineList(searchworkflow, prefilter);
enrichworkflow = combineList(enrichworkflow, subtractdbs);
Expand All @@ -1418,6 +1423,7 @@ Parameters::Parameters():
enrichworkflow = combineList(enrichworkflow, result2profile);
enrichworkflow = removeParameter(enrichworkflow, PARAM_GPU);
enrichworkflow = removeParameter(enrichworkflow, PARAM_GPU_SERVER);
enrichworkflow = removeParameter(enrichworkflow, PARAM_GPU_SERVER_WAIT_TIMEOUT);
databases.push_back(&PARAM_HELP);
databases.push_back(&PARAM_HELP_LONG);
Expand Down Expand Up @@ -2468,6 +2474,7 @@ void Parameters::setDefaults() {
}
#endif
gpuServer = 0;
gpuServerWaitTimeout = 10 * 60;
#ifdef HAVE_CUDA
char* gpuServerEnv = getenv("MMSEQS_FORCE_GPUSERVER");
if (gpuServerEnv != NULL) {
Expand Down
2 changes: 2 additions & 0 deletions src/commons/Parameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ class Parameters {
int verbosity; // log level
int gpu; // use GPU
int gpuServer; // use the gpu server
int gpuServerWaitTimeout; // wait for this many seconds until GPU server is ready
int threads; // Amounts of threads
int compressed; // compressed writer
bool removeTmpFiles; // Do not delete temp files
Expand Down Expand Up @@ -820,6 +821,7 @@ class Parameters {
// gpu
PARAMETER(PARAM_GPU)
PARAMETER(PARAM_GPU_SERVER)
PARAMETER(PARAM_GPU_SERVER_WAIT_TIMEOUT)
// format alignment
PARAMETER(PARAM_FORMAT_MODE)
PARAMETER(PARAM_FORMAT_OUTPUT)
Expand Down
8 changes: 1 addition & 7 deletions src/commons/Sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "MathUtil.h"
#include "SubstitutionMatrixProfileStates.h"
#include "PSSMCalculator.h"

#include <climits> // short_max
#include <cstddef>

Expand Down Expand Up @@ -206,12 +205,7 @@ void Sequence::mapSequence(size_t id, unsigned int dbKey, const char *sequence,
this->dbKey = dbKey;
this->seqData = sequence;
if (Parameters::isEqualDbtype(this->seqType, Parameters::DBTYPE_AMINO_ACIDS) || Parameters::isEqualDbtype(this->seqType, Parameters::DBTYPE_NUCLEOTIDES)) {
// check for padded database
if(seqLen >= 1 && sequence[0] >= 0 && sequence[0] <= 52){
mapSequence(id, dbKey, std::make_pair((const unsigned char *)sequence, seqLen));
}else{
mapSequence(sequence, seqLen);
}
mapSequence(sequence, seqLen);
} else if (Parameters::isEqualDbtype(this->seqType, Parameters::DBTYPE_HMM_PROFILE)) {
mapProfile(sequence, seqLen);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/prefiltering/PrefilteringIndexReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ std::string PrefilteringIndexReader::searchForIndex(const std::string &pathToDB)
return "";
}

std::string PrefilteringIndexReader::dbPathWithoutIndex(std::string & dbname) {
std::string PrefilteringIndexReader::dbPathWithoutIndex(const std::string& dbname) {
std::string rawname = dbname;
// check for .idx
size_t idxlastpos = dbname.rfind(".idx");
Expand Down
2 changes: 1 addition & 1 deletion src/prefiltering/PrefilteringIndexReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class PrefilteringIndexReader {

static std::string searchForIndex(const std::string &pathToDB);

static std::string dbPathWithoutIndex(std::string &dbname);
static std::string dbPathWithoutIndex(const std::string &dbname);

private:
static void printMeta(int *meta);
Expand Down
56 changes: 46 additions & 10 deletions src/prefiltering/ungappedprefilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include <fcntl.h>
#include <sys/mman.h>
#include <chrono>
#include <thread>

#ifdef OPENMP
#include <omp.h>
Expand Down Expand Up @@ -60,14 +62,48 @@ void runFilterOnGpu(Parameters & par, BaseMatrix * subMat,
memset(compositionBias, 0, compBufferSize);
}

// hash the realpath of par.db2
std::string tdbrName = par.db2;
std::string tdbrRelPath = FileUtil::getRealPathFromSymLink(PrefilteringIndexReader::dbPathWithoutIndex(tdbrName));
size_t hash = Util::hash(tdbrRelPath.c_str(), tdbrRelPath.length());
std::string shmFileInFile = (par.gpuServer == 0) ? "" : "/dev/shm/" + SSTR(hash);
if (shmFileInFile != "" && FileUtil::fileExists(shmFileInFile.c_str()) == false) {
Debug(Debug::ERROR) << "--gpu-server " << shmFileInFile << " does not exist";
EXIT(EXIT_FAILURE);
std::string hash = "";
if (par.gpuServer != 0) {
hash = GPUSharedMemory::getShmHash(par.db2);
std::string path = "/dev/shm/" + hash;
int waitTimeout = par.gpuServerWaitTimeout;
std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
bool statusPrinted = false;
while (true) {
size_t shmSize = FileUtil::getFileSize(path);
// server is ready once the shm file exists and is not 0 byte large
if (shmSize != (size_t)-1 && shmSize > 0) {
break;
}

if (waitTimeout == 0) {
Debug(Debug::ERROR)
<< "gpuserver for database " << par.db2 << " not found.\n"
<< "Please start gpuserver with the same CUDA_VISIBLE_DEVICES\n";
EXIT(EXIT_FAILURE);
}

if (waitTimeout > 0) {
if (statusPrinted == false) {
Debug(Debug::INFO) << "Waiting for `gpuserver`";
statusPrinted = true;
} else {
Debug(Debug::INFO) << ".";
}
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - startTime).count();
if (elapsed >= waitTimeout) {
Debug(Debug::ERROR)
<< "gpuserver for database " << par.db2 << " not found after " << elapsed << "seconds.\n"
<< "Please start gpuserver with the same CUDA_VISIBLE_DEVICES\n";
EXIT(EXIT_FAILURE);
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
if (waitTimeout > 0 && statusPrinted) {
Debug(Debug::INFO) << "\n";
}
}

size_t* offsetData = NULL;
Expand All @@ -76,7 +112,7 @@ void runFilterOnGpu(Parameters & par, BaseMatrix * subMat,
std::vector<int32_t> lengths;
GPUSharedMemory* layout = NULL;
pid_t pid = 0; // current process ID, only for server
if (FileUtil::fileExists(shmFileInFile.c_str()) == false) {
if (hash.empty()) {
offsets.reserve(tdbr->getSize() + 1);
lengths.reserve(tdbr->getSize());
for (size_t id = 0; id < tdbr->getSize(); id++) {
Expand All @@ -88,7 +124,7 @@ void runFilterOnGpu(Parameters & par, BaseMatrix * subMat,
lengthData = lengths.data();
} else {
pid = getpid();
layout = GPUSharedMemory::openSharedMemory(SSTR(hash));
layout = GPUSharedMemory::openSharedMemory(hash);
}

const bool serverMode = par.gpuServer;
Expand Down
34 changes: 29 additions & 5 deletions src/taxonomy/NcbiTaxonomy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,22 +457,46 @@ size_t NcbiTaxonomy::loadMerged(const std::string &mergedFile) {
EXIT(EXIT_FAILURE);
}

std::unordered_map<TaxID, TaxID> mergedMap;
TaxID localMaxTaxID = maxTaxID;
std::string line;
size_t count = 0;
while (std::getline(ss, line)) {
std::vector<std::string> result = splitByDelimiter(line, "\t|\t", 2);
if (result.size() != 2) {
Debug(Debug::ERROR) << "Invalid name entry!\n";
EXIT(EXIT_FAILURE);
}

unsigned int oldId = (unsigned int)strtoul(result[0].c_str(), NULL, 10);
unsigned int mergedId = (unsigned int)strtoul(result[1].c_str(), NULL, 10);
TaxID oldId = (TaxID) strtoul(result[0].c_str(), NULL, 10);
TaxID mergedId = (TaxID) strtoul(result[1].c_str(), NULL, 10);

// Only update if the oldId doesn't exist yet AND the mergedId does exist
if (!nodeExists(oldId) && nodeExists(mergedId)) {
D[oldId] = D[mergedId];
++count;
if (oldId > localMaxTaxID) {
localMaxTaxID = oldId;
}
if (mergedId > localMaxTaxID) {
localMaxTaxID = mergedId;
}
mergedMap[oldId] = mergedId;
}
}

// realloc D if we find a higher maxTaxID
if (localMaxTaxID > maxTaxID) {
int* newD = new int[localMaxTaxID + 1];
std::copy(D, D + maxTaxID + 1, newD);
std::fill(newD + maxTaxID + 1, newD + (localMaxTaxID + 1), -1);
delete[] D;
D = newD;
maxTaxID = localMaxTaxID;
}

size_t count = 0;
for (std::unordered_map<TaxID, TaxID>::iterator it = mergedMap.begin(); it != mergedMap.end(); ++it) {
D[it->first] = D[it->second];
++count;
}
Debug(Debug::INFO) << " Done, added " << count << " merged nodes.\n";
return count;
}
Expand Down
10 changes: 3 additions & 7 deletions src/util/gpuserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ int gpuserver(int argc, const char **argv, const Command& command) {
offsets.emplace_back(offsets.back() + lengths.back());
int32_t maxTargetLength = lengths.back();

std::string dbrName = par.db1;
std::string dbrRelPath = FileUtil::getRealPathFromSymLink(PrefilteringIndexReader::dbPathWithoutIndex(dbrName));
size_t hash = Util::hash(dbrRelPath.c_str(), dbrRelPath.length());

std::string shmFile = SSTR(hash);
GPUSharedMemory* layout = GPUSharedMemory::alloc(shmFile, par.maxSeqLen , par.maxResListLen); // Adjust sizes as necessary

BaseMatrix *subMat;
if (Parameters::isEqualDbtype(dbrIdx.sequenceReader->getDbtype(), Parameters::DBTYPE_NUCLEOTIDES)) {
subMat = new NucleotideMatrix(par.scoringMatrixFile.values.nucleotide().c_str(), 1.0, 0.0);
Expand All @@ -78,6 +71,9 @@ int gpuserver(int argc, const char **argv, const Command& command) {
// Set up the handler for SIGINT and SIGTERM
sigaction(SIGINT, &act, NULL);
sigaction(SIGTERM, &act, NULL);

std::string shmFile = GPUSharedMemory::getShmHash(par.db1);
GPUSharedMemory* layout = GPUSharedMemory::alloc(shmFile, par.maxSeqLen, par.maxResListLen);
Debug(Debug::WARNING) << shmFile << "\n";
while (keepRunning) {
while (layout->serverReady.load(std::memory_order_acquire) == 0 || layout->clientReady.load(std::memory_order_acquire) == 0) {
Expand Down
14 changes: 14 additions & 0 deletions util/build_osx.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ if [ "$(otool -L "src/${BINARY_NAME}" | tail -n +2 | grep -v -E "${ALLOWED_DL_LI
exit 1
fi

if ! vtool -show "src/${BINARY_NAME}" | tee | grep minos | \
awk -v version="${MACOSX_DEPLOYMENT_TARGET}" '$2 > version { exit 1 }'
then
echo "macOS deployment target was not set correctly"
exit 1
fi

export MACOSX_DEPLOYMENT_TARGET=11.0

mkdir -p "$BUILD/build_libomp/openmp-${OMPVERSION}.src/build-arm64" && cd "$BUILD/build_libomp/openmp-${OMPVERSION}.src/build-arm64"
Expand Down Expand Up @@ -94,6 +101,13 @@ if [ "$(otool -L "src/${BINARY_NAME}" | tail -n +2 | grep -v -E "${ALLOWED_DL_LI
exit 1
fi

if ! vtool -show "src/${BINARY_NAME}" | tee | grep minos | \
awk -v version="${MACOSX_DEPLOYMENT_TARGET}" '$2 > version { exit 1 }'
then
echo "macOS deployment target was not set correctly"
exit 1
fi

lipo \
-create \
-arch x86_64 "$BUILD/build_avx2/src/${BINARY_NAME}" \
Expand Down

0 comments on commit 1013c94

Please sign in to comment.