Skip to content

[Failure Store] Add more test coverage #126891

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.notNullValue;

public class FailureStoreSecurityRestIT extends ESRestTestCase {
Expand Down Expand Up @@ -1921,6 +1924,335 @@ public void testFailureStoreAccess() throws Exception {
}
}

public void testModifyingFailureStoreBackingIndices() throws Exception {
setupDataStream();
Tuple<String, String> backingIndices = getSingleDataAndFailureIndices("test1");
String dataIndexName = backingIndices.v1();
String failureIndexName = backingIndices.v2();

createUser(MANAGE_ACCESS, PASSWORD, MANAGE_ACCESS);
createOrUpdateRoleAndApiKey(MANAGE_ACCESS, MANAGE_ACCESS, """
{
"cluster": ["all"],
"indices": [{"names": ["test*"], "privileges": ["manage"]}]
}""");
assertOK(addFailureStoreBackingIndex(MANAGE_ACCESS, "test1", failureIndexName));
assertDataStreamHasDataAndFailureIndices("test1", dataIndexName, failureIndexName);

// remove the failure index
assertOK(removeFailureStoreBackingIndex(MANAGE_ACCESS, "test1", failureIndexName));
assertDataStreamHasNoFailureIndices("test1", dataIndexName);

// adding it will fail because the user has no direct access to the backing failure index (.fs*)
expectThrows(() -> addFailureStoreBackingIndex(MANAGE_ACCESS, "test1", failureIndexName), 403);

// let's change that
createOrUpdateRoleAndApiKey(MANAGE_ACCESS, MANAGE_ACCESS, """
{
"cluster": ["all"],
"indices": [{"names": ["test*", ".fs*"], "privileges": ["manage"]}]
}""");

// adding should succeed now
assertOK(addFailureStoreBackingIndex(MANAGE_ACCESS, "test1", failureIndexName));
assertDataStreamHasDataAndFailureIndices("test1", dataIndexName, failureIndexName);

createUser(MANAGE_FAILURE_STORE_ACCESS, PASSWORD, MANAGE_FAILURE_STORE_ACCESS);
createOrUpdateRoleAndApiKey(MANAGE_FAILURE_STORE_ACCESS, MANAGE_FAILURE_STORE_ACCESS, """
{
"cluster": ["all"],
"indices": [{"names": ["test*"], "privileges": ["manage_failure_store"]}]
}""");

// manage_failure_store can only remove the failure backing index, but not add it
assertOK(removeFailureStoreBackingIndex(MANAGE_FAILURE_STORE_ACCESS, "test1", failureIndexName));
assertDataStreamHasNoFailureIndices("test1", dataIndexName);
expectThrows(() -> addFailureStoreBackingIndex(MANAGE_FAILURE_STORE_ACCESS, "test1", failureIndexName), 403);

// not even with access to .fs*
createOrUpdateRoleAndApiKey(MANAGE_FAILURE_STORE_ACCESS, MANAGE_FAILURE_STORE_ACCESS, """
{
"cluster": ["all"],
"indices": [{"names": ["test*", ".fs*"], "privileges": ["manage_failure_store"]}]
}""");
expectThrows(() -> addFailureStoreBackingIndex(MANAGE_FAILURE_STORE_ACCESS, "test1", failureIndexName), 403);
}

private void assertDataStreamHasDataAndFailureIndices(String dataStreamName, String dataIndexName, String failureIndexName)
throws IOException {
Tuple<List<String>, List<String>> indices = getDataAndFailureIndices(dataStreamName);
assertThat(indices.v1(), containsInAnyOrder(dataIndexName));
assertThat(indices.v2(), containsInAnyOrder(failureIndexName));
}

private void assertDataStreamHasNoFailureIndices(String dataStreamName, String dataIndexName) throws IOException {
Tuple<List<String>, List<String>> indices = getDataAndFailureIndices(dataStreamName);
assertThat(indices.v1(), containsInAnyOrder(dataIndexName));
assertThat(indices.v2(), is(empty()));
}

private Response addFailureStoreBackingIndex(String user, String dataStreamName, String failureIndexName) throws IOException {
return modifyFailureStoreBackingIndex(user, "add_backing_index", dataStreamName, failureIndexName);
}

private Response removeFailureStoreBackingIndex(String user, String dataStreamName, String failureIndexName) throws IOException {
return modifyFailureStoreBackingIndex(user, "remove_backing_index", dataStreamName, failureIndexName);
}

private Response modifyFailureStoreBackingIndex(String user, String action, String dataStreamName, String failureIndexName)
throws IOException {
Request request = new Request("POST", "/_data_stream/_modify");
request.setJsonEntity(Strings.format("""
{
"actions": [
{
"%s": {
"data_stream": "%s",
"index": "%s",
"failure_store" : true
}
}
]
}
""", action, dataStreamName, failureIndexName));
return performRequest(user, request);
}

public void testDataStreamApis() throws Exception {
setupDataStream();
setupOtherDataStream();

final String username = "user";
final String roleName = "role";
createUser(username, PASSWORD, roleName);
{
// manage_failure_store does not grant access to _data_stream APIs
createOrUpdateRoleAndApiKey(username, roleName, """
{
"cluster": ["all"],
"indices": [
{
"names": ["test1", "other1"],
"privileges": ["manage_failure_store"]
}
]
}
""");

expectThrows(() -> performRequest(username, new Request("GET", "/_data_stream/test1")), 403);
expectThrows(() -> performRequest(username, new Request("GET", "/_data_stream/test1/_stats")), 403);
expectThrows(() -> performRequest(username, new Request("GET", "/_data_stream/test1/_options")), 403);
expectThrows(() -> performRequest(username, new Request("GET", "/_data_stream/test1/_lifecycle")), 403);
expectThrows(() -> putDataStreamLifecycle(username, "test1", """
{
"data_retention": "7d"
}"""), 403);
expectEmptyDataStreamStats(username, new Request("GET", "/_data_stream/_stats"));
expectEmptyDataStreamStats(username, new Request("GET", "/_data_stream/" + randomFrom("test*", "*") + "/_stats"));
}
{
createOrUpdateRoleAndApiKey(username, roleName, """
{
"cluster": ["all"],
"indices": [
{
"names": ["test1"],
"privileges": ["manage"]
}
]
}
""");

expectDataStreamStats(username, new Request("GET", "/_data_stream/_stats"), "test1", 2);
expectDataStreamStats(
username,
new Request("GET", "/_data_stream/" + randomFrom("test1", "test*", "*") + "/_stats"),
"test1",
2
);
expectDataStreams(username, new Request("GET", "/_data_stream/" + randomFrom("test1", "test*", "*")), "test1");
putDataStreamLifecycle(username, "test1", """
{
"data_retention": "7d"
}""");

var lifecycleResponse = assertOKAndCreateObjectPath(
performRequest(username, new Request("GET", "/_data_stream/" + randomFrom("test1", "test*", "*") + "/_lifecycle"))
);
assertThat(lifecycleResponse.evaluate("data_streams"), iterableWithSize(1));
assertThat(lifecycleResponse.evaluate("data_streams.0.name"), equalTo("test1"));

var optionsResponse = assertOKAndCreateObjectPath(
performRequest(username, new Request("GET", "/_data_stream/" + randomFrom("test1", "test*", "*") + "/_options"))
);
assertThat(optionsResponse.evaluate("data_streams"), iterableWithSize(1));
assertThat(optionsResponse.evaluate("data_streams.0.name"), equalTo("test1"));
}
}

private void putDataStreamLifecycle(String user, String dataStreamName, String lifecyclePolicy) throws IOException {
Request request = new Request("PUT", "/_data_stream/" + dataStreamName + "/_lifecycle");
request.setJsonEntity(lifecyclePolicy);
assertOK(performRequest(user, request));
}

private void expectDataStreams(String user, Request dataStreamRequest, String dataStreamName) throws IOException {
Response response = performRequest(user, dataStreamRequest);
ObjectPath path = assertOKAndCreateObjectPath(response);
List<?> dataStreams = path.evaluate("data_streams");
assertThat(dataStreams.size(), equalTo(1));
assertThat(path.evaluate("data_streams.0.name"), equalTo(dataStreamName));
}

private void expectDataStreamStats(String user, Request statsRequest, String dataStreamName, int backingIndices) throws IOException {
Response response = performRequest(user, statsRequest);
ObjectPath path = assertOKAndCreateObjectPath(response);
assertThat(path.evaluate("data_stream_count"), equalTo(1));
assertThat(path.evaluate("backing_indices"), equalTo(backingIndices));
assertThat(path.evaluate("data_streams.0.data_stream"), equalTo(dataStreamName));
}

private void expectEmptyDataStreamStats(String user, Request request) throws IOException {
Response response = performRequest(user, request);
ObjectPath path = assertOKAndCreateObjectPath(response);
assertThat(path.evaluate("data_stream_count"), equalTo(0));
assertThat(path.evaluate("backing_indices"), equalTo(0));
}

public void testWatcher() throws Exception {
setupDataStream();
setupOtherDataStream();

final Tuple<String, String> backingIndices = getSingleDataAndFailureIndices("test1");
final String dataIndexName = backingIndices.v1();
final String failureIndexName = backingIndices.v2();

final String watchId = "failures-watch";
final String username = "user";
final String roleName = "role";
createUser(username, PASSWORD, roleName);

{
// grant access to the failure store
createOrUpdateRoleAndApiKey(username, roleName, """
{
"cluster": ["manage_security", "manage_watcher"],
"indices": [
{
"names": ["test1"],
"privileges": ["read_failure_store"]
}
]
}
""");

// searching the failure store should return only test1 failure indices
createOrUpdateWatcher(username, watchId, Strings.format("""
{
"trigger": { "schedule": { "interval": "60m"}},
"input": {
"search": {
"request": {
"indices": [ "%s" ],
"body": {"query": {"match_all": {}}}
}
}
}
}""", randomFrom("test1::failures", "test*::failures", "*::failures", failureIndexName)));
executeWatchAndAssertResults(username, watchId, failureIndexName);

// searching the data should return empty results
createOrUpdateWatcher(username, watchId, Strings.format("""
{
"trigger": { "schedule": { "interval": "60m"}},
"input": {
"search": {
"request": {
"indices": [ "%s" ],
"body": {"query": {"match_all": {}}}
}
}
}
}""", randomFrom(dataIndexName, "*", "test*", "test1", "test1::data")));
executeWatchAndAssertEmptyResults(username, watchId);
}

{
// remove read_failure_store and add read
createOrUpdateRoleAndApiKey(username, roleName, """
{
"cluster": ["manage_security", "manage_watcher"],
"indices": [
{
"names": ["test1"],
"privileges": ["read"]
}
]
}
""");

// searching the failure store should return empty results
createOrUpdateWatcher(username, watchId, Strings.format("""
{
"trigger": { "schedule": { "interval": "60m"}},
"input": {
"search": {
"request": {
"indices": [ "%s" ],
"body": {"query": {"match_all": {}}}
}
}
}
}""", randomFrom("test1::failures", "test*::failures", "*::failures", failureIndexName)));
executeWatchAndAssertEmptyResults(username, watchId);

// searching the data should return single result
createOrUpdateWatcher(username, watchId, Strings.format("""
{
"trigger": { "schedule": { "interval": "60m"}},
"input": {
"search": {
"request": {
"indices": [ "%s" ],
"body": {"query": {"match_all": {}}}
}
}
}
}""", randomFrom("*", "test*", "test1", "test1::data", dataIndexName)));
executeWatchAndAssertResults(username, watchId, dataIndexName);
}
}

private void executeWatchAndAssertResults(String user, String watchId, final String... expectedIndices) throws IOException {
Request request = new Request("POST", "_watcher/watch/" + watchId + "/_execute");
Response response = performRequest(user, request);
ObjectPath path = assertOKAndCreateObjectPath(response);
assertThat(path.evaluate("watch_record.user"), equalTo(user));
assertThat(path.evaluate("watch_record.state"), equalTo("executed"));
assertThat(path.evaluate("watch_record.result.input.status"), equalTo("success"));
assertThat(path.evaluate("watch_record.result.input.payload.hits.total"), equalTo(expectedIndices.length));
List<Map<String, ?>> hits = path.evaluate("watch_record.result.input.payload.hits.hits");
hits.stream().map(hit -> hit.get("_index")).forEach(index -> { assertThat(index, is(in(expectedIndices))); });
}

private void executeWatchAndAssertEmptyResults(String user, String watchId) throws IOException {
Request request = new Request("POST", "_watcher/watch/" + watchId + "/_execute");
Response response = performRequest(user, request);
ObjectPath path = assertOKAndCreateObjectPath(response);
assertThat(path.evaluate("watch_record.user"), equalTo(user));
assertThat(path.evaluate("watch_record.state"), equalTo("executed"));
assertThat(path.evaluate("watch_record.result.input.status"), equalTo("success"));
assertThat(path.evaluate("watch_record.result.input.payload.hits.total"), equalTo(0));
List<Map<String, ?>> hits = path.evaluate("watch_record.result.input.payload.hits.hits");
assertThat(hits.size(), equalTo(0));
}

private void createOrUpdateWatcher(String user, String watchId, String watch) throws IOException {
Request request = new Request("PUT", "/_watcher/watch/" + watchId);
request.setJsonEntity(watch);
assertOK(performRequest(user, request));
}

public void testAliasBasedAccess() throws Exception {
List<String> docIds = setupDataStream();
assertThat(docIds.size(), equalTo(2));
Expand Down