Skip to content

Commit aaead92

Browse files
author
Cameron Zemek
committed
Rebuild from targeted replica (CASSANDRA-9875)
1 parent cd006d2 commit aaead92

File tree

3 files changed

+119
-3
lines changed

3 files changed

+119
-3
lines changed

src/java/org/apache/cassandra/dht/RangeStreamer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,24 @@ public boolean shouldInclude(InetAddress endpoint)
122122
}
123123
}
124124

125+
/**
126+
* Source filter which only includes endpoints contained within a provided set.
127+
*/
128+
public static class WhitelistedSourcesFilter implements ISourceFilter
129+
{
130+
private final Set<InetAddress> whitelistedSources;
131+
132+
public WhitelistedSourcesFilter(Set<InetAddress> whitelistedSources)
133+
{
134+
this.whitelistedSources = whitelistedSources;
135+
}
136+
137+
public boolean shouldInclude(InetAddress endpoint)
138+
{
139+
return whitelistedSources.contains(endpoint);
140+
}
141+
}
142+
125143
public RangeStreamer(TokenMetadata metadata,
126144
Collection<Token> tokens,
127145
InetAddress address,

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@
8080
import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
8181
import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport;
8282

83+
import java.util.Scanner;
84+
import java.util.regex.MatchResult;
85+
import java.util.regex.Pattern;
86+
8387
import static java.util.concurrent.TimeUnit.MINUTES;
8488

8589
/**
@@ -1111,12 +1115,26 @@ public boolean isJoined()
11111115
public void rebuild(String sourceDc)
11121116
{
11131117
// check on going rebuild
1118+
rebuild(sourceDc, null, null, null);
1119+
}
1120+
1121+
public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources)
1122+
{
1123+
// check ongoing rebuild
11141124
if (!isRebuilding.compareAndSet(false, true))
11151125
{
11161126
throw new IllegalStateException("Node is still rebuilding. Check nodetool netstats.");
11171127
}
11181128

1119-
logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
1129+
// check the arguments
1130+
if (keyspace == null && tokens != null)
1131+
{
1132+
throw new IllegalArgumentException("Cannot specify tokens without keyspace.");
1133+
}
1134+
1135+
logger.info("rebuild from dc: {}, {}, {}", sourceDc == null ? "(any dc)" : sourceDc,
1136+
keyspace == null ? "(All keyspaces)" : keyspace,
1137+
tokens == null ? "(All tokens)" : tokens);
11201138

11211139
try
11221140
{
@@ -1131,8 +1149,78 @@ public void rebuild(String sourceDc)
11311149
if (sourceDc != null)
11321150
streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
11331151

1134-
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
1135-
streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
1152+
if (keyspace == null)
1153+
{
1154+
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
1155+
streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
1156+
}
1157+
else if (tokens == null)
1158+
{
1159+
streamer.addRanges(keyspace, getLocalRanges(keyspace));
1160+
}
1161+
else
1162+
{
1163+
Token.TokenFactory factory = getPartitioner().getTokenFactory();
1164+
List<Range<Token>> ranges = new ArrayList<>();
1165+
Pattern rangePattern = Pattern.compile("\\(\\s*(-?\\w+)\\s*,\\s*(-?\\w+)\\s*\\]");
1166+
try (Scanner tokenScanner = new Scanner(tokens))
1167+
{
1168+
while (tokenScanner.findInLine(rangePattern) != null)
1169+
{
1170+
MatchResult range = tokenScanner.match();
1171+
Token startToken = factory.fromString(range.group(1));
1172+
Token endToken = factory.fromString(range.group(2));
1173+
logger.info("adding range: ({},{}]", startToken, endToken);
1174+
ranges.add(new Range<>(startToken, endToken));
1175+
}
1176+
if (tokenScanner.hasNext())
1177+
throw new IllegalArgumentException("Unexpected string: " + tokenScanner.next());
1178+
}
1179+
1180+
// Ensure all specified ranges are actually ranges owned by this host
1181+
Collection<Range<Token>> localRanges = getLocalRanges(keyspace);
1182+
for (Range<Token> specifiedRange : ranges)
1183+
{
1184+
boolean foundParentRange = false;
1185+
for (Range<Token> localRange : localRanges)
1186+
{
1187+
if (localRange.contains(specifiedRange))
1188+
{
1189+
foundParentRange = true;
1190+
break;
1191+
}
1192+
}
1193+
if (!foundParentRange)
1194+
{
1195+
throw new IllegalArgumentException(String.format("The specified range %s is not a range that is owned by this node. Please ensure that all token ranges specified to be rebuilt belong to this node.", specifiedRange.toString()));
1196+
}
1197+
}
1198+
1199+
if (specificSources != null)
1200+
{
1201+
String[] stringHosts = specificSources.split(",");
1202+
Set<InetAddress> sources = new HashSet<>(stringHosts.length);
1203+
for (String stringHost : stringHosts)
1204+
{
1205+
try
1206+
{
1207+
InetAddress endpoint = InetAddress.getByName(stringHost);
1208+
if (FBUtilities.getBroadcastAddress().equals(endpoint))
1209+
{
1210+
throw new IllegalArgumentException("This host was specified as a source for rebuilding. Sources for a rebuild can only be other nodes in the cluster.");
1211+
}
1212+
sources.add(endpoint);
1213+
}
1214+
catch (UnknownHostException ex)
1215+
{
1216+
throw new IllegalArgumentException("Unknown host specified " + stringHost, ex);
1217+
}
1218+
}
1219+
streamer.addSourceFilter(new RangeStreamer.WhitelistedSourcesFilter(sources));
1220+
}
1221+
1222+
streamer.addRanges(keyspace, ranges);
1223+
}
11361224

11371225
StreamResultFuture resultFuture = streamer.fetchAsync();
11381226
// wait for result

src/java/org/apache/cassandra/service/StorageServiceMBean.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,16 @@ public interface StorageServiceMBean extends NotificationEmitter
519519
*/
520520
public void rebuild(String sourceDc);
521521

522+
/**
523+
* Same as {@link #rebuild(String)}, but only for specified keyspace and ranges.
524+
*
525+
* @param sourceDc Name of DC from which to select sources for streaming or null to pick any node
526+
* @param keyspace Name of the keyspace which to rebuild or null to rebuild all keyspaces.
527+
* @param tokens Range of tokens to rebuild or null to rebuild all token ranges. In the format of:
528+
* "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]"
529+
*/
530+
public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources);
531+
522532
/** Starts a bulk load and blocks until it completes. */
523533
public void bulkLoad(String directory);
524534

0 commit comments

Comments
 (0)