Skip to content

Commit d5b0454

Browse files
refactor: use parallelizable scheduler for explore + entity (#93)
1 parent 510a988 commit d5b0454

File tree

6 files changed

+40
-19
lines changed

6 files changed

+40
-19
lines changed

hypertrace-graphql-entity-schema/src/main/java/org/hypertrace/graphql/entity/dao/EntityDaoModule.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
import org.hypertrace.core.graphql.common.utils.BiConverter;
1616
import org.hypertrace.core.graphql.common.utils.Converter;
1717
import org.hypertrace.core.graphql.common.utils.TriConverter;
18-
import org.hypertrace.core.graphql.context.GraphQlRequestContextBuilder;
1918
import org.hypertrace.core.graphql.rx.BoundedIoScheduler;
2019
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
2120
import org.hypertrace.core.graphql.utils.grpc.GrpcChannelRegistry;
21+
import org.hypertrace.core.graphql.utils.grpc.GrpcContextBuilder;
2222
import org.hypertrace.gateway.service.v1.baseline.BaselineEntity;
2323
import org.hypertrace.gateway.service.v1.common.AggregatedMetricValue;
2424
import org.hypertrace.gateway.service.v1.common.Expression;
@@ -43,8 +43,8 @@ protected void configure() {
4343
bind(BaselineDao.class).to(GatewayBaselineDao.class);
4444
requireBinding(CallCredentials.class);
4545
requireBinding(GraphQlServiceConfig.class);
46-
requireBinding(GraphQlRequestContextBuilder.class);
4746
requireBinding(GrpcChannelRegistry.class);
47+
requireBinding(GrpcContextBuilder.class);
4848
requireBinding(Key.get(Scheduler.class, BoundedIoScheduler.class));
4949

5050
requireBinding(

hypertrace-graphql-entity-schema/src/main/java/org/hypertrace/graphql/entity/dao/GatewayBaselineDao.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static org.hypertrace.core.graphql.common.utils.CollectorUtils.flatten;
66

77
import io.grpc.CallCredentials;
8+
import io.reactivex.rxjava3.core.Scheduler;
89
import io.reactivex.rxjava3.core.Single;
910
import java.util.Collection;
1011
import java.util.Set;
@@ -13,9 +14,10 @@
1314
import javax.inject.Singleton;
1415
import org.hypertrace.core.graphql.common.utils.Converter;
1516
import org.hypertrace.core.graphql.context.GraphQlRequestContext;
17+
import org.hypertrace.core.graphql.rx.BoundedIoScheduler;
1618
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
17-
import org.hypertrace.core.graphql.utils.grpc.GraphQlGrpcContextBuilder;
1819
import org.hypertrace.core.graphql.utils.grpc.GrpcChannelRegistry;
20+
import org.hypertrace.core.graphql.utils.grpc.GrpcContextBuilder;
1921
import org.hypertrace.gateway.service.GatewayServiceGrpc;
2022
import org.hypertrace.gateway.service.v1.baseline.BaselineEntitiesRequest;
2123
import org.hypertrace.gateway.service.v1.baseline.BaselineEntitiesResponse;
@@ -34,9 +36,9 @@
3436

3537
@Singleton
3638
class GatewayBaselineDao implements BaselineDao {
37-
private static final int DEFAULT_DEADLINE_SEC = 10;
3839
private final GatewayServiceGrpc.GatewayServiceFutureStub gatewayServiceStub;
39-
private final GraphQlGrpcContextBuilder grpcContextBuilder;
40+
private final GrpcContextBuilder grpcContextBuilder;
41+
private final Scheduler boundedIoScheduler;
4042
private final Converter<Collection<MetricAggregationRequest>, Set<Expression>>
4143
aggregationConverter;
4244
private final Converter<Collection<MetricSeriesRequest>, Set<TimeAggregation>> seriesConverter;
@@ -47,10 +49,12 @@ class GatewayBaselineDao implements BaselineDao {
4749
GrpcChannelRegistry grpcChannelRegistry,
4850
GraphQlServiceConfig serviceConfig,
4951
CallCredentials credentials,
50-
GraphQlGrpcContextBuilder grpcContextBuilder,
52+
GrpcContextBuilder grpcContextBuilder,
53+
@BoundedIoScheduler Scheduler boundedIoScheduler,
5154
Converter<Collection<MetricAggregationRequest>, Set<Expression>> aggregationConverter,
5255
Converter<Collection<MetricSeriesRequest>, Set<TimeAggregation>> seriesConverter) {
5356
this.grpcContextBuilder = grpcContextBuilder;
57+
this.boundedIoScheduler = boundedIoScheduler;
5458
this.gatewayServiceStub =
5559
GatewayServiceGrpc.newFutureStub(
5660
grpcChannelRegistry.forAddress(
@@ -68,6 +72,7 @@ public Single<BaselineEntitiesResponse> getBaselines(
6872
EntitiesResponse entitiesResponse,
6973
EntityRequest request) {
7074
return this.buildRequest(entitiesRequest, entitiesResponse, request)
75+
.subscribeOn(this.boundedIoScheduler)
7176
.flatMap(baselineEntitiesRequest -> makeRequest(context, baselineEntitiesRequest));
7277
}
7378

@@ -126,7 +131,7 @@ private Single<BaselineEntitiesResponse> makeRequest(
126131
return Single.fromFuture(
127132
this.grpcContextBuilder
128133
.build(context)
129-
.callInContext(
134+
.call(
130135
() ->
131136
this.gatewayServiceStub
132137
.withDeadlineAfter(

hypertrace-graphql-entity-schema/src/main/java/org/hypertrace/graphql/entity/dao/GatewayServiceEntityDao.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import static java.util.concurrent.TimeUnit.MILLISECONDS;
44

55
import io.grpc.CallCredentials;
6+
import io.reactivex.rxjava3.core.Scheduler;
67
import io.reactivex.rxjava3.core.Single;
78
import javax.inject.Inject;
89
import javax.inject.Singleton;
910
import org.hypertrace.core.graphql.context.GraphQlRequestContext;
11+
import org.hypertrace.core.graphql.rx.BoundedIoScheduler;
1012
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
11-
import org.hypertrace.core.graphql.utils.grpc.GraphQlGrpcContextBuilder;
1213
import org.hypertrace.core.graphql.utils.grpc.GrpcChannelRegistry;
14+
import org.hypertrace.core.graphql.utils.grpc.GrpcContextBuilder;
1315
import org.hypertrace.gateway.service.GatewayServiceGrpc;
1416
import org.hypertrace.gateway.service.GatewayServiceGrpc.GatewayServiceFutureStub;
1517
import org.hypertrace.gateway.service.v1.entity.EntitiesRequest;
@@ -21,26 +23,29 @@
2123
@Singleton
2224
class GatewayServiceEntityDao implements EntityDao {
2325
private final GatewayServiceFutureStub gatewayServiceStub;
24-
private final GraphQlGrpcContextBuilder grpcContextBuilder;
26+
private final GrpcContextBuilder grpcContextBuilder;
2527
private final GatewayServiceEntityRequestBuilder requestBuilder;
2628
private final GatewayServiceEntityConverter entityConverter;
2729
private final BaselineDao baselineDao;
2830
private final GraphQlServiceConfig serviceConfig;
31+
private final Scheduler boundedIoScheduler;
2932

3033
@Inject
3134
GatewayServiceEntityDao(
3235
GraphQlServiceConfig serviceConfig,
3336
CallCredentials credentials,
34-
GraphQlGrpcContextBuilder grpcContextBuilder,
37+
GrpcContextBuilder grpcContextBuilder,
3538
GrpcChannelRegistry grpcChannelRegistry,
3639
GatewayServiceEntityRequestBuilder requestBuilder,
3740
GatewayServiceEntityConverter entityConverter,
38-
BaselineDao baselineDao) {
41+
BaselineDao baselineDao,
42+
@BoundedIoScheduler Scheduler boundedIoScheduler) {
3943
this.grpcContextBuilder = grpcContextBuilder;
4044
this.requestBuilder = requestBuilder;
4145
this.entityConverter = entityConverter;
4246
this.baselineDao = baselineDao;
4347
this.serviceConfig = serviceConfig;
48+
this.boundedIoScheduler = boundedIoScheduler;
4449

4550
this.gatewayServiceStub =
4651
GatewayServiceGrpc.newFutureStub(
@@ -53,6 +58,7 @@ class GatewayServiceEntityDao implements EntityDao {
5358
public Single<EntityResultSet> getEntities(EntityRequest request) {
5459
return this.requestBuilder
5560
.buildRequest(request)
61+
.subscribeOn(this.boundedIoScheduler)
5662
.flatMap(
5763
serverRequest ->
5864
this.makeRequest(request.resultSetRequest().context(), serverRequest)
@@ -75,7 +81,7 @@ private Single<EntitiesResponse> makeRequest(
7581
return Single.fromFuture(
7682
this.grpcContextBuilder
7783
.build(context)
78-
.callInContext(
84+
.call(
7985
() ->
8086
this.gatewayServiceStub
8187
.withDeadlineAfter(

hypertrace-graphql-explorer-schema/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies {
2525
implementation("org.hypertrace.core.graphql:hypertrace-core-graphql-attribute-store")
2626
implementation("org.hypertrace.core.graphql:hypertrace-core-graphql-deserialization")
2727
implementation("org.hypertrace.core.graphql:hypertrace-core-graphql-schema-utils")
28+
implementation("org.hypertrace.core.graphql:hypertrace-core-graphql-rx-utils")
2829

2930
testImplementation("org.junit.jupiter:junit-jupiter")
3031
testImplementation("org.mockito:mockito-core")

hypertrace-graphql-explorer-schema/src/main/java/org/hypertrace/graphql/explorer/dao/ExplorerDaoModule.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.google.inject.Key;
55
import com.google.inject.TypeLiteral;
66
import io.grpc.CallCredentials;
7+
import io.reactivex.rxjava3.core.Scheduler;
78
import java.util.Collection;
89
import java.util.List;
910
import java.util.Set;
@@ -15,9 +16,10 @@
1516
import org.hypertrace.core.graphql.common.schema.attributes.MetricAggregationType;
1617
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument;
1718
import org.hypertrace.core.graphql.common.utils.Converter;
18-
import org.hypertrace.core.graphql.context.GraphQlRequestContextBuilder;
19+
import org.hypertrace.core.graphql.rx.BoundedIoScheduler;
1920
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
2021
import org.hypertrace.core.graphql.utils.grpc.GrpcChannelRegistry;
22+
import org.hypertrace.core.graphql.utils.grpc.GrpcContextBuilder;
2123
import org.hypertrace.gateway.service.v1.common.Expression;
2224
import org.hypertrace.gateway.service.v1.common.Filter;
2325
import org.hypertrace.gateway.service.v1.common.OrderByExpression;
@@ -34,8 +36,9 @@ protected void configure() {
3436
bind(ExplorerDao.class).to(GatewayServiceExplorerDao.class);
3537
requireBinding(CallCredentials.class);
3638
requireBinding(GraphQlServiceConfig.class);
37-
requireBinding(GraphQlRequestContextBuilder.class);
3839
requireBinding(GrpcChannelRegistry.class);
40+
requireBinding(GrpcContextBuilder.class);
41+
requireBinding(Key.get(Scheduler.class, BoundedIoScheduler.class));
3942
requireBinding(Key.get(new TypeLiteral<Converter<Value, Object>>() {}));
4043
requireBinding(Key.get(new TypeLiteral<Converter<AttributeModelType, AttributeType>>() {}));
4144
requireBinding(

hypertrace-graphql-explorer-schema/src/main/java/org/hypertrace/graphql/explorer/dao/GatewayServiceExplorerDao.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import static java.util.concurrent.TimeUnit.MILLISECONDS;
44

55
import io.grpc.CallCredentials;
6+
import io.reactivex.rxjava3.core.Scheduler;
67
import io.reactivex.rxjava3.core.Single;
78
import javax.inject.Inject;
89
import javax.inject.Singleton;
910
import org.hypertrace.core.graphql.context.GraphQlRequestContext;
11+
import org.hypertrace.core.graphql.rx.BoundedIoScheduler;
1012
import org.hypertrace.core.graphql.spi.config.GraphQlServiceConfig;
11-
import org.hypertrace.core.graphql.utils.grpc.GraphQlGrpcContextBuilder;
1213
import org.hypertrace.core.graphql.utils.grpc.GrpcChannelRegistry;
14+
import org.hypertrace.core.graphql.utils.grpc.GrpcContextBuilder;
1315
import org.hypertrace.gateway.service.GatewayServiceGrpc;
1416
import org.hypertrace.gateway.service.GatewayServiceGrpc.GatewayServiceFutureStub;
1517
import org.hypertrace.gateway.service.v1.explore.ExploreResponse;
@@ -19,23 +21,26 @@
1921
@Singleton
2022
class GatewayServiceExplorerDao implements ExplorerDao {
2123
private final GatewayServiceFutureStub gatewayServiceStub;
22-
private final GraphQlGrpcContextBuilder grpcContextBuilder;
24+
private final GrpcContextBuilder grpcContextBuilder;
2325
private final GatewayServiceExploreRequestBuilder requestBuilder;
2426
private final GatewayServiceExploreResponseConverter responseConverter;
2527
private final GraphQlServiceConfig serviceConfig;
28+
private final Scheduler boundedIoScheduler;
2629

2730
@Inject
2831
GatewayServiceExplorerDao(
2932
GraphQlServiceConfig serviceConfig,
3033
CallCredentials credentials,
31-
GraphQlGrpcContextBuilder grpcContextBuilder,
34+
GrpcContextBuilder grpcContextBuilder,
3235
GrpcChannelRegistry grpcChannelRegistry,
3336
GatewayServiceExploreRequestBuilder requestBuilder,
34-
GatewayServiceExploreResponseConverter responseConverter) {
37+
GatewayServiceExploreResponseConverter responseConverter,
38+
@BoundedIoScheduler Scheduler boundedIoScheduler) {
3539
this.grpcContextBuilder = grpcContextBuilder;
3640
this.requestBuilder = requestBuilder;
3741
this.responseConverter = responseConverter;
3842
this.serviceConfig = serviceConfig;
43+
this.boundedIoScheduler = boundedIoScheduler;
3944

4045
this.gatewayServiceStub =
4146
GatewayServiceGrpc.newFutureStub(
@@ -48,6 +53,7 @@ class GatewayServiceExplorerDao implements ExplorerDao {
4853
public Single<ExploreResultSet> explore(ExploreRequest request) {
4954
return this.requestBuilder
5055
.buildRequest(request)
56+
.subscribeOn(this.boundedIoScheduler)
5157
.flatMap(serverRequest -> this.makeRequest(request.requestContext(), serverRequest))
5258
.flatMap(serverResponse -> this.responseConverter.convert(request, serverResponse));
5359
}
@@ -58,7 +64,7 @@ private Single<ExploreResponse> makeRequest(
5864
return Single.fromFuture(
5965
this.grpcContextBuilder
6066
.build(context)
61-
.callInContext(
67+
.call(
6268
() ->
6369
this.gatewayServiceStub
6470
.withDeadlineAfter(

0 commit comments

Comments
 (0)