Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snap 2591 #456

Open
wants to merge 14 commits into
base: snappy/master
Choose a base branch
from
4 changes: 2 additions & 2 deletions native/.settings/language.settings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<provider copy-of="extension" id="org.eclipse.cdt.ui.UserLanguageSettingsProvider"/>
<provider-reference id="org.eclipse.cdt.core.ReferencedProjectsLanguageSettingsProvider" ref="shared-provider"/>
<provider-reference id="org.eclipse.cdt.managedbuilder.core.MBSLanguageSettingsProvider" ref="shared-provider"/>
<provider class="org.eclipse.cdt.managedbuilder.language.settings.providers.GCCBuiltinSpecsDetector" console="false" env-hash="-303658432693294100" id="org.eclipse.cdt.managedbuilder.core.GCCBuiltinSpecsDetector" keep-relative-paths="false" name="CDT GCC Built-in Compiler Settings" parameter="${COMMAND} ${FLAGS} -E -P -v -dD &quot;${INPUTS}&quot;" prefer-non-shared="true">
<provider class="org.eclipse.cdt.managedbuilder.language.settings.providers.GCCBuiltinSpecsDetector" console="false" env-hash="789826077032083517" id="org.eclipse.cdt.managedbuilder.core.GCCBuiltinSpecsDetector" keep-relative-paths="false" name="CDT GCC Built-in Compiler Settings" parameter="${COMMAND} ${FLAGS} -E -P -v -dD &quot;${INPUTS}&quot;" prefer-non-shared="true">
<language-scope id="org.eclipse.cdt.core.gcc"/>
<language-scope id="org.eclipse.cdt.core.g++"/>
</provider>
Expand All @@ -16,7 +16,7 @@
<provider copy-of="extension" id="org.eclipse.cdt.ui.UserLanguageSettingsProvider"/>
<provider-reference id="org.eclipse.cdt.core.ReferencedProjectsLanguageSettingsProvider" ref="shared-provider"/>
<provider-reference id="org.eclipse.cdt.managedbuilder.core.MBSLanguageSettingsProvider" ref="shared-provider"/>
<provider class="org.eclipse.cdt.managedbuilder.language.settings.providers.GCCBuiltinSpecsDetector" console="false" env-hash="-303658432693294100" id="org.eclipse.cdt.managedbuilder.core.GCCBuiltinSpecsDetector" keep-relative-paths="false" name="CDT GCC Built-in Compiler Settings" parameter="${COMMAND} ${FLAGS} -E -P -v -dD &quot;${INPUTS}&quot;" prefer-non-shared="true">
<provider class="org.eclipse.cdt.managedbuilder.language.settings.providers.GCCBuiltinSpecsDetector" console="false" env-hash="789826077032083517" id="org.eclipse.cdt.managedbuilder.core.GCCBuiltinSpecsDetector" keep-relative-paths="false" name="CDT GCC Built-in Compiler Settings" parameter="${COMMAND} ${FLAGS} -E -P -v -dD &quot;${INPUTS}&quot;" prefer-non-shared="true">
<language-scope id="org.eclipse.cdt.core.gcc"/>
<language-scope id="org.eclipse.cdt.core.g++"/>
</provider>
Expand Down
31 changes: 13 additions & 18 deletions native/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
//plugins{
//id "de.undercouch.download" version "3.4.3"
//}

buildscript {
repositories {
maven { url 'https://plugins.gradle.org/m2' }
Expand Down Expand Up @@ -46,16 +42,16 @@ if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative

String distDir = "${rootProject.projectDir}/dist"
String thriftVersion = '1.0.0-1'
String boostVersion = '1.59.0'
String newThriftVersion = '1.0.0-2'
String boostVersion = '1.69.0'
String mpirVersion = '2.7.2'
String opensslVersion = '1.0.2h'
String unixodbcVersion = '2.3.4'
String googletestVersion = '1.7.0'
String oldReleasesDir = "https://github.com/SnappyDataInc/thrift/releases/download/${thriftVersion}"
String oldReleasesDir = "https://github.com/SnappyDataInc/thrift/releases/download/${newThriftVersion}"

String releasesDir = oldReleasesDir

String newThriftVersion = '1.0.0-2'
String newBoostVersion = '1.65.1'
String newUnixodbcVersion = '2.3.4'
String newGoogletestVersion = '1.8.0'
Expand Down Expand Up @@ -142,8 +138,7 @@ if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative
dependsOn downloadDependencies

inputs.files dependencies.collect { [ "${distDir}/${it}" ] }.flatten()
// outputs.files dependencies.collect { [ "${distDir}/${it.replaceAll('-[^-]*.tar.bz2', '')}" ] }.flatten()

//outputs.files dependencies.collect { [ "${distDir}/${it.replaceAll('-[^-]*.tar.bz2', '')}" ] }.flatten()
doLast {
outputs.files.each { d -> delete d }
dependencies.each { tarball ->
Expand Down Expand Up @@ -281,15 +276,15 @@ if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative
'-rdynamic', '-ldl', '-z', 'defs'
} else if (toolChain in VisualCpp) {
// explicitly include UCRT since its still not passed through by gradle
String VS_2015_INCLUDE_DIR = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.10240.0/ucrt'
String VS_2015_LIB_DIR = 'C:/Program Files (x86)/Windows Kits/10/Lib/10.0.10240.0/ucrt'
String VS_2015_INCLUDE_DIR_UM = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.10240.0/um'
String VS_2015_LIB_DIR_UM = 'C:/Program Files (x86)/Windows Kits/10/Lib/10.0.10240.0/um'
String VS_2015_INCLUDE_DIR_SHARED = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.10240.0/shared'
String VS_2017_INCLUDE_DIR = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.17763.0/ucrt'
String VS_2017_LIB_DIR = 'C:/Program Files (x86)/Windows Kits/10/Lib/10.0.17763.0/ucrt'
String VS_2017_INCLUDE_DIR_UM = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.17763.0/um'
String VS_2017_LIB_DIR_UM = 'C:/Program Files (x86)/Windows Kits/10/Lib/10.0.17763.0/um'
String VS_2017_INCLUDE_DIR_SHARED = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.17763.0/shared'
cppCompiler.args '/W3', '/FS', '/Zc:inline', '/WX', '/EHsc',
'/Fdsnappyclient.pdb', '/errorReport:prompt',
"/I${VS_2015_INCLUDE_DIR_UM}", "/I${VS_2015_INCLUDE_DIR_SHARED}",
"/I${VS_2015_INCLUDE_DIR}"
"/I${VS_2017_INCLUDE_DIR_UM}", "/I${VS_2017_INCLUDE_DIR_SHARED}",
+ "/I${VS_2017_INCLUDE_DIR}"
if (buildType == buildTypes.debug) {
cppCompiler.define '_DEBUG'
cppCompiler.args '/Od', '/Gm', '/ZI', '/RTC1', '/MDd'
Expand All @@ -298,9 +293,9 @@ if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative
cppCompiler.args '/O2', '/GL', '/Oi', '/Gm-', '/Zi', '/MD'
}
if (targetPlatform == platforms.x64) {
linker.args "/LIBPATH:${VS_2015_LIB_DIR}/x64:${VS_2015_LIB_DIR_UM}/x64"
linker.args "/LIBPATH:${VS_2017_LIB_DIR}/x64:${VS_2017_LIB_DIR_UM}/x64"
} else {
linker.args "/LIBPATH:${VS_2015_LIB_DIR}/x86:${VS_2015_LIB_DIR_UM}/x86"
linker.args "/LIBPATH:${VS_2017_LIB_DIR}/x86:${VS_2017_LIB_DIR_UM}/x86"
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions native/src/snappyclient/cpp/SQLState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ const SQLState SQLState::DATA_CONTAINER_CLOSED("40XD0",
ExceptionSeverity::TRANSACTION_SEVERITY);
const SQLState SQLState::THRIFT_PROTOCOL_ERROR("58015",
ExceptionSeverity::SESSION_SEVERITY);
const SQLState SQLState::NODE_BUCKET_MOVED("X0Z18",
ExceptionSeverity::TRANSACTION_SEVERITY);

void SQLState::staticInitialize() {
}
8 changes: 0 additions & 8 deletions native/src/snappyclient/cpp/impl/ClientService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,6 @@ void ClientService::handleTException(const char* op, const TException& te) {

handleStdException(op, te);
}
/*
void ClientService::handleException(const TException* te,
const std::set<thrift::HostAddress>& failedServers, bool tryFailover, bool ignoreFailOver,
bool createNewConnection, const std::string& op)
{

}*/

void ClientService::throwSQLExceptionForNodeFailure(const char* op,
const std::exception& se) {
std::ostringstream hostAddrStr;
Expand Down
87 changes: 57 additions & 30 deletions native/src/snappyclient/cpp/impl/ControlConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ ControlConnection::ControlConnection(ClientService *const &service) :m_serverGro
boost::assign::insert(m_snappyServerTypeSet)(service->getServerType(true,false,false));
std::copy(m_locators.begin(),m_locators.end(),std::inserter(m_controlHostSet,m_controlHostSet.end()));
m_controlLocator = nullptr;
//initliaze failoverSQLStateSet
short arrSize = sizeof(failoverSQLStateArray)/sizeof(failoverSQLStateArray[0]);
for(short i =0; i< arrSize;++i){
failoverSQLStateSet.insert(failoverSQLStateArray[i]);
}
}

const boost::optional<ControlConnection&> ControlConnection::getOrCreateControlConnection(
const std::vector<thrift::HostAddress>& hostAddrs, ClientService *const &service, std::exception* failure){

Expand Down Expand Up @@ -102,9 +106,9 @@ const boost::optional<ControlConnection&> ControlConnection::getOrCreateControlC
" as registered but having different type " + Utils::getServerTypeString(contrConnServerType) +
" than connection " + Utils::getServerTypeString( serviceServerType) ;
SnappyExceptionData snappyExData;
// snappyExData.__set_sqlState("08006.C");// TODO: discuss with sumedh about correct SQLState
snappyExData.__set_sqlState("08006");
snappyExData.__set_reason(msg);

//snappyExData.__set_errorCode(17002); //TODO:: Need to confirm with sumedh
ex->__set_exceptionData(snappyExData);
ex->__set_serverInfo(hostAddr.hostName + ":" + portStr );
throw ex;
Expand All @@ -131,9 +135,7 @@ const boost::optional<ControlConnection&> ControlConnection::getOrCreateControlC
}
void ControlConnection::getLocatorPreferredServer(thrift::HostAddress& prefHostAddr,std::set<thrift::HostAddress>& failedServers,
std::set<std::string>serverGroups){
// TODO: SanityManager
m_controlLocator->getPreferredServer(prefHostAddr,m_snappyServerTypeSet,serverGroups,failedServers);
//TODO:SanityManager
}
void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure,bool forFailover){
std::set<thrift::HostAddress> failedServers;
Expand All @@ -143,7 +145,6 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,
void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure,
std::set<thrift::HostAddress>& failedServers,
std::set<std::string>& serverGroups,bool forFailover){
//boost::lock_guard<boost::mutex> localGuard(m_lock);
if(m_controlLocator == nullptr)
{
failoverToAvailableHost(failedServers, false,failure);
Expand All @@ -155,22 +156,18 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,

try{
if(forFailover){
//TODO: SanityManager
//refresh the full host list
std::vector<HostAddress> prefServerAndAllHosts;
m_controlLocator->getAllServersWithPreferredServer(prefServerAndAllHosts,m_snappyServerTypeSet,serverGroups,failedServers);
//TODO :: refresh new server list--like java do.
if(! prefServerAndAllHosts.empty())
{
std::vector<HostAddress> allHosts(prefServerAndAllHosts.begin() +1,prefServerAndAllHosts.end());
refreshAllHosts(allHosts);
preferredServer = prefServerAndAllHosts.at(0);
}
//TODO :SanityManger
}else{
getLocatorPreferredServer(preferredServer,failedServers,serverGroups);
}
// TODO: SanityManager
if(preferredServer.port <=0){
/*For this case we don't have a locator or locator unable to
* determine a preferred server, so choose some server randomly
Expand All @@ -187,13 +184,16 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,
}
searchRandomServer(skipServers, failure,preferredServer);
}
//TODO: Sanitymanger
return;
}catch(thrift::SnappyException &snEx){
// TODO:
//Discuss with Sumedh
FailoverStatus status = getFailoverStatus(snEx.exceptionData.sqlState,snEx.exceptionData.errorCode,snEx);
if(status== FailoverStatus::NONE){
throw snEx;
}else if(status== FailoverStatus::RETRY){
forFailover = true;
continue;
}
}catch(TException &tex){
// TODO: SanityManager
//Search for a new host for locator query
// for the first call do not mark controlhost as failed but retry(e.g. for a reconnect case)
if(firstCall){
Expand All @@ -204,7 +204,7 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,
m_controlLocator->getOutputProtocol()->getTransport()->close();
failoverToAvailableHost(failedServers,true,failure);
if(failure ==nullptr){
failure = &(tex);// TODO: need to look again
failure = &(tex);
}
}catch(std::exception &ex){
throw unexpectedError(ex, m_controlHost);
Expand All @@ -215,8 +215,6 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,

void ControlConnection::searchRandomServer(const std::set<thrift::HostAddress>& skipServers,std::exception* failure,
thrift::HostAddress& hostAddress){

//TODO: Need to discuss implemetation of this method and also ClientService:: updateFailedServersForCurrent with sumedh
std::vector<thrift::HostAddress> searchServers;
// Note: Do not use unordered_set -- reason is http://www.cplusplus.com/forum/general/198319/
std::copy(m_controlHostSet.begin(),m_controlHostSet.end(),std::inserter(searchServers,searchServers.end()));
Expand All @@ -240,7 +238,6 @@ void ControlConnection::failoverToAvailableHost(std::set<thrift::HostAddress>& f
std::exception* failure){
boost::lock_guard<boost::mutex> localGuard(m_lock);
for(auto iterator = m_controlHostSet.begin();iterator!= m_controlHostSet.end(); ++iterator ){
// NEXT: for(thrift::HostAddress controlAddr : m_controlHostSet){
thrift::HostAddress controlAddr = *iterator;
if(checkFailedControlHosts && ! failedServers.empty() && (failedServers.find(controlAddr) != failedServers.end())){
continue;
Expand All @@ -262,14 +259,13 @@ void ControlConnection::failoverToAvailableHost(std::set<thrift::HostAddress>& f
m_snappyServerType == thrift::ServerType::THRIFT_LOCATOR_CP_SSL ||
m_snappyServerType == thrift::ServerType::THRIFT_SNAPPY_BP_SSL ||
m_snappyServerType==thrift::ServerType::THRIFT_SNAPPY_CP_SSL){
// TODO: Find out whether SnappyTSSLSocket is needed or not, or any other thing is required
TSSLSocketFactory sslSocketFactory;
tTransport = sslSocketFactory.createSocket(controlAddr.hostName,controlAddr.port);
}else if(m_snappyServerType == thrift::ServerType::THRIFT_LOCATOR_BP ||
m_snappyServerType== thrift::ServerType::THRIFT_LOCATOR_CP ||
m_snappyServerType== thrift::ServerType::THRIFT_SNAPPY_BP ||
m_snappyServerType== thrift::ServerType::THRIFT_SNAPPY_CP){
tTransport = boost::make_shared<TSocket>(controlAddr.hostName,controlAddr.port); // TODO: Find out whether SnappyTSocket is needed or not, or any other thing is required
tTransport = boost::make_shared<TSocket>(controlAddr.hostName,controlAddr.port);
}
tTransport->open();
TTransportFactory* transportFactory = nullptr;
Expand Down Expand Up @@ -301,13 +297,12 @@ void ControlConnection::failoverToAvailableHost(std::set<thrift::HostAddress>& f
break;
}
}catch(TException &tExp){
failure = &tExp; // TODO: need to look again
failure = &tExp;
failedServers.insert(controlAddr);
if(outTransport != nullptr){
outTransport->close();
}
continue;
//goto NEXT;
}catch(std::exception &ex){
throw unexpectedError(ex,controlAddr);
}
Expand All @@ -328,8 +323,9 @@ const thrift::SnappyException* ControlConnection:: unexpectedError(const std::ex
}
thrift::SnappyException *snappyEx = new thrift::SnappyException();
SnappyExceptionData snappyExData;
//snappyExData.__set_sqlState(std::string(SQLState::UNKNOWN_EXCEPTION));// TODO: discuss with sumedh about correct SQLState
snappyExData.__set_sqlState(std::string(SQLState::UNKNOWN_EXCEPTION.getSQLState()));
snappyExData.__set_reason(ex.what());

snappyEx->__set_exceptionData(snappyExData);

std::string portNum;
Expand Down Expand Up @@ -368,7 +364,6 @@ void ControlConnection::refreshAllHosts(const std::vector<thrift::HostAddress>&
m_controlHostSet.insert(allHosts.begin(),allHosts.end());
}


thrift::SnappyException* ControlConnection::failoverExhausted(const std::set<thrift::HostAddress>& failedServers,
std::exception* failure) {

Expand All @@ -380,14 +375,46 @@ thrift::SnappyException* ControlConnection::failoverExhausted(const std::set<thr
}
thrift::SnappyException *snappyEx = new thrift::SnappyException();
SnappyExceptionData snappyExData;
//snappyExData.__set_sqlState(std::string(SQLState::DATA_CONTAINER_CLOSED));// TODO: discuss with sumedh about correct SQLState
std::string reason ="{Failed afer trying all the servers:}" ;
snappyExData.__set_reason(reason);
snappyExData.__set_sqlState(std::string(SQLState::DATA_CONTAINER_CLOSED.getSQLState()));
std::string reason ="{Failed afer trying all available servers:}" ;
snappyExData.__set_reason(reason.append(failedServerString));
snappyEx->__set_exceptionData(snappyExData);
//snappyExData.__set_sqlState();//
// TODO: complete this funtion
snappyEx->__set_serverInfo(failedServerString);
return snappyEx;
}


FailoverStatus ControlConnection::getFailoverStatus(const std::string& sqlState,const int32_t& errorCode, const TException& snappyEx){
if(! std::strcmp(SQLState::SNAPPY_NODE_SHUTDOWN.getSQLState(),sqlState.c_str())
|| std::strcmp(SQLState::NODE_BUCKET_MOVED.getSQLState(),sqlState.c_str())){
return FailoverStatus::RETRY;
}
/* for 08001 we have to, unfortunately, resort to string search to
* determine if failover makes sense or it is due to some problem
* with authentication or invalid properties */
else if(!sqlState.compare("08001")){
std::string msg(snappyEx.what());
if(!msg.empty() &&
((msg.find("rror")!=std::string::npos) // cater to CONNECT_UNABLE_TO_CONNECT_TO_SERVER
|| (msg.find("xception")!=std::string::npos ) // cater to CONNECT_SOCKET_EXCEPTION
||(msg.find("ocket")!=std::string::npos))// cater to CONNECT_UNABLE_TO_OPEN_SOCKET_STREAM
){
return FailoverStatus::NEW_SERVER;
}
}
/* for 08004 we have to, unfortunately, resort to string search to
* determine if failover makes sense or it is due to some problem
* with authentication
*/
else if(!sqlState.compare("08004")){
std::string msg(snappyEx.what());
if(!msg.empty() &&
(msg.find("connection refused") !=std::string::npos)
){
return FailoverStatus::NEW_SERVER;
}
}
else if(failoverSQLStateSet.find(sqlState)!= failoverSQLStateSet.end()){
return FailoverStatus::NEW_SERVER;
}
return FailoverStatus::NONE;
}
12 changes: 12 additions & 0 deletions native/src/snappyclient/cpp/impl/ControlConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ namespace io {
namespace client {
namespace impl {

enum class FailoverStatus :unsigned char{
NONE, /** no failover to be done */
NEW_SERVER, /** failover to a new server */
RETRY /** retry to the same server */
};
/**
* Holds locator, server information to use for failover. Also provides
* convenience methods to actually search for an appropriate host for
Expand Down Expand Up @@ -97,6 +102,12 @@ namespace io {
/** Global lock for {@link allConnections} */
static boost::mutex s_allConnsLock;

/** array of SQLState strings that denote failover should be done */
std::string failoverSQLStateArray[23] = { "08001",
"08003", "08004", "08006", "X0J15", "X0Z32", "XN001", "XN014", "XN016",
"58009", "58014", "58015", "58016", "58017", "57017", "58010", "30021",
"XJ040", "XJ041", "XSDA3", "XSDA4", "XSDAJ", "XJ217" };
std::set<std::string> failoverSQLStateSet;
/*********Member functions**************/
ControlConnection():m_serverGroups(std::set<std::string>()){};
ControlConnection(ClientService *const &service);
Expand All @@ -115,6 +126,7 @@ namespace io {

void getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure,
bool forFailover = false);
FailoverStatus getFailoverStatus(const std::string& sqlState,const int32_t& errorCode, const TException& snappyEx);
public:

static const boost::optional<ControlConnection&> getOrCreateControlConnection(
Expand Down
Loading