Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing Scripts and Leader Detection #18

Merged
merged 18 commits into from
Apr 9, 2023
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ config.csv
tmux_script.sh
manifest.xml
oh-my-db.tar.gz
oh-my-db
tags
monitor_results
2 changes: 1 addition & 1 deletion ohmydb/OhMyReplica.H
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ inline void ReplicaManager::initialiseServices(
LogInfo( "Waiting for a majority of peers to be up..." )
// keep pinging until a majority of peers are up
while ( true ) {
auto activePeers = 1; // myself
size_t activePeers = 1; // myself
for ( auto& [i, peer]: peers ) {
if ( peer->Ping(1) > 0 ) {
LogInfo( "ping OK: " + std::to_string(i) );
Expand Down
72 changes: 43 additions & 29 deletions ohmydb/ReplicatedDB.H
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace ohmydb {

class ReplicatedDB {
public:
ReplicatedDB( std::string serverAddr );
ReplicatedDB(std::vector<ServerInfo> serverInfo);

std::optional<int32_t> get( int32_t key );
bool put( std::pair<int32_t, int32_t> kvp );
Expand All @@ -21,38 +21,51 @@ private:
static constexpr const int32_t MAX_TRIES = 1000;
OhMyDBClient client_;
std::string serverAddr_;
std::vector<ServerInfo> serverInfo_;
void updateChannel(std::string serverAddr);

};

inline ReplicatedDB::ReplicatedDB( std::string serverAddr )
: client_( grpc::CreateChannel( serverAddr, grpc::InsecureChannelCredentials() ) )
, serverAddr_( serverAddr )
inline ReplicatedDB::ReplicatedDB( std::vector<ServerInfo> serverInfo )
: client_( grpc::CreateChannel( serverInfo[0].ip + ":" + std::to_string(serverInfo[0].db_port), grpc::InsecureChannelCredentials() ) )
, serverInfo_( serverInfo )
{ }

//Update the server we are connecting to for RPC
void ReplicatedDB::updateChannel(std::string serverAddr)
{
serverAddr_ = serverAddr;
client_ = OhMyDBClient(
grpc::CreateChannel( serverAddr_, grpc::InsecureChannelCredentials() ));
}

inline std::optional<int32_t> ReplicatedDB::get( int32_t key )
{
uint32_t backupID = 0;
auto iters = MAX_TRIES;
while ( iters-- ) {
// try until you find a leader
auto retOpt = client_.Get( key );

if ( ! retOpt.has_value() ) {
LogError( "Failed to connect to DB server: RPC Failed" );
return {};

//Either failed to connect to server, or its not the leader. Try another server.
if ( ! retOpt.has_value() || retOpt.value().errorCode == ErrorCode::NOT_LEADER) {
LogError( "Failed to connect to DB server: RPC Failed, contacting other servers." );
auto serverAddr = serverInfo_[backupID].ip + ":" + std::to_string(serverInfo_[backupID].db_port);
updateChannel(serverAddr);
backupID++;
if(backupID >= serverInfo_.size())
{
backupID = 0;
}
continue;
}


auto ret = retOpt.value();

switch ( ret.errorCode ) {
case ErrorCode::NOT_LEADER: {
if ( ret.leaderAddr == serverAddr_ ) {
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
}
LogWarn( "Contacted server(" + serverAddr_ + ") is not the leader. "
"Switching to: " + ret.leaderAddr );
serverAddr_ = ret.leaderAddr;
client_ = OhMyDBClient(
grpc::CreateChannel( ret.leaderAddr, grpc::InsecureChannelCredentials() )
);
LogError("Hit NOT_LEADER in switch, this should not happen.");
break;
}
case ErrorCode::KEY_NOT_FOUND: {
Expand All @@ -70,27 +83,28 @@ inline std::optional<int32_t> ReplicatedDB::get( int32_t key )
inline bool ReplicatedDB::put( std::pair<int32_t, int32_t> kvp )
{
auto iters = MAX_TRIES;
uint32_t backupID = 0;
while ( iters-- ) {
auto retOpt = client_.Put( kvp.first, kvp.second );

if ( ! retOpt.has_value() ) {
LogError( "Failed to connect to DB server: RPC Failed" );
return false;
//Either failed to connect to server, or its not the leader. Try another server.
if ( ! retOpt.has_value() || retOpt.value().errorCode == ErrorCode::NOT_LEADER) {
LogError( "Failed to connect to DB server: RPC Failed, contacting other servers." );
auto serverAddr = serverInfo_[backupID].ip + ":" + std::to_string(serverInfo_[backupID].db_port);
updateChannel(serverAddr);
backupID++;
if(backupID >= serverInfo_.size())
{
backupID = 0;
}
continue;
}

auto ret = retOpt.value();

switch ( ret.errorCode ) {
case ErrorCode::NOT_LEADER: {
if ( ret.leaderAddr == serverAddr_ ) {
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
}
LogWarn( "Contacted server(" + serverAddr_ + ") is not the leader. "
"Switching to: " + ret.leaderAddr );
serverAddr_ = ret.leaderAddr;
client_ = OhMyDBClient(
grpc::CreateChannel( ret.leaderAddr, grpc::InsecureChannelCredentials() )
);
LogError("Hit NOT_LEADER in switch, this should not happen.");
break;
}
case ErrorCode::KEY_NOT_FOUND: {
Expand Down
29 changes: 19 additions & 10 deletions ohmyraft/OhMyRaft.H
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ void RaftManager<T>::runLeaderOneIter()
}
state_.Mut.unlock();


std::vector<std::thread*> threadHandles;

for ( auto& [id, peer] : peers_ ) {
auto th = std::thread([id = id, this, savedCurrentTerm]{
auto th = new std::thread([id = id, this, savedCurrentTerm]{
AppendEntriesParams args;

state_.Mut.lock();
Expand Down Expand Up @@ -225,23 +228,23 @@ void RaftManager<T>::runLeaderOneIter()
state_.MatchIndex[id] = state_.NextIndex[id] - 1;

auto savedCommitIndex = state_.CommitIndex;
for ( size_t i = state_.CommitIndex + 1; i < state_.Logs.size(); ++i ) {
for ( int32_t i = state_.CommitIndex + 1; i < (int32_t)state_.Logs.size(); ++i ) {
if ( state_.Logs[i].term == state_.CurrentTerm ) {
auto matchCount = 1;
for ( auto& [pid, _] : peers_ ) {
if ( state_.MatchIndex[pid] >= i ) {
matchCount++;
}
}
if ( matchCount * 2 > peers_.size() ) {
if ( matchCount * 2 > (int32_t)peers_.size() ) {
state_.CommitIndex = i;
}
}
}
if ( state_.CommitIndex != savedCommitIndex ) {
LogInfo("New entries being committed" );
std::lock_guard<std::mutex> rom( raftOutMutex_ );
for ( size_t i = state_.LastApplied + 1; i <= state_.CommitIndex; ++i ) {
for ( int32_t i = state_.LastApplied + 1; i <= state_.CommitIndex; ++i ) {
raftOut_.push_back( state_.Logs[i].op );
}
state_.LastApplied = state_.CommitIndex;
Expand All @@ -255,8 +258,14 @@ void RaftManager<T>::runLeaderOneIter()
}
}
});
th.detach();
//threadHandles.push_back(th);
th->detach();
}

//for ( auto& th : threadHandles) {
// th->join();
// delete th;
//}
}

template <class T>
Expand Down Expand Up @@ -367,14 +376,14 @@ AppendEntriesRet RaftManager<T>::AppendEntries( AppendEntriesParams args )
becomeFollower( args.term );
}
if ( args.prevLogIndex == -1 ||
( args.prevLogIndex < state_.Logs.size() && args.prevLogTerm == state_.Logs[args.prevLogIndex].term) )
( args.prevLogIndex < (int32_t)state_.Logs.size() && args.prevLogTerm == state_.Logs[args.prevLogIndex].term) )
{
reply.success = true;
auto logInsertIndex = args.prevLogIndex + 1;
auto newEntriesIndex = 0;

while ( true ) {
if ( logInsertIndex >= state_.Logs.size() || newEntriesIndex >= args.entries.size() ) {
if ( logInsertIndex >= (int32_t)state_.Logs.size() || newEntriesIndex >= (int32_t)args.entries.size() ) {
break;
}
if ( state_.Logs[logInsertIndex].term != args.entries[newEntriesIndex].term ) {
Expand All @@ -384,7 +393,7 @@ AppendEntriesRet RaftManager<T>::AppendEntries( AppendEntriesParams args )
newEntriesIndex++;
}

if ( newEntriesIndex < args.entries.size() ) {
if ( newEntriesIndex < (int32_t)args.entries.size() ) {
for ( size_t i = logInsertIndex; i < state_.Logs.size(); ++i ) {
state_.Logs[i].op.abort(); // release any pending service requests
}
Expand All @@ -405,7 +414,7 @@ AppendEntriesRet RaftManager<T>::AppendEntries( AppendEntriesParams args )
state_.CommitIndex = std::min( args.leaderCommit, (int32_t) state_.Logs.size() - 1 );
std::lock_guard<std::mutex> rom(raftOutMutex_);
// queue all jobs that can be committed to be fed to the executer
for ( size_t i = state_.LastApplied + 1; i <= state_.CommitIndex; ++i ) {
for ( int32_t i = state_.LastApplied + 1; i <= state_.CommitIndex; ++i ) {
raftOut_.push_back( state_.Logs[i].op );
}
state_.LastApplied = state_.CommitIndex;
Expand Down Expand Up @@ -563,7 +572,7 @@ void RaftManager<T>::startElection()
} else if ( reply.term == savedCurrentTerm ) {
if ( reply.voteGranted ) {
state_.VotesReceived++;
if ( state_.VotesReceived * 2 > peers_.size() ) {
if ( state_.VotesReceived * 2 > (int32_t) peers_.size() ) {
LogInfo("Id=" + std::to_string(id_) + " elected as leader");
becomeLeader();
}
Expand Down
13 changes: 5 additions & 8 deletions ohmyserver/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ int main(int argc, char **argv)
.default_value("10")
.help("Number of possible keys for testing.");

program.add_argument("--id")
.default_value("0")
.help("Initial node to contact.");
//program.add_argument("--id")
// .default_value("0")
// .help("Initial node to contact.");

try {
program.parse_args( argc, argv );
Expand All @@ -121,17 +121,14 @@ int main(int argc, char **argv)


// parse arguments
//auto id = std::stoi(program.get<std::string>("--id"));
auto configPath = program.get<std::string>("--config");
auto id = std::stoi(program.get<std::string>("--id"));
auto iter = std::stoi(program.get<std::string>("--iter"));
auto numPairs = std::stoi(program.get<std::string>("--numkeys"));

auto servers = ParseConfig(configPath);
auto serverAddr = servers[id].ip + ":" + std::to_string(servers[id].db_port);

LogInfo("Client: Starting contact on " + serverAddr);

auto repDB = ohmydb::ReplicatedDB(serverAddr);
auto repDB = ohmydb::ReplicatedDB(servers);
writeTest(repDB, numPairs, 1lu<<iter);
readTest(repDB, numPairs, 1lu<<iter);
readWriteTest(repDB, numPairs, 1lu<<iter);
Expand Down
8 changes: 2 additions & 6 deletions scripts/cleanup_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ start_replica()
id=$2

ssh ${SSH_HOST} << EOF
killall monitor.sh
killall replica
killall client
rm -rf /tmp/db
EOF

Expand All @@ -29,7 +31,6 @@ parse_ssh_hosts()
echo ${arr[@]}
}


# here assuming the 4-th column is the hostname, and 5-th column is the username
SSH_HOSTS=($(parse_ssh_hosts config.sh \$5\"@\"\$4))

Expand All @@ -46,8 +47,3 @@ start_replica ${SSH_HOST} $i &
done

wait


#echo ${SSH_HOSTS[@]}
#echo ${raft_port[@]}
#echo ${db_port[@]}
2 changes: 1 addition & 1 deletion scripts/cluster_setup.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
BRANCH="hc_testing"
BRANCH="testing"

setup ()
{
Expand Down
60 changes: 60 additions & 0 deletions scripts/deploy_modifed.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash

deploy_to_replica()
{
SSH_HOST=$1
file=$2

scp $file ${SSH_HOST}:~/oh-my-db/$file
ssh ${SSH_HOST} << EOF
cd oh-my-db/build
cmake ../ -DCMAKE_INSTALL_PREFIX=.
cmake --build . --parallel 16
exit
EOF
}

parse_ssh_hosts()
{
local filename=$1
local i=0
local arr=()
while read -r line; do
if [[ $i -eq 0 ]]; then
((i=i+1))
continue
fi
arr[i-1]=$(echo $line | awk -F',' "{print ${2}}")
((i=i+1))
done < config.csv
echo ${arr[@]}
}

# Identify files that are modified and need to be copied over--------------

# Use the git command to identify modified files in the directory and store the output in a variable
output=$(git -C . status --porcelain)

# Create an empty array to store modified file paths
modified_files=()

# Iterate over the output and extract the file paths of modified files
while read line; do
if [[ "$line" =~ ^M ]]; then
path=$(echo "$line" | awk '{print $2}')
modified_files+=("$path")
fi
done <<< "$output"

# Retrieve host information and copy files over ------------
SSH_HOSTS=($(parse_ssh_hosts config.sh \$5\"@\"\$4))

for host in "${SSH_HOSTS[@]}"; do

echo $host
for path in "${modified_files[@]}"; do
deploy_to_replica $host $path &
done
done

wait
Loading