Skip to content

Commit

Permalink
Add manual switch to allow non-distributed execution.
Browse files Browse the repository at this point in the history
  • Loading branch information
dbw9580 committed Sep 5, 2020
1 parent 29fe525 commit 0c34b56
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -116,34 +118,21 @@ private void init() {

Multihash topHash = ipfsScanSpec.getTargetHash(ipfsHelper);
try {
Map<Multihash, String> leafAddrMap = getLeafAddrMappings(topHash);
logger.debug("Iterating on {} leaves...", leafAddrMap.size());
ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator();
for (Multihash leaf : leafAddrMap.keySet()) {
String peerHostname = leafAddrMap.get(leaf);
Map<Multihash, IPFSPeer> leafPeerMap = getLeafPeerMappings(topHash);
logger.debug("Iterating on {} leaves...", leafPeerMap.size());

Optional<DrillbitEndpoint> oep = coordinator.getAvailableEndpoints()
.stream()
.filter(a -> a.getAddress().equals(peerHostname))
.findAny();
ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator();
for (Multihash leaf : leafPeerMap.keySet()) {
DrillbitEndpoint ep;
if (oep.isPresent()) {
ep = oep.get();
logger.debug("Using existing endpoint {}", ep.getAddress());
if (config.isDistributedMode()) {
String peerHostname = leafPeerMap
.get(leaf)
.getDrillbitAddress()
.orElseThrow(() -> new RuntimeException("Chosen IPFS peer does not have drillbit address"));
ep = registerEndpoint(coordinator, peerHostname);
} else {
logger.debug("created new endpoint on the fly {}", peerHostname);
//DRILL-7754: read ports & version info from IPFS instead of hard-coded
ep = DrillbitEndpoint.newBuilder()
.setAddress(peerHostname)
.setUserPort(DEFAULT_USER_PORT)
.setControlPort(DEFAULT_CONTROL_PORT)
.setDataPort(DEFAULT_DATA_PORT)
.setHttpPort(DEFAULT_HTTP_PORT)
.setVersion(DrillVersionInfo.getVersion())
.setState(DrillbitEndpoint.State.ONLINE)
.build();
//DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed?
ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep);
// the foreman is used to execute the plan
ep = ipfsContext.getStoragePlugin().getContext().getEndpoint();
}

IPFSWork work = new IPFSWork(leaf);
Expand All @@ -161,15 +150,56 @@ private void init() {
}
}

Map<Multihash, String> getLeafAddrMappings(Multihash topHash) {
private DrillbitEndpoint registerEndpoint(ClusterCoordinator coordinator, String peerHostname) {
Optional<DrillbitEndpoint> oep = coordinator.getAvailableEndpoints()
.stream()
.filter(ep -> ep.getAddress().equals(peerHostname))
.findAny();
DrillbitEndpoint ep;
if (oep.isPresent()) {
ep = oep.get();
logger.debug("Using existing endpoint {}", ep.getAddress());
} else {
logger.debug("created new endpoint on the fly {}", peerHostname);
//DRILL-7754: read ports & version info from IPFS instead of hard-coded
ep = DrillbitEndpoint.newBuilder()
.setAddress(peerHostname)
.setUserPort(DEFAULT_USER_PORT)
.setControlPort(DEFAULT_CONTROL_PORT)
.setDataPort(DEFAULT_DATA_PORT)
.setHttpPort(DEFAULT_HTTP_PORT)
.setVersion(DrillVersionInfo.getVersion())
.setState(DrillbitEndpoint.State.ONLINE)
.build();
//DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed?
ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep);
}

return ep;
}

Map<Multihash, IPFSPeer> getLeafPeerMappings(Multihash topHash) {
logger.debug("start to recursively expand nested IPFS hashes, topHash={}", topHash);
Stopwatch watch = Stopwatch.createStarted();
ForkJoinPool forkJoinPool = new ForkJoinPool(config.getNumWorkerThreads());
IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, false, ipfsContext);
Map<Multihash, String> leafAddrMap = forkJoinPool.invoke(topTask);
IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, ipfsContext);
List<Multihash> leaves = forkJoinPool.invoke(topTask);
logger.debug("Took {} ms to expand hash leaves", watch.elapsed(TimeUnit.MILLISECONDS));

return leafAddrMap;
logger.debug("Start to resolve providers");
watch.reset().start();
Map<Multihash, IPFSPeer> leafPeerMap;
if (config.isDistributedMode()) {
leafPeerMap = forkJoinPool.invoke(new IPFSProviderResolver(leaves, ipfsContext));
} else {
leafPeerMap = new HashMap<>();
for (Multihash leaf : leaves) {
leafPeerMap.put(leaf, ipfsContext.getMyself());
}
}
logger.debug("Took {} ms to resolve providers", watch.elapsed(TimeUnit.MILLISECONDS));

return leafPeerMap;
}

private IPFSGroupScan(IPFSGroupScan that) {
Expand Down Expand Up @@ -330,50 +360,93 @@ public String toString() {
}
}

//DRILL-7756: detect and warn about loops/recursions in case of a malformed tree
static class IPFSTreeFlattener extends RecursiveTask<Map<Multihash, String>> {
private final Multihash hash;
private final boolean isProvider;
private final Map<Multihash, String> ret = new LinkedHashMap<>();
static class IPFSProviderResolver extends RecursiveTask<Map<Multihash, IPFSPeer>> {
private final List<Multihash> leaves;
private final Map<Multihash, IPFSPeer> ret = new LinkedHashMap<>();
private final IPFSPeer myself;
private final IPFSHelper helper;
private final LoadingCache<Multihash, IPFSPeer> peerCache;
private final LoadingCache<Multihash, List<Multihash>> providerCache;

public IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSContext context) {
public IPFSProviderResolver(List<Multihash> leaves, IPFSContext context) {
this(leaves, context.getMyself(), context.getIPFSHelper(), context.getIPFSPeerCache(), context.getProviderCache());
}

public IPFSProviderResolver(IPFSProviderResolver reference, List<Multihash> leaves) {
this(leaves, reference.myself, reference.helper, reference.peerCache, reference.providerCache);
}

IPFSProviderResolver(List<Multihash> leaves, IPFSPeer myself, IPFSHelper helper, LoadingCache<Multihash, IPFSPeer> peerCache, LoadingCache<Multihash, List<Multihash>> providerCache) {
this.leaves = leaves;
this.myself = myself;
this.helper = helper;
this.peerCache = peerCache;
this.providerCache = providerCache;
}

@Override
protected Map<Multihash, IPFSPeer> compute() {
int totalLeaves = leaves.size();
if (totalLeaves == 1) {
Multihash hash = leaves.get(0);
List<IPFSPeer> providers = providerCache.getUnchecked(hash).parallelStream()
.map(peerCache::getUnchecked)
.filter(IPFSPeer::isDrillReady)
.filter(IPFSPeer::hasDrillbitAddress)
.collect(Collectors.toList());
if (providers.size() < 1) {
logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash);
providers.add(myself);
}
logger.debug("Got {} providers for {} from IPFS", providers.size(), hash);

//DRILL-7753: better peer selection algorithm
Random random = new Random();
IPFSPeer chosenPeer = providers.get(random.nextInt(providers.size()));
ret.put(hash, chosenPeer);
logger.debug("Use peer {} for leaf {}", chosenPeer, hash);
return ret;
}

int firstHalf = totalLeaves / 2;
ImmutableList<IPFSProviderResolver> resolvers = ImmutableList.of(
new IPFSProviderResolver(this, leaves.subList(0, firstHalf)),
new IPFSProviderResolver(this, leaves.subList(firstHalf, totalLeaves))
);
resolvers.forEach(ForkJoinTask::fork);
resolvers.reverse().forEach(resolver -> ret.putAll(resolver.join()));
return ret;
}
}

//DRILL-7756: detect and warn about loops/recursions in case of a malformed tree
static class IPFSTreeFlattener extends RecursiveTask<List<Multihash>> {
private final Multihash hash;
private final List<Multihash> ret = new LinkedList<>();
private final IPFSPeer myself;
private final IPFSHelper helper;

public IPFSTreeFlattener(Multihash hash, IPFSContext context) {
this(
hash,
isProvider,
context.getMyself(),
context.getIPFSHelper(),
context.getIPFSPeerCache(),
context.getProviderCache()
context.getIPFSHelper()
);
}

IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSPeer myself, IPFSHelper ipfsHelper,
LoadingCache<Multihash, IPFSPeer> peerCache, LoadingCache<Multihash, List<Multihash>> providerCache) {
IPFSTreeFlattener(Multihash hash, IPFSPeer myself, IPFSHelper ipfsHelper) {
this.hash = hash;
this.isProvider = isProvider;
this.myself = myself;
this.helper = ipfsHelper;
this.peerCache = peerCache;
this.providerCache = providerCache;
}

public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash, boolean isProvider) {
this(hash, isProvider, reference.myself, reference.helper, reference.peerCache, reference.providerCache);
public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash) {
this(hash, reference.myself, reference.helper);
}

@Override
public Map<Multihash, String> compute() {
public List<Multihash> compute() {
try {
if (isProvider) {
IPFSPeer peer = peerCache.getUnchecked(hash);
ret.put(hash, peer.getDrillbitAddress().orElse(null));
return ret;
}

MerkleNode metaOrSimpleNode = helper.getObjectLinksTimeout(hash);
if (metaOrSimpleNode.links.size() > 0) {
logger.debug("{} is a meta node", hash);
Expand All @@ -382,68 +455,19 @@ public Map<Multihash, String> compute() {

ImmutableList.Builder<IPFSTreeFlattener> builder = ImmutableList.builder();
for (Multihash intermediate : intermediates.subList(1, intermediates.size())) {
builder.add(new IPFSTreeFlattener(this, intermediate, false));
builder.add(new IPFSTreeFlattener(this, intermediate));
}
ImmutableList<IPFSTreeFlattener> subtasks = builder.build();
subtasks.forEach(IPFSTreeFlattener::fork);

IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0), false);
ret.putAll(first.compute());
IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0));
ret.addAll(first.compute());
subtasks.reverse().forEach(
subtask -> ret.putAll(subtask.join())
subtask -> ret.addAll(subtask.join())
);
} else {
logger.debug("{} is a simple node", hash);
List<IPFSPeer> providers = providerCache.getUnchecked(hash).stream()
.map(peerCache::getUnchecked)
.collect(Collectors.toList());
providers = providers.stream()
.filter(IPFSPeer::isDrillReady)
.collect(Collectors.toList());
if (providers.size() < 1) {
logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash);
providers.add(myself);
}

logger.debug("Got {} providers for {} from IPFS", providers.size(), hash);
ImmutableList.Builder<IPFSTreeFlattener> builder = ImmutableList.builder();
for (IPFSPeer provider : providers.subList(1, providers.size())) {
builder.add(new IPFSTreeFlattener(this, provider.getId(), true));
}
ImmutableList<IPFSTreeFlattener> subtasks = builder.build();
subtasks.forEach(IPFSTreeFlattener::fork);

List<String> possibleAddrs = new ArrayList<>();
Multihash firstProvider = providers.get(0).getId();
IPFSTreeFlattener firstTask = new IPFSTreeFlattener(this, firstProvider, true);
String firstAddr = firstTask.compute().get(firstProvider);
if (firstAddr != null) {
possibleAddrs.add(firstAddr);
}

subtasks.reverse().forEach(
subtask -> {
String addr = subtask.join().get(subtask.hash);
if (addr != null) {
possibleAddrs.add(addr);
}
}
);

if (possibleAddrs.size() < 1) {
logger.error("All attempts to find an appropriate provider address for {} have failed", hash);
throw UserException
.planError()
.message("No address found for any provider for leaf " + hash)
.build(logger);
} else {
//DRILL-7753: better peer selection algorithm
Random random = new Random();
String chosenAddr = possibleAddrs.get(random.nextInt(possibleAddrs.size()));
ret.clear();
ret.put(hash, chosenAddr);
logger.debug("Got peer host {} for leaf {}", chosenAddr, hash);
}
ret.add(hash);
}
} catch (IOException e) {
throw UserException.planError(e).message("Exception during planning").build(logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class IPFSStoragePluginConfig extends StoragePluginConfigBase {
@JsonProperty("max-nodes-per-leaf")
private final int maxNodesPerLeaf;

@JsonProperty("distributed-mode")
private final boolean distributedMode;

@JsonProperty("ipfs-timeouts")
private final Map<IPFSTimeOut, Integer> ipfsTimeouts;

Expand Down Expand Up @@ -156,13 +159,15 @@ public IPFSStoragePluginConfig(
@JsonProperty("host") String host,
@JsonProperty("port") int port,
@JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
@JsonProperty("distributed-mode") boolean distributedMode,
@JsonProperty("ipfs-timeouts") Map<IPFSTimeOut, Integer> ipfsTimeouts,
@JsonProperty("ipfs-caches") Map<IPFSCacheType, IPFSCache> ipfsCaches,
@JsonProperty("groupscan-worker-threads") int numWorkerThreads,
@JsonProperty("formats") Map<String, FormatPluginConfig> formats) {
this.host = host;
this.port = port;
this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
this.distributedMode = distributedMode;
this.ipfsTimeouts = applyDefaultMap(ipfsTimeouts, ipfsTimeoutDefaults);
this.ipfsCaches = applyDefaultMap(ipfsCaches, ipfsCacheDefaults);
this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1;
Expand Down Expand Up @@ -196,6 +201,11 @@ public int getMaxNodesPerLeaf() {
return maxNodesPerLeaf;
}

@JsonProperty("distributed-mode")
public boolean isDistributedMode() {
return distributedMode;
}

@JsonIgnore
public int getIPFSTimeout(IPFSTimeOut which) {
return ipfsTimeouts.get(which);
Expand Down Expand Up @@ -228,7 +238,7 @@ public Map<String, FormatPluginConfig> getFormats() {

@Override
public int hashCode() {
return Objects.hashCode(host, port, maxNodesPerLeaf, ipfsTimeouts, ipfsCaches, formats);
return Objects.hashCode(host, port, maxNodesPerLeaf, distributedMode, ipfsTimeouts, ipfsCaches, formats);
}

@Override
Expand All @@ -246,6 +256,7 @@ public boolean equals(Object obj) {
&& Objects.equal(ipfsCaches, other.ipfsTimeouts)
&& port == other.port
&& maxNodesPerLeaf == other.maxNodesPerLeaf
&& distributedMode == other.distributedMode
&& numWorkerThreads == other.numWorkerThreads;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"host": "127.0.0.1",
"port": 5001,
"max-nodes-per-leaf": 3,
"distributed-mode": false,
"ipfs-timeouts": {
"find-provider": 4,
"find-peer-info": 4,
Expand Down
Loading

0 comments on commit 0c34b56

Please sign in to comment.