Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request level latency tracking #1

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
if (timeProvider.isPhaseTookEnabled()) {
searchListenersList.add(timeProvider);
}
if (!CollectionUtils.isEmpty(searchListenersList)) {
this.searchListenersList = searchListenersList;
this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger);
Expand Down Expand Up @@ -333,6 +336,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
timeProvider.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -791,6 +795,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
timeProvider.getPhaseTook(),
failures,
clusters,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private String pipeline;

private Boolean phaseTookQueryParamEnabled = null;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
Expand Down Expand Up @@ -209,6 +211,7 @@ private SearchRequest(
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
this.phaseTookQueryParamEnabled = searchRequest.phaseTookQueryParamEnabled;
}

/**
Expand Down Expand Up @@ -253,6 +256,7 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
pipeline = in.readOptionalString();
}
phaseTookQueryParamEnabled = in.readOptionalBoolean();
}

@Override
Expand Down Expand Up @@ -284,6 +288,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalString(pipeline);
}
out.writeOptionalBoolean(phaseTookQueryParamEnabled);
}

@Override
Expand Down Expand Up @@ -615,6 +620,33 @@ public void setPreFilterShardSize(int preFilterShardSize) {
this.preFilterShardSize = preFilterShardSize;
}

enum ParamValue {
TRUE,
FALSE,
UNSET
}

/**
* Returns value of user-provided phase_took query parameter for this search request.
* Defaults to <code>false</code>.
*/
public ParamValue isPhaseTookQueryParamEnabled() {
if (phaseTookQueryParamEnabled == null) {
return ParamValue.UNSET;
} else if (phaseTookQueryParamEnabled == true) {
return ParamValue.TRUE;
} else {
return ParamValue.FALSE;
}
}

/**
* Sets value of phase_took query param if provided by user. Defaults to <code>null</code>.
*/
public void setPhaseTookQueryParamEnabled(boolean phaseTookQueryParamEnabled) {
this.phaseTookQueryParamEnabled = phaseTookQueryParamEnabled;
}

/**
* Returns a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold, or <code>null</code> if the threshold is unspecified.
Expand Down Expand Up @@ -719,7 +751,8 @@ public boolean equals(Object o) {
&& absoluteStartMillis == that.absoluteStartMillis
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
&& Objects.equals(pipeline, that.pipeline);
&& Objects.equals(pipeline, that.pipeline)
&& Objects.equals(phaseTookQueryParamEnabled, that.phaseTookQueryParamEnabled);
}

@Override
Expand All @@ -740,7 +773,8 @@ public int hashCode() {
localClusterAlias,
absoluteStartMillis,
ccsMinimizeRoundtrips,
cancelAfterTimeInterval
cancelAfterTimeInterval,
phaseTookQueryParamEnabled
);
}

Expand Down Expand Up @@ -783,6 +817,8 @@ public String toString() {
+ cancelAfterTimeInterval
+ ", pipeline="
+ pipeline
+ ", phaseTookQueryParamEnabled="
+ phaseTookQueryParamEnabled
+ "}";
}
}
172 changes: 171 additions & 1 deletion server/src/main/java/org/opensearch/action/search/SearchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
private final ShardSearchFailure[] shardFailures;
private final Clusters clusters;
private final long tookInMillis;
private final PhaseTook phaseTook;

public SearchResponse(StreamInput in) throws IOException {
super(in);
Expand All @@ -112,6 +113,7 @@ public SearchResponse(StreamInput in) throws IOException {
clusters = new Clusters(in);
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
phaseTook = new PhaseTook(in);
skippedShards = in.readVInt();
pointInTimeId = in.readOptionalString();
}
Expand All @@ -126,7 +128,18 @@ public SearchResponse(
ShardSearchFailure[] shardFailures,
Clusters clusters
) {
this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters, null);
this(
internalResponse,
scrollId,
totalShards,
successfulShards,
skippedShards,
tookInMillis,
SearchResponse.PhaseTook.NULL,
shardFailures,
clusters,
null
);
}

public SearchResponse(
Expand All @@ -136,6 +149,7 @@ public SearchResponse(
int successfulShards,
int skippedShards,
long tookInMillis,
PhaseTook phaseTook,
ShardSearchFailure[] shardFailures,
Clusters clusters,
String pointInTimeId
Expand All @@ -148,6 +162,7 @@ public SearchResponse(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.tookInMillis = tookInMillis;
this.phaseTook = phaseTook;
this.shardFailures = shardFailures;
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
assert scrollId == null || pointInTimeId == null : "SearchResponse can't have both scrollId ["
Expand Down Expand Up @@ -210,6 +225,13 @@ public TimeValue getTook() {
return new TimeValue(tookInMillis);
}

/**
* How long the request took in each search phase.
*/
public PhaseTook getPhaseTook() {
return phaseTook;
}

/**
* The total number of shards the search was executed on.
*/
Expand Down Expand Up @@ -298,6 +320,9 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field(POINT_IN_TIME_ID.getPreferredName(), pointInTimeId);
}
builder.field(TOOK.getPreferredName(), tookInMillis);
if (phaseTook.equals(PhaseTook.NULL) == false) {
phaseTook.toXContent(builder, params);
}
builder.field(TIMED_OUT.getPreferredName(), isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(TERMINATED_EARLY.getPreferredName(), isTerminatedEarly());
Expand Down Expand Up @@ -337,6 +362,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
Boolean terminatedEarly = null;
int numReducePhases = 1;
long tookInMillis = -1;
PhaseTook phaseTook = PhaseTook.NULL;
int successfulShards = -1;
int totalShards = -1;
int skippedShards = 0; // 0 for BWC
Expand Down Expand Up @@ -401,6 +427,35 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
parser.skipChildren();
}
}
} else if (PhaseTook.PHASE_TOOK.match(currentFieldName, parser.getDeprecationHandler())) {
long dfsPreQueryTotal = -1;
long canMatchTotal = -1;
long queryTotal = -1;
long fetchTotal = -1;
long expandSearchTotal = -1;

while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (PhaseTook.DFS_PREQUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
dfsPreQueryTotal = parser.longValue(); // we don't need it but need to consume it
} else if (PhaseTook.CAN_MATCH_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
canMatchTotal = parser.longValue();
} else if (PhaseTook.QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
queryTotal = parser.longValue();
} else if (PhaseTook.FETCH_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
fetchTotal = parser.longValue();
} else if (PhaseTook.EXPAND_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
expandSearchTotal = parser.longValue();
} else {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
phaseTook = new PhaseTook(dfsPreQueryTotal, canMatchTotal, queryTotal, fetchTotal, expandSearchTotal);
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
int successful = -1;
int total = -1;
Expand Down Expand Up @@ -472,6 +527,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
successfulShards,
skippedShards,
tookInMillis,
phaseTook,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY),
clusters,
searchContextId
Expand All @@ -491,6 +547,7 @@ public void writeTo(StreamOutput out) throws IOException {
clusters.writeTo(out);
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
phaseTook.writeTo(out);
out.writeVInt(skippedShards);
out.writeOptionalString(pointInTimeId);
}
Expand Down Expand Up @@ -604,6 +661,118 @@ public String toString() {
}
}

/**
* Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
* and how many of them were skipped.
*
* @opensearch.internal
*/
public static class PhaseTook implements ToXContentFragment, Writeable {
public static final PhaseTook NULL = new PhaseTook(-1, -1, -1, -1, -1);

static final ParseField PHASE_TOOK = new ParseField("phase_took");
static final ParseField DFS_PREQUERY_FIELD = new ParseField("dfs_prequery");
static final ParseField CAN_MATCH_FIELD = new ParseField("can_match");
static final ParseField QUERY_FIELD = new ParseField("query");
static final ParseField FETCH_FIELD = new ParseField("fetch");
static final ParseField EXPAND_FIELD = new ParseField("expand_search");

private final long dfsPreQueryTotal;
private final long canMatchTotal;
private final long queryTotal;
private final long fetchTotal;
private final long expandSearchTotal;

public PhaseTook(long dfsPreQueryTotal, long canMatchTotal, long queryTotal, long fetchTotal, long expandSearchTotal) {
this.dfsPreQueryTotal = dfsPreQueryTotal;
this.canMatchTotal = canMatchTotal;
this.queryTotal = queryTotal;
this.fetchTotal = fetchTotal;
this.expandSearchTotal = expandSearchTotal;
}

private PhaseTook(StreamInput in) throws IOException {
this(in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have version guards check around this change as it is a new feature. For now, use 3.0 version. We will eventually change it to 2.11 later on.

}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(dfsPreQueryTotal);
out.writeLong(canMatchTotal);
out.writeLong(queryTotal);
out.writeLong(fetchTotal);
out.writeLong(expandSearchTotal);
Comment on lines +700 to +704
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have version guards check around this. For now, use 3.0 version.

}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(PHASE_TOOK.getPreferredName());
builder.field(DFS_PREQUERY_FIELD.getPreferredName(), dfsPreQueryTotal);
builder.field(CAN_MATCH_FIELD.getPreferredName(), canMatchTotal);
builder.field(QUERY_FIELD.getPreferredName(), queryTotal);
builder.field(FETCH_FIELD.getPreferredName(), fetchTotal);
builder.field(EXPAND_FIELD.getPreferredName(), expandSearchTotal);
builder.endObject();
return builder;
}

/**
* Returns time spent in DFS Prequery phase during the execution of the search request
*/
public long getDfsPreQueryTotal() {
return dfsPreQueryTotal;
}

/**
* Returns time spent in canMatch phase during the execution of the search request
*/
public long getCanMatchTotal() {
return canMatchTotal;
}

/**
* Returns time spent in query phase during the execution of the search request
*/
public long getQueryTotal() {
return queryTotal;
}

/**
* Returns time spent in fetch phase during the execution of the search request
*/
public long getFetchTotal() {
return fetchTotal;
}

/**
* Returns time spent in expand phase during the execution of the search request
*/
public long getExpandSearchTotal() {
return expandSearchTotal;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PhaseTook phaseTook = (PhaseTook) o;
return dfsPreQueryTotal == phaseTook.dfsPreQueryTotal
&& queryTotal == phaseTook.queryTotal
&& canMatchTotal == phaseTook.canMatchTotal
&& fetchTotal == phaseTook.fetchTotal
&& expandSearchTotal == phaseTook.expandSearchTotal;
}

@Override
public int hashCode() {
return Objects.hash(dfsPreQueryTotal, queryTotal, canMatchTotal, fetchTotal, expandSearchTotal);
}
}

static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
Expand All @@ -622,6 +791,7 @@ static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters cluste
0,
0,
tookInMillisSupplier.get(),
PhaseTook.NULL,
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
searchTimeProvider.getPhaseTook(),
shardFailures,
clusters,
null
Expand Down
Loading