@@ -21,10 +21,10 @@ limitations under the License.
21
21
#include < cmath>
22
22
#include < string>
23
23
24
+ #include " ../query/algorithms/triangles/StreamingTriangles.h"
24
25
#include " ../server/JasmineGraphServer.h"
25
26
#include " ../util/logger/Logger.h"
26
27
#include " JasmineGraphInstance.h"
27
- #include " ../query/algorithms/triangles/StreamingTriangles.h"
28
28
29
29
using namespace std ;
30
30
@@ -46,6 +46,8 @@ std::vector<std::string> loadAverageVector;
46
46
bool collectValid = false ;
47
47
std::thread JasmineGraphInstanceService::workerThread;
48
48
49
+ std::string masterIP;
50
+
49
51
static void handshake_command (int connFd, bool *loop_exit_p);
50
52
static inline void close_command (int connFd, bool *loop_exit_p);
51
53
__attribute__ ((noreturn)) static inline void shutdown_command(int connFd);
@@ -87,15 +89,15 @@ static void triangles_command(
87
89
std::map<std::string, JasmineGraphHashMapCentralStore> graphDBMapCentralStores,
88
90
std::map<std::string, JasmineGraphHashMapDuplicateCentralStore> graphDBMapDuplicateCentralStores,
89
91
bool *loop_exit_p);
90
- static void streaming_triangles_command (int connFd, int serverPort, std::map<std::string,
91
- JasmineGraphIncrementalLocalStore *> &incrementalLocalStoreMap,
92
- bool *loop_exit_p);
92
+ static void streaming_triangles_command (
93
+ int connFd, int serverPort, std::map<std::string, JasmineGraphIncrementalLocalStore *> &incrementalLocalStoreMap,
94
+ bool *loop_exit_p);
93
95
static void send_centralstore_to_aggregator_command (int connFd, bool *loop_exit_p);
94
96
static void send_composite_centralstore_to_aggregator_command (int connFd, bool *loop_exit_p);
95
97
static void aggregate_centralstore_triangles_command (int connFd, bool *loop_exit_p);
96
98
static void aggregate_streaming_centralstore_triangles_command (
97
- int connFd, std::map<std::string, JasmineGraphIncrementalLocalStore *> &incrementalLocalStoreMap,
98
- bool *loop_exit_p);
99
+ int connFd, std::map<std::string, JasmineGraphIncrementalLocalStore *> &incrementalLocalStoreMap,
100
+ bool *loop_exit_p);
99
101
static void aggregate_composite_centralstore_triangles_command (int connFd, bool *loop_exit_p);
100
102
static void performance_statistics_command (int connFd, bool *loop_exit_p);
101
103
static void initiate_files_command (int connFd, bool *loop_exit_p);
@@ -576,7 +578,7 @@ string JasmineGraphInstanceService::aggregateCentralStoreTriangles(std::string g
576
578
map<long , long > distributionHashMap =
577
579
JasmineGraphInstanceService::getOutDegreeDistributionHashMap (aggregatedCentralStore);
578
580
579
- TriangleResult triangleResult = Triangles::countTriangles (aggregatedCentralStore, distributionHashMap, true );
581
+ TriangleResult triangleResult = Triangles::countTriangles (aggregatedCentralStore, distributionHashMap, true );
580
582
std::string triangles = triangleResult.triangles ;
581
583
582
584
return triangles;
@@ -652,8 +654,8 @@ string JasmineGraphInstanceService::aggregateCompositeCentralStoreTriangles(std:
652
654
map<long , long > distributionHashMap =
653
655
JasmineGraphInstanceService::getOutDegreeDistributionHashMap (aggregatedCompositeCentralStore);
654
656
655
- TriangleResult triangleResult = Triangles::countTriangles (aggregatedCompositeCentralStore,
656
- distributionHashMap, true );
657
+ TriangleResult triangleResult =
658
+ Triangles::countTriangles (aggregatedCompositeCentralStore, distributionHashMap, true );
657
659
std::string triangles = triangleResult.triangles ;
658
660
659
661
return triangles;
@@ -2045,15 +2047,15 @@ static void handshake_command(int connFd, bool *loop_exit_p) {
2045
2047
instance_logger.info (" Sent : " + JasmineGraphInstanceProtocol::HANDSHAKE_OK);
2046
2048
2047
2049
char data[DATA_BUFFER_SIZE];
2048
- string server_hostname = Utils::read_str_trim_wrapper (connFd, data, INSTANCE_DATA_LENGTH);
2049
- instance_logger.info (" Received hostname : " + server_hostname );
2050
+ masterIP = Utils::read_str_trim_wrapper (connFd, data, INSTANCE_DATA_LENGTH);
2051
+ instance_logger.info (" Received hostname : " + masterIP );
2050
2052
2051
2053
instance_logger.info (" Sending : " + JasmineGraphInstanceProtocol::HOST_OK);
2052
2054
if (!Utils::send_str_wrapper (connFd, JasmineGraphInstanceProtocol::HOST_OK)) {
2053
2055
*loop_exit_p = true ;
2054
2056
return ;
2055
2057
}
2056
- instance_logger.info (" ServerName : " + server_hostname );
2058
+ instance_logger.info (" ServerName : " + masterIP );
2057
2059
}
2058
2060
2059
2061
static inline void close_command (int connFd, bool *loop_exit_p) {
@@ -2302,7 +2304,7 @@ static void duplicate_centralstore_command(int connFd, int serverPort, bool *loo
2302
2304
}
2303
2305
2304
2306
JasmineGraphInstanceService::duplicateCentralStore (serverPort, stoi (graphID), stoi (partitionID), workerSockets,
2305
- " localhost " );
2307
+ masterIP );
2306
2308
}
2307
2309
2308
2310
static void worker_in_degree_distribution_command (
@@ -2967,9 +2969,9 @@ static void triangles_command(
2967
2969
}
2968
2970
}
2969
2971
2970
- static void streaming_triangles_command (int connFd, int serverPort,
2971
- std::map<std::string, JasmineGraphIncrementalLocalStore *> &incrementalLocalStoreMap,
2972
- bool *loop_exit_p) {
2972
+ static void streaming_triangles_command (
2973
+ int connFd, int serverPort, std::map<std::string, JasmineGraphIncrementalLocalStore *> &incrementalLocalStoreMap,
2974
+ bool *loop_exit_p) {
2973
2975
if (!Utils::send_str_wrapper (connFd, JasmineGraphInstanceProtocol::OK)) {
2974
2976
*loop_exit_p = true ;
2975
2977
return ;
@@ -3017,8 +3019,7 @@ static void streaming_triangles_command(int connFd, int serverPort,
3017
3019
3018
3020
if (incrementalLocalStoreMap.find (graphIdentifier) == incrementalLocalStoreMap.end ()) {
3019
3021
incrementalLocalStoreInstance =
3020
- JasmineGraphInstanceService::loadStreamingStore (graphID, partitionId,
3021
- incrementalLocalStoreMap, " app" );
3022
+ JasmineGraphInstanceService::loadStreamingStore (graphID, partitionId, incrementalLocalStoreMap, " app" );
3022
3023
} else {
3023
3024
incrementalLocalStoreInstance = incrementalLocalStoreMap[graphIdentifier];
3024
3025
}
@@ -3028,8 +3029,7 @@ static void streaming_triangles_command(int connFd, int serverPort,
3028
3029
localCount = StreamingTriangles::countLocalStreamingTriangles (incrementalLocalStoreInstance);
3029
3030
} else {
3030
3031
localCount = StreamingTriangles::countDynamicLocalTriangles (
3031
- incrementalLocalStoreInstance, std::stol (oldLocalRelationCount),
3032
- std::stol (oldCentralRelationCount));
3032
+ incrementalLocalStoreInstance, std::stol (oldLocalRelationCount), std::stol (oldCentralRelationCount));
3033
3033
}
3034
3034
3035
3035
long newLocalRelationCount, newCentralRelationCount, result;
@@ -3045,8 +3045,7 @@ static void streaming_triangles_command(int connFd, int serverPort,
3045
3045
3046
3046
string response = Utils::read_str_trim_wrapper (connFd, data, INSTANCE_DATA_LENGTH);
3047
3047
if (response.compare (JasmineGraphInstanceProtocol::OK) != 0 ) {
3048
- instance_logger.error (" Received : " + response + " instead of : " +
3049
- JasmineGraphInstanceProtocol::HOST_OK);
3048
+ instance_logger.error (" Received : " + response + " instead of : " + JasmineGraphInstanceProtocol::HOST_OK);
3050
3049
*loop_exit_p = true ;
3051
3050
return ;
3052
3051
}
@@ -3060,8 +3059,7 @@ static void streaming_triangles_command(int connFd, int serverPort,
3060
3059
3061
3060
response = Utils::read_str_trim_wrapper (connFd, data, INSTANCE_DATA_LENGTH);
3062
3061
if (response.compare (JasmineGraphInstanceProtocol::OK) != 0 ) {
3063
- instance_logger.error (" Received : " + response + " instead of : " +
3064
- JasmineGraphInstanceProtocol::HOST_OK);
3062
+ instance_logger.error (" Received : " + response + " instead of : " + JasmineGraphInstanceProtocol::HOST_OK);
3065
3063
*loop_exit_p = true ;
3066
3064
return ;
3067
3065
}
@@ -3365,8 +3363,8 @@ static void aggregate_centralstore_triangles_command(int connFd, bool *loop_exit
3365
3363
}
3366
3364
3367
3365
static void aggregate_streaming_centralstore_triangles_command (
3368
- int connFd, std::map<std::string, JasmineGraphIncrementalLocalStore *> &incrementalLocalStoreMap,
3369
- bool *loop_exit_p) {
3366
+ int connFd, std::map<std::string, JasmineGraphIncrementalLocalStore *> &incrementalLocalStoreMap,
3367
+ bool *loop_exit_p) {
3370
3368
if (!Utils::send_str_wrapper (connFd, JasmineGraphInstanceProtocol::OK)) {
3371
3369
*loop_exit_p = true ;
3372
3370
return ;
@@ -3427,8 +3425,7 @@ static void aggregate_streaming_centralstore_triangles_command(
3427
3425
}
3428
3426
3429
3427
std::string aggregatedTriangles = JasmineGraphInstanceService::aggregateStreamingCentralStoreTriangles (
3430
- graphId, partitionId, partitionIdList, centralCountList,
3431
- threadPriority, incrementalLocalStoreMap, mode);
3428
+ graphId, partitionId, partitionIdList, centralCountList, threadPriority, incrementalLocalStoreMap, mode);
3432
3429
3433
3430
if (threadPriority > Conts::DEFAULT_THREAD_PRIORITY) {
3434
3431
threadPriorityMutex.lock ();
@@ -4252,11 +4249,11 @@ static void send_priority_command(int connFd, bool *loop_exit_p) {
4252
4249
}
4253
4250
4254
4251
string JasmineGraphInstanceService::aggregateStreamingCentralStoreTriangles (
4255
- std::string graphId, std::string partitionId, std::string partitionIdString,
4256
- std::string centralCountString, int threadPriority ,
4257
- std::map<std::string, JasmineGraphIncrementalLocalStore *> incrementalLocalStores, std::string mode) {
4252
+ std::string graphId, std::string partitionId, std::string partitionIdString, std::string centralCountString ,
4253
+ int threadPriority, std::map<std:: string, JasmineGraphIncrementalLocalStore *> incrementalLocalStores ,
4254
+ std::string mode) {
4258
4255
instance_logger.info (" ###INSTANCE### Started Aggregating Central Store Triangles" );
4259
- std::vector<JasmineGraphIncrementalLocalStore*> incrementalLocalStoreInstances;
4256
+ std::vector<JasmineGraphIncrementalLocalStore *> incrementalLocalStoreInstances;
4260
4257
std::vector<std::string> centralCountList = Utils::split (centralCountString, ' ,' );
4261
4258
std::vector<std::string> partitionIdList = Utils::split (partitionIdString, ' ,' );
4262
4259
partitionIdList.push_back (partitionId);
@@ -4270,9 +4267,8 @@ string JasmineGraphInstanceService::aggregateStreamingCentralStoreTriangles(
4270
4267
JasmineGraphIncrementalLocalStore *incrementalLocalStoreInstance;
4271
4268
4272
4269
if (incrementalLocalStores.find (graphIdentifier) == incrementalLocalStores.end ()) {
4273
- incrementalLocalStoreInstance =
4274
- JasmineGraphInstanceService::loadStreamingStore (graphId, aggregatePartitionId,
4275
- incrementalLocalStores, " app" );
4270
+ incrementalLocalStoreInstance = JasmineGraphInstanceService::loadStreamingStore (
4271
+ graphId, aggregatePartitionId, incrementalLocalStores, " app" );
4276
4272
} else {
4277
4273
incrementalLocalStoreInstance = incrementalLocalStores[graphIdentifier];
4278
4274
}
@@ -4283,8 +4279,7 @@ string JasmineGraphInstanceService::aggregateStreamingCentralStoreTriangles(
4283
4279
if (mode == " 0" ) {
4284
4280
triangles = StreamingTriangles::countCentralStoreStreamingTriangles (incrementalLocalStoreInstances);
4285
4281
} else {
4286
- triangles = StreamingTriangles::countDynamicCentralTriangles (
4287
- incrementalLocalStoreInstances, centralCountList);
4282
+ triangles = StreamingTriangles::countDynamicCentralTriangles (incrementalLocalStoreInstances, centralCountList);
4288
4283
}
4289
4284
4290
4285
instance_logger.info (" ###INSTANCE### Central Store Aggregation : Completed" );
0 commit comments