Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RRIO] [Throttle] [Cache] Implement Throttle and Cache using an external resource. #29401

Merged
merged 8 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Configures patch for ../base/configmap.yaml
# See https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/

- op: replace
path: /metadata/labels/quota-id
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
- op: replace
path: /data/QUOTA_ID
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
- op: replace
path: /data/QUOTA_SIZE
value: "10"
- op: replace
path: /data/QUOTA_REFRESH_INTERVAL
value: 1s
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Configures patch for ../base/deployment.yaml
# See https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/

- op: replace
path: /metadata/labels/quota-id
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
- op: replace
path: /spec/selector/matchLabels/quota-id
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
- op: replace
path: /spec/template/metadata/labels/quota-id
value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Configures the overlay for .test-infra/mock-apis/infrastructure/kubernetes/refresher/base
# Using the Quota Id:
# echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota

resources:
- ../../base

nameSuffix: -throttle-with-external-resource-test-10-per-1s

patches:
- path: configmap.yaml
target:
kind: ConfigMap
name: refresher

- path: deployment.yaml
target:
kind: Deployment
name: refresher
1 change: 1 addition & 0 deletions sdks/java/io/rrio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
testImplementation platform(library.java.google_cloud_platform_libraries_bom)
testImplementation library.java.google_http_client
testImplementation library.java.junit
testImplementation library.java.hamcrest
testImplementation library.java.testcontainers_base

testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponse;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/** Transforms for reading and writing request/response associations to a cache. */
final class Cache {

/**
* Instantiates a {@link Call} {@link PTransform} that reads {@link RequestT} {@link ResponseT}
* associations from a cache. The {@link KV} value is null when no association exists. This method
* does not enforce {@link Coder#verifyDeterministic} and defers to the user to determine whether
* to enforce this given the cache implementation.
*/
static <
RequestT,
@Nullable ResponseT,
CallerSetupTeardownT extends
Caller<RequestT, KV<RequestT, @Nullable ResponseT>> & SetupTeardown>
PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable ResponseT>>> read(
CallerSetupTeardownT implementsCallerSetupTeardown,
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder) {
return Call.ofCallerAndSetupTeardown(
implementsCallerSetupTeardown, KvCoder.of(requestTCoder, responseTCoder));
}

/**
* Instantiates a {@link Call} {@link PTransform}, calling {@link #read} with a {@link Caller}
* that employs a redis client.
*
* <p>This method requires both the {@link RequestT} and {@link ResponseT}s' {@link
* Coder#verifyDeterministic}. Otherwise, it throws a {@link NonDeterministicException}.
*
* <p><a href="https://redis.io">Redis</a> is designed for multiple workloads, simultaneously
* reading and writing to a shared instance. See <a
* href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more information on important
* considerations when using this method to achieve cache reads.
*/
static <RequestT, @Nullable ResponseT>
PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable ResponseT>>>
readUsingRedis(
RedisClient client,
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder)
throws NonDeterministicException {
return read(
new UsingRedis<>(requestTCoder, responseTCoder, client).read(),
requestTCoder,
responseTCoder);
}

/**
* Write a {@link RequestT} {@link ResponseT} association to a cache. This method does not enforce
* {@link Coder#verifyDeterministic} and defers to the user to determine whether to enforce this
* given the cache implementation.
*/
static <
RequestT,
ResponseT,
CallerSetupTeardownT extends
Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>> & SetupTeardown>
PTransform<PCollection<KV<RequestT, ResponseT>>, Call.Result<KV<RequestT, ResponseT>>> write(
CallerSetupTeardownT implementsCallerSetupTeardown,
KvCoder<RequestT, ResponseT> kvCoder) {
return Call.ofCallerAndSetupTeardown(implementsCallerSetupTeardown, kvCoder);
}

/**
* Instantiates a {@link Call} {@link PTransform}, calling {@link #write} with a {@link Caller}
* that employs a redis client.
*
* <p>This method requires both the {@link RequestT} and {@link ResponseT}s' {@link
* Coder#verifyDeterministic}. Otherwise, it throws a {@link NonDeterministicException}.
*
* <p><a href="https://redis.io">Redis</a> is designed for multiple workloads, simultaneously
* reading and writing to a shared instance. See <a
* href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more information on important
* considerations when using this method to achieve cache writes.
*/
static <RequestT, ResponseT>
PTransform<PCollection<KV<RequestT, ResponseT>>, Call.Result<KV<RequestT, ResponseT>>>
writeUsingRedis(
Duration expiry,
RedisClient client,
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder)
throws NonDeterministicException {
return write(
new UsingRedis<>(requestTCoder, responseTCoder, client).write(expiry),
KvCoder.of(requestTCoder, responseTCoder));
}

private static class UsingRedis<RequestT, ResponseT> {
private final Coder<RequestT> requestTCoder;
private final Coder<@Nullable ResponseT> responseTCoder;
private final RedisClient client;

private UsingRedis(
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder,
RedisClient client)
throws Coder.NonDeterministicException {
this.client = client;
requestTCoder.verifyDeterministic();
responseTCoder.verifyDeterministic();
this.requestTCoder = requestTCoder;
this.responseTCoder = responseTCoder;
}

private Read<RequestT, @Nullable ResponseT> read() {
return new Read<>(requestTCoder, responseTCoder, client);
}

private Write<RequestT, ResponseT> write(Duration expiry) {
return new Write<>(expiry, requestTCoder, responseTCoder, client);
}

/** Reads associated {@link RequestT} {@link ResponseT} using a {@link RedisClient}. */
private static class Read<RequestT, @Nullable ResponseT>
implements Caller<RequestT, KV<RequestT, @Nullable ResponseT>>, SetupTeardown {

private final Coder<RequestT> requestTCoder;
private final Coder<@Nullable ResponseT> responseTCoder;
private final RedisClient client;

private Read(
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder,
RedisClient client) {
this.requestTCoder = requestTCoder;
this.responseTCoder = responseTCoder;
this.client = client;
}

@Override
public KV<RequestT, @Nullable ResponseT> call(RequestT request)
throws UserCodeExecutionException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
requestTCoder.encode(request, baos);
byte[] encodedRequest = baos.toByteArray();
byte[] encodedResponse = client.getBytes(encodedRequest);
if (encodedResponse == null) {
return KV.of(request, null);
}
ResponseT response =
checkStateNotNull(
responseTCoder.decode(ByteSource.wrap(encodedResponse).openStream()));
return KV.of(request, response);
} catch (IllegalStateException | IOException e) {
throw new UserCodeExecutionException(e);
}
}

@Override
public void setup() throws UserCodeExecutionException {
client.setup();
}

@Override
public void teardown() throws UserCodeExecutionException {
client.teardown();
}
}
}

private static class Write<RequestT, ResponseT>
implements Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>>, SetupTeardown {
private final Duration expiry;
private final Coder<RequestT> requestTCoder;
private final Coder<@Nullable ResponseT> responseTCoder;
private final RedisClient client;

private Write(
Duration expiry,
Coder<RequestT> requestTCoder,
Coder<@Nullable ResponseT> responseTCoder,
RedisClient client) {
this.expiry = expiry;
this.requestTCoder = requestTCoder;
this.responseTCoder = responseTCoder;
this.client = client;
}

@Override
public KV<RequestT, ResponseT> call(KV<RequestT, ResponseT> request)
throws UserCodeExecutionException {
ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
try {
requestTCoder.encode(request.getKey(), keyStream);
responseTCoder.encode(request.getValue(), valueStream);
} catch (IOException e) {
throw new UserCodeExecutionException(e);
}
client.setex(keyStream.toByteArray(), valueStream.toByteArray(), expiry);
return request;
}

@Override
public void setup() throws UserCodeExecutionException {
client.setup();
}

@Override
public void teardown() throws UserCodeExecutionException {
client.teardown();
}
}
}
Loading
Loading