diff --git a/docs/storage_engine/groot.md b/docs/storage_engine/groot.md
index d76aa7452dbc..589d891e3985 100644
--- a/docs/storage_engine/groot.md
+++ b/docs/storage_engine/groot.md
@@ -488,6 +488,10 @@ Else, please proceed to ingest and commit.
Groot graph have several methods for realtime write as follows:
+#### Python
+
+Refer to [test_store_service.py](https://github.com/alibaba/GraphScope/blob/main/python/graphscope/tests/kubernetes/test_store_service.py) for examples.
+
```python
# Inserts one vertex
def insert_vertex(self, vertex: VertexRecordKey, properties: dict) -> int: pass
@@ -549,6 +553,13 @@ class EdgeRecordKey:
self.eid: int = eid # Only required in update and delete operation
```
+
+#### Java
+
+We also have a java sdk for realtime write and schema management.
+
+Refer to [RealtimeWrite.java](https://github.com/alibaba/GraphScope/blob/main/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java) for examples.
+
## Uninstalling and Restarting
### Uninstall Groot
diff --git a/interactive_engine/groot-client/pom.xml b/interactive_engine/groot-client/pom.xml
index 1a3d9cadcfc0..01d2b7c3f0dd 100644
--- a/interactive_engine/groot-client/pom.xml
+++ b/interactive_engine/groot-client/pom.xml
@@ -168,6 +168,23 @@
org.apache.maven.plugins
maven-javadoc-plugin
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 3.1.0
+
+ java
+
+ -classpath
+
+ com.alibaba.graphscope.groot.sdk.example.RealtimeWrite
+
+ com.alibaba.graphscope.groot.sdk.example.RealtimeWrite
+ 1.11
+ -1
+
+
diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java
index 46453683fb82..b4600669cd82 100644
--- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java
+++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java
@@ -13,11 +13,14 @@
*/
package com.alibaba.graphscope.groot.sdk;
+import com.alibaba.graphscope.groot.sdk.schema.Edge;
import com.alibaba.graphscope.groot.sdk.schema.Schema;
+import com.alibaba.graphscope.groot.sdk.schema.Vertex;
import com.alibaba.graphscope.proto.groot.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -28,35 +31,31 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class GrootClient {
private final ClientGrpc.ClientBlockingStub clientStub;
private final ClientWriteGrpc.ClientWriteBlockingStub writeStub;
+ private final ClientWriteGrpc.ClientWriteStub asyncWriteStub;
private final ClientBackupGrpc.ClientBackupBlockingStub backupStub;
private final GrootDdlServiceGrpc.GrootDdlServiceBlockingStub ddlStub;
- private String clientId = "DEFAULT";
-
- private BatchWriteRequest.Builder batchWriteBuilder;
private GrootClient(
ClientGrpc.ClientBlockingStub clientBlockingStub,
ClientWriteGrpc.ClientWriteBlockingStub clientWriteBlockingStub,
+ ClientWriteGrpc.ClientWriteStub clientWriteStub,
ClientBackupGrpc.ClientBackupBlockingStub clientBackupBlockingStub,
GrootDdlServiceGrpc.GrootDdlServiceBlockingStub ddlServiceBlockingStub) {
this.clientStub = clientBlockingStub;
this.writeStub = clientWriteBlockingStub;
+ this.asyncWriteStub = clientWriteStub;
this.backupStub = clientBackupBlockingStub;
this.ddlStub = ddlServiceBlockingStub;
- this.reset();
}
public void close() {}
- private void reset() {
- this.batchWriteBuilder = BatchWriteRequest.newBuilder().setClientId(this.clientId);
- }
-
public com.alibaba.graphscope.proto.GraphDefPb submitSchema(Schema schema) {
BatchSubmitRequest request = schema.toProto();
BatchSubmitResponse response = ddlStub.batchSubmit(request);
@@ -67,25 +66,10 @@ public com.alibaba.graphscope.proto.GraphDefPb submitSchema(Schema.Builder schem
return submitSchema(schema.build());
}
- public void initWriteSession() {
- this.clientId =
- this.writeStub.getClientId(GetClientIdRequest.newBuilder().build()).getClientId();
- this.reset();
- }
-
- /**
- * Commit the realtime write transaction.
- * @return The snapshot_id. The data committed would be available after a while, or you could remoteFlush(snapshot_id)
- * and wait for its return.
- */
- public long commit() {
- long snapshotId = 0L;
- if (this.batchWriteBuilder.getWriteRequestsCount() > 0) {
- BatchWriteResponse response = this.writeStub.batchWrite(this.batchWriteBuilder.build());
- snapshotId = response.getSnapshotId();
- }
- this.reset();
- return snapshotId;
+ private BatchWriteRequest.Builder getNewWriteBuilder() {
+ String clientId =
+ writeStub.getClientId(GetClientIdRequest.newBuilder().build()).getClientId();
+ return BatchWriteRequest.newBuilder().setClientId(clientId);
}
/**
@@ -93,104 +77,213 @@ public long commit() {
* @param snapshotId the snapshot id to be flushed
*/
public void remoteFlush(long snapshotId) {
- this.writeStub.remoteFlush(
- RemoteFlushRequest.newBuilder().setSnapshotId(snapshotId).build());
+ if (snapshotId != 0) {
+ this.writeStub.remoteFlush(
+ RemoteFlushRequest.newBuilder().setSnapshotId(snapshotId).build());
+ }
+ }
+
+ private long modifyVertex(Vertex vertex, WriteTypePb writeType) {
+ WriteRequestPb request = vertex.toWriteRequest(writeType);
+ return submit(request);
+ }
+
+ private long modifyVertex(List vertices, WriteTypePb writeType) {
+ List requests = getVertexWriteRequestPbs(vertices, writeType);
+ return submit(requests);
+ }
+
+ private void modifyVertex(
+ Vertex vertex, StreamObserver callback, WriteTypePb writeType) {
+ WriteRequestPb request = vertex.toWriteRequest(writeType);
+ submit(request, callback);
+ }
+
+ private void modifyVertex(
+ List vertices,
+ StreamObserver callback,
+ WriteTypePb writeType) {
+ List requests = getVertexWriteRequestPbs(vertices, writeType);
+ submit(requests, callback);
+ }
+
+ private long modifyEdge(Edge edge, WriteTypePb writeType) {
+ WriteRequestPb request = edge.toWriteRequest(writeType);
+ return submit(request);
+ }
+
+ private long modifyEdge(List edges, WriteTypePb writeType) {
+ List requests = getEdgeWriteRequestPbs(edges, writeType);
+ return submit(requests);
+ }
+
+ private void modifyEdge(
+ Edge edge, StreamObserver callback, WriteTypePb writeType) {
+ WriteRequestPb request = edge.toWriteRequest(writeType);
+ submit(request, callback);
+ }
+
+ private void modifyEdge(
+ List edges, StreamObserver callback, WriteTypePb writeType) {
+ List requests = getEdgeWriteRequestPbs(edges, writeType);
+ submit(requests, callback);
}
/**
* Add vertex by realtime write
- * @param label vertex label
- * @param properties properties, including the primary key
+ * @param vertex vertex that contains label and pk properties and other properties
*/
- public void addVertex(String label, Map properties) {
- DataRecordPb record = getVertexDataRecord(label, properties);
- WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.INSERT);
- this.batchWriteBuilder.addWriteRequests(request);
+ public long addVertex(Vertex vertex) {
+ return modifyVertex(vertex, WriteTypePb.INSERT);
+ }
+
+ public long addVertices(List vertices) {
+ return modifyVertex(vertices, WriteTypePb.INSERT);
+ }
+
+ public void addVertex(Vertex vertex, StreamObserver callback) {
+ modifyVertex(vertex, callback, WriteTypePb.INSERT);
+ }
+
+ public void addVertices(List vertices, StreamObserver callback) {
+ modifyVertex(vertices, callback, WriteTypePb.INSERT);
}
/**
* Update existed vertex by realtime write
- * @param label vertex label
- * @param properties properties, including the primary key
+ * @param vertex vertex that contains label and pk properties and other properties
*/
- public void updateVertex(String label, Map properties) {
- DataRecordPb record = getVertexDataRecord(label, properties);
- WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.UPDATE);
- this.batchWriteBuilder.addWriteRequests(request);
+ public long updateVertex(Vertex vertex) {
+ return modifyVertex(vertex, WriteTypePb.UPDATE);
+ }
+
+ public long updateVertices(List vertices) {
+ return modifyVertex(vertices, WriteTypePb.UPDATE);
+ }
+
+ public void updateVertex(Vertex vertex, StreamObserver callback) {
+ modifyVertex(vertex, callback, WriteTypePb.UPDATE);
+ }
+
+ public void updateVertices(List vertices, StreamObserver callback) {
+ modifyVertex(vertices, callback, WriteTypePb.UPDATE);
}
/**
* Delete vertex by its primary key
- * @param label vertex label
- * @param properties properties, contains only the primary key
+ * @param vertex vertex that contains label and primary key properties
*/
- public void deleteVertex(String label, Map properties) {
- DataRecordPb record = getVertexDataRecord(label, properties);
- WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.DELETE);
- this.batchWriteBuilder.addWriteRequests(request);
+ public long deleteVertex(Vertex vertex) {
+ WriteRequestPb request = vertex.toWriteRequest(WriteTypePb.DELETE);
+ return submit(request);
+ }
+
+ public long deleteVertices(List vertices) {
+ List requests = getVertexWriteRequestPbs(vertices, WriteTypePb.DELETE);
+ return submit(requests);
+ }
+
+ public void deleteVertex(Vertex vertex, StreamObserver callback) {
+ modifyVertex(vertex, callback, WriteTypePb.DELETE);
+ }
+
+ public void deleteVertices(List vertices, StreamObserver callback) {
+ modifyVertex(vertices, callback, WriteTypePb.DELETE);
}
/**
* Add edge by realtime write
- * @param label edge label
- * @param srcLabel source vertex label
- * @param dstLabel destination vertex label
- * @param srcPk source primary keys
- * @param dstPk destination primary keys
- * @param properties edge properties
+ * @param edge edge that contains label, src vertex label and pk, dst label and pk, and properties
*/
- public void addEdge(
- String label,
- String srcLabel,
- String dstLabel,
- Map srcPk,
- Map dstPk,
- Map properties) {
- DataRecordPb record =
- getEdgeDataRecord(label, srcLabel, dstLabel, srcPk, dstPk, properties);
- WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.INSERT);
- this.batchWriteBuilder.addWriteRequests(request);
+ public long addEdge(Edge edge) {
+ return modifyEdge(edge, WriteTypePb.INSERT);
+ }
+
+ public long addEdges(List edges) {
+ return modifyEdge(edges, WriteTypePb.INSERT);
+ }
+
+ public void addEdge(Edge edge, StreamObserver callback) {
+ modifyEdge(edge, callback, WriteTypePb.INSERT);
+ }
+
+ public void addEdges(List edges, StreamObserver callback) {
+ modifyEdge(edges, callback, WriteTypePb.INSERT);
}
/**
* Update existed edge by realtime write
- * @param label edge label
- * @param srcLabel source vertex label
- * @param dstLabel destination vertex label
- * @param srcPk source primary keys
- * @param dstPk destination primary keys
- * @param properties edge properties
+ * @param edge edge that contains label, src vertex label and pk, dst label and pk, and properties
*/
- public void updateEdge(
- String label,
- String srcLabel,
- String dstLabel,
- Map srcPk,
- Map dstPk,
- Map properties) {
- DataRecordPb record =
- getEdgeDataRecord(label, srcLabel, dstLabel, srcPk, dstPk, properties);
- WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.INSERT);
- this.batchWriteBuilder.addWriteRequests(request);
+ public long updateEdge(Edge edge) {
+ return modifyEdge(edge, WriteTypePb.UPDATE);
+ }
+
+ public long updateEdges(List edges) {
+ return modifyEdge(edges, WriteTypePb.UPDATE);
+ }
+
+ public void updateEdge(Edge edge, StreamObserver callback) {
+ modifyEdge(edge, callback, WriteTypePb.UPDATE);
+ }
+
+ public void updateEdges(List edges, StreamObserver callback) {
+ modifyEdge(edges, callback, WriteTypePb.UPDATE);
}
/**
* Delete an edge by realtime write
- * @param label edge label
- * @param srcLabel source vertex label
- * @param dstLabel destination vertex label
- * @param srcPk source primary keys
- * @param dstPk destination primary keys
+ * @param edge edge that contains label, src vertex label and pk, dst label and pk, no properties required
+ */
+ public long deleteEdge(Edge edge) {
+ return modifyEdge(edge, WriteTypePb.DELETE);
+ }
+
+ public long deleteEdges(List edges) {
+ return modifyEdge(edges, WriteTypePb.DELETE);
+ }
+
+ public void deleteEdge(Edge edge, StreamObserver callback) {
+ modifyEdge(edge, callback, WriteTypePb.DELETE);
+ }
+
+ public void deleteEdges(List edges, StreamObserver callback) {
+ modifyEdge(edges, callback, WriteTypePb.DELETE);
+ }
+
+ /**
+ * Commit the realtime write transaction.
+ * @return The snapshot_id. The data committed would be available after a while, or you could remoteFlush(snapshot_id)
+ * and wait for its return.
*/
- public void deleteEdge(
- String label,
- String srcLabel,
- String dstLabel,
- Map srcPk,
- Map dstPk) {
- DataRecordPb record = getEdgeDataRecord(label, srcLabel, dstLabel, srcPk, dstPk, null);
- WriteRequestPb request = getWriteRequestPb(record, WriteTypePb.INSERT);
- this.batchWriteBuilder.addWriteRequests(request);
+ private long submit(WriteRequestPb request) {
+ BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder();
+ batchWriteBuilder.addWriteRequests(request);
+ return writeStub.batchWrite(batchWriteBuilder.build()).getSnapshotId();
+ }
+
+ private long submit(List requests) {
+ if (requests.isEmpty()) {
+ return 0;
+ }
+ BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder();
+ batchWriteBuilder.addAllWriteRequests(requests);
+ return writeStub.batchWrite(batchWriteBuilder.build()).getSnapshotId();
+ }
+
+ private void submit(WriteRequestPb request, StreamObserver callback) {
+ BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder();
+ batchWriteBuilder.addWriteRequests(request);
+ asyncWriteStub.batchWrite(batchWriteBuilder.build(), callback);
+ }
+
+ private void submit(
+ List requests, StreamObserver callback) {
+ if (!requests.isEmpty()) {
+ BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder();
+ batchWriteBuilder.addAllWriteRequests(requests);
+ asyncWriteStub.batchWrite(batchWriteBuilder.build(), callback);
+ }
}
public GraphDefPb getSchema() {
@@ -364,6 +457,7 @@ public GrootClient build() {
ClientGrpc.ClientBlockingStub clientBlockingStub = ClientGrpc.newBlockingStub(channel);
ClientWriteGrpc.ClientWriteBlockingStub clientWriteBlockingStub =
ClientWriteGrpc.newBlockingStub(channel);
+ ClientWriteGrpc.ClientWriteStub clientWriteStub = ClientWriteGrpc.newStub(channel);
ClientBackupGrpc.ClientBackupBlockingStub clientBackupBlockingStub =
ClientBackupGrpc.newBlockingStub(channel);
GrootDdlServiceGrpc.GrootDdlServiceBlockingStub ddlServiceBlockingStub =
@@ -372,69 +466,29 @@ public GrootClient build() {
BasicAuth basicAuth = new BasicAuth(username, password);
clientBlockingStub = clientBlockingStub.withCallCredentials(basicAuth);
clientWriteBlockingStub = clientWriteBlockingStub.withCallCredentials(basicAuth);
+ clientWriteStub = clientWriteStub.withCallCredentials(basicAuth);
clientBackupBlockingStub = clientBackupBlockingStub.withCallCredentials(basicAuth);
ddlServiceBlockingStub = ddlServiceBlockingStub.withCallCredentials(basicAuth);
}
return new GrootClient(
clientBlockingStub,
clientWriteBlockingStub,
+ clientWriteStub,
clientBackupBlockingStub,
ddlServiceBlockingStub);
}
}
- private VertexRecordKeyPb getVertexRecordKeyPb(String label, Map properties) {
- VertexRecordKeyPb.Builder builder = VertexRecordKeyPb.newBuilder().setLabel(label);
- if (properties != null) {
- builder.putAllPkProperties(properties);
- }
- return builder.build();
- }
-
- private EdgeRecordKeyPb getEdgeRecordKeyPb(
- String label, VertexRecordKeyPb src, VertexRecordKeyPb dst) {
- return EdgeRecordKeyPb.newBuilder()
- .setLabel(label)
- .setSrcVertexKey(src)
- .setDstVertexKey(dst)
- .build();
- }
-
- private DataRecordPb getDataRecordPb(VertexRecordKeyPb key, Map properties) {
- DataRecordPb.Builder builder = DataRecordPb.newBuilder().setVertexRecordKey(key);
- if (properties != null) {
- builder.putAllProperties(properties);
- }
- return builder.build();
- }
-
- private DataRecordPb getDataRecordPb(EdgeRecordKeyPb key, Map properties) {
- DataRecordPb.Builder builder = DataRecordPb.newBuilder().setEdgeRecordKey(key);
- if (properties != null) {
- builder.putAllProperties(properties);
- }
- return builder.build();
- }
-
- private DataRecordPb getVertexDataRecord(String label, Map properties) {
- VertexRecordKeyPb vertexRecordKey = getVertexRecordKeyPb(label, null);
- return getDataRecordPb(vertexRecordKey, properties);
- }
-
- private DataRecordPb getEdgeDataRecord(
- String label,
- String srcLabel,
- String dstLabel,
- Map srcPk,
- Map dstPk,
- Map properties) {
- VertexRecordKeyPb src = getVertexRecordKeyPb(srcLabel, srcPk);
- VertexRecordKeyPb dst = getVertexRecordKeyPb(dstLabel, dstPk);
- EdgeRecordKeyPb edgeRecordKeyPb = getEdgeRecordKeyPb(label, src, dst);
- return getDataRecordPb(edgeRecordKeyPb, properties);
+ private List getVertexWriteRequestPbs(
+ List vertices, WriteTypePb writeType) {
+ return vertices.stream()
+ .map(element -> element.toWriteRequest(writeType))
+ .collect(Collectors.toList());
}
- private WriteRequestPb getWriteRequestPb(DataRecordPb record, WriteTypePb writeType) {
- return WriteRequestPb.newBuilder().setWriteType(writeType).setDataRecord(record).build();
+ private List getEdgeWriteRequestPbs(List edges, WriteTypePb writeType) {
+ return edges.stream()
+ .map(element -> element.toWriteRequest(writeType))
+ .collect(Collectors.toList());
}
}
diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/IngestFile.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/IngestFile.java
index fc791d69dbfd..906b638981c9 100644
--- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/IngestFile.java
+++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/IngestFile.java
@@ -14,6 +14,7 @@
package com.alibaba.graphscope.groot.sdk.example;
import com.alibaba.graphscope.groot.sdk.GrootClient;
+import com.alibaba.graphscope.groot.sdk.schema.Vertex;
import java.io.BufferedReader;
import java.io.File;
@@ -56,24 +57,23 @@ public static void main(String[] args) throws IOException {
propertyNames.add(item);
}
} else {
+ List vertices = new ArrayList<>();
Map properties = new HashMap<>();
String[] items = line.split("\\|");
for (int i = 0; i < items.length; i++) {
properties.put(propertyNames.get(i), items[i]);
}
- client.addVertex(label, properties);
+ vertices.add(new Vertex(label, properties));
count++;
if (count == batchSize) {
- snapshotId = client.commit();
+ snapshotId = client.addVertices(vertices);
count = 0;
}
}
}
}
- long maybeSnapshotId = client.commit();
- long flushSnapshotId = maybeSnapshotId == 0 ? snapshotId : maybeSnapshotId;
- System.out.println("flush snapshotId [" + flushSnapshotId + "]");
- client.remoteFlush(flushSnapshotId);
+ System.out.println("flush snapshotId [" + snapshotId + "]");
+ client.remoteFlush(snapshotId);
System.out.println("done");
}
}
diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/LoadLdbc.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/LoadLdbc.java
index 9b372722e59b..4fa33294c200 100644
--- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/LoadLdbc.java
+++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/LoadLdbc.java
@@ -14,6 +14,8 @@
package com.alibaba.graphscope.groot.sdk.example;
import com.alibaba.graphscope.groot.sdk.GrootClient;
+import com.alibaba.graphscope.groot.sdk.schema.Edge;
+import com.alibaba.graphscope.groot.sdk.schema.Vertex;
import java.io.BufferedReader;
import java.io.FileReader;
@@ -91,7 +93,7 @@ public static String capitalize(String origin) {
}
private static void processVertex(GrootClient client, String label, Path path, int batchSize)
- throws IOException, ParseException {
+ throws IOException {
List propertyNames = new ArrayList<>();
int count = 0;
long snapshotId = 0;
@@ -103,6 +105,7 @@ private static void processVertex(GrootClient client, String label, Path path, i
propertyNames.add(item.split(":")[0]);
}
} else {
+ List vertices = new ArrayList<>();
Map properties = new HashMap<>();
String[] items = line.split("\\|");
for (int i = 0; i < items.length; i++) {
@@ -117,29 +120,27 @@ private static void processVertex(GrootClient client, String label, Path path, i
// }
properties.put(propertyName, propertyVal);
}
- try {
- client.addVertex(label, properties);
- } catch (Exception e) {
- System.err.println(
- "add vertex label ["
- + label
- + "], properties ["
- + properties
- + "] failed. Reason: "
- + e);
- }
+ vertices.add(new Vertex(label, properties));
count++;
if (count == batchSize) {
- snapshotId = client.commit();
+ try {
+ snapshotId = client.addVertices(vertices);
+ } catch (Exception e) {
+ System.err.println(
+ "add vertex label ["
+ + label
+ + "], properties ["
+ + properties
+ + "] failed. Reason: "
+ + e);
+ }
count = 0;
}
}
}
}
- long maybeSnapshotId = client.commit();
- long flushSnapshotId = maybeSnapshotId == 0 ? snapshotId : maybeSnapshotId;
- System.out.println("flush snapshotId [" + flushSnapshotId + "]");
- client.remoteFlush(flushSnapshotId);
+ System.out.println("flush snapshotId [" + snapshotId + "]");
+ client.remoteFlush(snapshotId);
System.out.println("done");
}
@@ -150,7 +151,7 @@ private static void processEdge(
String dstLabel,
Path path,
int batchSize)
- throws IOException, ParseException {
+ throws IOException {
List propertyNames = new ArrayList<>();
int count = 0;
long snapshotId = 0;
@@ -162,6 +163,7 @@ private static void processEdge(
propertyNames.add(item.split(":")[0]);
}
} else {
+ List edges = new ArrayList<>();
Map properties = new HashMap<>();
String[] items = line.split("\\|");
for (int i = 2; i < items.length; i++) {
@@ -180,25 +182,24 @@ private static void processEdge(
// }
properties.put(propertyName, propertyVal);
}
- client.addEdge(
- label,
- srcLabel,
- dstLabel,
- Collections.singletonMap("id", items[0]),
- Collections.singletonMap("id", items[1]),
- properties);
+ edges.add(
+ new Edge(
+ label,
+ srcLabel,
+ dstLabel,
+ Collections.singletonMap("id", items[0]),
+ Collections.singletonMap("id", items[1]),
+ properties));
count++;
if (count == batchSize) {
- snapshotId = client.commit();
+ snapshotId = client.addEdges(edges);
count = 0;
}
}
}
}
- long maybeSnapshotId = client.commit();
- long flushSnapshotId = maybeSnapshotId == 0 ? snapshotId : maybeSnapshotId;
- System.out.println("flush snapshotId [" + flushSnapshotId + "]");
- client.remoteFlush(flushSnapshotId);
+ System.out.println("flush snapshotId [" + snapshotId + "]");
+ client.remoteFlush(snapshotId);
System.out.println("done");
}
}
diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java
index 1d9fae1e7ae5..eb276c928855 100644
--- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java
+++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/RealtimeWrite.java
@@ -1,27 +1,80 @@
package com.alibaba.graphscope.groot.sdk.example;
import com.alibaba.graphscope.groot.sdk.GrootClient;
+import com.alibaba.graphscope.groot.sdk.schema.*;
+import com.alibaba.graphscope.proto.groot.BatchWriteResponse;
+import com.alibaba.graphscope.proto.groot.DataTypePb;
+import io.grpc.stub.StreamObserver;
+
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
public class RealtimeWrite {
+ private static int startId = 0;
+ private static int recordNum = 10000 + startId;
+
+ public void initSchema(GrootClient client) {
+ VertexLabel.Builder person = VertexLabel.newBuilder();
+ person.setLabel("person");
+ Property id =
+ Property.newBuilder()
+ .setName("id")
+ .setDataType(DataTypePb.LONG)
+ .setPrimaryKey()
+ .build();
+ Property.Builder name =
+ Property.newBuilder().setName("name").setDataType(DataTypePb.STRING);
+ Property.Builder age = Property.newBuilder().setName("age").setDataType(DataTypePb.INT);
+ person.addProperty(id);
+ person.addProperty(name);
+ person.addProperty(age);
+
+ VertexLabel.Builder software = VertexLabel.newBuilder();
+ Property.Builder lang =
+ Property.newBuilder().setName("lang").setDataType(DataTypePb.STRING);
+
+ software.setLabel("software");
+ software.addProperty(id);
+ software.addProperty(name);
+ software.addProperty(lang);
+
+ EdgeLabel.Builder created = EdgeLabel.newBuilder();
+ created.setLabel("created");
+ created.addRelation("person", "software");
+ Property.Builder weight =
+ Property.newBuilder().setName("weight").setDataType(DataTypePb.LONG);
+ created.addProperty(weight);
+
+ Schema.Builder schema = Schema.newBuilder();
+ schema.addVertexLabel(person);
+ schema.addVertexLabel(software);
+ schema.addEdgeLabel(created);
+
+ System.out.println(client.submitSchema(schema));
+ System.out.println("testAddLabel succeed");
+ }
+
private static void testAddVerticesEdges(GrootClient client) {
- client.initWriteSession();
for (int i = 0; i < 10; ++i) {
Map properties = new HashMap<>();
properties.put("id", String.valueOf(i));
properties.put("name", "person-" + i);
properties.put("age", String.valueOf(i + 20));
- client.addVertex("person", properties);
+ client.addVertex(new Vertex("person", properties));
properties.clear();
properties.put("id", String.valueOf(i));
properties.put("name", "software-" + i);
properties.put("lang", String.valueOf(i + 200));
- client.addVertex("software", properties);
+ client.addVertex(new Vertex("software", properties));
}
-
+ long snapshotId = 0;
for (int i = 0; i < 10; ++i) {
Map srcPk = new HashMap<>();
Map dstPk = new HashMap<>();
@@ -30,15 +83,15 @@ private static void testAddVerticesEdges(GrootClient client) {
srcPk.put("id", String.valueOf(i));
dstPk.put("id", String.valueOf(i));
properties.put("weight", String.valueOf(i * 100));
- client.addEdge("created", "person", "software", srcPk, dstPk, properties);
+ snapshotId =
+ client.addEdge(
+ new Edge("created", "person", "software", srcPk, dstPk, properties));
}
- long snapshotId = client.commit();
client.remoteFlush(snapshotId);
System.out.println("Finished adding vertices and edges");
}
private static void testUpdateDeleteEdge(GrootClient client) {
- client.initWriteSession();
Map srcPk = new HashMap<>();
Map dstPk = new HashMap<>();
Map properties = new HashMap<>();
@@ -46,42 +99,275 @@ private static void testUpdateDeleteEdge(GrootClient client) {
srcPk.put("id", String.valueOf(0));
dstPk.put("id", String.valueOf(0));
properties.put("weight", String.valueOf(10000));
- client.updateEdge("created", "person", "software", srcPk, dstPk, properties);
- long snapshotId = client.commit();
+ long snapshotId =
+ client.updateEdge(
+ new Edge("created", "person", "software", srcPk, dstPk, properties));
client.remoteFlush(snapshotId);
System.out.println("Finished update edge person-0 -> software-0");
- client.initWriteSession();
- client.deleteEdge("created", "person", "software", srcPk, dstPk);
- snapshotId = client.commit();
+ client.deleteEdge(new Edge("created", "person", "software", srcPk, dstPk));
client.remoteFlush(snapshotId);
System.out.println("Finished delete edge person-0 -> software-0");
}
private static void testUpdateDeleteVertex(GrootClient client) {
- client.initWriteSession();
Map properties = new HashMap<>();
properties.put("id", String.valueOf(0));
properties.put("name", "marko-0-updated");
- client.updateVertex("person", properties);
- long snapshotId = client.commit();
+ long snapshotId = client.updateVertex(new Vertex("person", properties));
client.remoteFlush(snapshotId);
System.out.println("Finished update vertex person-0");
- client.initWriteSession();
Map pk_properties = new HashMap<>();
- client.deleteVertex("person", pk_properties);
- snapshotId = client.commit();
- client.remoteFlush(snapshotId);
+ client.deleteVertex(new Vertex("person", pk_properties));
System.out.println("Finished delete vertex person-0");
}
- public static void main(String[] args) {
+ private static List getVerticesA() {
+ List vertices = new ArrayList<>();
+ for (int i = startId; i < recordNum; ++i) {
+ Map properties = new HashMap<>();
+ properties.put("id", String.valueOf(i));
+ properties.put("name", "person-" + i);
+ properties.put("age", String.valueOf(i + 20));
+ vertices.add(new Vertex("person", properties));
+ }
+ return vertices;
+ }
+
+ private static List getVerticesB() {
+ List vertices = new ArrayList<>();
+ for (int i = startId; i < recordNum; ++i) {
+ Map properties = new HashMap<>();
+ properties.put("id", String.valueOf(i));
+ properties.put("name", "software-" + i);
+ properties.put("lang", String.valueOf(i + 200));
+ vertices.add(new Vertex("software", properties));
+ }
+ return vertices;
+ }
+
+ private static List getEdges() {
+ List edges = new ArrayList<>();
+ for (int i = startId; i < recordNum; ++i) {
+ Map srcPk = new HashMap<>();
+ Map dstPk = new HashMap<>();
+ Map properties = new HashMap<>();
+
+ srcPk.put("id", String.valueOf(i));
+ dstPk.put("id", String.valueOf(i));
+ properties.put("weight", String.valueOf(i * 100));
+ edges.add(new Edge("created", "person", "software", srcPk, dstPk, properties));
+ }
+ return edges;
+ }
+
+ class ClientTask implements Runnable {
+
+ private GrootClient client;
+ private List vertices;
+ private List edges;
+
+ private int type;
+
+ ClientTask(GrootClient client, int type, List vertices, List edges) {
+ this.client = client;
+ this.type = type;
+ this.vertices = vertices;
+ this.edges = edges;
+ }
+
+ @Override
+ public void run() {
+ if (type == 0) {
+ for (int i = 0; i < vertices.size(); ++i) {
+ client.addVertex(vertices.get(i));
+ }
+ } else {
+ for (int i = 0; i < edges.size(); ++i) {
+ client.addEdge(edges.get(i));
+ }
+ }
+ }
+ }
+
+ public void sequential(
+ GrootClient client, List verticesA, List verticesB, List edges) {
+ long snapshotId = 0;
+ TimeWatch watch = TimeWatch.start();
+ {
+ watch.reset();
+ for (Vertex vertex : verticesA) {
+ snapshotId = client.addVertex(vertex);
+ }
+ watch.status("VerticesA");
+ }
+ {
+ watch.reset();
+ for (Vertex vertex : verticesB) {
+ snapshotId = client.addVertex(vertex);
+ }
+ watch.status("VerticesB");
+ }
+ {
+ watch.reset();
+ client.remoteFlush(snapshotId);
+ watch.status("Flush Vertices");
+ System.out.println("Finished add vertices");
+ }
+ {
+ watch.reset();
+ for (Edge edge : edges) {
+ snapshotId = client.addEdge(edge);
+ }
+ watch.status("Edges");
+ }
+ {
+ watch.reset();
+ client.remoteFlush(snapshotId);
+ watch.status("Flush Edges");
+ System.out.println("Finished add edges");
+ }
+ }
+
+ public void sequentialBatch(
+ GrootClient client, List verticesA, List verticesB, List edges) {
+ long snapshotId = 0;
+
+ TimeWatch watch = TimeWatch.start();
+
+ {
+ watch.reset();
+ // snapshotId = client.addVertices(vertices);
+ for (int i = 0; i < verticesA.size(); i += 1000) {
+ snapshotId = client.addVertices(verticesA.subList(i, i + 1000));
+ }
+ watch.status("VerticesA");
+ }
+ {
+ watch.reset();
+ // snapshotId = client.addVertices(vertices);
+ for (int i = 0; i < verticesB.size(); i += 1000) {
+ snapshotId = client.addVertices(verticesB.subList(i, i + 1000));
+ }
+ watch.status("VerticesB");
+ }
+ {
+ watch.reset();
+ client.remoteFlush(snapshotId);
+ watch.status("Flush Vertices");
+ System.out.println("Finished add vertices");
+ }
+ {
+ watch.reset();
+ // snapshotId = client.addEdges(edges);
+ for (int i = 0; i < edges.size(); i += 1000) {
+ snapshotId = client.addEdges(edges.subList(i, i + 1000));
+ }
+ watch.status("Edges");
+ }
+ {
+ watch.reset();
+ client.remoteFlush(snapshotId);
+ watch.status("Flush Edges");
+ System.out.println("Finished add edges");
+ }
+ }
+
+ public void parallel(
+ GrootClient client, List verticesA, List verticesB, List edges)
+ throws InterruptedException {
+ // Create thread pool with 10 threads
+ int taskNum = 30;
+ int offset = 10000 / taskNum;
+ TimeWatch watch = TimeWatch.start();
+ {
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ // Submit 10 tasks to call submit()
+
+ for (int i = 0; i < taskNum * offset; i += offset) {
+ int start = i;
+ int end = start + offset;
+ List subVerticesA = verticesA.subList(start, end);
+ List subVerticesB = verticesB.subList(start, end);
+ executor.submit(new ClientTask(client, 0, subVerticesA, null));
+ executor.submit(new ClientTask(client, 0, subVerticesB, null));
+ }
+ executor.shutdown();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ watch.status("Vertices");
+ }
+ {
+ Thread.sleep(2000);
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ watch.reset();
+ for (int i = 0; i < taskNum * offset; i += offset) {
+ int start = i;
+ int end = start + offset;
+ List subEdges = edges.subList(start, end);
+ executor.submit(new ClientTask(client, 1, null, subEdges));
+ }
+ executor.shutdown();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ watch.status("Edges");
+ }
+ }
+
+ public void sequentialAsync(
+ GrootClient client, List verticesA, List verticesB, List edges)
+ throws InterruptedException {
+ TimeWatch watch = TimeWatch.start();
+ class VertexCallBack implements StreamObserver {
+ @Override
+ public void onNext(BatchWriteResponse value) {
+ // System.out.println("on next");
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // System.out.println("on next");
+ }
+
+ @Override
+ public void onCompleted() {
+ // System.out.println("completed");
+ }
+ }
+ {
+ watch.reset();
+ for (Vertex vertex : verticesA) {
+ client.addVertex(vertex, new VertexCallBack());
+ }
+ watch.status("VerticesA");
+ }
+ {
+ watch.reset();
+ for (Vertex vertex : verticesB) {
+ client.addVertex(vertex, new VertexCallBack());
+ }
+ watch.status("VerticesB");
+ }
+ }
+
+ public static void main(String[] args) throws InterruptedException {
String hosts = "localhost";
int port = 55556;
GrootClient client = GrootClient.newBuilder().addHost(hosts, port).build();
- testAddVerticesEdges(client);
- testUpdateDeleteEdge(client);
- testUpdateDeleteVertex(client);
+
+ RealtimeWrite writer = new RealtimeWrite();
+
+ client.dropSchema();
+ writer.initSchema(client);
+
+ List verticesA = RealtimeWrite.getVerticesA();
+ List verticesB = RealtimeWrite.getVerticesB();
+ List edges = RealtimeWrite.getEdges();
+
+ TimeWatch watch = TimeWatch.start();
+ // writer.sequential(client, verticesA, verticesB, edges);
+ // writer.parallel(client, verticesA, verticesB, edges);
+ // writer.sequentialBatch(client, verticesA, verticesB, edges);
+ writer.sequentialAsync(client, verticesA, verticesB, edges);
+ watch.status("Total");
}
}
diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/TimeWatch.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/TimeWatch.java
new file mode 100644
index 000000000000..d92db897773b
--- /dev/null
+++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/example/TimeWatch.java
@@ -0,0 +1,37 @@
+package com.alibaba.graphscope.groot.sdk.example;
+
+import java.util.concurrent.TimeUnit;
+
+class TimeWatch {
+ long starts;
+
+ public static TimeWatch start() {
+ return new TimeWatch();
+ }
+
+ private TimeWatch() {
+ reset();
+ }
+
+ public TimeWatch reset() {
+ starts = System.currentTimeMillis();
+ return this;
+ }
+
+ public long time() {
+ long ends = System.currentTimeMillis();
+ return ends - starts;
+ }
+
+ public long time(TimeUnit unit) {
+ return unit.convert(time(), TimeUnit.MILLISECONDS);
+ }
+
+ public void status(String prefix) {
+ System.out.println(prefix + ": " + time() + " ms");
+ }
+
+ public void status() {
+ status("Duration");
+ }
+}
diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java
new file mode 100644
index 000000000000..933d292c09b0
--- /dev/null
+++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java
@@ -0,0 +1,115 @@
+package com.alibaba.graphscope.groot.sdk.schema;
+
+import com.alibaba.graphscope.proto.groot.*;
+
+import java.util.Map;
+
+public class Edge {
+ public String label;
+ public String srcLabel;
+ public String dstLabel;
+ public Map srcPk;
+ public Map dstPk;
+ public Map properties;
+
+ /**
+ * Construct an edge
+ * @param label edge label
+ * @param srcLabel source vertex label
+ * @param dstLabel destination vertex label
+ * @param srcPk source primary keys
+ * @param dstPk destination primary keys
+ * @param properties edge properties
+ */
+ public Edge(
+ String label,
+ String srcLabel,
+ String dstLabel,
+ Map srcPk,
+ Map dstPk,
+ Map properties) {
+ this.label = label;
+ this.srcLabel = srcLabel;
+ this.dstLabel = dstLabel;
+ this.srcPk = srcPk;
+ this.dstPk = dstPk;
+ this.properties = properties;
+ }
+
+ public Edge(
+ String label,
+ String srcLabel,
+ String dstLabel,
+ Map srcPk,
+ Map dstPk) {
+ this(label, srcLabel, dstLabel, srcPk, dstPk, null);
+ }
+
+ public Edge(String label, Vertex src, Vertex dst, Map properties) {
+ this(
+ label,
+ src.getLabel(),
+ dst.getLabel(),
+ src.getProperties(),
+ dst.getProperties(),
+ properties);
+ }
+
+ public Edge(String label, Vertex src, Vertex dst) {
+ this(label, src, dst, null);
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public String getSrcLabel() {
+ return srcLabel;
+ }
+
+ public String getDstLabel() {
+ return dstLabel;
+ }
+
+ public Map getSrcPk() {
+ return srcPk;
+ }
+
+ public Map getDstPk() {
+ return dstPk;
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public EdgeRecordKeyPb toEdgeRecordKey() {
+ return EdgeRecordKeyPb.newBuilder()
+ .setLabel(label)
+ .setSrcVertexKey(toVertexRecordKey(srcLabel, srcPk))
+ .setDstVertexKey(toVertexRecordKey(dstLabel, dstPk))
+ .build();
+ }
+
+ public DataRecordPb toDataRecord() {
+ DataRecordPb.Builder builder =
+ DataRecordPb.newBuilder().setEdgeRecordKey(toEdgeRecordKey());
+ if (properties != null) {
+ builder.putAllProperties(properties);
+ }
+ return builder.build();
+ }
+
+ public WriteRequestPb toWriteRequest(WriteTypePb writeType) {
+ return WriteRequestPb.newBuilder()
+ .setWriteType(writeType)
+ .setDataRecord(toDataRecord())
+ .build();
+ }
+
+ private VertexRecordKeyPb toVertexRecordKey(String label, Map properties) {
+ VertexRecordKeyPb.Builder builder = VertexRecordKeyPb.newBuilder().setLabel(label);
+ builder.putAllPkProperties(properties);
+ return builder.build();
+ }
+}
diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Vertex.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Vertex.java
new file mode 100644
index 000000000000..71b66a074ded
--- /dev/null
+++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Vertex.java
@@ -0,0 +1,52 @@
+package com.alibaba.graphscope.groot.sdk.schema;
+
+import com.alibaba.graphscope.proto.groot.DataRecordPb;
+import com.alibaba.graphscope.proto.groot.VertexRecordKeyPb;
+import com.alibaba.graphscope.proto.groot.WriteRequestPb;
+import com.alibaba.graphscope.proto.groot.WriteTypePb;
+
+import java.util.Map;
+
+public class Vertex {
+ public String label;
+ public Map properties;
+
+ public Vertex(String label) {
+ this(label, null);
+ }
+
+ public Vertex(String label, Map properties) {
+ this.label = label;
+ this.properties = properties;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public VertexRecordKeyPb toVertexRecordKey(String label) {
+ VertexRecordKeyPb.Builder builder = VertexRecordKeyPb.newBuilder().setLabel(label);
+ // builder.putAllPkProperties(properties);
+ return builder.build();
+ }
+
+ public DataRecordPb toDataRecord() {
+ DataRecordPb.Builder builder =
+ DataRecordPb.newBuilder().setVertexRecordKey(toVertexRecordKey(label));
+ if (properties != null) {
+ builder.putAllProperties(properties);
+ }
+ return builder.build();
+ }
+
+ public WriteRequestPb toWriteRequest(WriteTypePb writeType) {
+ return WriteRequestPb.newBuilder()
+ .setWriteType(writeType)
+ .setDataRecord(toDataRecord())
+ .build();
+ }
+}
diff --git a/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientBackupTest.java b/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientBackupTest.java
index ddd9181179a7..3091722b2de7 100644
--- a/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientBackupTest.java
+++ b/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientBackupTest.java
@@ -13,6 +13,7 @@
*/
package com.alibaba.graphscope.groot.sdk;
+import com.alibaba.graphscope.groot.sdk.schema.Vertex;
import com.alibaba.graphscope.proto.groot.BackupInfoPb;
import org.junit.jupiter.api.Assertions;
@@ -50,9 +51,8 @@ public void testBackup() throws InterruptedException, IOException, URISyntaxExce
properties.put("id", "" + i);
properties.put("name", "young_" + i);
properties.put("age", "18");
- client.addVertex("person", properties);
+ client.addVertex(new Vertex("person", properties));
}
- client.commit();
Thread.sleep(3000L);
List backupInfoList;
@@ -68,9 +68,8 @@ public void testBackup() throws InterruptedException, IOException, URISyntaxExce
properties.put("id", "" + i);
properties.put("name", "lop_" + i);
properties.put("lang", "java");
- client.addVertex("software", properties);
+ client.addVertex(new Vertex("software", properties));
}
- client.commit();
Thread.sleep(3000L);
int backupId2 = client.createNewGraphBackup();
diff --git a/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientTest.java b/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientTest.java
index b8baa3122c6f..7a57ca22a558 100644
--- a/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientTest.java
+++ b/interactive_engine/groot-client/src/test/java/com/alibaba/graphscope/groot/sdk/ClientTest.java
@@ -13,6 +13,8 @@
*/
package com.alibaba.graphscope.groot.sdk;
+import com.alibaba.graphscope.groot.sdk.schema.Edge;
+import com.alibaba.graphscope.groot.sdk.schema.Vertex;
import com.alibaba.graphscope.proto.groot.GraphDefPb;
import org.junit.jupiter.api.Test;
@@ -70,30 +72,29 @@ void testGetSchema() {
@Test
void testAddData() {
- client.initWriteSession();
Map properties = new HashMap<>();
properties.put("name", "alice");
properties.put("id", "12345");
- client.addVertex("person", properties);
+ client.addVertex(new Vertex("person", properties));
properties = new HashMap<>();
properties.put("name", "bob");
properties.put("id", "88888");
- client.addVertex("person", properties);
+ client.addVertex(new Vertex("person", properties));
for (int i = 0; i < 100; i++) {
properties = new HashMap<>();
properties.put("name", "test" + i);
properties.put("id", "" + i);
- client.addVertex("person", properties);
+ client.addVertex(new Vertex("person", properties));
}
client.addEdge(
- "knows",
- "person",
- "person",
- Collections.singletonMap("id", "12345"),
- Collections.singletonMap("id", "88888"),
- Collections.singletonMap("weight", "20201111"));
- client.commit();
+ new Edge(
+ "knows",
+ "person",
+ "person",
+ Collections.singletonMap("id", "12345"),
+ Collections.singletonMap("id", "88888"),
+ Collections.singletonMap("weight", "20201111")));
}
}
diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java
index 91ab1840bb16..e8f9eda06231 100644
--- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java
+++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/GarbageCollectManager.java
@@ -44,12 +44,9 @@ public void start() {
for (int i = 0; i < CommonConfig.STORE_NODE_COUNT.get(configs); i++) {
CoordinatorSnapshotClient client = clients.getClient(i);
client.synchronizeSnapshot(offlineVersion);
- logger.info(
- "Offline version of store ["
- + i
- + "] updated to ["
- + offlineVersion
- + "]");
+ if (i == 0 && offlineVersion % 100 == 0) {
+ logger.info("Offline version updated to {}", offlineVersion);
+ }
}
}
} catch (Exception e) {
diff --git a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java
index e03b812a490a..b03317ce38d8 100644
--- a/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java
+++ b/interactive_engine/groot-server/src/test/java/com/alibaba/graphscope/groot/tests/gremlin/GrootGraph.java
@@ -20,6 +20,8 @@
import com.alibaba.graphscope.groot.common.config.GremlinConfig;
import com.alibaba.graphscope.groot.common.exception.GrootException;
import com.alibaba.graphscope.groot.sdk.GrootClient;
+import com.alibaba.graphscope.groot.sdk.schema.Edge;
+import com.alibaba.graphscope.groot.sdk.schema.Vertex;
import com.alibaba.graphscope.groot.servers.MaxNode;
import com.alibaba.graphscope.groot.servers.NodeBase;
import com.alibaba.graphscope.groot.tests.common.GrootIORegistry;
@@ -36,9 +38,7 @@
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.ProfileStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.FilterRankingStrategy;
-import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Transaction;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +51,6 @@
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
public class GrootGraph extends RemoteTestGraph {
@@ -134,96 +133,100 @@ public void loadData(LoadGraphWith.GraphData graphData) throws InterruptedExcept
Thread.sleep(5000);
String graphName = graphData.name().toLowerCase();
if (graphName.equals("modern")) {
- ddlClient.initWriteSession();
-
+ long snapshotId;
Map v1 = new HashMap<>();
v1.put("id", "1");
v1.put("name", "marko");
v1.put("age", "29");
- ddlClient.addVertex("person", v1);
+ ddlClient.addVertex(new Vertex("person", v1));
Map v2 = new HashMap<>();
v2.put("id", "2");
v2.put("name", "vadas");
v2.put("age", "27");
- ddlClient.addVertex("person", v2);
+ ddlClient.addVertex(new Vertex("person", v2));
Map v4 = new HashMap<>();
v4.put("id", "4");
v4.put("name", "josh");
v4.put("age", "32");
- ddlClient.addVertex("person", v4);
+ ddlClient.addVertex(new Vertex("person", v4));
Map v6 = new HashMap<>();
v6.put("id", "6");
v6.put("name", "peter");
v6.put("age", "35");
- ddlClient.addVertex("person", v6);
+ ddlClient.addVertex(new Vertex("person", v6));
Map v3 = new HashMap<>();
v3.put("id", "3");
v3.put("name", "lop");
v3.put("lang", "java");
- ddlClient.addVertex("software", v3);
+ ddlClient.addVertex(new Vertex("software", v3));
Map v5 = new HashMap<>();
v5.put("id", "5");
v5.put("name", "ripple");
v5.put("lang", "java");
- ddlClient.addVertex("software", v5);
-
- ddlClient.commit();
- Thread.sleep(5000);
+ snapshotId = ddlClient.addVertex(new Vertex("software", v5));
- ddlClient.addEdge(
- "knows",
- "person",
- "person",
- Collections.singletonMap("id", "1"),
- Collections.singletonMap("id", "2"),
- Collections.singletonMap("weight", "0.5"));
+ ddlClient.remoteFlush(snapshotId);
ddlClient.addEdge(
- "created",
- "person",
- "software",
- Collections.singletonMap("id", "1"),
- Collections.singletonMap("id", "3"),
- Collections.singletonMap("weight", "0.4"));
+ new Edge(
+ "knows",
+ "person",
+ "person",
+ Collections.singletonMap("id", "1"),
+ Collections.singletonMap("id", "2"),
+ Collections.singletonMap("weight", "0.5")));
ddlClient.addEdge(
- "knows",
- "person",
- "person",
- Collections.singletonMap("id", "1"),
- Collections.singletonMap("id", "4"),
- Collections.singletonMap("weight", "1.0"));
+ new Edge(
+ "created",
+ "person",
+ "software",
+ Collections.singletonMap("id", "1"),
+ Collections.singletonMap("id", "3"),
+ Collections.singletonMap("weight", "0.4")));
ddlClient.addEdge(
- "created",
- "person",
- "software",
- Collections.singletonMap("id", "4"),
- Collections.singletonMap("id", "3"),
- Collections.singletonMap("weight", "0.4"));
+ new Edge(
+ "knows",
+ "person",
+ "person",
+ Collections.singletonMap("id", "1"),
+ Collections.singletonMap("id", "4"),
+ Collections.singletonMap("weight", "1.0")));
ddlClient.addEdge(
- "created",
- "person",
- "software",
- Collections.singletonMap("id", "4"),
- Collections.singletonMap("id", "5"),
- Collections.singletonMap("weight", "1.0"));
+ new Edge(
+ "created",
+ "person",
+ "software",
+ Collections.singletonMap("id", "4"),
+ Collections.singletonMap("id", "3"),
+ Collections.singletonMap("weight", "0.4")));
ddlClient.addEdge(
- "created",
- "person",
- "software",
- Collections.singletonMap("id", "6"),
- Collections.singletonMap("id", "3"),
- Collections.singletonMap("weight", "0.2"));
-
- ddlClient.commit();
+ new Edge(
+ "created",
+ "person",
+ "software",
+ Collections.singletonMap("id", "4"),
+ Collections.singletonMap("id", "5"),
+ Collections.singletonMap("weight", "1.0")));
+
+ snapshotId =
+ ddlClient.addEdge(
+ new Edge(
+ "created",
+ "person",
+ "software",
+ Collections.singletonMap("id", "6"),
+ Collections.singletonMap("id", "3"),
+ Collections.singletonMap("weight", "0.2")));
+ ddlClient.remoteFlush(snapshotId);
Thread.sleep(5000);
} else {
throw new UnsupportedOperationException("graph " + graphName + " is unsupported yet");
@@ -239,11 +242,6 @@ public GraphTraversalSource traversal() {
return source;
}
- @Override
- public Vertex addVertex(Object... keyValues) {
- throw new UnsupportedOperationException();
- }
-
@Override
public C compute(Class graphComputerClass)
throws IllegalArgumentException {
@@ -255,16 +253,6 @@ public GraphComputer compute() throws IllegalArgumentException {
throw new UnsupportedOperationException();
}
- @Override
- public Iterator vertices(Object... vertexIds) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator edges(Object... edgeIds) {
- throw new UnsupportedOperationException();
- }
-
@Override
public Transaction tx() {
throw new UnsupportedOperationException();