diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md
index 1cb1097a9b..67c3fa36ee 100644
--- a/docs/configs/janusgraph-cfg.md
+++ b/docs/configs/janusgraph-cfg.md
@@ -582,7 +582,7 @@ HBase storage options
| storage.hbase.short-cf-names | Whether to shorten the names of JanusGraph's column families to one-character mnemonics to conserve storage space | Boolean | true | FIXED |
| storage.hbase.skip-schema-check | Assume that JanusGraph's HBase table and column families already exist. When this is true, JanusGraph will not check for the existence of its table/CFs, nor will it attempt to create them under any circumstances. This is useful when running JanusGraph without HBase admin privileges. | Boolean | false | MASKABLE |
| storage.hbase.snapshot-name | The name of an existing HBase snapshot to be used by HBaseSnapshotInputFormat | String | janusgraph-snapshot | LOCAL |
-| storage.hbase.snapshot-restore-dir | The temporary directory to be used by HBaseSnapshotInputFormat to restore a snapshot. This directory should be on the same File System as the HBase root dir. | String | /tmp | LOCAL |
+| storage.hbase.snapshot-restore-dir | The temporary directory to be used by HBaseSnapshotInputFormat to restore a snapshot. This directory should be on the same File System as the HBase root dir. | String | /var/folders/9r/nv_glvys3kgcw706nvfkvgf40000gr/T/ | LOCAL |
| storage.hbase.table | The name of the table JanusGraph will use. When storage.hbase.skip-schema-check is false, JanusGraph will automatically create this table if it does not already exist. If this configuration option is not provided but graph.graphname is, the table will be set to that value. | String | janusgraph | LOCAL |
### storage.lock
diff --git a/janusgraph-couchbase/pom.xml b/janusgraph-couchbase/pom.xml
new file mode 100644
index 0000000000..79d4269f86
--- /dev/null
+++ b/janusgraph-couchbase/pom.xml
@@ -0,0 +1,257 @@
+
+
+ 4.0.0
+
+ org.janusgraph
+ janusgraph
+ 1.1.0-SNAPSHOT
+ ../pom.xml
+
+
+ janusgraph-couchbase
+ JanusGraph-Couchbase: Couchbase Backend for JanusGraph
+
+
+ 3.8.1
+ 1.8
+ 1.8
+ 1.8
+ 2.3.4
+ 3.3.4
+ 8.11.2
+ 2.10.3
+ 3.2.0
+ false
+ true
+ 1.15.3
+
+
+
+
+ Jagadesh Munta
+ jagadesh.munta@couchbase.com
+
+
+ Denis Souza Rosa
+ denis.rosa@couchbase.com
+
+
+ Dmitrii Chechetkin
+ dmitrii.chechetkin@couchbase.com
+
+
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+
+ org.janusgraph
+ janusgraph-core
+ ${project.version}
+
+
+ org.janusgraph
+ janusgraph-server
+ ${project.version}
+ provided
+
+
+ ch.qos.logback
+ logback-classic
+ 0.9.24
+ provided
+
+
+ org.janusgraph
+ janusgraph-backend-testutils
+ ${project.version}
+ test
+
+
+ org.janusgraph
+ janusgraph-test
+ ${project.version}
+ test
+
+
+ com.couchbase.client
+ core-io
+ ${couchbase.core-io.version}
+
+
+ com.couchbase.client
+ java-client
+ ${couchbase.java-client.version}
+
+
+ org.apache.lucene
+ lucene-queryparser
+ ${lucene-parser.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson-databind.version}
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.6
+
+
+ io.reactivex
+ rxjava
+ 1.3.8
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+ test
+
+
+ org.mockito
+ mockito-inline
+ 3.12.4
+ test
+
+
+ org.powermock
+ powermock-core
+ 2.0.9
+ test
+
+
+ org.powermock
+ powermock-api-mockito2
+ 2.0.9
+ test
+
+
+ org.powermock
+ powermock-module-junit4
+ 2.0.9
+ test
+
+
+ org.powermock
+ powermock-reflect
+ 2.0.9
+ test
+
+
+
+
+ ${basedir}/target
+
+
+ ${basedir}/src/test/resources
+
+
+
+
+ maven-compiler-plugin
+ ${maven.compiler.plugin.version}
+
+
+ ${jdk.version}
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ ${dependency.plugin.version}
+
+
+ copy-dependencies
+ prepare-package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/lib
+ compile
+
+
+
+
+
+ maven-surefire-plugin
+ 2.22.1
+
+ none
+ alphabetical
+ false
+
+ **/*PerformanceTest.java
+ **/*ConcurrentTest.java
+ **/*Groovy*Test.java
+ **/*ComputerTest.java
+ **/*ProcessTest.java
+ **/*ProcessPerformanceTest.java
+ **/*StructureTest.java
+
+ ${test.skip.default}
+
+
+
+ log4j.configuration
+ file:${project.build.directory}/test-classes/log4j.properties
+
+
+
+
+
+ tinkerpop-test
+
+ test
+
+ test
+
+ false
+ 1
+ none
+ 1
+ false
+
+ **/*Groovy*Test.java
+ **/*ComputerTest.java
+ **/*ProcessTest.java
+ **/*ProcessPerformanceTest.java
+ **/*StructureTest.java
+
+ alphabetical
+ ${test.skip.tp}
+
+ ${project.build.directory}
+ file:${project.build.directory}/test-classes/log4j.properties
+ true
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ ${jdk.version}
+
+
+
+
+
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/AbstractDocument.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/AbstractDocument.java
new file mode 100644
index 0000000000..09c3d91c4b
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/AbstractDocument.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import com.couchbase.client.core.msg.kv.MutationToken;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * Common parent implementation of a {@link Document}.
+ *
+ * It is recommended that all {@link Document} implementations extend from this class so that parameter checks
+ * are consistently applied. It also ensures that equals and hashcode are applied on the contents and therefore
+ * comparisons work as expected.
+ *
+ * @author Michael Nitschinger
+ * @since 2.0.0
+ */
+public abstract class AbstractDocument implements Document {
+
+ public static final int MAX_ID_LENGTH = 240;
+ private String id;
+ private long cas;
+ private int expiry;
+ private T content;
+ private MutationToken mutationToken;
+
+ /**
+ * Constructor needed for possible subclass serialization.
+ */
+ protected AbstractDocument() {
+ }
+
+ protected AbstractDocument(String id, int expiry, T content, long cas) {
+ this(id, expiry, content, cas, null);
+ }
+
+ protected AbstractDocument(String id, int expiry, T content, long cas, MutationToken mutationToken) {
+ if (id == null || id.isEmpty()) {
+ throw new IllegalArgumentException("The Document ID must not be null or empty.");
+ }
+ // Quick sanity check, but not 100% accurate. UTF-8 encoding avoided because of double
+ // allocations, it is done in core with proper exact error handling anyways.
+ if (id.length() > MAX_ID_LENGTH) {
+ throw new IllegalArgumentException("The Document ID must not be larger than 250 bytes");
+ }
+ if (expiry < 0) {
+ throw new IllegalArgumentException("The Document expiry must not be negative.");
+ }
+
+ this.id = id;
+ this.cas = cas;
+ this.expiry = expiry;
+ this.content = content;
+ this.mutationToken = mutationToken;
+ }
+
+ @Override
+ public String id() {
+ return id;
+ }
+
+ @Override
+ public long cas() {
+ return cas;
+ }
+
+ @Override
+ public int expiry() {
+ return expiry;
+ }
+
+ @Override
+ public T content() {
+ return content;
+ }
+
+ @Override
+ public MutationToken mutationToken() {
+ return mutationToken;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(this.getClass().getSimpleName() + "{");
+ sb.append("id='").append(id).append('\'');
+ sb.append(", cas=").append(cas);
+ sb.append(", expiry=").append(expiry);
+ sb.append(", content=").append(content);
+ sb.append(", mutationToken=").append(mutationToken);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AbstractDocument> that = (AbstractDocument>) o;
+
+ if (cas != that.cas) return false;
+ if (expiry != that.expiry) return false;
+ if (id != null ? !id.equals(that.id) : that.id != null) return false;
+ if (content != null ? !content.equals(that.content) : that.content != null) return false;
+ return !(mutationToken != null ? !mutationToken.equals(that.mutationToken) : that.mutationToken != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = id != null ? id.hashCode() : 0;
+ result = 31 * result + (int) (cas ^ (cas >>> 32));
+ result = 31 * result + expiry;
+ result = 31 * result + (content != null ? content.hashCode() : 0);
+ result = 31 * result + (mutationToken != null ? mutationToken.hashCode() : 0);
+ return result;
+ }
+
+ /**
+ * Helper method to write the current document state to the output stream for serialization purposes.
+ *
+ * @param stream the stream to write to.
+ * @throws IOException when a stream problem occurs
+ */
+ protected void writeToSerializedStream(ObjectOutputStream stream) throws IOException {
+ stream.writeLong(cas);
+ stream.writeInt(expiry);
+ stream.writeUTF(id);
+ stream.writeObject(content);
+ stream.writeObject(mutationToken);
+ }
+
+ /**
+ * Helper method to create the document from an object input stream, used for serialization purposes.
+ *
+ * @param stream the stream to read from.
+ * @throws IOException when a stream problem occurs
+ * @throws ClassNotFoundException when requested class is not present
+ */
+ @SuppressWarnings("unchecked")
+ protected void readFromSerializedStream(final ObjectInputStream stream) throws IOException, ClassNotFoundException {
+ cas = stream.readLong();
+ expiry = stream.readInt();
+ id = stream.readUTF();
+ content = (T) stream.readObject();
+ mutationToken = (MutationToken) stream.readObject();
+ }
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumn.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumn.java
new file mode 100644
index 0000000000..96c2a748a4
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumn.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+public class CouchbaseColumn implements Comparable {
+ // attributes keys of json document
+ public static final String ID = "id";
+ public static final String TABLE = "table";
+ public static final String COLUMNS = "columns";
+ public static final String KEY = "key";
+ public static final String VALUE = "value";
+ public static final String EXPIRE = "expire";
+ public static final String TTL = "ttl";
+ // instance members
+ private String key;
+ private String value;
+ private long expire;
+ private int ttl;
+
+ public CouchbaseColumn(String key, String value, long expire, int ttl) {
+ this.key = key;
+ this.value = value;
+ this.expire = expire;
+ this.ttl = ttl;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public long getExpire() {
+ return expire;
+ }
+
+ public int getTtl() {
+ return ttl;
+ }
+
+ public int compareTo(CouchbaseColumn o) {
+ return key.compareTo(o.key);
+ }
+
+ public boolean equals(Object anObject) {
+ if (this == anObject) {
+ return true;
+ }
+ if (anObject instanceof CouchbaseColumn) {
+ CouchbaseColumn anotherColumn = (CouchbaseColumn)anObject;
+ return key.equals(anotherColumn.key);
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ return key.hashCode();
+ }
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumnConverter.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumnConverter.java
new file mode 100644
index 0000000000..91291e548b
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumnConverter.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import org.janusgraph.diskstorage.StaticBuffer;
+import org.janusgraph.diskstorage.util.StaticArrayBuffer;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+
+public class CouchbaseColumnConverter {//implements StaticBuffer.Factory {
+ public static final CouchbaseColumnConverter INSTANCE = new CouchbaseColumnConverter();
+ private static final char[] hexArray = "0123456789ABCDEF".toCharArray();
+
+
+// @Override
+// public String get(byte[] array, int offset, int limit) {
+// byte[] source = getSource(array, offset, limit);
+// return toString(source);
+// }
+//
+// public String toString(byte[] array) {
+// stringSerializer.
+//
+//
+//// StaticBuffer sb = StaticArrayBuffer.of(array);
+//// return KeyValueStoreUtil.getString(sb);
+// //return Base64.getEncoder().encodeToString(array);
+// }
+
+
+ public static String toString(byte[] bytes) {
+ char[] hexChars = new char[bytes.length * 2];
+ for (int j = 0; j < bytes.length; j++) {
+ int v = bytes[j] & 0xFF;
+ hexChars[j * 2] = hexArray[v >>> 4];
+ hexChars[j * 2 + 1] = hexArray[v & 0x0F];
+ }
+ String s = new String(hexChars);
+
+// byte[] b = toByteArray(s);
+// if (!Arrays.equals(bytes, b)) {
+// System.out.println("fail");
+// }
+
+ return s;
+ }
+
+ public byte[] toByteArray(String value) {
+// final StaticBuffer buffer = toStaticBuffer(value);
+// return buffer.getBytes(0, buffer.length());
+
+// StaticBuffer sb = KeyValueStoreUtil.getBuffer(value);
+// String s = toString(sb);
+// System.out.println(s);
+// assert value.equals(s);
+// return sb.getBytes(0, sb.length());
+ int len = value == null ? 0 : value.length();
+ byte[] data = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ data[i / 2] = (byte) ((Character.digit(value.charAt(i), 16) << 4)
+ + Character.digit(value.charAt(i + 1), 16));
+ }
+ return data;
+ //return Base64.getDecoder().decode(value);
+ }
+
+ public static String toString(StaticBuffer buffer) {
+ return toString(buffer.as(StaticBuffer.ARRAY_FACTORY));
+ //return stringSerializer.read(buffer.asReadBuffer());
+ // return KeyValueStoreUtil.getString(buffer);
+ //return buffer.as(this);
+ }
+
+ public static String toId(String string) {
+ try {
+ byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
+ if (bytes.length > AbstractDocument.MAX_ID_LENGTH) {
+ MessageDigest digest = MessageDigest.getInstance("SHA-512");
+ digest.update(bytes);
+ return new StringBuilder(String.valueOf(bytes.length)).append(new String(digest.digest())).toString();
+ }
+ return string;
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public StaticBuffer toStaticBuffer(String value) {
+ return StaticArrayBuffer.of(toByteArray(value));
+// WriteByteBuffer writeBuffer = new WriteByteBuffer();
+// stringSerializer.write(writeBuffer, value);
+// return writeBuffer.getStaticBuffer();
+ //return KeyValueStoreUtil.getBuffer(value);
+// return new StaticArrayBuffer(toByteArray(value));
+ }
+
+ public String toId(StaticBuffer staticBuffer) {
+ return toId(toString(staticBuffer));
+ }
+
+// private byte[] getSource(byte[] array, int offset, int limit) {
+// if (offset == 0 && limit == array.length)
+// return array;
+// else
+// return Arrays.copyOfRange(array, offset, limit);
+// }
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseConfigOptions.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseConfigOptions.java
new file mode 100644
index 0000000000..087dcf282e
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseConfigOptions.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import org.janusgraph.diskstorage.configuration.ConfigNamespace;
+import org.janusgraph.diskstorage.configuration.ConfigOption;
+import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
+import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions;
+
+/**
+ * Configuration options for the Couchbase storage backend.
+ * These are managed under the 'cb' namespace in the configuration.
+ *
+ * @author Jagadesh Munta (jagadesh.munta@couchbase.com)
+ */
+@PreInitializeConfigOptions
+public interface CouchbaseConfigOptions {
+
+ ConfigNamespace CB_NS = new ConfigNamespace(
+ GraphDatabaseConfiguration.STORAGE_NS,
+ "cb",
+ "Couchbase storage backend options");
+
+ ConfigOption VERSION = new ConfigOption<>(
+ CB_NS,
+ "version",
+ "The version of the Couchbase cluster.",
+ ConfigOption.Type.LOCAL,
+ 703);
+
+ ConfigOption CLUSTER_CONNECT_STRING = new ConfigOption<>(
+ CB_NS,
+ "cluster-connect-string",
+ "Connect string to the Couchbase cluster",
+ ConfigOption.Type.LOCAL,
+ "couchbase://localhost");
+
+ ConfigOption CLUSTER_CONNECT_USERNAME = new ConfigOption<>(
+ CB_NS,
+ "cluster-connect-username",
+ "Username to the Couchbase cluster",
+ ConfigOption.Type.LOCAL,
+ "Administrator");
+
+ ConfigOption CLUSTER_CONNECT_PASSWORD = new ConfigOption<>(
+ CB_NS,
+ "cluster-connect-password",
+ "Password to the Couchbase cluster",
+ ConfigOption.Type.LOCAL,
+ "password");
+
+ ConfigOption CLUSTER_CONNECT_BUCKET = new ConfigOption<>(
+ CB_NS,
+ "cluster-connect-bucket",
+ "Bucket in the Couchbase cluster",
+ ConfigOption.Type.LOCAL,
+ "default");
+
+ ConfigOption CLUSTER_DEFAULT_SCOPE = new ConfigOption<>(
+ CB_NS,
+ "cluster-default-scope",
+ "Default Scope ",
+ ConfigOption.Type.LOCAL,
+ "_default");
+
+ ConfigOption CLUSTER_DEFAULT_COLLECTION = new ConfigOption<>(
+ CB_NS,
+ "cluster-default-collection",
+ "Default Collection",
+ ConfigOption.Type.LOCAL,
+ "_default");
+
+ ConfigOption ISOLATION_LEVEL = new ConfigOption<>(
+ CB_NS,
+ "isolation-level",
+ "Options are serializable, read_committed_no_write, read_committed_with_write",
+ ConfigOption.Type.LOCAL,
+ "serializable");
+
+ ConfigOption GET_RANGE_MODE = new ConfigOption<>(
+ CB_NS,
+ "get-range-mode",
+ "The mod of executing CB getRange, either `iterator` or `list`",
+ ConfigOption.Type.LOCAL,
+ "list"
+ );
+
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseDocumentMutation.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseDocumentMutation.java
new file mode 100644
index 0000000000..49371be20d
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseDocumentMutation.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation;
+
+public class CouchbaseDocumentMutation {
+ private String table;
+ private String documentId;
+ private KCVMutation mutation;
+
+ public CouchbaseDocumentMutation(String table, String documentId, KCVMutation mutation) {
+ this.table = table;
+ this.documentId = documentId;
+ this.mutation = mutation;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getDocumentId() {
+ return documentId;
+ }
+
+ public String getHashId() {
+ return CouchbaseColumnConverter.toId(documentId);
+ }
+
+ public KCVMutation getMutation() {
+ return mutation;
+ }
+
+ public String getDocumentKey() {
+ return documentId;
+ }
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseIndex.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseIndex.java
new file mode 100644
index 0000000000..6572cd2f96
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseIndex.java
@@ -0,0 +1,504 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import com.couchbase.client.core.deps.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import com.couchbase.client.core.env.ConnectionStringPropertyLoader;
+import com.couchbase.client.core.env.IoConfig;
+import com.couchbase.client.core.env.SecurityConfig;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.Cluster;
+import com.couchbase.client.java.ClusterOptions;
+import com.couchbase.client.java.Collection;
+import com.couchbase.client.java.Scope;
+import com.couchbase.client.java.env.ClusterEnvironment;
+import com.couchbase.client.java.json.JsonArray;
+import com.couchbase.client.java.manager.collection.CollectionSpec;
+import com.couchbase.client.java.query.QueryOptions;
+import com.couchbase.client.java.query.QueryResult;
+import com.couchbase.client.java.search.SearchOptions;
+import com.couchbase.client.java.search.SearchQuery;
+import com.couchbase.client.java.search.result.SearchResult;
+import org.apache.commons.lang3.StringUtils;
+import org.janusgraph.core.Cardinality;
+import org.janusgraph.core.attribute.Cmp;
+import org.janusgraph.core.attribute.Geo;
+import org.janusgraph.core.attribute.Geoshape;
+import org.janusgraph.core.attribute.Text;
+import org.janusgraph.core.schema.Mapping;
+import org.janusgraph.diskstorage.BackendException;
+import org.janusgraph.diskstorage.BaseTransaction;
+import org.janusgraph.diskstorage.BaseTransactionConfig;
+import org.janusgraph.diskstorage.BaseTransactionConfigurable;
+import org.janusgraph.diskstorage.configuration.Configuration;
+import org.janusgraph.diskstorage.couchbase.lucene.Lucene2CouchbaseQLTranslator;
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.janusgraph.diskstorage.indexing.IndexFeatures;
+import org.janusgraph.diskstorage.indexing.IndexMutation;
+import org.janusgraph.diskstorage.indexing.IndexProvider;
+import org.janusgraph.diskstorage.indexing.IndexQuery;
+import org.janusgraph.diskstorage.indexing.KeyInformation;
+import org.janusgraph.diskstorage.indexing.RawQuery;
+import org.janusgraph.graphdb.database.serialize.AttributeUtils;
+import org.janusgraph.graphdb.query.JanusGraphPredicate;
+import org.janusgraph.graphdb.query.condition.And;
+import org.janusgraph.graphdb.query.condition.Condition;
+import org.janusgraph.graphdb.query.condition.FixedCondition;
+import org.janusgraph.graphdb.query.condition.Not;
+import org.janusgraph.graphdb.query.condition.Or;
+import org.janusgraph.graphdb.query.condition.PredicateCondition;
+import org.janusgraph.graphdb.tinkerpop.optimize.step.Aggregation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.janusgraph.diskstorage.couchbase.CouchbaseConfigOptions.CLUSTER_CONNECT_STRING;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_CONNECT_BUCKET;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_CONNECT_PASSWORD;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_CONNECT_USERNAME;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_DEFAULT_FUZINESS;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_DEFAULT_SCOPE;
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME;
+
+/**
+ * @author : Dmitrii Chechetkin (dmitrii.chechetkin@couchbase.com)
+ */
+public class CouchbaseIndex implements IndexProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseIndex.class);
+
+ private static final String STRING_MAPPING_SUFFIX = "__STRING";
+ static final String FTS_INDEX_NAME = "fulltext_index";
+ private final String name;
+ private final Cluster cluster;
+
+ private final Bucket bucket;
+
+ private final Scope scope;
+
+ private final int fuzziness;
+
+ private final String indexNamePrefix;
+
+ private final String indexNamespace;
+
+ public CouchbaseIndex(Configuration config) {
+ boolean isTLS = false;
+ final String connectString = config.get(CLUSTER_CONNECT_STRING);
+ if (connectString.startsWith("couchbases://")) {
+ isTLS = true;
+ }
+
+ ClusterEnvironment.Builder envBuilder = ClusterEnvironment.builder()
+ .ioConfig(IoConfig.enableDnsSrv(isTLS))
+ .securityConfig(SecurityConfig.enableTls(isTLS)
+ .trustManagerFactory(InsecureTrustManagerFactory.INSTANCE));
+
+ new ConnectionStringPropertyLoader(connectString).load(envBuilder);
+
+ ClusterEnvironment env = envBuilder.build();
+ name = config.get(INDEX_NAME);
+ cluster = Cluster.connect(connectString,
+ ClusterOptions.clusterOptions(config.get(CLUSTER_CONNECT_USERNAME),
+ config.get(CLUSTER_CONNECT_PASSWORD)).environment(env));
+
+ fuzziness = config.get(CLUSTER_DEFAULT_FUZINESS);
+
+ String bucketName = config.get(CLUSTER_CONNECT_BUCKET);
+ String scopeName = config.get(CLUSTER_DEFAULT_SCOPE);
+
+ bucket = cluster.bucket(bucketName);
+ scope = bucket.scope(scopeName);
+ indexNamePrefix = String.format("%s_%s", bucketName, scopeName);
+ indexNamespace = String.format("%s.%s", bucketName, scopeName);
+ }
+
+ @Override
+ public void register(String storeName, String key, KeyInformation information, BaseTransaction tx) throws BackendException {
+ ensureStorageExists(storeName);
+ CouchbaseIndexTransaction cbitx = (CouchbaseIndexTransaction) tx;
+ cbitx.register(storeName, key, information);
+ }
+
+ protected Collection getStorage(String name) {
+ Collection result = scope.collection(name);
+ if (result == null) {
+ bucket.collections().createCollection(CollectionSpec.create(name, scope.name()));
+ result = scope.collection(name);
+ }
+ return result;
+ }
+
+ protected String getIndexFullName(String name) {
+ return indexNamePrefix + "_" + name;
+ }
+
+ @Override
+ public void mutate(Map> mutations, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException {
+ mutations.keySet().forEach(this::ensureStorageExists);
+ ((CouchbaseIndexTransaction)tx).mutate(mutations, information);
+ }
+
+ @Override
+ public void restore(Map>> documents, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException {
+ documents.keySet().forEach(this::ensureStorageExists);
+ ((CouchbaseIndexTransaction)tx).restore(documents, information);
+ }
+
+ @Override
+ public Number queryAggregation(IndexQuery query, KeyInformation.IndexRetriever information, BaseTransaction tx, Aggregation aggregation) throws BackendException {
+ final String aggType = aggregation.getType().name().toLowerCase();
+ final String fieldName = aggregation.getFieldName() == null ? "*" : aggregation.getFieldName();
+ return doQuery(String.format("%s(%s) as __agg_result", aggType, fieldName), query, information, tx)
+ .rowsAsObject().stream()
+ .findFirst().map(row -> row.getLong("__agg_result"))
+ .orElse(0L);
+ }
+
+ protected CollectionSpec ensureStorageExists(String name) {
+ return getCollection(name).orElseGet(() -> createCollection(name));
+ }
+
+ protected Optional getCollection(String name) {
+ return bucket.collections().getAllScopes()
+ .parallelStream()
+ .filter(scopeSpec -> scopeSpec.name().equals(scope.name()))
+ .flatMap(scopeSpec -> scopeSpec.collections().parallelStream())
+ .filter(collectionSpec -> collectionSpec.name().equals(name))
+ .findFirst();
+ }
+
+ protected CollectionSpec createCollection(String name) {
+ CollectionSpec collectionSpec = CollectionSpec.create(name, scope.name(), Duration.ZERO);
+ bucket.collections().createCollection(collectionSpec);
+
+ try {
+ Thread.sleep(2000);
+ scope.query("CREATE PRIMARY INDEX ON `" + name + "`");
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return collectionSpec;
+ }
+
+ protected List transformFilter(String storageName, Condition> condition) {
+ final List result = new LinkedList<>();
+ if (condition instanceof PredicateCondition) {
+ final PredicateCondition atom = (PredicateCondition) condition;
+ Object value = atom.getValue();
+ final String key = atom.getKey();
+ final JanusGraphPredicate predicate = atom.getPredicate();
+ final String fullIndexName = getIndexFullName(storageName);
+ if (value == null && predicate == Cmp.NOT_EQUAL) {
+ result.add(new QueryFilter(String.format("EXISTS %s", key)));
+ } else if (predicate == Cmp.EQUAL
+ || predicate == Cmp.NOT_EQUAL
+ || predicate == Cmp.GREATER_THAN
+ || predicate == Cmp.GREATER_THAN_EQUAL
+ || predicate == Cmp.LESS_THAN
+ || predicate == Cmp.LESS_THAN_EQUAL
+ ) {
+ result.add(new QueryFilter(String.format("%s %s ?", key, predicate), value));
+ } else if (predicate == Text.PREFIX || predicate == Text.NOT_PREFIX) {
+ StringBuilder statement = new StringBuilder();
+ if (predicate == Text.NOT_PREFIX) {
+ statement.append("NOT ");
+ }
+ statement.append("POSITION(LOWER(")
+ .append(key)
+ .append("), LOWER(?)) = 0");
+
+ result.add(new QueryFilter(statement.toString(), value));
+ } else if (predicate == Text.CONTAINS || predicate == Text.NOT_CONTAINS) {
+ StringBuilder statement = new StringBuilder();
+ if (predicate == Text.NOT_CONTAINS) {
+ statement.append("NOT ");
+ }
+ statement.append("CONTAINS(LOWER(")
+ .append(key)
+ .append("), LOWER(?))");
+
+ result.add(new QueryFilter(statement.toString(), value));
+ } else if ((predicate == Text.REGEX || predicate == Text.NOT_REGEX)) {
+ StringBuilder statement = new StringBuilder();
+ if (predicate == Text.NOT_REGEX) {
+ statement.append("NOT ");
+ }
+ statement.append("REGEXP_MATCHES(")
+ .append(key)
+ .append(", ?)");
+ result.add(new QueryFilter(statement.toString(), value));
+ } else if ((predicate == Text.CONTAINS_REGEX || predicate == Text.NOT_CONTAINS_REGEX)) {
+ StringBuilder statement = new StringBuilder();
+ if (predicate == Text.NOT_CONTAINS_REGEX) {
+ statement.append("NOT ");
+ }
+ statement.append("REGEXP_CONTAINS(")
+ .append(key)
+ .append(", ?)");
+ result.add(new QueryFilter(statement.toString(), value));
+ } else if (predicate instanceof Text) {
+ Text textPredicate = (Text) predicate;
+ String not = "";
+ if (textPredicate.name().toLowerCase(Locale.ROOT).startsWith("not_")) {
+ not = "NOT ";
+ }
+ result.add(new QueryFilter(
+ not + "SEARCH(?, ?)",
+ fullIndexName,
+ buildSearchQuery(key, predicate, value)
+ ));
+ } else if (predicate instanceof Geo) {
+ result.add(new QueryFilter(
+ "SEARCH(?, ?)",
+ fullIndexName,
+ buildGeoQuery(key, predicate, value)
+ ));
+ }else {
+ throw new IllegalArgumentException("Unsupported predicate: " + predicate.getClass().getCanonicalName());
+ }
+ } else if (condition instanceof Not) {
+ transformFilter(storageName, ((Not>) condition).getChild()).stream()
+ .map(qf -> new QueryFilter("NOT (" + qf.query() + ")", qf.arguments()))
+ .forEach(result::add);
+ } else if (condition instanceof And || condition instanceof Or) {
+ LinkedList statements = new LinkedList<>();
+ LinkedList