Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Bootstrap optimisation lld flow #2943

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.ambry.clustermap;

public class FileStoreException extends RuntimeException{

private static final long serialVersionUID = 1L;
private final FileStoreErrorCode error;

public FileStoreException(String s, FileStoreErrorCode error) {
super(s);
this.error = error;
}

public FileStoreException(String s, FileStoreErrorCode error, Throwable throwable) {
super(s, throwable);
this.error = error;
}

public enum FileStoreErrorCode{
FileStoreRunningFailure
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface ReplicaSyncUpManager {
* @throws InterruptedException
*/
void waitBootstrapCompleted(String partitionName) throws InterruptedException;
void initiateFileCopy(ReplicaId replicaId);
void waitForFileCopyCompleted(String partitionName) throws InterruptedException;

/**
* Update replica lag (in byte) between two replicas (local and peer replica) and check sync-up status.
Expand Down Expand Up @@ -64,6 +66,8 @@ boolean updateReplicaLagAndCheckSyncStatus(ReplicaId localReplica, ReplicaId pee
*/
void onBootstrapComplete(ReplicaId replicaId);

void onFileCopyComplete(ReplicaId replicaId);

/**
* Deactivation on given replica is complete.
* @param replicaId the replica which completes deactivation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,13 @@ public enum StateModelListenerType {
* leadership hand-off occurs. For example, if any replica becomes LEADER from STANDBY, it is supposed to replicate
* data from VCR nodes. This is part of two-way replication between Ambry and cloud.
*/
CloudToStoreReplicationManagerListener
CloudToStoreReplicationManagerListener,

/**
* The partition state change listener owned by Helix participant. It takes actions when partition state transition
* occurs.
*/
FileCopyManagerListener


}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public enum TransitionErrorCode {
/**
* If the resource name is not a numeric number.
*/
InvalidResourceName
InvalidResourceName,

FileCopyFailure
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.github.ambry.config;

public class FileCopyConfig {

public static final String PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK = "parallel.partition.hydration.count.per.disk";
@Config(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK)
public final int parallelPartitionHydrationCountPerDisk;

public static final String NUMBER_OF_FILE_COPY_THREADS = "number.of.file.copy.threads";
@Config(NUMBER_OF_FILE_COPY_THREADS)
public final int numberOfFileCopyThreads;

public static final String FILE_CHUNK_TIMEOUT_IN_MINUTES = "file.chunk.timeout.in.minutes";
@Config(FILE_CHUNK_TIMEOUT_IN_MINUTES)
public final long fileChunkTimeoutInMins;

/**
* The frequency at which the data gets flushed to disk
*/
public static final String STORE_DATA_FLUSH_INTERVAL_IN_MBS = "store.data.flush.interval.In.MBs";
@Config(STORE_DATA_FLUSH_INTERVAL_IN_MBS)
@Default("1000")
public final long storeDataFlushIntervalInMbs;

public static final String File_COPY_META_DATA_FILE_NAME = "file.copy.meta.data.file.name";
@Config(File_COPY_META_DATA_FILE_NAME)
@Default("sealed_logs_metadata_file")
public final String fileCopyMetaDataFileName;

public FileCopyConfig(VerifiableProperties verifiableProperties) {
fileCopyMetaDataFileName = verifiableProperties.getString(File_COPY_META_DATA_FILE_NAME, "sealed_logs_metadata_file");
parallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1);
numberOfFileCopyThreads = verifiableProperties.getInt(NUMBER_OF_FILE_COPY_THREADS, 4);
fileChunkTimeoutInMins = verifiableProperties.getInt(FILE_CHUNK_TIMEOUT_IN_MINUTES, 5);
storeDataFlushIntervalInMbs = verifiableProperties.getLong(STORE_DATA_FLUSH_INTERVAL_IN_MBS, 1000);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.ambry.config;

public enum ServerReplicationMode {
BLOB_BASED,
FILE_BASED;
}
23 changes: 23 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -674,15 +674,38 @@ public class StoreConfig {
public final boolean storeBlockStaleBlobStoreToStart;
public final static String storeBlockStaleBlobStoreToStartName = "store.block.stale.blob.store.to.start";

/**
* Config to Decide Replication Protocol For Hydration Of Newly Added Replicas
*/
public static final String SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION = "server.replication.protocol.for.hydration";
@Config(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION)
public final ServerReplicationMode serverReplicationProtocolForHydration;


/**
* Whether to attempt reshuffling of reordered disks and subsequent process termination.
*/
@Config("store.reshuffle.disks.on.reorder")
@Default("false")
public final boolean storeReshuffleDisksOnReorder;

public static final String FILE_COPY_IN_PROGRESS_FILE_NAME = "file.copy.in.progress.file.name";
@Config(FILE_COPY_IN_PROGRESS_FILE_NAME)
@Default("file_copy_in_progress")
public final String fileCopyInProgressFileName;

public static final String BOOTSTRAP_IN_PROGRESS_FILE = "bootstrap.in.progress.file.name";
@Config(BOOTSTRAP_IN_PROGRESS_FILE)
@Default("bootstrap_in_progress")
public final String bootstrapInProgressFile;

public final static String storeReshuffleDisksOnReorderName = "store.reshuffle.disks.on.reorder";

public StoreConfig(VerifiableProperties verifiableProperties) {
bootstrapInProgressFile = verifiableProperties.getString(BOOTSTRAP_IN_PROGRESS_FILE, "bootstrap_in_progress");
fileCopyInProgressFileName = verifiableProperties.getString(FILE_COPY_IN_PROGRESS_FILE_NAME, "file_copy_in_progress");
serverReplicationProtocolForHydration = verifiableProperties.getEnum(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION,
ServerReplicationMode.class, ServerReplicationMode.BLOB_BASED);
storeKeyFactory = verifiableProperties.getString("store.key.factory", "com.github.ambry.commons.BlobIdFactory");
storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.interval.seconds", 60);
storeIndexMaxMemorySizeBytes = verifiableProperties.getInt("store.index.max.memory.size.bytes", 20 * 1024 * 1024);
Expand Down
15 changes: 15 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public interface RequestAPI {
*/
void handleReplicaMetadataRequest(NetworkRequest request) throws IOException, InterruptedException;

/**
*
* @param request
* @throws IOException
* @throws InterruptedException
*/

/**
* Replicate one specific Blob from a remote host to the local store.
* @param request The request that contains the remote host information and the blob id to be replicated.
Expand Down Expand Up @@ -116,4 +123,12 @@ default void handleAdminRequest(NetworkRequest request) throws InterruptedExcept
default void handleUndeleteRequest(NetworkRequest request) throws InterruptedException, IOException {
throw new UnsupportedOperationException("Undelete request not supported on this node");
}

default void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException{
throw new UnsupportedOperationException("File Meta Data request not supported on this node");
}

default void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException{
throw new UnsupportedOperationException("File Chunk request not supported on this node");
}
}
15 changes: 15 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/server/StoreManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.store.FileStore;
import com.github.ambry.store.Store;
import com.github.ambry.store.StoreException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;



/**
* High level interface for handling and managing blob stores.
*/
Expand All @@ -34,6 +36,17 @@ public interface StoreManager {
*/
boolean addBlobStore(ReplicaId replica);

boolean addFileStore(ReplicaId replicaId);

void setUpReplica(String partitionName);


/**
* Build state after filecopy is completed
* @param partitionName the partition id for which state is to be built..
*/
void buildStateForFileCopy(String partitionName);

/**
* Remove store from storage manager.
* @param id the {@link PartitionId} associated with store
Expand Down Expand Up @@ -62,6 +75,8 @@ public interface StoreManager {
*/
Store getStore(PartitionId id);

FileStore getFileStore(PartitionId id);

/**
* Get replicaId on current node by partition name. (There should be at most one replica belonging to specific
* partition on single node)
Expand Down
34 changes: 34 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/FileMetaData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.github.ambry.store;

import java.util.List;


public class FileMetaData {
SealedFileInfo sealedSegments;
List<SealedFileInfo> indexSegments;
List<SealedFileInfo> bloomFilters;

public SealedFileInfo getSealedSegments() {
return sealedSegments;
}

public void setSealedSegments(SealedFileInfo sealedSegments) {
this.sealedSegments = sealedSegments;
}

public List<SealedFileInfo> getIndexSegments() {
return indexSegments;
}

public void setIndexSegments(List<SealedFileInfo> indexSegments) {
this.indexSegments = indexSegments;
}

public List<SealedFileInfo> getBloomFilters() {
return bloomFilters;
}

public void setBloomFilters(List<SealedFileInfo> bloomFilters) {
this.bloomFilters = bloomFilters;
}
}
65 changes: 65 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/FileStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.github.ambry.store;

import com.github.ambry.clustermap.FileStoreException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.ConcurrentHashMap;
import com.github.ambry.clustermap.FileStoreException.FileStoreErrorCode;


public class FileStore {
private static boolean isRunning = false;

private final String dataDir;

public FileStore(String dataDir){
this.dataDir = dataDir;
}

public ConcurrentHashMap<String, FileChannel> fileNameToFileChannelMap;

public void start() throws StoreException {
isRunning = true;
}
public boolean isRunning() {
return isRunning;
}
public void stop() {
isRunning = false;
}

public void putChunkToFile(String mountPath, String fileName, ByteBuffer byteBuffer, long offset, long size){
if(!isRunning){
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
}
if(byteBuffer == null){
throw new IllegalArgumentException("ByteBuffer is null");
}
FileChannel currentFileBuffer = fileNameToFileChannelMap.get(fileName);
if(currentFileBuffer == null){
throw new IllegalArgumentException("File not found");
}

//long currentOffset =
}


public void persistMetaDataToFile(String mountPath, String fileName, ByteBuffer byteBuffer, long offset, long size){
if(!isRunning){
throw new FileStoreException("FileStore is not running", FileStoreErrorCode.FileStoreRunningFailure);
}
if(byteBuffer == null){
throw new IllegalArgumentException("ByteBuffer is null");
}
FileChannel currentFileBuffer = fileNameToFileChannelMap.get(fileName);
if(currentFileBuffer == null){
throw new IllegalArgumentException("File not found");
}

//long currentOffset =
}

public void shutdown(){
return;
}
}
19 changes: 19 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/store/SealedFileInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.github.ambry.store;

public class SealedFileInfo {
private String fileName;
private final long fileSize;

public SealedFileInfo(String fileName, Long fileSize) {
this.fileName = fileName;
this.fileSize = fileSize;
}
public String getFileName() {
return fileName;
}

public Long getFileSize() {
return fileSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.server.StoreManager;
import com.github.ambry.store.FileStore;
import com.github.ambry.store.Store;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -57,6 +58,21 @@ public boolean addBlobStore(ReplicaId replica) {
return createAndStartBlobStoreIfAbsent(replica.getPartitionId()) != null;
}

@Override
public boolean addFileStore(ReplicaId replicaId) {
return false;
}

@Override
public void setUpReplica(String partitionName) {

}

@Override
public void buildStateForFileCopy(String partitionName){
// no-op
}

@Override
public boolean shutdownBlobStore(PartitionId id) {
try {
Expand Down Expand Up @@ -84,6 +100,11 @@ public Store getStore(PartitionId id) {
return createAndStartBlobStoreIfAbsent(id);
}

@Override
public FileStore getFileStore(PartitionId id) {
return null;
}

@Override
public boolean scheduleNextForCompaction(PartitionId id) {
throw new UnsupportedOperationException("Method not supported");
Expand Down
Loading