diff --git a/common/src/test/java/org/apache/drill/categories/IPFSStorageTest.java b/common/src/test/java/org/apache/drill/categories/IPFSStorageTest.java
new file mode 100644
index 00000000000..80298c9081e
--- /dev/null
+++ b/common/src/test/java/org/apache/drill/categories/IPFSStorageTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.categories;
+
+/**
+ * This is a category used to mark unit tests that test the IPFS storage plugin.
+ */
+public interface IPFSStorageTest {
+}
+
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index 6dbd6259ff2..ae97caca310 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -956,7 +956,7 @@ const char descriptor_table_protodef_UserBitShared_2eproto[] PROTOBUF_SECTION_VA
"ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000"
"\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014"
"\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022"
- "\032\n\026CANCELLATION_REQUESTED\020\006*\236\013\n\020CoreOper"
+ "\032\n\026CANCELLATION_REQUESTED\020\006*\261\013\n\020CoreOper"
"atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST"
"_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020"
"\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH"
@@ -992,11 +992,11 @@ const char descriptor_table_protodef_UserBitShared_2eproto[] PROTOBUF_SECTION_VA
"CAN\020\?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA"
"N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO"
"NTROLLER\020C\022\022\n\016DRUID_SUB_SCAN\020D\022\021\n\rSPSS_S"
- "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslStat"
- "us\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020"
- "SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013"
- "SASL_FAILED\020\004B.\n\033org.apache.drill.exec.p"
- "rotoB\rUserBitSharedH\001"
+ "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F\022\021\n\rIPFS_SUB"
+ "_SCAN\020G*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022"
+ "\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n"
+ "\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org."
+ "apache.drill.exec.protoB\rUserBitSharedH\001"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_UserBitShared_2eproto_deps[3] = {
&::descriptor_table_Coordination_2eproto,
@@ -1030,7 +1030,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_Use
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_UserBitShared_2eproto_once;
static bool descriptor_table_UserBitShared_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_UserBitShared_2eproto = {
- &descriptor_table_UserBitShared_2eproto_initialized, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto", 5821,
+ &descriptor_table_UserBitShared_2eproto_initialized, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto", 5840,
&descriptor_table_UserBitShared_2eproto_once, descriptor_table_UserBitShared_2eproto_sccs, descriptor_table_UserBitShared_2eproto_deps, 22, 3,
schemas, file_default_instances, TableStruct_UserBitShared_2eproto::offsets,
file_level_metadata_UserBitShared_2eproto, 22, file_level_enum_descriptors_UserBitShared_2eproto, file_level_service_descriptors_UserBitShared_2eproto,
@@ -1269,6 +1269,7 @@ bool CoreOperatorType_IsValid(int value) {
case 68:
case 69:
case 70:
+ case 71:
return true;
default:
return false;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index ae87641a518..f5ebf2adebc 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -392,11 +392,12 @@ enum CoreOperatorType : int {
METADATA_CONTROLLER = 67,
DRUID_SUB_SCAN = 68,
SPSS_SUB_SCAN = 69,
- HTTP_SUB_SCAN = 70
+ HTTP_SUB_SCAN = 70,
+ IPFS_SUB_SCAN = 71
};
bool CoreOperatorType_IsValid(int value);
constexpr CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER;
-constexpr CoreOperatorType CoreOperatorType_MAX = HTTP_SUB_SCAN;
+constexpr CoreOperatorType CoreOperatorType_MAX = IPFS_SUB_SCAN;
constexpr int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* CoreOperatorType_descriptor();
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 3fb86471357..0bcfef54a20 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -56,6 +56,7 @@
storage-kudu
storage-opentsdb
storage-http
+ storage-ipfs
storage-druid
diff --git a/contrib/storage-ipfs/README.md b/contrib/storage-ipfs/README.md
new file mode 100644
index 00000000000..dd86a70a2ee
--- /dev/null
+++ b/contrib/storage-ipfs/README.md
@@ -0,0 +1,99 @@
+# Drill Storage Plugin for IPFS (Minerva)
+
+## Contents
+
+0. [Introduction](#Introduction)
+1. [Configuration](#Configuration)
+2. [Usage Notes](#Usage Notes)
+
+## Introduction
+
+Minerva is a storage plugin of Drill that connects IPFS's decentralized storage and Drill's flexible query engine. Any data file stored on IPFS can be easily accessed from Drill's query interface, just like a file stored on a local disk. Moreover, with Drill's capability of distributed execution, other instances who are also running Minerva can help accelerate the execution: the data stays where it is, and the queries go to the most suitable nodes which stores the data locally, and from there the operations can be performed most efficiently.
+
+## Configuration
+
+1. Set Drill hostname to the IP address of the node to run Drill:
+
+ Edit file `conf/drill-env.sh` and change the environment variable `DRILL_HOST_NAME` to the IP address of the node. Use private or global addresses, depending on whether you plan to run it in a private cluster or on the open Internet.
+
+2. Configure the IPFS storage plugin:
+
+ The default configuration of the IPFS storage plugin is located at `src/resources/bootstrap-storage-plugins.json`:
+
+ ```
+ "ipfs" : {
+ "type":"ipfs",
+ "host": "127.0.0.1",
+ "port": 5001,
+ "max-nodes-per-leaf": 3,
+ "ipfs-timeouts": {
+ "find-provider": 4,
+ "find-peer-info": 4,
+ "fetch-data": 5
+ },
+ "ipfs-caches": {
+ "peer": {"size": 100, "ttl": 600},
+ "provider": {"size": 1000, "ttl": 600}
+ },
+ "groupscan-worker-threads": 50,
+ "formats": null,
+ "enabled": true
+ }
+ ```
+
+ where
+
+ `host` and `port` are the host and API port where your IPFS daemon will be listening. Change it so that it matches the configuration of your IPFS instance.
+
+ `max-nodes-per-leaf` controls how many provider nodes will be considered when the query is being planned. A larger value increases the parallelization width but typically takes longer to find enough providers from DHT resolution. A smaller value does the opposite.
+
+ `ipfs-timeouts` set the maximum amount of time in seconds for various time-consuming operations:
+
+ * `find-provider` is the time allowed to do DHT queries to find providers.
+ * `find-peer-info` is the time allowed to resolve the network addresses of the providers.
+ * `fetch-data` is the time the actual transmission is allowed to take.
+
+ `ipfs-caches` control the size and TTL in seconds of cache entries of various caches used to accelerate query execution:
+
+ * `peer` cache caches peers addresses.
+ * `provider` cache caches which providers provide a particular IPFS object.
+
+ `groupscan-worker-threads` limits the number of worker threads when the planner communicate with the IPFS daemon to resolve providers and peer info.
+
+ `formats` specifies the formats of the files. It is unimplemented for now and does nothing.
+
+3. Configure IPFS
+
+ Start the IPFS daemon first.
+
+ Set a Drill-ready flag to the node:
+
+ ```
+ $ IPFS_NULL_OBJECT=$(ipfs object new)
+ $ ipfs object patch add-link $IPFS_NULL_OBJECT "drill-ready" $IPFS_NULL_OBJECT
+ QmeXLv7D5uV2uXHejg7St7mSXDtqwTw8LuywSBawSxy5iA
+ $ ipfs name publish /ipfs/QmeXLv7D5uV2uXHejg7St7mSXDtqwTw8LuywSBawSxy5iA
+ Published to : /ipfs/QmeXLv7D5uV2uXHejg7St7mSXDtqwTw8LuywSBawSxy5iA
+ ```
+
+ This flag indicates that an IPFS node is also capable of handling Drill queries, and the planner will consider it when scheduling a query to execute distributedly. A node without this flag will be ignored.
+
+ Also, pin the flag so that it will stick on your node:
+
+ ```
+ $ ipfs pin add -r QmeXLv7D5uV2uXHejg7St7mSXDtqwTw8LuywSBawSxy5iA
+ ```
+
+## Usage Notes
+
+1. Compatible data formats
+
+ Currently only JSON files are supported by this storage plugin.
+
+2. Add datasets to IPFS
+
+ IPFS provides the `ipfs add` command to conveniently add a file to IPFS. Unfortunately that command does not split data files into chunks on line boundaries. Use [this script](https://gist.github.com/dbw9580/250e52a54e39a34083f815dea34a89e0) to do proper chunking and add files to IPFS.
+
+3. Timeout exceptions
+
+ IPFS operations can be time-consuming, and sometimes an operation can take forever (e.g. querying the DHT for a non-existent object). Adjust the timeout values in the config to avoid most timeout exceptions.
diff --git a/contrib/storage-ipfs/pom.xml b/contrib/storage-ipfs/pom.xml
new file mode 100644
index 00000000000..711f071817c
--- /dev/null
+++ b/contrib/storage-ipfs/pom.xml
@@ -0,0 +1,87 @@
+
+
+
+
+ drill-contrib-parent
+ org.apache.drill.contrib
+ 1.18.0-SNAPSHOT
+
+ 4.0.0
+
+ drill-ipfs-storage
+ contrib/ipfs-storage-plugin
+
+
+ **/IPFSTestSuit.class
+
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ ${project.version}
+
+
+
+ com.github.ipfs
+ java-ipfs-http-client
+ v1.3.3
+
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ tests
+ ${project.version}
+ test
+
+
+
+ org.apache.drill
+ drill-common
+ tests
+ ${project.version}
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ 0
+
+ ${ipfs.TestSuite}
+
+
+ **/TestIPFSQueries.java
+
+
+
+
+
+
\ No newline at end of file
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
new file mode 100644
index 00000000000..aca5b109162
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.multihash.Multihash;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * Compatibility fixes for java-ipfs-http-client library
+ */
+public class IPFSCompat {
+ public final String host;
+ public final int port;
+ private final String version;
+ public final String protocol;
+ public final int readTimeout;
+ public static final int DEFAULT_READ_TIMEOUT = 0;
+
+ public final DHT dht = new DHT();
+ public final Name name = new Name();
+
+ public IPFSCompat(IPFS ipfs) {
+ this(ipfs.host, ipfs.port);
+ }
+
+ public IPFSCompat(String host, int port) {
+ this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT);
+ }
+
+ public IPFSCompat(String host, int port, String version, boolean ssl, int readTimeout) {
+ this.host = host;
+ this.port = port;
+
+ if (ssl) {
+ this.protocol = "https";
+ } else {
+ this.protocol = "http";
+ }
+
+ this.version = version;
+ this.readTimeout = readTimeout;
+ }
+
+ /**
+ * Resolve names to IPFS CIDs.
+ * See resolve in IPFS doc .
+ *
+ * @param scheme the scheme of the name to resolve, usually IPFS or IPNS
+ * @param path the path to the object
+ * @param recursive whether recursively resolve names until it is a IPFS CID
+ * @return a Map of JSON object, with the result as the value of key "Path"
+ */
+ public Map resolve(String scheme, String path, boolean recursive) {
+ AtomicReference ret = new AtomicReference<>();
+ getObjectStream(
+ "resolve?arg=/" + scheme + "/" + path + "&r=" + recursive,
+ res -> {
+ ret.set((Map) res);
+ return true;
+ },
+ err -> {
+ throw new RuntimeException(err);
+ }
+ );
+ return ret.get();
+ }
+
+ /**
+ * As defined in https://github.com/libp2p/go-libp2p-core/blob/b77fd280f2bfcce22f10a000e8e1d9ec53c47049/routing/query.go#L16
+ */
+ public enum DHTQueryEventType {
+ // Sending a query to a peer.
+ SendingQuery,
+ // Got a response from a peer.
+ PeerResponse,
+ // Found a "closest" peer (not currently used).
+ FinalPeer,
+ // Got an error when querying.
+ QueryError,
+ // Found a provider.
+ Provider,
+ // Found a value.
+ Value,
+ // Adding a peer to the query.
+ AddingPeer,
+ // Dialing a peer.
+ DialingPeer
+ }
+
+ public class DHT {
+ /**
+ * Find internet addresses of a given peer.
+ * See dht/findpeer in IPFS doc .
+ *
+ * @param id the id of the peer to query
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return List of Multiaddresses of the peer
+ */
+ public List findpeerListTimeout(Multihash id, int timeout, ExecutorService executor) {
+ AtomicReference> ret = new AtomicReference<>(new ArrayList<>());
+ timeLimitedExec(
+ "dht/findpeer?arg=" + id,
+ timeout,
+ res -> {
+ Map peer = (Map) res;
+ if (peer == null) {
+ return false;
+ }
+ if ((int) peer.get("Type") != DHTQueryEventType.FinalPeer.ordinal()) {
+ return false;
+ }
+ List responses = (List) peer.get("Responses");
+ if (responses == null || responses.size() == 0) {
+ return false;
+ }
+ // FinalPeer responses have exactly one response
+ Map> response = responses.get(0);
+ if (response == null) {
+ return false;
+ }
+ List addrs = response.get("Addrs");
+
+ ret.set(addrs);
+ return true;
+ },
+ err -> {
+ if (!(err instanceof TimeoutException)) {
+ throw new RuntimeException(err);
+ }
+ },
+ executor
+ );
+ if (ret.get().size() > 0) {
+ return ret.get();
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ /**
+ * Find providers of a given CID.
+ * See dht/findprovs in IPFS doc .
+ *
+ * @param id the CID of the IPFS object
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return List of Multihash of providers of the object
+ */
+ public List findprovsListTimeout(Multihash id, int maxPeers, int timeout, ExecutorService executor) {
+ AtomicReference> ret = new AtomicReference<>(new ArrayList<>());
+ timeLimitedExec(
+ "dht/findprovs?arg=" + id + "&n=" + maxPeers,
+ timeout,
+ res -> {
+ Map peer = (Map) res;
+ if (peer == null) {
+ return false;
+ }
+ if ((int) peer.get("Type") != DHTQueryEventType.Provider.ordinal()) {
+ return false;
+ }
+ List responses = (List) peer.get("Responses");
+ if (responses == null || responses.size() == 0) {
+ return false;
+ }
+ // One Provider message contains only one provider
+ Map response = responses.get(0);
+ if (response == null) {
+ return false;
+ }
+ String providerID = response.get("ID");
+
+ ret.get().add(providerID);
+ return ret.get().size() >= maxPeers;
+ },
+ err -> {
+ if (!(err instanceof TimeoutException)) {
+ throw new RuntimeException(err);
+ }
+ },
+ executor
+ );
+ if (ret.get().size() > 0) {
+ return ret.get();
+ } else {
+ return Collections.emptyList();
+ }
+ }
+ }
+
+ public class Name {
+ /**
+ * Resolve a IPNS name.
+ * See name/resolve in IPFS doc .
+ *
+ * @param hash the IPNS name to resolve
+ * @param timeout timeout value in seconds
+ * @param executor executor
+ * @return a Multihash of resolved name
+ */
+ public Optional resolve(Multihash hash, int timeout, ExecutorService executor) {
+ AtomicReference ret = new AtomicReference<>();
+ timeLimitedExec(
+ "name/resolve?arg=" + hash,
+ timeout,
+ res -> {
+ Map peer = (Map) res;
+ if (peer != null) {
+ ret.set((String) peer.get(("Path")));
+ return true;
+ }
+ return false;
+ },
+ err -> {
+ if (!(err instanceof TimeoutException)) {
+ throw new RuntimeException(err);
+ }
+ },
+ executor
+ );
+ return Optional.ofNullable(ret.get());
+ }
+ }
+
+ private void timeLimitedExec(String path, int timeout, Predicate processor, Consumer error,
+ ExecutorService executor) {
+ CompletableFuture f = CompletableFuture.runAsync(
+ () -> getObjectStream(path, processor, error),
+ executor
+ );
+ try {
+ f.get(timeout, TimeUnit.SECONDS);
+ } catch (TimeoutException | ExecutionException | InterruptedException e) {
+ f.cancel(true);
+ error.accept(e);
+ }
+ }
+
+ private void getObjectStream(String path, Predicate processor, Consumer error) {
+ byte LINE_FEED = (byte) 10;
+
+ try {
+ InputStream in = getStream(path);
+ ByteArrayOutputStream resp = new ByteArrayOutputStream();
+
+ byte[] buf = new byte[4096];
+ int r;
+ while ((r = in.read(buf)) >= 0) {
+ resp.write(buf, 0, r);
+ if (buf[r - 1] == LINE_FEED) {
+ try {
+ boolean done = processor.test(JSONParser.parse(resp.toString()));
+ if (done) {
+ break;
+ }
+ resp.reset();
+ } catch (IllegalStateException e) {
+ in.close();
+ resp.close();
+ error.accept(e);
+ }
+ }
+ }
+ in.close();
+ resp.close();
+ } catch (IOException e) {
+ error.accept(e);
+ }
+ }
+
+ private InputStream getStream(String path) throws IOException {
+ URL target = new URL(protocol, host, port, version + path);
+ HttpURLConnection conn = (HttpURLConnection) target.openConnection();
+ conn.setRequestMethod("POST");
+ conn.setRequestProperty("Content-Type", "application/json");
+ conn.setReadTimeout(readTimeout);
+ return conn.getInputStream();
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java
new file mode 100644
index 00000000000..b7cbf071a4a
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSCacheType;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class IPFSContext {
+ private final IPFS ipfsClient;
+ private final IPFSHelper ipfsHelper;
+ private final IPFSPeer myself;
+ private final IPFSStoragePluginConfig storagePluginConfig;
+ private final IPFSStoragePlugin storagePlugin;
+ private final LoadingCache ipfsPeerCache;
+ private final LoadingCache> providerCache;
+
+ public IPFSContext(IPFSStoragePluginConfig config, IPFSStoragePlugin plugin) throws IOException {
+ this.ipfsClient = new IPFS(config.getHost(), config.getPort());
+ this.ipfsHelper = new IPFSHelper(ipfsClient, Executors.newCachedThreadPool());
+ ipfsHelper.setMaxPeersPerLeaf(config.getMaxNodesPerLeaf());
+ ipfsHelper.setTimeouts(config.getIPFSTimeouts());
+ this.storagePlugin = plugin;
+ this.storagePluginConfig = config;
+
+ this.myself = ipfsHelper.getMyself();
+ this.ipfsPeerCache = CacheBuilder.newBuilder()
+ .maximumSize(config.getIPFSCache(IPFSCacheType.PEER).size)
+ .refreshAfterWrite(config.getIPFSCache(IPFSCacheType.PEER).ttl, TimeUnit.SECONDS)
+ .build(new CacheLoader() {
+ @Override
+ public IPFSPeer load(Multihash key) {
+ return new IPFSPeer(getIPFSHelper(), key);
+ }
+ });
+ this.providerCache = CacheBuilder.newBuilder()
+ .maximumSize(config.getIPFSCache(IPFSCacheType.PROVIDER).size)
+ .refreshAfterWrite(config.getIPFSCache(IPFSCacheType.PROVIDER).ttl, TimeUnit.SECONDS)
+ .build(new CacheLoader>() {
+ @Override
+ public List load(Multihash key) {
+ return ipfsHelper.findprovsTimeout(key);
+ }
+ });
+ }
+
+ public IPFS getIPFSClient() {
+ return ipfsClient;
+ }
+
+ public IPFSHelper getIPFSHelper() {
+ return ipfsHelper;
+ }
+
+ public IPFSPeer getMyself() {
+ return myself;
+ }
+
+ public IPFSStoragePlugin getStoragePlugin() {
+ return storagePlugin;
+ }
+
+ public IPFSStoragePluginConfig getStoragePluginConfig() {
+ return storagePluginConfig;
+ }
+
+ public LoadingCache getIPFSPeerCache() {
+ return ipfsPeerCache;
+ }
+
+ public LoadingCache> getProviderCache() {
+ return providerCache;
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
new file mode 100644
index 00000000000..a9534e95749
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.cid.Cid;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveTask;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@JsonTypeName("ipfs-scan")
+public class IPFSGroupScan extends AbstractGroupScan {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSGroupScan.class);
+ private final IPFSContext ipfsContext;
+ private final IPFSScanSpec ipfsScanSpec;
+ private final IPFSStoragePluginConfig config;
+ private List columns;
+
+ private static final long DEFAULT_NODE_SIZE = 1000L;
+ public static final int DEFAULT_USER_PORT = 31010;
+ public static final int DEFAULT_CONTROL_PORT = 31011;
+ public static final int DEFAULT_DATA_PORT = 31012;
+ public static final int DEFAULT_HTTP_PORT = 8047;
+
+ private ListMultimap assignments;
+ private List ipfsWorkList = Lists.newArrayList();
+ private ListMultimap endpointWorksMap;
+ private List affinities;
+
+ @JsonCreator
+ public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec,
+ @JsonProperty("IPFSStoragePluginConfig") IPFSStoragePluginConfig ipfsStoragePluginConfig,
+ @JsonProperty("columns") List columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry) {
+ this(
+ pluginRegistry.resolve(ipfsStoragePluginConfig, IPFSStoragePlugin.class).getIPFSContext(),
+ ipfsScanSpec,
+ columns
+ );
+ }
+
+ public IPFSGroupScan(IPFSContext ipfsContext,
+ IPFSScanSpec ipfsScanSpec,
+ List columns) {
+ super((String) null);
+ this.ipfsContext = ipfsContext;
+ this.ipfsScanSpec = ipfsScanSpec;
+ this.config = ipfsContext.getStoragePluginConfig();
+ logger.debug("GroupScan constructor called with columns {}", columns);
+ this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS : columns;
+ init();
+ }
+
+ private void init() {
+ IPFSHelper ipfsHelper = ipfsContext.getIPFSHelper();
+ endpointWorksMap = ArrayListMultimap.create();
+
+ Multihash topHash = ipfsScanSpec.getTargetHash(ipfsHelper);
+ try {
+ Map leafPeerMap = getLeafPeerMappings(topHash);
+ logger.debug("Iterating on {} leaves...", leafPeerMap.size());
+
+ ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator();
+ for (Multihash leaf : leafPeerMap.keySet()) {
+ DrillbitEndpoint ep;
+ if (config.isDistributedMode()) {
+ String peerHostname = leafPeerMap
+ .get(leaf)
+ .getDrillbitAddress()
+ .orElseThrow(() -> new RuntimeException("Chosen IPFS peer does not have drillbit address"));
+ ep = registerEndpoint(coordinator, peerHostname);
+ } else {
+ // the foreman is used to execute the plan
+ ep = ipfsContext.getStoragePlugin().getContext().getEndpoint();
+ }
+
+ IPFSWork work = new IPFSWork(leaf);
+ logger.debug("added endpoint {} to work {}", ep.getAddress(), work);
+ work.getByteMap().add(ep, DEFAULT_NODE_SIZE);
+ work.setOnEndpoint(ep);
+ endpointWorksMap.put(ep.getAddress(), work);
+ ipfsWorkList.add(work);
+ }
+ } catch (Exception e) {
+ throw UserException
+ .planError(e)
+ .message("Exception during initialization of IPFS GroupScan")
+ .build(logger);
+ }
+ }
+
+ private DrillbitEndpoint registerEndpoint(ClusterCoordinator coordinator, String peerHostname) {
+ Optional oep = coordinator.getAvailableEndpoints()
+ .stream()
+ .filter(ep -> ep.getAddress().equals(peerHostname))
+ .findAny();
+ DrillbitEndpoint ep;
+ if (oep.isPresent()) {
+ ep = oep.get();
+ logger.debug("Using existing endpoint {}", ep.getAddress());
+ } else {
+ logger.debug("created new endpoint on the fly {}", peerHostname);
+ //DRILL-7754: read ports & version info from IPFS instead of hard-coded
+ ep = DrillbitEndpoint.newBuilder()
+ .setAddress(peerHostname)
+ .setUserPort(DEFAULT_USER_PORT)
+ .setControlPort(DEFAULT_CONTROL_PORT)
+ .setDataPort(DEFAULT_DATA_PORT)
+ .setHttpPort(DEFAULT_HTTP_PORT)
+ .setVersion(DrillVersionInfo.getVersion())
+ .setState(DrillbitEndpoint.State.ONLINE)
+ .build();
+ //DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed?
+ ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep);
+ }
+
+ return ep;
+ }
+
+ Map getLeafPeerMappings(Multihash topHash) {
+ logger.debug("start to recursively expand nested IPFS hashes, topHash={}", topHash);
+ Stopwatch watch = Stopwatch.createStarted();
+ ForkJoinPool forkJoinPool = new ForkJoinPool(config.getNumWorkerThreads());
+ IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, ipfsContext);
+ List leaves = forkJoinPool.invoke(topTask);
+ logger.debug("Took {} ms to expand hash leaves", watch.elapsed(TimeUnit.MILLISECONDS));
+
+ logger.debug("Start to resolve providers");
+ watch.reset().start();
+ Map leafPeerMap;
+ if (config.isDistributedMode()) {
+ leafPeerMap = forkJoinPool.invoke(new IPFSProviderResolver(leaves, ipfsContext));
+ } else {
+ leafPeerMap = new HashMap<>();
+ for (Multihash leaf : leaves) {
+ leafPeerMap.put(leaf, ipfsContext.getMyself());
+ }
+ }
+ logger.debug("Took {} ms to resolve providers", watch.elapsed(TimeUnit.MILLISECONDS));
+
+ return leafPeerMap;
+ }
+
+ private IPFSGroupScan(IPFSGroupScan that) {
+ super(that);
+ this.ipfsContext = that.ipfsContext;
+ this.ipfsScanSpec = that.ipfsScanSpec;
+ this.config = that.config;
+ this.assignments = that.assignments;
+ this.ipfsWorkList = that.ipfsWorkList;
+ this.endpointWorksMap = that.endpointWorksMap;
+ this.columns = that.columns;
+ }
+
+ @JsonProperty
+ public List getColumns() {
+ return columns;
+ }
+
+ @JsonIgnore
+ public IPFSStoragePlugin getStoragePlugin() {
+ return ipfsContext.getStoragePlugin();
+ }
+
+ @JsonProperty
+ public IPFSScanSpec getIPFSScanSpec() {
+ return ipfsScanSpec;
+ }
+
+ @Override
+ public List getOperatorAffinity() {
+ if (affinities == null) {
+ affinities = AffinityCreator.getAffinityMap(ipfsWorkList);
+ }
+ return affinities;
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ DrillbitEndpoint myself = ipfsContext.getStoragePlugin().getContext().getEndpoint();
+ int width;
+ if (endpointWorksMap.containsKey(myself.getAddress())) {
+ // the foreman is also going to be a minor fragment worker under a UnionExchange operator
+ width = ipfsWorkList.size();
+ } else {
+ // the foreman does not hold data, so we have to force parallelization
+ // to make sure there is a UnionExchange operator
+ width = ipfsWorkList.size() + 1;
+ }
+ logger.debug("getMaxParallelizationWidth: {}", width);
+ return width;
+ }
+
+ @Override
+ public void applyAssignments(List incomingEndpoints) {
+ logger.debug("Applying assignments: endpointWorksMap = {}", endpointWorksMap);
+ assignments = AssignmentCreator.getMappings(incomingEndpoints, ipfsWorkList);
+ }
+
+ @Override
+ public IPFSSubScan getSpecificScan(int minorFragmentId) {
+ logger.debug(String.format("getSpecificScan: minorFragmentId = %d", minorFragmentId));
+ List workList = assignments.get(minorFragmentId);
+ List scanSpecList = Lists.newArrayList();
+ if (workList != null) {
+ logger.debug("workList.size(): {}", workList.size());
+
+ for (IPFSWork work : workList) {
+ scanSpecList.add(work.getPartialRootHash());
+ }
+ }
+
+ return new IPFSSubScan(ipfsContext, scanSpecList, ipfsScanSpec.getFormatExtension(), columns);
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ long recordCount = 100000 * endpointWorksMap.size();
+ return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, recordCount, 1, recordCount);
+ }
+
+ @Override
+ public IPFSGroupScan clone(List columns) {
+ logger.debug("IPFSGroupScan clone {}", columns);
+ IPFSGroupScan cloned = new IPFSGroupScan(this);
+ cloned.columns = columns;
+ return cloned;
+ }
+
+ @Override
+ @JsonIgnore
+ public boolean canPushdownProjects(List columns) {
+ return true;
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List children) {
+ Preconditions.checkArgument(children.isEmpty());
+ logger.debug("getNewWithChildren called");
+ return new IPFSGroupScan(this);
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("scan spec", ipfsScanSpec)
+ .field("columns", columns)
+ .toString();
+ }
+
+ private static class IPFSWork implements CompleteWork {
+ private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+ private final Multihash partialRoot;
+ private DrillbitEndpoint onEndpoint = null;
+
+
+ public IPFSWork(String root) {
+ this.partialRoot = Cid.decode(root);
+ }
+
+ public IPFSWork(Multihash root) {
+ this.partialRoot = root;
+ }
+
+ public Multihash getPartialRootHash() {
+ return partialRoot;
+ }
+
+ public void setOnEndpoint(DrillbitEndpoint endpointAddress) {
+ this.onEndpoint = endpointAddress;
+ }
+
+ @Override
+ public long getTotalBytes() {
+ return DEFAULT_NODE_SIZE;
+ }
+
+ @Override
+ public EndpointByteMap getByteMap() {
+ return byteMap;
+ }
+
+ @Override
+ public int compareTo(CompleteWork o) {
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("partial root", partialRoot)
+ .toString();
+ }
+ }
+
+ static class IPFSProviderResolver extends RecursiveTask> {
+ private final List leaves;
+ private final Map ret = new LinkedHashMap<>();
+ private final IPFSPeer myself;
+ private final IPFSHelper helper;
+ private final LoadingCache peerCache;
+ private final LoadingCache> providerCache;
+
+ public IPFSProviderResolver(List leaves, IPFSContext context) {
+ this(leaves, context.getMyself(), context.getIPFSHelper(), context.getIPFSPeerCache(), context.getProviderCache());
+ }
+
+ public IPFSProviderResolver(IPFSProviderResolver reference, List leaves) {
+ this(leaves, reference.myself, reference.helper, reference.peerCache, reference.providerCache);
+ }
+
+ IPFSProviderResolver(List leaves, IPFSPeer myself, IPFSHelper helper, LoadingCache peerCache, LoadingCache> providerCache) {
+ this.leaves = leaves;
+ this.myself = myself;
+ this.helper = helper;
+ this.peerCache = peerCache;
+ this.providerCache = providerCache;
+ }
+
+ @Override
+ protected Map compute() {
+ int totalLeaves = leaves.size();
+ if (totalLeaves == 1) {
+ Multihash hash = leaves.get(0);
+ List providers = providerCache.getUnchecked(hash).parallelStream()
+ .map(peerCache::getUnchecked)
+ .filter(IPFSPeer::isDrillReady)
+ .filter(IPFSPeer::hasDrillbitAddress)
+ .collect(Collectors.toList());
+ if (providers.size() < 1) {
+ logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash);
+ providers.add(myself);
+ }
+ logger.debug("Got {} providers for {} from IPFS", providers.size(), hash);
+
+ //DRILL-7753: better peer selection algorithm
+ Random random = new Random();
+ IPFSPeer chosenPeer = providers.get(random.nextInt(providers.size()));
+ ret.put(hash, chosenPeer);
+ logger.debug("Use peer {} for leaf {}", chosenPeer, hash);
+ return ret;
+ }
+
+ int firstHalf = totalLeaves / 2;
+ ImmutableList resolvers = ImmutableList.of(
+ new IPFSProviderResolver(this, leaves.subList(0, firstHalf)),
+ new IPFSProviderResolver(this, leaves.subList(firstHalf, totalLeaves))
+ );
+ resolvers.forEach(ForkJoinTask::fork);
+ resolvers.reverse().forEach(resolver -> ret.putAll(resolver.join()));
+ return ret;
+ }
+ }
+
+ //DRILL-7756: detect and warn about loops/recursions in case of a malformed tree
+ static class IPFSTreeFlattener extends RecursiveTask> {
+ private final Multihash hash;
+ private final List ret = new LinkedList<>();
+ private final IPFSPeer myself;
+ private final IPFSHelper helper;
+
+ public IPFSTreeFlattener(Multihash hash, IPFSContext context) {
+ this(
+ hash,
+ context.getMyself(),
+ context.getIPFSHelper()
+ );
+ }
+
+ IPFSTreeFlattener(Multihash hash, IPFSPeer myself, IPFSHelper ipfsHelper) {
+ this.hash = hash;
+ this.myself = myself;
+ this.helper = ipfsHelper;
+ }
+
+ public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash) {
+ this(hash, reference.myself, reference.helper);
+ }
+
+ @Override
+ public List compute() {
+ try {
+ MerkleNode metaOrSimpleNode = helper.getObjectLinksTimeout(hash);
+ if (metaOrSimpleNode.links.size() > 0) {
+ logger.debug("{} is a meta node", hash);
+ //DRILL-7755: do something useful with leaf size, e.g. hint Drill about operation costs
+ List intermediates = metaOrSimpleNode.links.stream().map(x -> x.hash).collect(Collectors.toList());
+
+ ImmutableList.Builder builder = ImmutableList.builder();
+ for (Multihash intermediate : intermediates.subList(1, intermediates.size())) {
+ builder.add(new IPFSTreeFlattener(this, intermediate));
+ }
+ ImmutableList subtasks = builder.build();
+ subtasks.forEach(IPFSTreeFlattener::fork);
+
+ IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0));
+ ret.addAll(first.compute());
+ subtasks.reverse().forEach(
+ subtask -> ret.addAll(subtask.join())
+ );
+ } else {
+ logger.debug("{} is a simple node", hash);
+ ret.add(hash);
+ }
+ } catch (IOException e) {
+ throw UserException.planError(e).message("Exception during planning").build(logger);
+ }
+ return ret;
+ }
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
new file mode 100644
index 00000000000..6025752ff99
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.cid.Cid;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.bouncycastle.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA;
+import static org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+/**
+ * Helper class with some utilities that are specific to Drill with an IPFS storage
+ *
+ * DRILL-7778: refactor to support CIDv1
+ */
+public class IPFSHelper {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSHelper.class);
+
+ public static final String IPFS_NULL_OBJECT_HASH = "QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n";
+ public static final Multihash IPFS_NULL_OBJECT = Cid.decode(IPFS_NULL_OBJECT_HASH);
+
+ private ExecutorService executorService;
+ private final IPFS client;
+ private final IPFSCompat clientCompat;
+ private IPFSPeer myself;
+ private int maxPeersPerLeaf;
+ private Map timeouts;
+
+ public IPFSHelper(IPFS ipfs) {
+ this.client = ipfs;
+ this.clientCompat = new IPFSCompat(ipfs);
+ }
+
+ public IPFSHelper(IPFS ipfs, ExecutorService executorService) {
+ this(ipfs);
+ this.executorService = executorService;
+ }
+
+ public void setTimeouts(Map timeouts) {
+ this.timeouts = timeouts;
+ }
+
+ public void setMyself(IPFSPeer myself) {
+ this.myself = myself;
+ }
+
+ /**
+ * Set maximum number of providers per leaf node. The more providers, the more time it takes to do DHT queries, while
+ * it is more likely we can find an optimal peer.
+ *
+ * @param maxPeersPerLeaf max number of providers to search per leaf node
+ */
+ public void setMaxPeersPerLeaf(int maxPeersPerLeaf) {
+ this.maxPeersPerLeaf = maxPeersPerLeaf;
+ }
+
+ public IPFS getClient() {
+ return client;
+ }
+
+ public IPFSCompat getClientCompat() {
+ return clientCompat;
+ }
+
+ public List findprovsTimeout(Multihash id) {
+ List providers;
+ providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, timeouts.get(IPFSTimeOut.FIND_PROV), executorService);
+
+ return providers.stream().map(Cid::decode).collect(Collectors.toList());
+ }
+
+ public List findpeerTimeout(Multihash peerId) {
+ // trying to resolve addresses of a node itself will always hang
+ // so we treat it specially
+ if (peerId.equals(myself.getId())) {
+ return myself.getMultiAddresses();
+ }
+
+ List addrs;
+ addrs = clientCompat.dht.findpeerListTimeout(peerId, timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService);
+ return addrs.stream()
+ .filter(addr -> !addr.equals(""))
+ .map(MultiAddress::new).collect(Collectors.toList());
+ }
+
+ public byte[] getObjectDataTimeout(Multihash object) throws IOException {
+ return timedFailure(client.object::data, object, timeouts.get(IPFSTimeOut.FETCH_DATA));
+ }
+
+ public MerkleNode getObjectLinksTimeout(Multihash object) throws IOException {
+ return timedFailure(client.object::links, object, timeouts.get(IPFSTimeOut.FETCH_DATA));
+ }
+
+ public IPFSPeer getMyself() throws IOException {
+ if (this.myself != null) {
+ return this.myself;
+ }
+
+ Map res = timedFailure(client::id, timeouts.get(FIND_PEER_INFO));
+ Multihash myID = Cid.decode((String) res.get("ID"));
+ // Rule out any non-local addresses as they might be NAT-ed external
+ // addresses that are not always reachable from the inside.
+ // But is it safe to assume IPFS always listens on loopback and local addresses?
+ List myAddrs = ((List) res.get("Addresses"))
+ .stream()
+ .map(MultiAddress::new)
+ .filter(addr -> {
+ try {
+ InetAddress inetAddress = InetAddress.getByName(addr.getHost());
+ if (inetAddress instanceof Inet6Address) {
+ return false;
+ }
+ return inetAddress.isSiteLocalAddress()
+ || inetAddress.isLinkLocalAddress()
+ || inetAddress.isLoopbackAddress();
+ } catch (UnknownHostException e) {
+ return false;
+ }
+ })
+ .collect(Collectors.toList());
+ this.myself = new IPFSPeer(this, myID, myAddrs);
+
+ return this.myself;
+ }
+
+ public Multihash resolve(String prefix, String path, boolean recursive) {
+ Map result = timedFailure(
+ (args) -> clientCompat.resolve((String) args.get(0), (String) args.get(1), (boolean) args.get(2)),
+ ImmutableList.of(prefix, path, recursive),
+ timeouts.get(IPFSTimeOut.FIND_PEER_INFO)
+ );
+ if (!result.containsKey("Path")) {
+ return null;
+ }
+
+ // the path returned is of form /ipfs/Qma...
+ String hashString = result.get("Path").split("/")[2];
+ return Cid.decode(hashString);
+ }
+
+ @FunctionalInterface
+ public interface ThrowingFunction {
+ R apply(final T in) throws E;
+ }
+
+ @FunctionalInterface
+ public interface ThrowingSupplier {
+ R get() throws E;
+ }
+
+ /**
+ * Execute a time-critical operation op within time timeout. Causes the query to fail completely
+ * if the operation times out.
+ *
+ * @param op a Function that represents the operation to perform
+ * @param in the parameter for op
+ * @param timeout consider the execution has timed out after this amount of time in seconds
+ * @param Input type
+ * @param Return type
+ * @param Type of checked exception op throws
+ * @return R the result of the operation
+ * @throws E when the function throws an E
+ */
+ public R timedFailure(ThrowingFunction op, T in, int timeout) throws E {
+ Callable task = () -> op.apply(in);
+ return timedFailure(task, timeout, TimeUnit.SECONDS);
+ }
+
+ public R timedFailure(ThrowingSupplier op, int timeout) throws E {
+ Callable task = op::get;
+ return timedFailure(task, timeout, TimeUnit.SECONDS);
+ }
+
+ private R timedFailure(Callable task, int timeout, TimeUnit timeUnit) throws E {
+ Future res = executorService.submit(task);
+ try {
+ return res.get(timeout, timeUnit);
+ } catch (ExecutionException e) {
+ throw (E) e.getCause();
+ } catch (TimeoutException e) {
+ throw UserException.executionError(e).message("IPFS operation timed out").build(logger);
+ } catch (CancellationException | InterruptedException e) {
+ throw UserException.executionError(e).message("IPFS operation was cancelled or interrupted").build(logger);
+ }
+ }
+
+ /*
+ * DRILL-7753: implement a more advanced algorithm that picks optimal addresses. Maybe check reachability, latency
+ * and bandwidth?
+ */
+
+ /**
+ * Choose a peer's network address from its advertised Multiaddresses.
+ * Prefer globally routable address over local addresses.
+ *
+ * @param peerAddrs Multiaddresses obtained from IPFS.DHT.findprovs
+ * @return network address
+ */
+ public static Optional pickPeerHost(List peerAddrs) {
+ String localAddr = null;
+ for (MultiAddress addr : peerAddrs) {
+ String host = addr.getHost();
+ try {
+ InetAddress inetAddress = InetAddress.getByName(host);
+ if (inetAddress instanceof Inet6Address) {
+ // ignore IPv6 addresses
+ continue;
+ }
+ if (inetAddress.isSiteLocalAddress() || inetAddress.isLinkLocalAddress()) {
+ localAddr = host;
+ } else {
+ return Optional.of(host);
+ }
+ } catch (UnknownHostException ignored) {
+ }
+ }
+
+ return Optional.ofNullable(localAddr);
+ }
+
+ public Optional getPeerDrillHostname(Multihash peerId) {
+ return getPeerData(peerId, "drill-hostname").map(Strings::fromByteArray);
+ }
+
+ /**
+ * Check if an IPFS peer is also running a Drillbit so that it can be used to execute a part of a query.
+ *
+ * @param peerId the id of the peer
+ * @return if the peer is Drill-ready
+ */
+ public boolean isDrillReady(Multihash peerId) {
+ try {
+ return getPeerData(peerId, "drill-ready").isPresent();
+ } catch (RuntimeException e) {
+ return false;
+ }
+ }
+
+ public Optional getIPNSDataHash(Multihash peerId) {
+ Optional> links = getPeerLinks(peerId);
+ if (!links.isPresent()) {
+ return Optional.empty();
+ }
+
+ return links.get().stream()
+ .filter(l -> l.name.equals(Optional.of("drill-data")))
+ .findFirst()
+ .map(l -> l.hash);
+ }
+
+ /**
+ * Get from IPFS data under a peer's ID, i.e. the data identified by /ipfs/{ID}/key.
+ *
+ * @param peerId the peer's ID
+ * @param key key
+ * @return data in bytes
+ */
+ private Optional getPeerData(Multihash peerId, String key) {
+ Optional> links = getPeerLinks(peerId);
+ if (!links.isPresent()) {
+ return Optional.empty();
+ }
+
+ for (MerkleNode link : links.get()) {
+ if (link.name.equals(Optional.of(key))) {
+ try {
+ byte[] result = timedFailure(client.object::data, link.hash, timeouts.get(FETCH_DATA));
+ return Optional.of(result);
+ } catch (IOException e) {
+ return Optional.empty();
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Get all the links under a peer's ID.
+ *
+ * @param peerId peer's ID
+ * @return List of links
+ */
+ private Optional> getPeerLinks(Multihash peerId) {
+ try {
+ Optional optionalPath = clientCompat.name.resolve(peerId, timeouts.get(FIND_PEER_INFO), executorService);
+ if (!optionalPath.isPresent()) {
+ return Optional.empty();
+ }
+ String path = optionalPath.get().substring(6); // path starts with /ipfs/Qm...
+
+ List links = timedFailure(
+ client.object::get,
+ Cid.decode(path),
+ timeouts.get(FETCH_DATA)
+ ).links;
+ if (links.size() > 0) {
+ return Optional.of(links);
+ }
+ } catch (IOException ignored) {
+ }
+ return Optional.empty();
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONReader.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONReader.java
new file mode 100644
index 00000000000..ec9584c7c10
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class IPFSJSONReader implements ManagedReader {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSJSONReader.class);
+ private final IPFSContext ipfsContext;
+ private final Multihash block;
+ private JsonLoader jsonLoader;
+
+ public IPFSJSONReader(IPFSContext ipfsContext, Multihash block) {
+ this.ipfsContext = ipfsContext;
+ this.block = block;
+ }
+
+ @Override
+ public boolean open(SchemaNegotiator negotiator) {
+ CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
+ @Override
+ public void addContext(UserException.Builder builder) {
+ super.addContext(builder);
+ builder.addContext("hash", block.toString());
+ }
+ };
+ negotiator.setErrorContext(errorContext);
+
+ IPFSHelper helper = ipfsContext.getIPFSHelper();
+
+ byte[] rawDataBytes;
+ if (block.equals(IPFSHelper.IPFS_NULL_OBJECT)) {
+ // An empty ipfs object, but an empty string will make Jackson ObjectMapper fail
+ // so treat it specially
+ rawDataBytes = "[{}]".getBytes();
+ } else {
+ try {
+ rawDataBytes = helper.getObjectDataTimeout(block);
+ } catch (final IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to retrieve data from IPFS block")
+ .addContext("Error message", e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
+
+ String rootJson = new String(rawDataBytes);
+ int start = rootJson.indexOf("{");
+ int end = rootJson.lastIndexOf("}");
+ rootJson = rootJson.substring(start, end + 1);
+ InputStream inStream = new ByteArrayInputStream(rootJson.getBytes());
+
+ try {
+ jsonLoader = new JsonLoaderImpl.JsonLoaderBuilder()
+ .resultSetLoader(negotiator.build())
+ .standardOptions(negotiator.queryOptions())
+ .errorContext(errorContext)
+ .fromStream(inStream)
+ .build();
+ } catch (Throwable t) {
+
+ // Paranoia: ensure stream is closed if anything goes wrong.
+ // After this, the JSON loader will close the stream.
+ AutoCloseables.closeSilently(inStream);
+ throw t;
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ return jsonLoader.readBatch();
+ }
+
+ @Override
+ public void close() {
+ if (jsonLoader != null) {
+ jsonLoader.close();
+ jsonLoader = null;
+ }
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java
new file mode 100644
index 00000000000..d59b0a057d2
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+
+import java.util.List;
+import java.util.Optional;
+
+public class IPFSPeer {
+ private final IPFSHelper helper;
+
+ private final Multihash id;
+ private List addrs;
+ private boolean isDrillReady;
+ private boolean isDrillReadyChecked = false;
+ private Optional drillbitAddress = Optional.empty();
+ private boolean drillbitAddressChecked = false;
+
+
+ public IPFSPeer(IPFSHelper helper, Multihash id) {
+ this.helper = helper;
+ this.id = id;
+ }
+
+ IPFSPeer(IPFSHelper helper, Multihash id, List addrs) {
+ this.helper = helper;
+ this.id = id;
+ this.addrs = addrs;
+ this.isDrillReady = helper.isDrillReady(id);
+ this.isDrillReadyChecked = true;
+ this.drillbitAddress = IPFSHelper.pickPeerHost(addrs);
+ this.drillbitAddressChecked = true;
+ }
+
+ public boolean isDrillReady() {
+ if (!isDrillReadyChecked) {
+ isDrillReady = helper.isDrillReady(id);
+ isDrillReadyChecked = true;
+ }
+ return isDrillReady;
+ }
+
+ public boolean hasDrillbitAddress() {
+ return getDrillbitAddress().isPresent();
+ }
+
+ public Optional getDrillbitAddress() {
+ findDrillbitAddress();
+ return drillbitAddress;
+ }
+
+ public List getMultiAddresses() {
+ findDrillbitAddress();
+ return addrs;
+ }
+
+ public Multihash getId() {
+ return id;
+ }
+
+
+ private void findDrillbitAddress() {
+ if (!drillbitAddressChecked) {
+ addrs = helper.findpeerTimeout(id);
+ drillbitAddress = IPFSHelper.pickPeerHost(addrs);
+ drillbitAddressChecked = true;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("IPFSPeer(%s)", id.toBase58());
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanBatchCreator.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanBatchCreator.java
new file mode 100644
index 00000000000..36a13c39492
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanBatchCreator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class IPFSScanBatchCreator implements BatchCreator {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSScanBatchCreator.class);
+
+ @Override
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, IPFSSubScan subScan, List children)
+ throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ logger.debug(String.format("subScanSpecList.size = %d", subScan.getIPFSSubScanSpecList().size()));
+
+ try {
+ ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+ return builder.buildScanOperator(context, subScan);
+ } catch (Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ private ScanFrameworkBuilder createBuilder(OptionManager options, IPFSSubScan subScan) {
+ ManagedScanFramework.ScanFrameworkBuilder builder = new ManagedScanFramework.ScanFrameworkBuilder();
+ builder.projection(subScan.getColumns());
+ builder.setUserName(subScan.getUserName());
+
+ // Provide custom error context
+ builder.errorContext(
+ new ChildErrorContext(builder.errorContext()) {
+ @Override
+ public void addContext(UserException.Builder builder) {
+ builder.addContext("Plugin", subScan.getIPFSContext().getStoragePlugin().getName());
+ }
+ });
+
+ // Reader
+ ManagedScanFramework.ReaderFactory readerFactory = new IPFSJSONReaderFactory(subScan);
+ builder.setReaderFactory(readerFactory);
+ builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ return builder;
+ }
+
+ private static class IPFSJSONReaderFactory implements ReaderFactory {
+
+ private final IPFSSubScan subScan;
+ private int count;
+
+ public IPFSJSONReaderFactory(IPFSSubScan subScan) {
+ this.subScan = subScan;
+ this.count = 0;
+ }
+
+ @Override
+ public void bind(ManagedScanFramework framework) {
+ }
+
+ @Override
+ public ManagedReader next() {
+
+ List scanSpecList = subScan.getIPFSSubScanSpecList();
+ if (count < scanSpecList.size()) {
+ Multihash block = scanSpecList.get(count);
+ count++;
+ return new IPFSJSONReader(subScan.getIPFSContext(), block);
+ } else {
+ return null;
+ }
+ }
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
new file mode 100644
index 00000000000..2ca3a06d0c7
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.InvalidParameterException;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+@JsonTypeName("IPFSScanSpec")
+public class IPFSScanSpec {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSScanSpec.class);
+
+ public enum Prefix {
+ @JsonProperty("ipfs")
+ IPFS("ipfs"),
+ @JsonProperty("ipns")
+ IPNS("ipns");
+
+ @JsonProperty("prefix")
+ private final String name;
+
+ Prefix(String prefix) {
+ this.name = prefix;
+ }
+
+ @Override
+ public String toString() {
+ return this.name;
+ }
+
+ @JsonCreator
+ public static Prefix of(String what) {
+ switch (what) {
+ case "ipfs":
+ return IPFS;
+ case "ipns":
+ return IPNS;
+ default:
+ throw new InvalidParameterException("Unsupported prefix: " + what);
+ }
+ }
+ }
+
+ public enum Format {
+ @JsonProperty("json")
+ JSON("json"),
+ @JsonProperty("csv")
+ CSV("csv");
+
+ @JsonProperty("format")
+ private final String name;
+
+ Format(String prefix) {
+ this.name = prefix;
+ }
+
+ @Override
+ public String toString() {
+ return this.name;
+ }
+
+ @JsonCreator
+ public static Format of(String what) {
+ switch (what) {
+ case "json":
+ return JSON;
+ case "csv":
+ return CSV;
+ default:
+ throw new InvalidParameterException("Unsupported format: " + what);
+ }
+ }
+ }
+
+ public static Set formats = ImmutableSet.of("json", "csv");
+ private Prefix prefix;
+ private String path;
+ private Format formatExtension;
+ private final IPFSContext ipfsContext;
+
+ @JsonCreator
+ public IPFSScanSpec(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("IPFSStoragePluginConfig") IPFSStoragePluginConfig ipfsStoragePluginConfig,
+ @JsonProperty("prefix") Prefix prefix,
+ @JsonProperty("format") Format format,
+ @JsonProperty("path") String path) {
+ this.ipfsContext = registry.resolve(ipfsStoragePluginConfig, IPFSStoragePlugin.class).getIPFSContext();
+ this.prefix = prefix;
+ this.formatExtension = format;
+ this.path = path;
+ }
+
+ public IPFSScanSpec(IPFSContext ipfsContext, String path) {
+ this.ipfsContext = ipfsContext;
+ parsePath(path);
+ }
+
+ private void parsePath(String path) {
+ // IPFS CIDs can be encoded in various bases, see https://github.com/multiformats/multibase/blob/master/multibase.csv
+ // Base64-encoded CIDs should not be present in a path since it can contain the '/' character.
+ // [a-zA-Z0-9] should be enough to cover the other bases.
+ Pattern tableNamePattern = Pattern.compile("^/(ipfs|ipns)/([a-zA-Z0-9]+(/[^#]+)*)(?:#(\\w+))?$");
+ Matcher matcher = tableNamePattern.matcher(path);
+ if (!matcher.matches()) {
+ throw UserException
+ .validationError()
+ .message("Invalid IPFS path in query string. Use paths of pattern " +
+ "`/scheme/hashpath#format`, where scheme:= \"ipfs\"|\"ipns\", " +
+ "hashpath:= HASH [\"/\" path], HASH is IPFS Base58 encoded hash, " +
+ "path:= TEXT [\"/\" path], format:= \"json\"|\"csv\"")
+ .build(logger);
+ }
+
+ String prefix = matcher.group(1);
+ String hashPath = matcher.group(2);
+ String formatExtension = matcher.group(4);
+ if (formatExtension == null) {
+ formatExtension = "_FORMAT_OMITTED_";
+ }
+
+ logger.debug("prefix {}, hashPath {}, format {}", prefix, hashPath, formatExtension);
+
+ this.path = hashPath;
+ this.prefix = Prefix.of(prefix);
+ try {
+ this.formatExtension = Format.of(formatExtension);
+ } catch (InvalidParameterException e) {
+ //if format is omitted or not valid, try resolve it from file extension in the path
+ Pattern fileExtensionPattern = Pattern.compile("^.*\\.(\\w+)$");
+ Matcher fileExtensionMatcher = fileExtensionPattern.matcher(hashPath);
+ if (fileExtensionMatcher.matches()) {
+ this.formatExtension = Format.of(fileExtensionMatcher.group(1));
+ logger.debug("extracted format from query: {}", this.formatExtension);
+ } else {
+ logger.debug("failed to extract format from path: {}", hashPath);
+ throw UserException
+ .validationError()
+ .message("File format is missing and cannot be extracted from query: %s. " +
+ "Please specify file format explicitly by appending `#csv` or `#json`, etc, to the IPFS path.", hashPath)
+ .build(logger);
+ }
+ }
+ }
+
+ /**
+ * Resolve target hash from IPFS/IPNS paths.
+ * e.g. /ipfs/hash/path/file will be resolved to /ipfs/file_hash
+ *
+ * @param helper IPFS helper
+ * @return the resolved target hash
+ */
+ @JsonProperty
+ public Multihash getTargetHash(IPFSHelper helper) {
+ try {
+ Multihash topHash = helper.resolve(prefix.toString(), path, true);
+ if (topHash == null) {
+ throw UserException.validationError().message("Non-existent IPFS path: %s", toString()).build(logger);
+ }
+ return topHash;
+ } catch (Exception e) {
+ throw UserException
+ .executionError(e)
+ .message("Unable to resolve IPFS path; is it a valid IPFS path?")
+ .build(logger);
+ }
+ }
+
+ @JsonProperty
+ public Prefix getPrefix() {
+ return prefix;
+ }
+
+ @JsonProperty
+ public String getPath() {
+ return path;
+ }
+
+ @JsonProperty
+ public Format getFormatExtension() {
+ return formatExtension;
+ }
+
+ @JsonIgnore
+ public IPFSContext getIPFSContext() {
+ return ipfsContext;
+ }
+
+ @JsonProperty("IPFSStoragePluginConfig")
+ public IPFSStoragePluginConfig getIPFSStoragePluginConfig() {
+ return ipfsContext.getStoragePluginConfig();
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("prefix", prefix)
+ .field("path", path)
+ .field("format", formatExtension)
+ .toString();
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java
new file mode 100644
index 00000000000..1261896123a
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class IPFSSchemaFactory implements SchemaFactory {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSSchemaFactory.class);
+
+ final String schemaName;
+ final IPFSContext context;
+
+ public IPFSSchemaFactory(IPFSContext context, String name) {
+ this.context = context;
+ this.schemaName = name;
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+ logger.debug("registerSchemas {}", schemaName);
+ IPFSTables schema = new IPFSTables(schemaName);
+ SchemaPlus hPlus = parent.add(schemaName, schema);
+ schema.setHolder(hPlus);
+ }
+
+ class IPFSTables extends AbstractSchema {
+ private final Set tableNames = Sets.newHashSet();
+ private final ConcurrentMap tables = new ConcurrentSkipListMap<>(String::compareToIgnoreCase);
+
+ public IPFSTables(String name) {
+ super(ImmutableList.of(), name);
+ tableNames.add(name);
+ }
+
+ public void setHolder(SchemaPlus plusOfThis) {
+ }
+
+ @Override
+ public String getTypeName() {
+ return IPFSStoragePluginConfig.NAME;
+ }
+
+ @Override
+ public Set getTableNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Table getTable(String tableName) {
+ //DRILL-7766: handle placeholder table name when the table is yet to create
+ logger.debug("getTable in IPFSTables {}", tableName);
+ if (tableName.equals("create")) {
+ return null;
+ }
+
+ IPFSScanSpec spec = new IPFSScanSpec(context, tableName);
+ return tables.computeIfAbsent(name,
+ n -> new DynamicDrillTable(context.getStoragePlugin(), schemaName, spec));
+ }
+
+ @Override
+ public AbstractSchema getSubSchema(String name) {
+ return null;
+ }
+
+ @Override
+ public Set getSubSchemaNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public boolean isMutable() {
+ logger.debug("IPFS Schema isMutable called");
+ return true;
+ }
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePlugin.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePlugin.java
new file mode 100644
index 00000000000..96eec770d08
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePlugin.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class IPFSStoragePlugin extends AbstractStoragePlugin {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSStoragePlugin.class);
+
+ private final IPFSContext ipfsContext;
+ private final IPFSStoragePluginConfig pluginConfig;
+ private final IPFSSchemaFactory schemaFactory;
+
+ public IPFSStoragePlugin(IPFSStoragePluginConfig config, DrillbitContext context, String name) throws IOException {
+ super(context, name);
+ this.ipfsContext = new IPFSContext(config, this);
+ this.schemaFactory = new IPFSSchemaFactory(this.ipfsContext, name);
+ this.pluginConfig = config;
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return true;
+ }
+
+ @Override
+ public IPFSGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ return getPhysicalScan(userName, selection, (List) null);
+ }
+
+ @Override
+ public IPFSGroupScan getPhysicalScan(String userName, JSONOptions selection, List columns) throws IOException {
+ logger.debug("IPFSStoragePlugin before getPhysicalScan");
+ IPFSScanSpec spec = selection.getListWith(new ObjectMapper(), new TypeReference() {});
+ logger.debug("IPFSStoragePlugin getPhysicalScan with selection {}, columns {}", selection, columns);
+ return new IPFSGroupScan(ipfsContext, spec, columns);
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+ schemaFactory.registerSchemas(schemaConfig, parent);
+ }
+
+ @Override
+ public IPFSStoragePluginConfig getConfig() {
+ return pluginConfig;
+ }
+
+ public IPFSContext getIPFSContext() {
+ return ipfsContext;
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
new file mode 100644
index 00000000000..f0b358e5b3a
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.drill.shaded.guava.com.google.common.base.Objects;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase {
+ public static final String NAME = "ipfs";
+
+ @JsonProperty
+ private final String host;
+
+ @JsonProperty
+ private final int port;
+
+ @JsonProperty("max-nodes-per-leaf")
+ private final int maxNodesPerLeaf;
+
+ @JsonProperty("distributed-mode")
+ private final boolean distributedMode;
+
+ @JsonProperty("ipfs-timeouts")
+ private final Map ipfsTimeouts;
+
+ @JsonProperty("ipfs-caches")
+ private final Map ipfsCaches;
+
+ @JsonIgnore
+ public static final Map ipfsTimeoutDefaults = ImmutableMap.of(
+ IPFSTimeOut.FIND_PROV, 4,
+ IPFSTimeOut.FIND_PEER_INFO, 4,
+ IPFSTimeOut.FETCH_DATA, 6
+ );
+
+ @JsonIgnore
+ public static final Map ipfsCacheDefaults = ImmutableMap.of(
+ IPFSCacheType.PEER, new IPFSCache(1000, 600),
+ IPFSCacheType.PROVIDER, new IPFSCache(1000, 600)
+ );
+
+ public enum IPFSTimeOut {
+ @JsonProperty("find-provider")
+ FIND_PROV("find-provider"),
+ @JsonProperty("find-peer-info")
+ FIND_PEER_INFO("find-peer-info"),
+ @JsonProperty("fetch-data")
+ FETCH_DATA("fetch-data");
+
+ @JsonProperty("type")
+ private final String which;
+
+ IPFSTimeOut(String which) {
+ this.which = which;
+ }
+
+ @JsonCreator
+ public static IPFSTimeOut of(String which) {
+ switch (which) {
+ case "find-provider":
+ return FIND_PROV;
+ case "find-peer-info":
+ return FIND_PEER_INFO;
+ case "fetch-data":
+ return FETCH_DATA;
+ default:
+ throw new InvalidParameterException("Unknown key for IPFS timeout config entry: " + which);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return this.which;
+ }
+ }
+
+ public enum IPFSCacheType {
+ @JsonProperty("peer")
+ PEER("peer"),
+ @JsonProperty("provider")
+ PROVIDER("provider");
+
+ @JsonProperty("type")
+ private final String which;
+
+ IPFSCacheType(String which) {
+ this.which = which;
+ }
+
+ @JsonCreator
+ public static IPFSCacheType of(String which) {
+ switch (which) {
+ case "peer":
+ return PEER;
+ case "provider":
+ return PROVIDER;
+ default:
+ throw new InvalidParameterException("Unknown key for cache config entry: " + which);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return this.which;
+ }
+ }
+
+ public static class IPFSCache {
+ @JsonProperty
+ public final int size;
+ @JsonProperty
+ public final int ttl;
+
+ @JsonCreator
+ public IPFSCache(@JsonProperty("size") int size, @JsonProperty("ttl") int ttl) {
+ Preconditions.checkArgument(size >= 0 && ttl > 0);
+ this.size = size;
+ this.ttl = ttl;
+ }
+ }
+
+ @JsonProperty("groupscan-worker-threads")
+ private final int numWorkerThreads;
+
+ @JsonProperty
+ private final Map formats;
+
+ @JsonCreator
+ public IPFSStoragePluginConfig(
+ @JsonProperty("host") String host,
+ @JsonProperty("port") int port,
+ @JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
+ @JsonProperty("distributed-mode") boolean distributedMode,
+ @JsonProperty("ipfs-timeouts") Map ipfsTimeouts,
+ @JsonProperty("ipfs-caches") Map ipfsCaches,
+ @JsonProperty("groupscan-worker-threads") int numWorkerThreads,
+ @JsonProperty("formats") Map formats) {
+ this.host = host;
+ this.port = port;
+ this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
+ this.distributedMode = distributedMode;
+ this.ipfsTimeouts = applyDefaultMap(ipfsTimeouts, ipfsTimeoutDefaults);
+ this.ipfsCaches = applyDefaultMap(ipfsCaches, ipfsCacheDefaults);
+ this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1;
+ this.formats = formats;
+ }
+
+ private static Map applyDefaultMap(Map supplied, Map defaults) {
+ Map ret;
+ if (supplied == null) {
+ ret = defaults;
+ } else {
+ ret = Maps.newHashMap();
+ supplied.forEach(ret::put);
+ defaults.forEach(ret::putIfAbsent);
+ }
+ return ret;
+ }
+
+ @JsonProperty
+ public String getHost() {
+ return host;
+ }
+
+ @JsonProperty
+ public int getPort() {
+ return port;
+ }
+
+ @JsonProperty("max-nodes-per-leaf")
+ public int getMaxNodesPerLeaf() {
+ return maxNodesPerLeaf;
+ }
+
+ @JsonProperty("distributed-mode")
+ public boolean isDistributedMode() {
+ return distributedMode;
+ }
+
+ @JsonIgnore
+ public int getIPFSTimeout(IPFSTimeOut which) {
+ return ipfsTimeouts.get(which);
+ }
+
+ @JsonIgnore
+ public IPFSCache getIPFSCache(IPFSCacheType which) {
+ return ipfsCaches.get(which);
+ }
+
+ @JsonProperty("ipfs-timeouts")
+ public Map getIPFSTimeouts() {
+ return ipfsTimeouts;
+ }
+
+ @JsonProperty("ipfs-caches")
+ public Map getIPFSCaches() {
+ return ipfsCaches;
+ }
+
+ @JsonProperty("groupscan-worker-threads")
+ public int getNumWorkerThreads() {
+ return numWorkerThreads;
+ }
+
+ @JsonProperty
+ public Map getFormats() {
+ return formats;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(host, port, maxNodesPerLeaf, distributedMode, ipfsTimeouts, ipfsCaches, formats);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ IPFSStoragePluginConfig other = (IPFSStoragePluginConfig) obj;
+ return Objects.equal(formats, other.formats)
+ && Objects.equal(host, other.host)
+ && Objects.equal(ipfsTimeouts, other.ipfsTimeouts)
+ && Objects.equal(ipfsCaches, other.ipfsTimeouts)
+ && port == other.port
+ && maxNodesPerLeaf == other.maxNodesPerLeaf
+ && distributedMode == other.distributedMode
+ && numWorkerThreads == other.numWorkerThreads;
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java
new file mode 100644
index 00000000000..c34840f1f65
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.ipfs.cid.Cid;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+@JsonTypeName("ipfs-sub-scan")
+public class IPFSSubScan extends AbstractBase implements SubScan {
+ private final IPFSContext ipfsContext;
+ private final List ipfsSubScanSpecList;
+ private final IPFSScanSpec.Format format;
+ private final List columns;
+
+
+ @JsonCreator
+ public IPFSSubScan(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("IPFSStoragePluginConfig") IPFSStoragePluginConfig ipfsStoragePluginConfig,
+ @JsonProperty("IPFSSubScanSpec") @JsonDeserialize(using = MultihashDeserializer.class) List ipfsSubScanSpecList,
+ @JsonProperty("format") IPFSScanSpec.Format format,
+ @JsonProperty("columns") List columns
+ ) {
+ super((String) null);
+ IPFSStoragePlugin plugin = registry.resolve(ipfsStoragePluginConfig, IPFSStoragePlugin.class);
+ ipfsContext = plugin.getIPFSContext();
+ this.ipfsSubScanSpecList = ipfsSubScanSpecList;
+ this.format = format;
+ this.columns = columns;
+ }
+
+ public IPFSSubScan(IPFSContext ipfsContext, List ipfsSubScanSpecList, IPFSScanSpec.Format format, List columns) {
+ super((String) null);
+ this.ipfsContext = ipfsContext;
+ this.ipfsSubScanSpecList = ipfsSubScanSpecList;
+ this.format = format;
+ this.columns = columns;
+ }
+
+ @JsonIgnore
+ public IPFSContext getIPFSContext() {
+ return ipfsContext;
+ }
+
+ @JsonProperty("IPFSStoragePluginConfig")
+ public IPFSStoragePluginConfig getIPFSStoragePluginConfig() {
+ return ipfsContext.getStoragePluginConfig();
+ }
+
+ @JsonProperty("columns")
+ public List getColumns() {
+ return columns;
+ }
+
+ @JsonProperty("format")
+ public IPFSScanSpec.Format getFormat() {
+ return format;
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("scan spec", ipfsSubScanSpecList)
+ .field("format", format)
+ .field("columns", columns)
+ .toString();
+ }
+
+ @JsonSerialize(using = MultihashSerializer.class)
+ @JsonProperty("IPFSSubScanSpec")
+ public List getIPFSSubScanSpecList() {
+ return ipfsSubScanSpecList;
+ }
+
+ @Override
+ public T accept(
+ PhysicalVisitor physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public Iterator iterator() {
+ return ImmutableSet.of().iterator();
+ }
+
+ @Override
+ public int getOperatorType() {
+ return UserBitShared.CoreOperatorType.IPFS_SUB_SCAN_VALUE;
+ }
+
+ @Override
+ public boolean isExecutable() {
+ return false;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List children) {
+ return new IPFSSubScan(ipfsContext, ipfsSubScanSpecList, format, columns);
+ }
+
+ public static class IPFSSubScanSpec {
+ private final String targetHash;
+
+ @JsonCreator
+ public IPFSSubScanSpec(@JsonProperty("targetHash") String targetHash) {
+ this.targetHash = targetHash;
+ }
+
+ @JsonProperty
+ public String getTargetHash() {
+ return targetHash;
+ }
+ }
+
+ static class MultihashSerializer extends JsonSerializer> {
+
+ @Override
+ public void serialize(List value, JsonGenerator jgen,
+ SerializerProvider provider) throws IOException {
+ jgen.writeStartArray();
+ for (Multihash hash : value) {
+ jgen.writeString(hash.toString());
+ }
+ jgen.writeEndArray();
+ }
+ }
+
+ static class MultihashDeserializer extends JsonDeserializer> {
+ @Override
+ public List deserialize(JsonParser jp, DeserializationContext ctxt)
+ throws IOException {
+ assert jp.currentToken() == JsonToken.START_ARRAY;
+
+ List multihashList = new ArrayList<>();
+ while (jp.nextToken() != JsonToken.END_ARRAY) {
+ String hash = jp.getValueAsString();
+ multihashList.add(Cid.decode(hash));
+ }
+ return multihashList;
+ }
+ }
+}
diff --git a/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 00000000000..74bca8e6f7f
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,30 @@
+{
+ "storage": {
+ "ipfs": {
+ "type": "ipfs",
+ "host": "127.0.0.1",
+ "port": 5001,
+ "max-nodes-per-leaf": 3,
+ "distributed-mode": false,
+ "ipfs-timeouts": {
+ "find-provider": 4,
+ "find-peer-info": 4,
+ "fetch-data": 5
+ },
+ "ipfs-caches": {
+ "peer": {
+ "size": 100,
+ "ttl": 600
+ },
+ "provider": {
+ "size": 1000,
+ "ttl": 600
+ }
+ },
+ "groupscan-worker-threads": 50,
+ "formats": null,
+ "enabled": false
+ }
+ }
+}
+
\ No newline at end of file
diff --git a/contrib/storage-ipfs/src/main/resources/drill-module.conf b/contrib/storage-ipfs/src/main/resources/drill-module.conf
new file mode 100644
index 00000000000..f664e6dd257
--- /dev/null
+++ b/contrib/storage-ipfs/src/main/resources/drill-module.conf
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill: {
+ classpath.scanning: {
+ packages += "org.apache.drill.exec.store.ipfs"
+ }
+}
\ No newline at end of file
diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestBase.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestBase.java
new file mode 100644
index 00000000000..dc6b05bf674
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestBase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IPFSTestBase extends ClusterTest {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSTestBase.class);
+ private static StoragePluginRegistry pluginRegistry;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ builder.configProperty(ExecConstants.INITIAL_BIT_PORT, IPFSGroupScan.DEFAULT_CONTROL_PORT)
+ .configProperty(ExecConstants.INITIAL_DATA_PORT, IPFSGroupScan.DEFAULT_DATA_PORT)
+ .configProperty(ExecConstants.INITIAL_USER_PORT, IPFSGroupScan.DEFAULT_USER_PORT)
+ .configProperty(ExecConstants.DRILL_PORT_HUNT, false)
+ .configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true)
+ .clusterSize(1)
+ .withLocalZk();
+ startCluster(builder);
+ pluginRegistry = cluster.drillbit().getContext().getStorage();
+
+ IPFSTestSuit.initIPFS();
+ initIPFSStoragePlugin();
+ }
+
+ private static void initIPFSStoragePlugin() throws Exception {
+ pluginRegistry
+ .put(
+ IPFSStoragePluginConfig.NAME,
+ IPFSTestSuit.getIpfsStoragePluginConfig());
+ }
+
+ @AfterClass
+ public static void tearDownIPFSTestBase() throws StoragePluginRegistry.PluginException {
+ if (pluginRegistry != null) {
+ pluginRegistry.remove(IPFSStoragePluginConfig.NAME);
+ } else {
+ logger.warn("Plugin Registry was null");
+ }
+ }
+}
\ No newline at end of file
diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestConstants.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestConstants.java
new file mode 100644
index 00000000000..bd1a1ca2d39
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestConstants.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.cid.Cid;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface IPFSTestConstants {
+ String MOCK_NODE_ID_STRING = "QmP14kRKf1mR6LAYgfuuMirscgZNYbzMCHQ1ebe4bBKdah";
+ Multihash MOCK_NODE_ID_MULTIHASH = Multihash.fromBase58(MOCK_NODE_ID_STRING);
+ String MOCK_NODE_ADDR = "127.0.0.1";
+ int MOCK_NODE_IPFS_SWARM_PORT = 4001;
+ int MOCK_NODE_IPFS_API_PORT = 5001;
+ List MOCK_NODE_ADDRS = ImmutableList.of(
+ String.format("/ip4/%s/tcp/%d/ipfs/%s", MOCK_NODE_ADDR, MOCK_NODE_IPFS_SWARM_PORT, MOCK_NODE_ID_STRING)
+ );
+ List MOCK_NODE_MULTIADDRS = MOCK_NODE_ADDRS.stream().map(MultiAddress::new).collect(Collectors.toList());
+
+ String SIMPLE_DATASET_HASH_STRING = "QmcbeavnEofA6NjG7vkpe1yLJo6En6ML4JnDooDn1BbKmR";
+ Multihash SIMPLE_DATASET_MULTIHASH = Multihash.fromBase58(SIMPLE_DATASET_HASH_STRING);
+ Cid SIMPLE_DATASET_CID_V1 = Cid.build(1, Cid.Codec.DagProtobuf, SIMPLE_DATASET_MULTIHASH);
+ String SIMPLE_DATASET_CID_V1_STRING = SIMPLE_DATASET_CID_V1.toString();
+
+ /**
+ * Chunked dataset layout:
+ * top object: QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo
+ * +-- 1 QmSmDFd1GcLPyYtscdtkBCj7gbNKiJ8MkaBPEFMz9orPEi chunked-json-1.json (162 bytes)
+ * +-- 2 QmQVBWTZ7MZjwHv5q9qG3zLzczsh8PGAVRWhF2gKsrj1hP chunked-json-2.json (159 bytes)
+ * +-- 3 QmY8ghdB3mwdUAdBmft3bdgzPVcq8bCvtqTRd9wu3LjyTd chunked-json-3.json (89 bytes)
+ */
+ String CHUNKED_DATASET_HASH_STRING = "QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo";
+ Multihash CHUNKED_DATASET_MULTIHASH = Multihash.fromBase58(CHUNKED_DATASET_HASH_STRING);
+ Map CHUNKS_MULTIHASH = ImmutableMap.of(
+ "chunked-json-1.json", Multihash.fromBase58("QmSmDFd1GcLPyYtscdtkBCj7gbNKiJ8MkaBPEFMz9orPEi"),
+ "chunked-json-2.json", Multihash.fromBase58("QmQVBWTZ7MZjwHv5q9qG3zLzczsh8PGAVRWhF2gKsrj1hP"),
+ "chunked-json-3.json", Multihash.fromBase58("QmY8ghdB3mwdUAdBmft3bdgzPVcq8bCvtqTRd9wu3LjyTd")
+ );
+
+ static String getQueryPath(Multihash dataset) {
+ return IPFSTestConstants.getQueryPath(dataset.toString());
+ }
+
+ static String getQueryPath(String dataset) {
+ return String.format("/ipfs/%s#json", dataset);
+ }
+}
diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestDataGenerator.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestDataGenerator.java
new file mode 100644
index 00000000000..9304159bb53
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestDataGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Optional;
+
+public class IPFSTestDataGenerator {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSTestDataGenerator.class);
+
+ public static Multihash importSimple(IPFSStoragePluginConfig config) {
+ try {
+ final IPFS client = new IPFS(config.getHost(), config.getPort());
+ File testFile = new File(Resources.getResource("simple.json").toURI());
+ return addObject(client, Files.readAllBytes(testFile.toPath()));
+ } catch (Exception e) {
+ logger.error("Failed to import data: %s", e);
+ return null;
+ }
+ }
+
+ public static Multihash importChunked(IPFSStoragePluginConfig config) {
+ try {
+ final IPFS client = new IPFS(config.getHost(), config.getPort());
+
+ Multihash base = IPFSHelper.IPFS_NULL_OBJECT;
+ for (int i = 1; i <= 3; i++) {
+ File testFile = new File(Resources.getResource(String.format("chunked-json-%d.json", i)).toURI());
+ Multihash chunk = addObject(client, Files.readAllBytes(testFile.toPath()));
+ base = addLink(client, base, String.format("%d", i), chunk);
+ }
+
+ return base;
+ } catch (Exception e) {
+ logger.error("Failed to import data: %s", e);
+ return null;
+ }
+ }
+
+ private static Multihash addObject(IPFS client, byte[] data) throws IOException {
+ MerkleNode node = client.object.patch(IPFSHelper.IPFS_NULL_OBJECT, "set-data", Optional.of(data), Optional.empty(), Optional.empty());
+ return node.hash;
+ }
+
+ private static Multihash addLink(IPFS client, Multihash base, String name, Multihash referent) throws IOException {
+ MerkleNode node = client.object.patch(base, "add-link", Optional.empty(), Optional.of(name), Optional.of(referent));
+ return node.hash;
+ }
+}
diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestSuit.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestSuit.java
new file mode 100644
index 00000000000..a2be3184b3e
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestSuit.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.categories.IPFSStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({TestIPFSQueries.class, TestIPFSGroupScan.class, TestIPFSScanSpec.class})
+@Category({SlowTest.class, IPFSStorageTest.class})
+public class IPFSTestSuit {
+ private static final Logger logger = LoggerFactory.getLogger(IPFSTestSuit.class);
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private static IPFSStoragePluginConfig ipfsStoragePluginConfig = null;
+
+ @BeforeClass
+ public static void initIPFS() {
+ try {
+ JsonNode storagePluginJson = mapper.readTree(new File(Resources.getResource("bootstrap-storage-plugins.json").toURI()));
+ ipfsStoragePluginConfig = mapper.treeToValue(storagePluginJson.get("storage").get("ipfs"), IPFSStoragePluginConfig.class);
+ ipfsStoragePluginConfig.setEnabled(true);
+ } catch (Exception e) {
+ logger.error("Error initializing IPFS ", e);
+ }
+ }
+
+ public static IPFSStoragePluginConfig getIpfsStoragePluginConfig() {
+ return ipfsStoragePluginConfig;
+ }
+}
diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java
new file mode 100644
index 00000000000..489d0cf4c29
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.api.JSONParser;
+import io.ipfs.api.MerkleNode;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.categories.IPFSStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category({SlowTest.class, IPFSStorageTest.class})
+public class TestIPFSGroupScan extends IPFSTestBase implements IPFSTestConstants {
+ @Mock
+ private IPFS ipfs;
+ @Mock
+ private IPFSCompat ipfsCompat;
+ @Mock
+ private IPFSHelper ipfsHelper;
+ @Mock
+ private IPFSStoragePlugin plugin;
+ @Mock
+ private IPFSPeer myself;
+
+ @Before
+ public void before() {
+
+ ipfs = Mockito.mock(IPFS.class);
+ ipfsCompat = Mockito.mock(IPFSCompat.class);
+ ipfsHelper = Mockito.mock(IPFSHelper.class);
+ plugin = Mockito.mock(IPFSStoragePlugin.class);
+
+ try {
+ IPFSStoragePluginConfig config = IPFSTestSuit.getIpfsStoragePluginConfig();
+
+ Mockito.when(ipfs.id()).thenReturn(ImmutableMap.of(
+ "ID", MOCK_NODE_ID_STRING,
+ "Addresses", MOCK_NODE_ADDRS
+ ));
+
+ IPFSContext context = Mockito.mock(IPFSContext.class);
+ myself = getMockedIPFSPeer(
+ MOCK_NODE_ID_MULTIHASH,
+ MOCK_NODE_MULTIADDRS,
+ true,
+ Optional.of(MOCK_NODE_ADDR)
+ );
+
+ Mockito.when(plugin.getConfig()).thenReturn(config);
+ Mockito.when(plugin.getIPFSContext()).thenReturn(context);
+ Mockito.when(plugin.getContext()).thenReturn(cluster.drillbit().getContext());
+ Mockito.when(context.getMyself()).thenReturn(myself);
+ Mockito.when(context.getIPFSHelper()).thenReturn(ipfsHelper);
+ Mockito.when(context.getStoragePlugin()).thenReturn(plugin);
+ Mockito.when(context.getStoragePluginConfig()).thenReturn(config);
+ Mockito.when(context.getIPFSClient()).thenReturn(ipfs);
+ Mockito.when(context.getIPFSPeerCache()).thenReturn(
+ CacheBuilder.newBuilder()
+ .maximumSize(1)
+ .build(CacheLoader.from(key -> {
+ if (myself.getId().equals(key)) {
+ return myself;
+ } else {
+ return null;
+ }
+ })
+ ));
+ Mockito.when(context.getProviderCache()).thenReturn(
+ CacheBuilder.newBuilder()
+ .maximumSize(1)
+ .build(CacheLoader.from(key -> ImmutableList.of(myself.getId())))
+ );
+ Mockito.when(ipfsHelper.getClient()).thenReturn(ipfs);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSimpleDatasetWithNoAnyOtherProviders() {
+ try {
+ Mockito.when(ipfsHelper.getObjectLinksTimeout(Mockito.any(Multihash.class))).thenReturn(new MerkleNode(SIMPLE_DATASET_HASH_STRING));
+ Mockito.when(ipfsHelper.findprovsTimeout(Mockito.any(Multihash.class))).thenReturn(ImmutableList.of(MOCK_NODE_ID_MULTIHASH));
+ //called in IPFSScanSpec.getTargetHash
+ Mockito.when(ipfsHelper.resolve(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean())).thenReturn(SIMPLE_DATASET_MULTIHASH);
+ Mockito.when(ipfsHelper.isDrillReady(Mockito.any(Multihash.class))).thenReturn(true);
+ Mockito.when(ipfsHelper.findpeerTimeout(Mockito.any(Multihash.class))).thenReturn(MOCK_NODE_MULTIADDRS);
+
+ File simpleDataset = new File(Resources.getResource("simple.json").toURI());
+ byte[] contents = Files.readAllBytes(simpleDataset.toPath());
+ Mockito.when(ipfsHelper.getObjectDataTimeout(Mockito.any(Multihash.class))).thenReturn(contents);
+
+ IPFSContext context = plugin.getIPFSContext();
+ IPFSGroupScan groupScan = new IPFSGroupScan(context, new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH)), null);
+ Map map = groupScan.getLeafPeerMappings(SIMPLE_DATASET_MULTIHASH);
+ assertEquals(map.keySet().size(), 1);
+ assertEquals(map.get(SIMPLE_DATASET_MULTIHASH), myself);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testChunkedDatasetWithNoAnyOtherProviders() {
+ try {
+ Mockito.when(ipfsHelper.getObjectLinksTimeout(CHUNKED_DATASET_MULTIHASH)).thenReturn(MerkleNode.fromJSON(JSONParser.parse("{\"Hash\":\"QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo\",\"Links\":[{\"Name\":\"1\",\"Hash\":\"QmSmDFd1GcLPyYtscdtkBCj7gbNKiJ8MkaBPEFMz9orPEi\",\"Size\":162},{\"Name\":\"2\",\"Hash\":\"QmQVBWTZ7MZjwHv5q9qG3zLzczsh8PGAVRWhF2gKsrj1hP\",\"Size\":159},{\"Name\":\"3\",\"Hash\":\"QmY8ghdB3mwdUAdBmft3bdgzPVcq8bCvtqTRd9wu3LjyTd\",\"Size\":89}]}\n")));
+ Mockito.when(ipfsHelper.findprovsTimeout(Mockito.any(Multihash.class))).thenReturn(ImmutableList.of(MOCK_NODE_ID_MULTIHASH));
+ Mockito.when(ipfsHelper.resolve(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean())).thenReturn(CHUNKED_DATASET_MULTIHASH);
+ Mockito.when(ipfsHelper.isDrillReady(Mockito.any(Multihash.class))).thenReturn(true);
+ Mockito.when(ipfsHelper.findpeerTimeout(Mockito.any(Multihash.class))).thenReturn(MOCK_NODE_MULTIADDRS);
+ for (Map.Entry entry : CHUNKS_MULTIHASH.entrySet()) {
+ File chunkFile = new File(Resources.getResource(entry.getKey()).toURI());
+ Mockito.when(ipfsHelper.getObjectDataTimeout(entry.getValue())).thenReturn(Files.readAllBytes(chunkFile.toPath()));
+ Mockito.when(ipfsHelper.getObjectLinksTimeout(entry.getValue())).thenReturn(new MerkleNode(entry.getValue().toBase58()));
+ }
+
+ IPFSContext context = plugin.getIPFSContext();
+ IPFSGroupScan groupScan = new IPFSGroupScan(context, new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH)), null);
+ Map map = groupScan.getLeafPeerMappings(CHUNKED_DATASET_MULTIHASH);
+ assertEquals(map.keySet().size(), 3);
+ for (Map.Entry entry : CHUNKS_MULTIHASH.entrySet()) {
+ assertEquals(map.get(entry.getValue()), myself);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ private IPFSPeer getMockedIPFSPeer(Multihash multihashId, List addrs, boolean isDrillReady,
+ Optional drillbitAddress) {
+ IPFSPeer peer = Mockito.mock(IPFSPeer.class);
+ Mockito.when(peer.getId()).thenReturn(multihashId);
+ Mockito.when(peer.getMultiAddresses()).thenReturn(addrs);
+ Mockito.when(peer.getDrillbitAddress()).thenReturn(drillbitAddress);
+ Mockito.when(peer.hasDrillbitAddress()).thenReturn(drillbitAddress.isPresent());
+ Mockito.when(peer.toString()).thenReturn(multihashId.toBase58());
+
+ return peer;
+ }
+}
diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java
new file mode 100644
index 00000000000..b1fdda7f50b
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.categories.IPFSStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.fail;
+
+@Ignore("Requires running local IPFS daemon")
+@Category({SlowTest.class, IPFSStorageTest.class})
+public class TestIPFSQueries extends IPFSTestBase implements IPFSTestConstants {
+
+ @Before
+ public void checkDrillbitPorts() {
+ CoordinationProtos.DrillbitEndpoint ep = cluster.drillbit().getRegistrationHandle().getEndPoint();
+ int controlPort = ep.getControlPort();
+ int userPort = ep.getUserPort();
+ int dataPort = ep.getDataPort();
+ if (controlPort != IPFSGroupScan.DEFAULT_CONTROL_PORT
+ || userPort != IPFSGroupScan.DEFAULT_USER_PORT
+ || dataPort != IPFSGroupScan.DEFAULT_DATA_PORT) {
+ //DRILL-7754 handle non-default ports
+ fail(String.format("Drill binded to non-default ports: %d, %d, %d", controlPort, userPort, dataPort));
+ }
+ }
+
+ @Test
+ public void testNullQuery() throws Exception {
+
+ testBuilder()
+ .sqlQuery(getSelectStar(IPFSHelper.IPFS_NULL_OBJECT))
+ .unOrdered()
+ .expectsNumRecords(1)
+ .go();
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+ Multihash dataset = IPFSTestDataGenerator.importSimple(IPFSTestSuit.getIpfsStoragePluginConfig());
+ if (null == dataset) {
+ fail();
+ }
+
+ testBuilder()
+ .sqlQuery(getSelectStar(dataset))
+ .unOrdered()
+ .expectsNumRecords(1)
+ .go();
+ }
+
+ @Test
+ public void testSimpleCIDv1() throws Exception {
+ Multihash dataset = IPFSTestDataGenerator.importSimple(IPFSTestSuit.getIpfsStoragePluginConfig());
+ if (null == dataset) {
+ fail();
+ }
+
+ testBuilder()
+ .sqlQuery(getSelectStar(SIMPLE_DATASET_CID_V1))
+ .unOrdered()
+ .expectsNumRecords(1)
+ .go();
+ }
+
+ @Test
+ public void testChunked() throws Exception {
+ Multihash dataset = IPFSTestDataGenerator.importChunked(IPFSTestSuit.getIpfsStoragePluginConfig());
+ if (null == dataset) {
+ fail();
+ }
+
+ testBuilder()
+ .sqlQuery(getSelectStar(dataset))
+ .unOrdered()
+ .expectsNumRecords(5)
+ .go();
+ }
+
+ private static String getSelectStar(Multihash dataset) {
+ return String.format("SELECT * FROM ipfs.`%s`", IPFSTestConstants.getQueryPath(dataset));
+ }
+}
diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSScanSpec.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSScanSpec.java
new file mode 100644
index 00000000000..f163e0de69c
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSScanSpec.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import org.apache.drill.categories.IPFSStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.exceptions.UserException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({SlowTest.class, IPFSStorageTest.class})
+public class TestIPFSScanSpec extends IPFSTestBase implements IPFSTestConstants {
+ @Mock
+ private IPFSContext context;
+
+ @Before
+ public void before() {
+ context = Mockito.mock(IPFSContext.class);
+ }
+
+ @Test
+ public void testSimpleDatasetPath() {
+ IPFSScanSpec spec = new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH));
+ assertEquals(spec.getPath(), SIMPLE_DATASET_HASH_STRING);
+ }
+
+ @Test(expected = UserException.class)
+ public void testInvalidPathWithBadPrefix() {
+ IPFSScanSpec spec = new IPFSScanSpec(context, "/root/data/dataset.json");
+ }
+
+ @Test(expected = UserException.class)
+ public void testInvalidPathWithNoExtension() {
+ IPFSScanSpec spec = new IPFSScanSpec(context, String.format("/ipfs/%s", SIMPLE_DATASET_HASH_STRING));
+ }
+
+ @Test
+ public void testPathWithCIDv1() {
+ IPFSScanSpec spec = new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_CID_V1_STRING));
+ assertEquals(spec.getPath(), SIMPLE_DATASET_CID_V1_STRING);
+ }
+
+ @Test
+ public void testChunkedDatasetPath() {
+ IPFSScanSpec spec = new IPFSScanSpec(context, String.format("/ipfs/%s/1#json", CHUNKED_DATASET_HASH_STRING));
+ assertEquals(spec.getPath(), CHUNKED_DATASET_HASH_STRING + "/1");
+ }
+
+ @Test
+ public void testDataFileWithExplicitExtensionName() {
+ IPFSScanSpec spec = new IPFSScanSpec(context, "/ipfs/QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo/1.json");
+ assertEquals(spec.getPath(), "QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo/1.json");
+ }
+}
diff --git a/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json
new file mode 100644
index 00000000000..9d7635606d3
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,30 @@
+{
+ "storage": {
+ "ipfs": {
+ "type": "ipfs",
+ "host": "127.0.0.1",
+ "port": 5001,
+ "max-nodes-per-leaf": 1,
+ "distributed-mode": false,
+ "ipfs-timeouts": {
+ "find-provider": 1,
+ "find-peer-info": 1,
+ "fetch-data": 1
+ },
+ "ipfs-caches": {
+ "peer": {
+ "size": 2,
+ "ttl": 30
+ },
+ "provider": {
+ "size": 2,
+ "ttl": 30
+ }
+ },
+ "groupscan-worker-threads": 5,
+ "formats": null,
+ "enabled": true
+ }
+ }
+}
+
\ No newline at end of file
diff --git a/contrib/storage-ipfs/src/test/resources/chunked-json-1.json b/contrib/storage-ipfs/src/test/resources/chunked-json-1.json
new file mode 100644
index 00000000000..d9b444e3a7f
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/resources/chunked-json-1.json
@@ -0,0 +1,12 @@
+{
+ "name": "Alice",
+ "job": "artist",
+ "age": 30,
+ "sex": "female"
+}
+{
+ "name": "Bob",
+ "job": "butcher",
+ "age": 45,
+ "sex": "male"
+}
diff --git a/contrib/storage-ipfs/src/test/resources/chunked-json-2.json b/contrib/storage-ipfs/src/test/resources/chunked-json-2.json
new file mode 100644
index 00000000000..a92f47043f4
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/resources/chunked-json-2.json
@@ -0,0 +1,12 @@
+{
+ "name": "Cecil",
+ "job": "cook",
+ "age": 32,
+ "sex": "male"
+}
+{
+ "name": "David",
+ "job": "doctor",
+ "age": 28,
+ "sex": "male"
+}
diff --git a/contrib/storage-ipfs/src/test/resources/chunked-json-3.json b/contrib/storage-ipfs/src/test/resources/chunked-json-3.json
new file mode 100644
index 00000000000..00a15d34d45
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/resources/chunked-json-3.json
@@ -0,0 +1,6 @@
+{
+ "name": "Elizabeth",
+ "job": "engineer",
+ "age": 23,
+ "sex": "female"
+}
diff --git a/contrib/storage-ipfs/src/test/resources/simple.json b/contrib/storage-ipfs/src/test/resources/simple.json
new file mode 100644
index 00000000000..463b47c0831
--- /dev/null
+++ b/contrib/storage-ipfs/src/test/resources/simple.json
@@ -0,0 +1 @@
+{"name": "Alice", "job": "artist", "age": 30, "sex": "female"}
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 3f5eeb081bd..26131bd717f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -301,6 +301,11 @@
drill-opentsdb-storage
${project.version}
+
+ org.apache.drill.contrib
+ drill-ipfs-storage
+ ${project.version}
+
org.apache.drill.contrib
drill-mongo-storage
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index b9a2fce4912..f615453b6e3 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -53,6 +53,7 @@
org.apache.drill.contrib:drill-storage-kafka:jar
org.apache.drill.contrib:drill-storage-http:jar
org.apache.drill.contrib:drill-opentsdb-storage:jar
+ org.apache.drill.contrib:drill-ipfs-storage:jar
org.apache.drill.contrib:drill-udfs:jar
org.apache.drill.contrib:drill-druid-storage:jar
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 01a51f07e67..ec8ac31c869 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -697,6 +697,10 @@ public enum CoreOperatorType
* HTTP_SUB_SCAN = 70;
*/
HTTP_SUB_SCAN(70),
+ /**
+ * IPFS_SUB_SCAN = 71;
+ */
+ IPFS_SUB_SCAN(71),
;
/**
@@ -983,6 +987,10 @@ public enum CoreOperatorType
* HTTP_SUB_SCAN = 70;
*/
public static final int HTTP_SUB_SCAN_VALUE = 70;
+ /**
+ * IPFS_SUB_SCAN = 71;
+ */
+ public static final int IPFS_SUB_SCAN_VALUE = 71;
public final int getNumber() {
@@ -1076,6 +1084,7 @@ public static CoreOperatorType forNumber(int value) {
case 68: return DRUID_SUB_SCAN;
case 69: return SPSS_SUB_SCAN;
case 70: return HTTP_SUB_SCAN;
+ case 71: return IPFS_SUB_SCAN;
default: return null;
}
}
@@ -29055,7 +29064,7 @@ public org.apache.drill.exec.proto.UserBitShared.SaslMessage getDefaultInstanceF
"ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" +
"\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" +
"\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" +
- "\032\n\026CANCELLATION_REQUESTED\020\006*\236\013\n\020CoreOper" +
+ "\032\n\026CANCELLATION_REQUESTED\020\006*\261\013\n\020CoreOper" +
"atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" +
"_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" +
"\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" +
@@ -29091,11 +29100,11 @@ public org.apache.drill.exec.proto.UserBitShared.SaslMessage getDefaultInstanceF
"CAN\020?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA" +
"N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO" +
"NTROLLER\020C\022\022\n\016DRUID_SUB_SCAN\020D\022\021\n\rSPSS_S" +
- "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslStat" +
- "us\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020" +
- "SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013" +
- "SASL_FAILED\020\004B.\n\033org.apache.drill.exec.p" +
- "rotoB\rUserBitSharedH\001"
+ "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F\022\021\n\rIPFS_SUB" +
+ "_SCAN\020G*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022" +
+ "\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n" +
+ "\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org." +
+ "apache.drill.exec.protoB\rUserBitSharedH\001"
};
descriptor = com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index f7b7b02d1b6..ad96911613c 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -382,6 +382,7 @@ enum CoreOperatorType {
DRUID_SUB_SCAN = 68;
SPSS_SUB_SCAN = 69;
HTTP_SUB_SCAN = 70;
+ IPFS_SUB_SCAN = 71;
}
/* Registry that contains list of jars, each jar contains its name and list of function signatures.