Skip to content

Commit

Permalink
Add ConnectorNodePartitioningProvider#getBucketCount
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Jun 15, 2020
1 parent cbe1cb1 commit 21f0b04
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,11 @@ public BucketFunction getBucketFunction(
return (int) (hash % bucketCount);
};
}

@Override
public int getBucketCount(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
// create one bucket per node
return nodeManager.getRequiredWorkerNodes().size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public ToIntFunction<ConnectorSplit> getSplitBucketFunction(
.orElseThrow(() -> new IllegalArgumentException("Bucket number not set in split"));
}

@Override
public int getBucketCount(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
HivePartitioningHandle handle = (HivePartitioningHandle) partitioningHandle;
return handle.getBucketCount();
}

@Override
public List<ConnectorPartitionHandle> listPartitionHandles(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ public BucketFunction getBucketFunction(
{
return null;
}

@Override
public int getBucketCount(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,11 @@ public BucketFunction getBucketFunction(ConnectorTransactionHandle transaction,
{
return new RaptorBucketFunction(bucketCount, partitionChannelTypes);
}

@Override
public int getBucketCount(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
RaptorPartitioningHandle handle = (RaptorPartitioningHandle) partitioningHandle;
return handle.getBucketToNode().size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ BucketFunction getBucketFunction(
ConnectorPartitioningHandle partitioningHandle,
List<Type> partitionChannelTypes,
int bucketCount);

int getBucketCount(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
ConnectorPartitioningHandle partitioningHandle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,12 @@ public ToIntFunction<ConnectorSplit> getSplitBucketFunction(ConnectorTransaction
return delegate.getSplitBucketFunction(transactionHandle, session, partitioningHandle);
}
}

@Override
public int getBucketCount(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getBucketCount(transactionHandle, session, partitioningHandle);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,12 @@ public BucketFunction getBucketFunction(ConnectorTransactionHandle transactionHa
{
throw new UnsupportedOperationException();
}

@Override
public int getBucketCount(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
Set<Node> nodes = nodeManager.getRequiredWorkerNodes();
checkState(!nodes.isEmpty(), "No TPCDS nodes available");
return nodes.size() * splitsPerNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,10 @@ public BucketFunction getBucketFunction(ConnectorTransactionHandle transactionHa
checkArgument(partitionChannelTypes.equals(ImmutableList.of(BIGINT)), "Expected one BIGINT parameter");
return new TpchBucketFunction(bucketCount, rowsPerBucket);
}

@Override
public int getBucketCount(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
{
return nodeManager.getRequiredWorkerNodes().size() * splitsPerNode;
}
}

0 comments on commit 21f0b04

Please sign in to comment.