Skip to content

Commit

Permalink
Merge branch 'main' into support_flat_object_docvalue
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei authored Sep 26, 2024
2 parents c308a98 + ae22e3f commit d7c880f
Show file tree
Hide file tree
Showing 30 changed files with 1,334 additions and 264 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383))

### Dependencies
Expand All @@ -27,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `dnsjava:dnsjava` from 3.6.1 to 3.6.2 ([#16041](https://github.com/opensearch-project/OpenSearch/pull/16041))

### Changed
- Add support for docker compose v2 in TestFixturesPlugin ([#16049](https://github.com/opensearch-project/OpenSearch/pull/16049))


### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public DockerAvailability getDockerAvailability() {
Version version = null;
boolean isVersionHighEnough = false;
boolean isComposeAvailable = false;
boolean isComposeV2Available = false;

// Check if the Docker binary exists
final Optional<String> dockerBinary = getDockerPath();
Expand All @@ -129,6 +130,8 @@ public DockerAvailability getDockerAvailability() {
if (lastResult.isSuccess() && composePath.isPresent()) {
isComposeAvailable = runCommand(composePath.get(), "version").isSuccess();
}

isComposeV2Available = runCommand(dockerPath, "compose", "version").isSuccess();
}
}
}
Expand All @@ -138,6 +141,7 @@ public DockerAvailability getDockerAvailability() {
this.dockerAvailability = new DockerAvailability(
isAvailable,
isComposeAvailable,
isComposeV2Available,
isVersionHighEnough,
dockerPath,
version,
Expand Down Expand Up @@ -356,6 +360,11 @@ public static class DockerAvailability {
*/
public final boolean isComposeAvailable;

/**
* True if docker compose is available.
*/
public final boolean isComposeV2Available;

/**
* True if the installed Docker version is &gt;= 17.05
*/
Expand All @@ -379,13 +388,15 @@ public static class DockerAvailability {
DockerAvailability(
boolean isAvailable,
boolean isComposeAvailable,
boolean isComposeV2Available,
boolean isVersionHighEnough,
String path,
Version version,
Result lastCommand
) {
this.isAvailable = isAvailable;
this.isComposeAvailable = isComposeAvailable;
this.isComposeV2Available = isComposeV2Available;
this.isVersionHighEnough = isVersionHighEnough;
this.path = path;
this.version = version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ public void execute(Task task) {
.findFirst();

composeExtension.getExecutable().set(dockerCompose.isPresent() ? dockerCompose.get() : "/usr/bin/docker");
composeExtension.getUseDockerComposeV2().set(false);
if (dockerSupport.get().getDockerAvailability().isComposeV2Available) {
composeExtension.getUseDockerComposeV2().set(true);
} else if (dockerSupport.get().getDockerAvailability().isComposeAvailable) {
composeExtension.getUseDockerComposeV2().set(false);
}

tasks.named("composeUp").configure(t -> {
// Avoid running docker-compose tasks in parallel in CI due to some issues on certain Linux distributions
Expand Down Expand Up @@ -228,7 +232,8 @@ private void maybeSkipTask(Provider<DockerSupportService> dockerSupport, TaskPro

private void maybeSkipTask(Provider<DockerSupportService> dockerSupport, Task task) {
task.onlyIf(spec -> {
boolean isComposeAvailable = dockerSupport.get().getDockerAvailability().isComposeAvailable;
boolean isComposeAvailable = dockerSupport.get().getDockerAvailability().isComposeV2Available
|| dockerSupport.get().getDockerAvailability().isComposeAvailable;
if (isComposeAvailable == false) {
LOGGER.info("Task {} requires docker-compose but it is unavailable. Task will be skipped.", task.getPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ teardown:
---
"Invalid docs":
- skip:
version: "- 2.99.99"
version: "- 2.17.99"
reason: "parsing of these objects would infinite loop prior to 2.18"
# The following documents are invalid.
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ setup:

---
"Date histogram aggregation w/ shared field range test":
- do:
indices.create:
index: dhisto-agg-w-query
body:
settings:
number_of_shards: 1
number_of_replicas: 0
refresh_interval: -1
mappings:
properties:
date:
type: date

- do:
bulk:
refresh: true
Expand All @@ -127,6 +140,11 @@ setup:
- '{"index": {}}'
- '{"date": "2025-02-14"}'

- do:
indices.forcemerge:
index: dhisto-agg-w-query
max_num_segments: 1

- do:
search:
index: dhisto-agg-w-query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ teardown:
---
"Test search_as_you_type data type supports multi-fields":
- skip:
version: " - 2.99.99"
reason: "the bug was fixed since 3.0.0"
version: " - 2.17.99"
reason: "the bug was fixed since 2.18.0"

- do:
indices.get_mapping: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ public void testRemotePublicationDownloadStats() {
assertDataNodeDownloadStats(nodesStatsResponseDataNode);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15767")
public void testRemotePublicationDisabledByRollingRestart() throws Exception {
prepareCluster(3, 2, INDEX_NAME, 1, 2);
ensureStableCluster(5);
Expand Down Expand Up @@ -272,7 +271,6 @@ public void doAfterNodes(int n, Client client) {
assertTrue(
stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0
);
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
} else {
DiscoveryStats stats = nodeStats.getDiscoveryStats();
assertEquals(0, stats.getPublishStats().getFullClusterStateReceivedCount());
Expand All @@ -297,7 +295,7 @@ public void doAfterNodes(int n, Client client) {
);
if (activeCMRestarted) {
assertNull(remoteState.getLastAcceptedState());
// assertNull(remoteState.getLastAcceptedManifest());
assertNull(remoteState.getLastAcceptedManifest());
} else {
ClusterState localState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL)
.getLastAcceptedState();
Expand Down Expand Up @@ -326,7 +324,6 @@ public void doAfterNodes(int n, Client client) {
response.getNodes().forEach(nodeStats -> {
PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats();
assertTrue(stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0);
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
});
NodesInfoResponse nodesInfoResponse = client().admin()
.cluster()
Expand All @@ -341,7 +338,7 @@ public void doAfterNodes(int n, Client client) {
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node);
CoordinationState.PersistedState remoteState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
assertNull(remoteState.getLastAcceptedState());
// assertNull(remoteState.getLastAcceptedManifest());
assertNull(remoteState.getLastAcceptedManifest());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.io.IOError;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -339,10 +340,11 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
for (UploadedIndexMetadata md : manifest.getIndices()) {
Files.move(segmentRepoPath.resolve(md.getUploadedFilename()), segmentRepoPath.resolve("cluster-state/"));
}
internalCluster().stopAllNodes();
} catch (IOException e) {
throw new RuntimeException(e);
}
assertThrows(IllegalStateException.class, () -> addNewNodes(dataNodeCount, clusterManagerNodeCount));
assertThrows(IOError.class, () -> internalCluster().client());
// Test is complete

// Starting a node without remote state to ensure test cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.client.Requests;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void testRemoteTranslogCleanup() throws Exception {

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
String dataNode = internalCluster().startNode();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
Expand Down Expand Up @@ -1011,4 +1012,70 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
.get()
);
}

public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s"))
.get();
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
if (randomBoolean()) {
for (int i = 0; i < randomIntBetween(1, 5); i++) {
indexSingleDoc(INDEX_NAME);
}
flushAndRefresh(INDEX_NAME);
}
// Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
// Index another doc and in this case the flush would have happened before the sync.
indexSingleDoc(INDEX_NAME);
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
// Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}
}
Loading

0 comments on commit d7c880f

Please sign in to comment.