Skip to content

Commit

Permalink
[SnapshotV2] Support centralize snapshot creation (opensearch-project…
Browse files Browse the repository at this point in the history
…#15124)

* Initial Commit to support centralize snapshot creation and implicit locking mechanism

Signed-off-by: Anshu Agarwal <[email protected]>

* Fix deserilization error

Signed-off-by: Anshu Agarwal <[email protected]>

* Fix gradle spotless check

Signed-off-by: Anshu Agarwal <[email protected]>

* Fix listener

Signed-off-by: Anshu Agarwal <[email protected]>

* Fix test

Signed-off-by: Anshu Agarwal <[email protected]>

* Fix snapshot generation

Signed-off-by: Anshu Agarwal <[email protected]>

* Modify cluster setting name

Signed-off-by: Anshu Agarwal <[email protected]>

* Add more tests

Signed-off-by: Anshu Agarwal <[email protected]>

* Uncomment pin timestamp code

Signed-off-by: Anshu Agarwal <[email protected]>

* Modify log messages

Signed-off-by: Anshu Agarwal <[email protected]>

* Add spotless check failure fix

Signed-off-by: Anshu Agarwal <[email protected]>

* Fix completion listener for snapshot v2

Signed-off-by: Anshu Agarwal <[email protected]>

* Elevate cluster state update priority for repository metadata update task

Signed-off-by: Anshu Agarwal <[email protected]>

* Add more integ tests

Signed-off-by: Anshu Agarwal <[email protected]>

* Add priority as IMMEDIATE for cluster state repo update task only for v2 snapshots

Signed-off-by: Anshu Agarwal <[email protected]>

* Fix build error

Signed-off-by: Anshu Agarwal <[email protected]>

* Fix spotless error

Signed-off-by: Anshu Agarwal <[email protected]>

* Add repository setting for snapshot v2

Signed-off-by: Anshu Agarwal <[email protected]>

* Address review comments

Signed-off-by: Anshu Agarwal <[email protected]>

* Add integ test to verify snapshot creation if shallow copy repo setting is disabled

Signed-off-by: Anshu Agarwal <[email protected]>

* Fix spotless vilation error

Signed-off-by: Anshu Agarwal <[email protected]>

* Address review comment

Signed-off-by: Anshu Agarwal <[email protected]>

* Address review comments

Signed-off-by: Anshu Agarwal <[email protected]>

* Add min version check for backward compatibility

Signed-off-by: Anshu Agarwal <[email protected]>

* address review comments

Signed-off-by: Anshu Agarwal <[email protected]>

* add integ test for master failover scenario

Signed-off-by: Anshu Agarwal <[email protected]>

* Add more integ tests

Signed-off-by: Anshu Agarwal <[email protected]>

* refactor code

Signed-off-by: Anshu Agarwal <[email protected]>

* add changelog

Signed-off-by: Anshu Agarwal <[email protected]>

* Add pinned timestamp setting in integ tests

Signed-off-by: Anshu Agarwal <[email protected]>

---------

Signed-off-by: Anshu Agarwal <[email protected]>
Signed-off-by: Anshu Agarwal <[email protected]>
Co-authored-by: Anshu Agarwal <[email protected]>
  • Loading branch information
anshu1106 and Anshu Agarwal authored Aug 28, 2024
1 parent c771bdd commit 23cba28
Show file tree
Hide file tree
Showing 21 changed files with 1,025 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -391,6 +392,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -400,6 +402,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -127,6 +128,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -136,6 +138,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;

/**
* Transport action for create snapshot operation
*
Expand All @@ -56,12 +60,15 @@
public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
private final SnapshotsService snapshotsService;

private final RepositoriesService repositoriesService;

@Inject
public TransportCreateSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
RepositoriesService repositoriesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand All @@ -75,6 +82,7 @@ public TransportCreateSnapshotAction(
indexNameExpressionResolver
);
this.snapshotsService = snapshotsService;
this.repositoriesService = repositoriesService;
}

@Override
Expand Down Expand Up @@ -103,7 +111,9 @@ protected void clusterManagerOperation(
ClusterState state,
final ActionListener<CreateSnapshotResponse> listener
) {
if (request.waitForCompletion()) {
Repository repository = repositoriesService.repository(request.repository());
boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings());
if (request.waitForCompletion() || isSnapshotV2) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Priority;
import org.opensearch.common.lifecycle.Lifecycle;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -104,6 +105,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
in.finalizeSnapshot(
Expand All @@ -113,6 +115,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -150,6 +151,7 @@ default Repository create(RepositoryMetadata metadata, Function<String, Reposito
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and
* is used to remove any state tracked for the in-progress snapshot from the cluster state
* @param repositoryUpdatePriority priority for the cluster state update task
* @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot
*/
void finalizeSnapshot(
Expand All @@ -159,6 +161,7 @@ void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.Numbers;
import org.opensearch.common.Priority;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -266,6 +267,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

public static final Setting<Boolean> REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting("remote_store_index_shallow_copy", false);

public static final Setting<Boolean> SHALLOW_SNAPSHOT_V2 = Setting.boolSetting("shallow_snapshot_v2", false);

/**
* Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion.
* For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation
Expand Down Expand Up @@ -1046,6 +1049,7 @@ private void doDeleteShardSnapshots(
repositoryStateId,
repoMetaVersion,
Function.identity(),
Priority.NORMAL,
ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure)
);
}, listener::onFailure);
Expand Down Expand Up @@ -1520,6 +1524,7 @@ public void cleanup(
repositoryStateId,
repositoryMetaVersion,
Function.identity(),
Priority.NORMAL,
ActionListener.wrap(
v -> cleanupStaleBlobs(
Collections.emptyList(),
Expand Down Expand Up @@ -1723,6 +1728,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
final ActionListener<RepositoryData> listener
) {
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received ["
Expand Down Expand Up @@ -1759,6 +1765,7 @@ public void finalizeSnapshot(
repositoryStateId,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
ActionListener.wrap(newRepoData -> {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
listener.onResponse(newRepoData);
Expand Down Expand Up @@ -2280,17 +2287,19 @@ public boolean isSystemRepository() {
* Lastly, the {@link RepositoryMetadata} entry for this repository is updated to the new generation {@code P + 1} and thus
* pending and safe generation are set to the same value marking the end of the update of the repository data.
*
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param version version of the repository metadata to write
* @param stateFilter filter for the last cluster state update executed by this method
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param version version of the repository metadata to write
* @param stateFilter filter for the last cluster state update executed by this method
* @param repositoryUpdatePriority priority for the cluster state update task
* @param listener completion listener
*/
protected void writeIndexGen(
RepositoryData repositoryData,
long expectedGen,
Version version,
Function<ClusterState, ClusterState> stateFilter,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
assert isReadOnly() == false; // can not write to a read only repository
Expand All @@ -2315,7 +2324,7 @@ protected void writeIndexGen(
final StepListener<Long> setPendingStep = new StepListener<>();
clusterService.submitStateUpdateTask(
"set pending repository generation [" + metadata.name() + "][" + expectedGen + "]",
new ClusterStateUpdateTask() {
new ClusterStateUpdateTask(repositoryUpdatePriority) {

private long newGen;

Expand Down Expand Up @@ -2453,7 +2462,7 @@ public void onFailure(Exception e) {
// Step 3: Update CS to reflect new repository generation.
clusterService.submitStateUpdateTask(
"set safe repository generation [" + metadata.name() + "][" + newGen + "]",
new ClusterStateUpdateTask() {
new ClusterStateUpdateTask(repositoryUpdatePriority) {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryMetadata meta = getRepoMetadata(currentState);
Expand Down
Loading

0 comments on commit 23cba28

Please sign in to comment.