diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java index f68749d85b9..a9534e95749 100644 --- a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java @@ -55,13 +55,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -116,34 +118,21 @@ private void init() { Multihash topHash = ipfsScanSpec.getTargetHash(ipfsHelper); try { - Map leafAddrMap = getLeafAddrMappings(topHash); - logger.debug("Iterating on {} leaves...", leafAddrMap.size()); - ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator(); - for (Multihash leaf : leafAddrMap.keySet()) { - String peerHostname = leafAddrMap.get(leaf); + Map leafPeerMap = getLeafPeerMappings(topHash); + logger.debug("Iterating on {} leaves...", leafPeerMap.size()); - Optional oep = coordinator.getAvailableEndpoints() - .stream() - .filter(a -> a.getAddress().equals(peerHostname)) - .findAny(); + ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator(); + for (Multihash leaf : leafPeerMap.keySet()) { DrillbitEndpoint ep; - if (oep.isPresent()) { - ep = oep.get(); - logger.debug("Using existing endpoint {}", ep.getAddress()); + if (config.isDistributedMode()) { + String peerHostname = leafPeerMap + .get(leaf) + .getDrillbitAddress() + .orElseThrow(() -> new RuntimeException("Chosen IPFS peer does not have drillbit address")); + ep = registerEndpoint(coordinator, peerHostname); } else { - logger.debug("created new endpoint on the fly {}", peerHostname); - //DRILL-7754: read ports & version info from IPFS instead of hard-coded - ep = DrillbitEndpoint.newBuilder() - .setAddress(peerHostname) - .setUserPort(DEFAULT_USER_PORT) - .setControlPort(DEFAULT_CONTROL_PORT) - .setDataPort(DEFAULT_DATA_PORT) - .setHttpPort(DEFAULT_HTTP_PORT) - .setVersion(DrillVersionInfo.getVersion()) - .setState(DrillbitEndpoint.State.ONLINE) - .build(); - //DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed? - ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep); + // the foreman is used to execute the plan + ep = ipfsContext.getStoragePlugin().getContext().getEndpoint(); } IPFSWork work = new IPFSWork(leaf); @@ -161,15 +150,56 @@ private void init() { } } - Map getLeafAddrMappings(Multihash topHash) { + private DrillbitEndpoint registerEndpoint(ClusterCoordinator coordinator, String peerHostname) { + Optional oep = coordinator.getAvailableEndpoints() + .stream() + .filter(ep -> ep.getAddress().equals(peerHostname)) + .findAny(); + DrillbitEndpoint ep; + if (oep.isPresent()) { + ep = oep.get(); + logger.debug("Using existing endpoint {}", ep.getAddress()); + } else { + logger.debug("created new endpoint on the fly {}", peerHostname); + //DRILL-7754: read ports & version info from IPFS instead of hard-coded + ep = DrillbitEndpoint.newBuilder() + .setAddress(peerHostname) + .setUserPort(DEFAULT_USER_PORT) + .setControlPort(DEFAULT_CONTROL_PORT) + .setDataPort(DEFAULT_DATA_PORT) + .setHttpPort(DEFAULT_HTTP_PORT) + .setVersion(DrillVersionInfo.getVersion()) + .setState(DrillbitEndpoint.State.ONLINE) + .build(); + //DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed? + ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep); + } + + return ep; + } + + Map getLeafPeerMappings(Multihash topHash) { logger.debug("start to recursively expand nested IPFS hashes, topHash={}", topHash); Stopwatch watch = Stopwatch.createStarted(); ForkJoinPool forkJoinPool = new ForkJoinPool(config.getNumWorkerThreads()); - IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, false, ipfsContext); - Map leafAddrMap = forkJoinPool.invoke(topTask); + IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, ipfsContext); + List leaves = forkJoinPool.invoke(topTask); logger.debug("Took {} ms to expand hash leaves", watch.elapsed(TimeUnit.MILLISECONDS)); - return leafAddrMap; + logger.debug("Start to resolve providers"); + watch.reset().start(); + Map leafPeerMap; + if (config.isDistributedMode()) { + leafPeerMap = forkJoinPool.invoke(new IPFSProviderResolver(leaves, ipfsContext)); + } else { + leafPeerMap = new HashMap<>(); + for (Multihash leaf : leaves) { + leafPeerMap.put(leaf, ipfsContext.getMyself()); + } + } + logger.debug("Took {} ms to resolve providers", watch.elapsed(TimeUnit.MILLISECONDS)); + + return leafPeerMap; } private IPFSGroupScan(IPFSGroupScan that) { @@ -330,50 +360,93 @@ public String toString() { } } - //DRILL-7756: detect and warn about loops/recursions in case of a malformed tree - static class IPFSTreeFlattener extends RecursiveTask> { - private final Multihash hash; - private final boolean isProvider; - private final Map ret = new LinkedHashMap<>(); + static class IPFSProviderResolver extends RecursiveTask> { + private final List leaves; + private final Map ret = new LinkedHashMap<>(); private final IPFSPeer myself; private final IPFSHelper helper; private final LoadingCache peerCache; private final LoadingCache> providerCache; - public IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSContext context) { + public IPFSProviderResolver(List leaves, IPFSContext context) { + this(leaves, context.getMyself(), context.getIPFSHelper(), context.getIPFSPeerCache(), context.getProviderCache()); + } + + public IPFSProviderResolver(IPFSProviderResolver reference, List leaves) { + this(leaves, reference.myself, reference.helper, reference.peerCache, reference.providerCache); + } + + IPFSProviderResolver(List leaves, IPFSPeer myself, IPFSHelper helper, LoadingCache peerCache, LoadingCache> providerCache) { + this.leaves = leaves; + this.myself = myself; + this.helper = helper; + this.peerCache = peerCache; + this.providerCache = providerCache; + } + + @Override + protected Map compute() { + int totalLeaves = leaves.size(); + if (totalLeaves == 1) { + Multihash hash = leaves.get(0); + List providers = providerCache.getUnchecked(hash).parallelStream() + .map(peerCache::getUnchecked) + .filter(IPFSPeer::isDrillReady) + .filter(IPFSPeer::hasDrillbitAddress) + .collect(Collectors.toList()); + if (providers.size() < 1) { + logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash); + providers.add(myself); + } + logger.debug("Got {} providers for {} from IPFS", providers.size(), hash); + + //DRILL-7753: better peer selection algorithm + Random random = new Random(); + IPFSPeer chosenPeer = providers.get(random.nextInt(providers.size())); + ret.put(hash, chosenPeer); + logger.debug("Use peer {} for leaf {}", chosenPeer, hash); + return ret; + } + + int firstHalf = totalLeaves / 2; + ImmutableList resolvers = ImmutableList.of( + new IPFSProviderResolver(this, leaves.subList(0, firstHalf)), + new IPFSProviderResolver(this, leaves.subList(firstHalf, totalLeaves)) + ); + resolvers.forEach(ForkJoinTask::fork); + resolvers.reverse().forEach(resolver -> ret.putAll(resolver.join())); + return ret; + } + } + + //DRILL-7756: detect and warn about loops/recursions in case of a malformed tree + static class IPFSTreeFlattener extends RecursiveTask> { + private final Multihash hash; + private final List ret = new LinkedList<>(); + private final IPFSPeer myself; + private final IPFSHelper helper; + + public IPFSTreeFlattener(Multihash hash, IPFSContext context) { this( hash, - isProvider, context.getMyself(), - context.getIPFSHelper(), - context.getIPFSPeerCache(), - context.getProviderCache() + context.getIPFSHelper() ); } - IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSPeer myself, IPFSHelper ipfsHelper, - LoadingCache peerCache, LoadingCache> providerCache) { + IPFSTreeFlattener(Multihash hash, IPFSPeer myself, IPFSHelper ipfsHelper) { this.hash = hash; - this.isProvider = isProvider; this.myself = myself; this.helper = ipfsHelper; - this.peerCache = peerCache; - this.providerCache = providerCache; } - public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash, boolean isProvider) { - this(hash, isProvider, reference.myself, reference.helper, reference.peerCache, reference.providerCache); + public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash) { + this(hash, reference.myself, reference.helper); } @Override - public Map compute() { + public List compute() { try { - if (isProvider) { - IPFSPeer peer = peerCache.getUnchecked(hash); - ret.put(hash, peer.getDrillbitAddress().orElse(null)); - return ret; - } - MerkleNode metaOrSimpleNode = helper.getObjectLinksTimeout(hash); if (metaOrSimpleNode.links.size() > 0) { logger.debug("{} is a meta node", hash); @@ -382,68 +455,19 @@ public Map compute() { ImmutableList.Builder builder = ImmutableList.builder(); for (Multihash intermediate : intermediates.subList(1, intermediates.size())) { - builder.add(new IPFSTreeFlattener(this, intermediate, false)); + builder.add(new IPFSTreeFlattener(this, intermediate)); } ImmutableList subtasks = builder.build(); subtasks.forEach(IPFSTreeFlattener::fork); - IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0), false); - ret.putAll(first.compute()); + IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0)); + ret.addAll(first.compute()); subtasks.reverse().forEach( - subtask -> ret.putAll(subtask.join()) + subtask -> ret.addAll(subtask.join()) ); } else { logger.debug("{} is a simple node", hash); - List providers = providerCache.getUnchecked(hash).stream() - .map(peerCache::getUnchecked) - .collect(Collectors.toList()); - providers = providers.stream() - .filter(IPFSPeer::isDrillReady) - .collect(Collectors.toList()); - if (providers.size() < 1) { - logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash); - providers.add(myself); - } - - logger.debug("Got {} providers for {} from IPFS", providers.size(), hash); - ImmutableList.Builder builder = ImmutableList.builder(); - for (IPFSPeer provider : providers.subList(1, providers.size())) { - builder.add(new IPFSTreeFlattener(this, provider.getId(), true)); - } - ImmutableList subtasks = builder.build(); - subtasks.forEach(IPFSTreeFlattener::fork); - - List possibleAddrs = new ArrayList<>(); - Multihash firstProvider = providers.get(0).getId(); - IPFSTreeFlattener firstTask = new IPFSTreeFlattener(this, firstProvider, true); - String firstAddr = firstTask.compute().get(firstProvider); - if (firstAddr != null) { - possibleAddrs.add(firstAddr); - } - - subtasks.reverse().forEach( - subtask -> { - String addr = subtask.join().get(subtask.hash); - if (addr != null) { - possibleAddrs.add(addr); - } - } - ); - - if (possibleAddrs.size() < 1) { - logger.error("All attempts to find an appropriate provider address for {} have failed", hash); - throw UserException - .planError() - .message("No address found for any provider for leaf " + hash) - .build(logger); - } else { - //DRILL-7753: better peer selection algorithm - Random random = new Random(); - String chosenAddr = possibleAddrs.get(random.nextInt(possibleAddrs.size())); - ret.clear(); - ret.put(hash, chosenAddr); - logger.debug("Got peer host {} for leaf {}", chosenAddr, hash); - } + ret.add(hash); } } catch (IOException e) { throw UserException.planError(e).message("Exception during planning").build(logger); diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java index c34466f7198..f0b358e5b3a 100644 --- a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java @@ -46,6 +46,9 @@ public class IPFSStoragePluginConfig extends StoragePluginConfigBase { @JsonProperty("max-nodes-per-leaf") private final int maxNodesPerLeaf; + @JsonProperty("distributed-mode") + private final boolean distributedMode; + @JsonProperty("ipfs-timeouts") private final Map ipfsTimeouts; @@ -156,6 +159,7 @@ public IPFSStoragePluginConfig( @JsonProperty("host") String host, @JsonProperty("port") int port, @JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf, + @JsonProperty("distributed-mode") boolean distributedMode, @JsonProperty("ipfs-timeouts") Map ipfsTimeouts, @JsonProperty("ipfs-caches") Map ipfsCaches, @JsonProperty("groupscan-worker-threads") int numWorkerThreads, @@ -163,6 +167,7 @@ public IPFSStoragePluginConfig( this.host = host; this.port = port; this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1; + this.distributedMode = distributedMode; this.ipfsTimeouts = applyDefaultMap(ipfsTimeouts, ipfsTimeoutDefaults); this.ipfsCaches = applyDefaultMap(ipfsCaches, ipfsCacheDefaults); this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1; @@ -196,6 +201,11 @@ public int getMaxNodesPerLeaf() { return maxNodesPerLeaf; } + @JsonProperty("distributed-mode") + public boolean isDistributedMode() { + return distributedMode; + } + @JsonIgnore public int getIPFSTimeout(IPFSTimeOut which) { return ipfsTimeouts.get(which); @@ -228,7 +238,7 @@ public Map getFormats() { @Override public int hashCode() { - return Objects.hashCode(host, port, maxNodesPerLeaf, ipfsTimeouts, ipfsCaches, formats); + return Objects.hashCode(host, port, maxNodesPerLeaf, distributedMode, ipfsTimeouts, ipfsCaches, formats); } @Override @@ -246,6 +256,7 @@ public boolean equals(Object obj) { && Objects.equal(ipfsCaches, other.ipfsTimeouts) && port == other.port && maxNodesPerLeaf == other.maxNodesPerLeaf + && distributedMode == other.distributedMode && numWorkerThreads == other.numWorkerThreads; } } diff --git a/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json index 54d8427aa6e..74bca8e6f7f 100644 --- a/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json @@ -5,6 +5,7 @@ "host": "127.0.0.1", "port": 5001, "max-nodes-per-leaf": 3, + "distributed-mode": false, "ipfs-timeouts": { "find-provider": 4, "find-peer-info": 4, diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java index f944aef03c4..489d0cf4c29 100644 --- a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java @@ -130,9 +130,9 @@ public void testSimpleDatasetWithNoAnyOtherProviders() { IPFSContext context = plugin.getIPFSContext(); IPFSGroupScan groupScan = new IPFSGroupScan(context, new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH)), null); - Map map = groupScan.getLeafAddrMappings(SIMPLE_DATASET_MULTIHASH); + Map map = groupScan.getLeafPeerMappings(SIMPLE_DATASET_MULTIHASH); assertEquals(map.keySet().size(), 1); - assertEquals(map.get(SIMPLE_DATASET_MULTIHASH), MOCK_NODE_ADDR); + assertEquals(map.get(SIMPLE_DATASET_MULTIHASH), myself); } catch (Exception e) { fail(e.getMessage()); } @@ -154,10 +154,10 @@ public void testChunkedDatasetWithNoAnyOtherProviders() { IPFSContext context = plugin.getIPFSContext(); IPFSGroupScan groupScan = new IPFSGroupScan(context, new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH)), null); - Map map = groupScan.getLeafAddrMappings(CHUNKED_DATASET_MULTIHASH); + Map map = groupScan.getLeafPeerMappings(CHUNKED_DATASET_MULTIHASH); assertEquals(map.keySet().size(), 3); for (Map.Entry entry : CHUNKS_MULTIHASH.entrySet()) { - assertEquals(map.get(entry.getValue()), MOCK_NODE_ADDR); + assertEquals(map.get(entry.getValue()), myself); } } catch (Exception e) { fail(e.getMessage()); diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java index b1fdda7f50b..ff9e55856ee 100644 --- a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java @@ -24,13 +24,12 @@ import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.proto.CoordinationProtos; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import static org.junit.Assert.fail; -@Ignore("Requires running local IPFS daemon") +//@Ignore("Requires running local IPFS daemon") @Category({SlowTest.class, IPFSStorageTest.class}) public class TestIPFSQueries extends IPFSTestBase implements IPFSTestConstants { diff --git a/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json index 93435666872..9d7635606d3 100644 --- a/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json @@ -5,6 +5,7 @@ "host": "127.0.0.1", "port": 5001, "max-nodes-per-leaf": 1, + "distributed-mode": false, "ipfs-timeouts": { "find-provider": 1, "find-peer-info": 1,