diff --git a/bin/cleanup.sh b/bin/cleanup.sh index 98b7f837..6239bfe8 100755 --- a/bin/cleanup.sh +++ b/bin/cleanup.sh @@ -19,8 +19,7 @@ display_usage() { cat </dev/stderr Performs full TD and K8S resource cleanup -USAGE: $0 [--nosecure] [arguments] - --nosecure: Skip cleanup for the resources specific for PSM Security +USAGE: $0 [arguments] arguments ...: additional arguments passed to ./run.sh ENVIRONMENT: @@ -31,7 +30,7 @@ ENVIRONMENT: Default: $XDS_K8S_DRIVER_DIR/venv EXAMPLES: $0 -$0 --nosecure +$0 --mode=app_net XDS_K8S_CONFIG=./path-to-flagfile.cfg $0 --resource_suffix=override-suffix EOF exit 1 @@ -47,13 +46,6 @@ readonly XDS_K8S_DRIVER_DIR="${SCRIPT_DIR}/.." cd "${XDS_K8S_DRIVER_DIR}" -if [[ "$1" == "--nosecure" ]]; then - shift - ./run.sh bin/run_td_setup.py --cmd=cleanup "$@" && \ - ./run.sh bin/run_test_client.py --cmd=cleanup --cleanup_namespace "$@" && \ - ./run.sh bin/run_test_server.py --cmd=cleanup --cleanup_namespace "$@" -else - ./run.sh bin/run_td_setup.py --cmd=cleanup --security=mtls "$@" && \ - ./run.sh bin/run_test_client.py --cmd=cleanup --cleanup_namespace --mode=secure "$@" && \ - ./run.sh bin/run_test_server.py --cmd=cleanup --cleanup_namespace --mode=secure "$@" -fi +./run.sh bin/run_td_setup.py --cmd=cleanup "$@" && \ +./run.sh bin/run_test_client.py --cmd=cleanup --cleanup_namespace "$@" && \ +./run.sh bin/run_test_server.py --cmd=cleanup --cleanup_namespace "$@" diff --git a/bin/cleanup_cluster.sh b/bin/cleanup_cluster.sh index 5755a467..839ffdfa 100755 --- a/bin/cleanup_cluster.sh +++ b/bin/cleanup_cluster.sh @@ -21,12 +21,12 @@ readonly XDS_K8S_DRIVER_DIR="${SCRIPT_DIR}/.." cd "${XDS_K8S_DRIVER_DIR}" -NO_SECURE="yes" +MODE="" DATE_TO=$(date -Iseconds) while [[ $# -gt 0 ]]; do case $1 in - --secure) NO_SECURE=""; shift ;; + --mode=*) MODE="${1#*=}"; shift ;; --date_to=*) DATE_TO="${1#*=}T00:00:00Z"; shift ;; *) echo "Unknown argument $1"; exit 1 ;; esac @@ -68,7 +68,7 @@ echo "Count: ${#namespaces[@]}" echo "Run plan:" for suffix in "${suffixes[@]}"; do - echo ./bin/cleanup.sh ${NO_SECURE:+"--nosecure"} "--resource_suffix=${suffix}" + echo ./bin/cleanup.sh ${MODE:+"--mode=$MODE"} "--resource_suffix=${suffix}" done read -r -n 1 -p "Continue? (y/N) " answer @@ -85,7 +85,7 @@ failed=0 for suffix in "${suffixes[@]}"; do echo "-------------------- Cleaning suffix ${suffix} --------------------" set -x - ./bin/cleanup.sh ${NO_SECURE:+"--nosecure"} "--resource_suffix=${suffix}" || (( ++failed )) + ./bin/cleanup.sh ${MODE:+"--mode=$MODE"} "--resource_suffix=${suffix}" || (( ++failed )) set +x echo "-------------------- Finished cleaning ${suffix} --------------------" done diff --git a/bin/lib/common.py b/bin/lib/common.py index ebe6681e..a3d59058 100644 --- a/bin/lib/common.py +++ b/bin/lib/common.py @@ -33,6 +33,30 @@ logger = logging.get_absl_logger() # TODO(sergiitk): move common flags/validations here: mode, security, etc +MODE = flags.DEFINE_enum( + "mode", + default="default", + enum_values=[ + "default", + "secure", + "app_net", + "rlqs", + "gamma", + ], + help="Select server mode", +) +SECURITY = flags.DEFINE_enum( + "security", + default=None, + enum_values=[ + "mtls", + "tls", + "plaintext", + "mtls_error", + "server_authz_error", + ], + help="Configure TD with security", +) SERVER_REPLICA_COUNT = flags.DEFINE_integer( "server_replica_count", default=1, @@ -40,6 +64,36 @@ upper_bound=999, help="The number server replicas to run.", ) +ROUTE_KIND_GAMMA = flags.DEFINE_enum_class( + "gamma_route_kind", + default=k8s.RouteKind.HTTP, + enum_class=k8s.RouteKind, + help="When --mode=gamma, select the kind of a gamma route the server uses", +) + +# Running outside of a test suite, so require explicit resource_suffix. +flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX) + +# Require --security when --mode=secure. +flags.register_multi_flags_validator( + (MODE, SECURITY), + lambda values: values[MODE.name] != "secure" or values[SECURITY.name], + "When --mode=secure; --security flag is required", +) + + +@flags.multi_flags_validator( + (xds_flags.SERVER_XDS_PORT.name, MODE.name), + message=( + "Run outside of a test suite, must provide" + " the exact port value (must be greater than 0)." + ), +) +def _check_server_xds_port_flag(flags_dict): + if flags_dict[MODE.name] == "gamma": + return True + return flags_dict[xds_flags.SERVER_XDS_PORT.name] > 0 + # Type aliases KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner @@ -151,7 +205,11 @@ def make_server_runner( f"{xds_flags.RESOURCE_PREFIX.value}-" f"{xds_flags.RESOURCE_SUFFIX.value}" ) + runner_kwargs["route_kind"] = ROUTE_KIND_GAMMA.value server_runner = GammaServerRunner + elif mode == "rlqs": + depl_args = k8s_xds_server_runner.ServerDeploymentArgs(enable_rlqs=True) + runner_kwargs["deployment_args"] = depl_args return server_runner(namespace, **runner_kwargs) diff --git a/bin/run_channelz.py b/bin/run_channelz.py index 3b1368ef..166de1bd 100644 --- a/bin/run_channelz.py +++ b/bin/run_channelz.py @@ -49,47 +49,9 @@ from framework.test_app import server_app # Flags -_MODE = flags.DEFINE_enum( - "mode", - default="default", - enum_values=[ - "default", - "secure", - "app_net", - "gamma", - ], - help="Select test mode", -) -_SECURITY = flags.DEFINE_enum( - "security", - default=None, - enum_values=[ - "mtls", - "tls", - "plaintext", - "mtls_error", - "server_authz_error", - ], - help="Show info for a security setup", -) flags.adopt_module_key_flags(common) flags.adopt_module_key_flags(xds_flags) flags.adopt_module_key_flags(xds_k8s_flags) -# Running outside of a test suite, so require explicit resource_suffix. -flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name) - - -@flags.multi_flags_validator( - (xds_flags.SERVER_XDS_PORT.name, _MODE.name), - message=( - "Run outside of a test suite, must provide" - " the exact port value (must be greater than 0)." - ), -) -def _check_server_xds_port_flag(flags_dict): - if flags_dict[_MODE.name] == "gamma": - return True - return flags_dict[xds_flags.SERVER_XDS_PORT.name] > 0 logger = logging.get_absl_logger() @@ -244,7 +206,11 @@ def main(argv): enable_workload_identity: bool = ( xds_k8s_flags.ENABLE_WORKLOAD_IDENTITY.value ) - is_secure: bool = bool(_SECURITY.value) + is_secure: bool = bool(common.SECURITY.value) + security_mode = common.SECURITY.value + if security_mode: + flags.set_default(common.MODE, "secure") + mode = common.MODE.value # Server. server_namespace = common.make_server_namespace() @@ -252,7 +218,7 @@ def main(argv): server_namespace, port_forwarding=should_port_forward, enable_workload_identity=enable_workload_identity, - mode=_MODE.value, + mode=mode, ) # Find server pod. server_pods = common.get_server_pods( @@ -270,7 +236,7 @@ def main(argv): client_namespace, port_forwarding=should_port_forward, enable_workload_identity=enable_workload_identity, - mode=_MODE.value, + mode=mode, ) # Find client pod. client_pod: k8s.V1Pod = common.get_client_pod( @@ -292,7 +258,7 @@ def main(argv): ) # Create client app for the client pod. - if _MODE.value == "gamma": + if mode == "gamma": server_target = ( f"xds:///{server_runner.frontend_service_name}" f".{server_runner.k8s_namespace.name}.svc.cluster.local" @@ -309,9 +275,9 @@ def main(argv): ) with test_client, test_server: - if _SECURITY.value in ("mtls", "tls", "plaintext"): + if security_mode in ("mtls", "tls", "plaintext"): debug_security_setup_positive(test_client, test_server) - elif _SECURITY.value in ("mtls_error", "server_authz_error"): + elif security_mode in ("mtls_error", "server_authz_error"): debug_security_setup_negative(test_client) else: debug_basic_setup(test_client, test_server) diff --git a/bin/run_ping_pong.py b/bin/run_ping_pong.py index 67cae67a..960dedc9 100644 --- a/bin/run_ping_pong.py +++ b/bin/run_ping_pong.py @@ -44,17 +44,6 @@ from framework.test_app import server_app # Flags -_MODE = flags.DEFINE_enum( - "mode", - default="default", - enum_values=[ - "default", - "secure", - "app_net", - "gamma", - ], - help="Select a deployment of the client/server", -) _NUM_RPCS = flags.DEFINE_integer( "num_rpcs", default=100, @@ -65,22 +54,6 @@ flags.adopt_module_key_flags(common) flags.adopt_module_key_flags(xds_flags) flags.adopt_module_key_flags(xds_k8s_flags) -# Running outside of a test suite, so require explicit resource_suffix. -flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name) - - -@flags.multi_flags_validator( - (xds_flags.SERVER_XDS_PORT.name, _MODE.name), - message=( - "Run outside of a test suite, must provide" - " the exact port value (must be greater than 0)." - ), -) -def _check_server_xds_port_flag(flags_dict): - if flags_dict[_MODE.name] == "gamma": - return True - return flags_dict[xds_flags.SERVER_XDS_PORT.name] > 0 - logger = logging.get_absl_logger() @@ -141,7 +114,7 @@ def main(argv): common.make_server_namespace(), port_forwarding=should_port_forward, enable_workload_identity=enable_workload_identity, - mode=_MODE.value, + mode=common.MODE.value, ) # Ensure server pods are running common.get_server_pods(server_runner, xds_flags.SERVER_NAME.value) @@ -151,7 +124,7 @@ def main(argv): common.make_client_namespace(), port_forwarding=should_port_forward, enable_workload_identity=enable_workload_identity, - mode=_MODE.value, + mode=common.MODE.value, ) # Find client pod. client_pod: k8s.V1Pod = common.get_client_pod( @@ -162,7 +135,7 @@ def main(argv): common.register_graceful_exit(server_runner, client_runner) # Create client app for the client pod. - if _MODE.value == "gamma": + if common.MODE.value == "gamma": server_target = ( f"xds:///{server_runner.frontend_service_name}" f".{server_runner.k8s_namespace.name}.svc.cluster.local" diff --git a/bin/run_td_setup.py b/bin/run_td_setup.py index 6f14be91..bd84cc4c 100644 --- a/bin/run_td_setup.py +++ b/bin/run_td_setup.py @@ -70,41 +70,10 @@ ], help="Command", ) -_MODE = flags.DEFINE_enum( - "mode", - default="default", - enum_values=[ - "default", - "secure", - "app_net", - "gamma", - ], - help="Select setup mode", -) -_SECURITY = flags.DEFINE_enum( - "security", - default=None, - enum_values=[ - "mtls", - "tls", - "plaintext", - "mtls_error", - "server_authz_error", - ], - help="Configure TD with security", -) flags.adopt_module_key_flags(xds_flags) flags.adopt_module_key_flags(xds_k8s_flags) # Flag validations. -# Running outside of a test suite, so require explicit resource_suffix. -flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name) -# Require --security when --mode=secure. -flags.register_multi_flags_validator( - (_MODE, _SECURITY), - lambda values: values[_MODE.name] != "secure" or values[_SECURITY.name], - "When --mode=secure; --security flag is required", -) @flags.multi_flags_validator( @@ -234,7 +203,7 @@ def _setup_td_secure( mtls=False, ) td.setup_client_security( - server_namespace=(f"incorrect-namespace-{rand.rand_string()}"), + server_namespace=f"incorrect-namespace-{rand.rand_string()}", server_name=server_name, tls=True, mtls=False, @@ -253,6 +222,26 @@ def _setup_td_appnet( td.create_grpc_route(server_xds_host, server_xds_port) +def _setup_td_rlqs( + *, + td: traffic_director.TrafficDirectorAppNetManager, + server_xds_host, + server_xds_port, + server_namespace, + server_name, + server_port, +): + td.create_health_check() + td.create_backend_service() + td.create_mesh() + td.create_grpc_route(server_xds_host, server_xds_port) + td.create_endpoint_policy( + server_namespace=server_namespace, + server_name=server_name, + server_port=server_port, + ) + + def _cmd_backends_add(td, server_name, server_namespace, server_port): logger.info("Adding backends") k8s_api_manager = k8s.KubernetesApiManager(xds_k8s_flags.KUBE_CONTEXT.value) @@ -292,11 +281,11 @@ def main( # Flags. command = _CMD.value - security_mode = _SECURITY.value + security_mode = common.SECURITY.value if security_mode: - flags.set_default(_MODE, "secure") + flags.set_default(common.MODE, "secure") - mode = _MODE.value + mode = common.MODE.value # Short circuit for gamma node. if mode == "gamma": @@ -320,6 +309,10 @@ def main( td_attrs = common.td_attrs() if mode == "app_net": td = traffic_director.TrafficDirectorAppNetManager(**td_attrs) + elif mode == "rlqs": + td_attrs["netsvc_class"] = traffic_director.NetworkServicesV1Alpha1 + td_attrs["compute_api_version"] = "v1alpha" + td = traffic_director.TrafficDirectorAppNetManager(**td_attrs) elif mode == "secure": td = traffic_director.TrafficDirectorSecureManager(**td_attrs) if server_maintenance_port is None: @@ -338,6 +331,15 @@ def main( server_xds_host=server_xds_host, server_xds_port=server_xds_port, ) + elif mode == "rlqs": + _setup_td_rlqs( + td=td, + server_xds_host=server_xds_host, + server_xds_port=server_xds_port, + server_namespace=server_namespace_name, + server_name=server_name, + server_port=server_port, + ) elif mode == "secure": _setup_td_secure( security_mode, diff --git a/bin/run_test_client.py b/bin/run_test_client.py index 44299804..9791c72b 100644 --- a/bin/run_test_client.py +++ b/bin/run_test_client.py @@ -45,17 +45,6 @@ _CMD = flags.DEFINE_enum( "cmd", default="run", enum_values=["run", "cleanup"], help="Command" ) -_MODE = flags.DEFINE_enum( - "mode", - default="default", - enum_values=[ - "default", - "secure", - "app_net", - "gamma", - ], - help="Select client mode", -) _QPS = flags.DEFINE_integer("qps", default=25, help="Queries per second") _PRINT_RESPONSE = flags.DEFINE_bool( "print_response", default=False, help="Client prints responses" @@ -78,21 +67,6 @@ ) flags.adopt_module_key_flags(xds_flags) flags.adopt_module_key_flags(xds_k8s_flags) -# Running outside of a test suite, so require explicit resource_suffix. -flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX.name) - - -@flags.multi_flags_validator( - (xds_flags.SERVER_XDS_PORT, _CMD, _MODE), - message=( - "Run outside of a test suite, must provide" - " the exact port value (must be greater than 0)." - ), -) -def _check_server_xds_port_flag(flags_dict): - if flags_dict[_MODE.name] == "gamma" or flags_dict[_CMD.name] == "cleanup": - return True - return flags_dict[xds_flags.SERVER_XDS_PORT.name] > 0 def _make_sigint_handler(client_runner: common.KubernetesClientRunner): @@ -116,7 +90,7 @@ def _get_run_kwargs(mode: str): if mode == "secure": run_kwargs["secure_mode"] = True - elif mode == "app_net": + elif mode in ("app_net", "rlqs"): # Minimal appnet td setup so it's possible to generate config mesh name td = traffic_director.TrafficDirectorAppNetManager(**common.td_attrs()) run_kwargs["config_mesh"] = td.make_resource_name(td.MESH_NAME) @@ -155,7 +129,7 @@ def main(argv): xds_flags.set_socket_default_timeout_from_flag() # Flags. - mode: str = _MODE.value + mode: str = common.MODE.value command: str = _CMD.value # Flags: log following and port forwarding. should_follow_logs = _FOLLOW.value and xds_flags.COLLECT_APP_LOGS.value diff --git a/bin/run_test_server.py b/bin/run_test_server.py index e19ab497..46d9ed61 100644 --- a/bin/run_test_server.py +++ b/bin/run_test_server.py @@ -45,29 +45,13 @@ from framework import xds_flags from framework import xds_k8s_flags from framework.infrastructure import k8s +from framework.infrastructure import traffic_director logger = logging.getLogger(__name__) # Flags _CMD = flags.DEFINE_enum( "cmd", default="run", enum_values=["run", "cleanup"], help="Command" ) -_MODE = flags.DEFINE_enum( - "mode", - default="default", - enum_values=[ - "default", - "secure", - "app_net", - "gamma", - ], - help="Select server mode", -) -_ROUTE_KIND_GAMMA = flags.DEFINE_enum_class( - "gamma_route_kind", - default=k8s.RouteKind.HTTP, - enum_class=k8s.RouteKind, - help="When --mode=gamma, select the kind of a gamma route to create", -) _REUSE_NAMESPACE = flags.DEFINE_bool( "reuse_namespace", default=True, help="Use existing namespace if exists" ) @@ -85,8 +69,6 @@ flags.adopt_module_key_flags(xds_flags) flags.adopt_module_key_flags(xds_k8s_flags) flags.adopt_module_key_flags(common) -# Running outside of a test suite, so require explicit resource_suffix. -flags.mark_flag_as_required(xds_flags.RESOURCE_SUFFIX) def _make_sigint_handler(server_runner: common.KubernetesServerRunner): @@ -109,10 +91,14 @@ def _get_run_kwargs(mode: str): run_kwargs["secure_mode"] = True elif mode == "gamma": run_kwargs["generate_mesh_id"] = True - if _ROUTE_KIND_GAMMA.value is k8s.RouteKind.HTTP: + if common.ROUTE_KIND_GAMMA.value is k8s.RouteKind.HTTP: run_kwargs["route_template"] = "gamma/route_http.yaml" - elif _ROUTE_KIND_GAMMA.value is k8s.RouteKind.GRPC: + elif common.ROUTE_KIND_GAMMA.value is k8s.RouteKind.GRPC: run_kwargs["route_template"] = "gamma/route_grpc.yaml" + elif mode == "rlqs": + # Minimal appnet td setup so it's possible to generate config mesh name + td = traffic_director.TrafficDirectorAppNetManager(**common.td_attrs()) + run_kwargs["config_mesh"] = td.make_resource_name(td.MESH_NAME) return run_kwargs @@ -126,7 +112,7 @@ def main(argv): # Flags. command: str = _CMD.value - mode: str = _MODE.value + mode: str = common.MODE.value # Flags: log following and port forwarding. should_follow_logs = _FOLLOW.value and xds_flags.COLLECT_APP_LOGS.value should_port_forward = ( diff --git a/framework/infrastructure/gcp/network_services.py b/framework/infrastructure/gcp/network_services.py index 86d4a97a..eb2328a9 100644 --- a/framework/infrastructure/gcp/network_services.py +++ b/framework/infrastructure/gcp/network_services.py @@ -356,36 +356,6 @@ def _operation_internal_error(exception): ) -class NetworkServicesV1Beta1(_NetworkServicesBase): - """NetworkServices API v1beta1.""" - - ENDPOINT_POLICIES = "endpointPolicies" - - @property - def api_version(self) -> str: - return "v1beta1" - - def create_endpoint_policy(self, name, body: dict) -> GcpResource: - return self._create_resource( - collection=self._api_locations.endpointPolicies(), - body=body, - endpointPolicyId=name, - ) - - def get_endpoint_policy(self, name: str) -> EndpointPolicy: - response = self._get_resource( - collection=self._api_locations.endpointPolicies(), - full_name=self.resource_full_name(name, self.ENDPOINT_POLICIES), - ) - return EndpointPolicy.from_response(name, response) - - def delete_endpoint_policy(self, name: str) -> bool: - return self._delete_resource( - collection=self._api_locations.endpointPolicies(), - full_name=self.resource_full_name(name, self.ENDPOINT_POLICIES), - ) - - class NetworkServicesV1(_NetworkServicesBase): """NetworkServices API v1.""" @@ -475,3 +445,11 @@ def delete_http_route(self, name: str) -> bool: collection=self._api_locations.httpRoutes(), full_name=self.resource_full_name(name, self.HTTP_ROUTES), ) + + +class NetworkServicesV1Alpha1(NetworkServicesV1): + """NetworkServices API v1alpha1.""" + + @property + def api_version(self) -> str: + return "v1alpha1" diff --git a/framework/infrastructure/traffic_director.py b/framework/infrastructure/traffic_director.py index 0004ad9d..7ebb45d7 100644 --- a/framework/infrastructure/traffic_director.py +++ b/framework/infrastructure/traffic_director.py @@ -43,7 +43,10 @@ AuthorizationPolicy = gcp.network_security.AuthorizationPolicy # Network Services -_NetworkServicesV1Beta1 = gcp.network_services.NetworkServicesV1Beta1 +NetworkServicesV1: TypeAlias = gcp.network_services.NetworkServicesV1 +NetworkServicesV1Alpha1: TypeAlias = ( + gcp.network_services.NetworkServicesV1Alpha1 +) EndpointPolicy = gcp.network_services.EndpointPolicy GrpcRoute = gcp.network_services.GrpcRoute HttpRoute = gcp.network_services.HttpRoute @@ -907,9 +910,16 @@ class TrafficDirectorAppNetManager(TrafficDirectorManager): GRPC_ROUTE_NAME = "grpc-route" HTTP_ROUTE_NAME = "http-route" MESH_NAME = "mesh" + ENDPOINT_POLICY = "endpoint-policy" netsvc: gcp.network_services.NetworkServicesV1 + # Managed resources + grpc_route: Optional[GrpcRoute] = None + http_route: Optional[HttpRoute] = None + mesh: Optional[Mesh] = None + endpoint_policy: Optional[EndpointPolicy] = None + def __init__( self, gcp_api_manager: gcp.api.GcpApiManager, @@ -920,6 +930,7 @@ def __init__( network: str = "default", compute_api_version: str = "v1", enable_dualstack: bool = False, + netsvc_class: type[NetworkServicesV1] = NetworkServicesV1, ): super().__init__( gcp_api_manager, @@ -932,20 +943,19 @@ def __init__( ) # API - self.netsvc = gcp.network_services.NetworkServicesV1( - gcp_api_manager, project - ) + self.netsvc = netsvc_class(gcp_api_manager, project) # Managed resources # TODO(gnossen) PTAL at the pylint error - self.grpc_route: Optional[GrpcRoute] = None - self.http_route: Optional[HttpRoute] = None - self.mesh: Optional[Mesh] = None + self.grpc_route = None + self.http_route = None + self.mesh = None + self.endpoint_policy = None def create_mesh(self) -> GcpResource: name = self.make_resource_name(self.MESH_NAME) logger.info("Creating Mesh %s", name) - body = {} + body = {"name": name} resource = self.netsvc.create_mesh(name, body) self.mesh = self.netsvc.get_mesh(name) logger.debug("Loaded Mesh: %s", self.mesh) @@ -1019,14 +1029,54 @@ def delete_http_route(self, force=False): self.netsvc.delete_http_route(name) self.http_route = None + def create_endpoint_policy( + self, *, server_namespace: str, server_name: str, server_port: int + ) -> None: + name = self.make_resource_name(self.ENDPOINT_POLICY) + logger.info("Creating Endpoint Policy %s", name) + endpoint_matcher_labels = [ + { + "labelName": "app", + "labelValue": f"{server_namespace}-{server_name}", + } + ] + port_selector = {"ports": [str(server_port)]} + label_matcher_all = { + "metadataLabelMatchCriteria": "MATCH_ALL", + "metadataLabels": endpoint_matcher_labels, + } + config = { + "type": "GRPC_SERVER", + "trafficPortSelector": port_selector, + "endpointMatcher": { + "metadataLabelMatcher": label_matcher_all, + }, + } + + self.netsvc.create_endpoint_policy(name, config) + self.endpoint_policy = self.netsvc.get_endpoint_policy(name) + logger.debug("Loaded Endpoint Policy: %r", self.endpoint_policy) + + def delete_endpoint_policy(self, force: bool = False) -> None: + if force: + name = self.make_resource_name(self.ENDPOINT_POLICY) + elif self.endpoint_policy: + name = self.endpoint_policy.name + else: + return + logger.info("Deleting Endpoint Policy %s", name) + self.netsvc.delete_endpoint_policy(name) + self.endpoint_policy = None + def cleanup(self, *, force=False): self.delete_http_route(force=force) self.delete_grpc_route(force=force) self.delete_mesh(force=force) + self.delete_endpoint_policy(force=force) super().cleanup(force=force) -class TrafficDirectorSecureManager(TrafficDirectorManager): +class TrafficDirectorSecureManager(TrafficDirectorAppNetManager): SERVER_TLS_POLICY_NAME = "server-tls-policy" CLIENT_TLS_POLICY_NAME = "client-tls-policy" AUTHZ_POLICY_NAME = "authz-policy" @@ -1034,7 +1084,7 @@ class TrafficDirectorSecureManager(TrafficDirectorManager): CERTIFICATE_PROVIDER_INSTANCE = "google_cloud_private_spiffe" netsec: _NetworkSecurityV1Beta1 - netsvc: _NetworkServicesV1Beta1 + netsvc: gcp.network_services.NetworkServicesV1 def __init__( self, @@ -1059,7 +1109,9 @@ def __init__( # API self.netsec = _NetworkSecurityV1Beta1(gcp_api_manager, project) - self.netsvc = _NetworkServicesV1Beta1(gcp_api_manager, project) + self.netsvc = gcp.network_services.NetworkServicesV1( + gcp_api_manager, project + ) # Managed resources self.server_tls_policy: Optional[ServerTlsPolicy] = None diff --git a/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/framework/test_app/runners/k8s/k8s_xds_server_runner.py index 9b492d6e..d2194433 100644 --- a/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -38,6 +38,7 @@ class ServerDeploymentArgs: csm_workload_name: str = "" csm_canonical_service_name: str = "" enable_dualstack: bool = False + enable_rlqs: bool = False def as_dict(self): return { @@ -49,6 +50,7 @@ def as_dict(self): "csm_workload_name": self.csm_workload_name, "csm_canonical_service_name": self.csm_canonical_service_name, "enable_dualstack": self.enable_dualstack, + "enable_rlqs": self.enable_rlqs, } @@ -176,6 +178,7 @@ def run( # pylint: disable=arguments-differ,too-many-branches replica_count: int = 1, log_to_stdout: bool = False, bootstrap_version: Optional[str] = None, + config_mesh: Optional[str] = None, ) -> List[XdsTestServer]: if not maintenance_port: maintenance_port = self._get_default_maintenance_port(secure_mode) @@ -266,6 +269,7 @@ def run( # pylint: disable=arguments-differ,too-many-branches secure_mode=secure_mode, address_type=address_type, bootstrap_version=bootstrap_version, + config_mesh=config_mesh, **self.deployment_args.as_dict(), ) diff --git a/framework/test_cases/base_testcase.py b/framework/test_cases/base_testcase.py index 389bb1f8..5f191c65 100644 --- a/framework/test_cases/base_testcase.py +++ b/framework/test_cases/base_testcase.py @@ -13,16 +13,61 @@ # limitations under the License. """Base test case used for xds test suites.""" import inspect +import signal +import time import traceback -from typing import Optional, Union +from types import FrameType +from typing import Any, Callable, Optional, TypeAlias, Union import unittest from absl import logging from absl.testing import absltest +from typing_extensions import override + +# pylint complains about signal.Signals for some reason. +_SignalNum: TypeAlias = Union[int, signal.Signals] # pylint: disable=no-member +_SignalHandler: TypeAlias = Callable[[_SignalNum, Optional[FrameType]], Any] class BaseTestCase(absltest.TestCase): - # @override + _prev_sigint_handler: Optional[_SignalHandler] = None + _handling_sigint: bool = False + + @override + def setUp(self): + super().setUp() + self._prev_sigint_handler = signal.signal( + signal.SIGINT, self._handle_sigint + ) + + def _handle_sigint( + self, signalnum: _SignalNum, frame: Optional[FrameType] + ) -> None: + if self._handling_sigint: + logging.info("Ctrl+C pressed twice, aborting the cleanup.") + else: + cleanup_delay_sec = 2 + logging.info( + "Caught Ctrl+C. Cleanup will start in %d seconds." + " Press Ctrl+C again to abort.", + cleanup_delay_sec, + ) + self._handling_sigint = True + # Sleep for a few seconds to allow second Ctrl-C before the cleanup. + time.sleep(cleanup_delay_sec) + # Force resource cleanup by their name. Addresses the case where + # ctrl-c is pressed while waiting for the resource creation. + self.force_cleanup = True + self.tearDown() + self.tearDownClass() + + # Remove the sigint handler. + self._handling_sigint = False + if self._prev_sigint_handler is not None: + signal.signal(signal.SIGINT, self._prev_sigint_handler) + raise KeyboardInterrupt + + @override def run(self, result: Optional[unittest.TestResult] = None) -> None: super().run(result) # TODO(sergiitk): should this method be returning result? See diff --git a/framework/xds_k8s_testcase.py b/framework/xds_k8s_testcase.py index 75d1619d..d03e80d7 100644 --- a/framework/xds_k8s_testcase.py +++ b/framework/xds_k8s_testcase.py @@ -19,10 +19,8 @@ import hashlib import logging import re -import signal import time -from types import FrameType -from typing import Any, Callable, Final, List, Optional, Tuple, Union +from typing import Callable, Final, Optional, Tuple from absl import flags from absl.testing import absltest @@ -83,9 +81,6 @@ ClientConfig = grpc_csds.ClientConfig RpcMetadata = grpc_testing.LoadBalancerStatsResponse.RpcMetadata MetadataByPeer: list[str, RpcMetadata] -# pylint complains about signal.Signals for some reason. -_SignalNum = Union[int, signal.Signals] # pylint: disable=no-member -_SignalHandler = Callable[[_SignalNum, Optional[FrameType]], Any] TD_CONFIG_MAX_WAIT: Final[dt.timedelta] = dt.timedelta(minutes=10) # TODO(sergiitk): get rid of the seconds constant, use timedelta @@ -147,8 +142,6 @@ class XdsKubernetesBaseTestCase(base_testcase.BaseTestCase): server_xds_port: Optional[int] td: TrafficDirectorManager td_bootstrap_image: str - _prev_sigint_handler: Optional[_SignalHandler] = None - _handling_sigint: bool = False yaml_highlighter: framework.helpers.highlighter.HighlighterYaml = None enable_dualstack: bool = False @@ -291,39 +284,6 @@ def tearDownClass(cls): cls.secondary_k8s_api_manager.close() cls.gcp_api_manager.close() - def setUp(self): - self._prev_sigint_handler = signal.signal( - signal.SIGINT, self.handle_sigint - ) - - def handle_sigint( - self, signalnum: _SignalNum, frame: Optional[FrameType] - ) -> None: - # TODO(sergiitk): move to base_testcase.BaseTestCase - if self._handling_sigint: - logger.info("Ctrl+C pressed twice, aborting the cleanup.") - else: - cleanup_delay_sec = 2 - logger.info( - "Caught Ctrl+C. Cleanup will start in %d seconds." - " Press Ctrl+C again to abort.", - cleanup_delay_sec, - ) - self._handling_sigint = True - # Sleep for a few seconds to allow second Ctrl-C before the cleanup. - time.sleep(cleanup_delay_sec) - # Force resource cleanup by their name. Addresses the case where - # ctrl-c is pressed while waiting for the resource creation. - self.force_cleanup = True - self.tearDown() - self.tearDownClass() - - # Remove the sigint handler. - self._handling_sigint = False - if self._prev_sigint_handler is not None: - signal.signal(signal.SIGINT, self._prev_sigint_handler) - raise KeyboardInterrupt - @contextlib.contextmanager def subTest(self, msg, **params): # noqa pylint: disable=signature-differs # TODO(sergiitk): move to base_testcase.BaseTestCase @@ -801,6 +761,10 @@ class IsolatedXdsKubernetesTestCase( each test, and destroyed after. """ + client_runner: Optional[KubernetesClientRunner] = None + server_runner: Optional[KubernetesServerRunner] = None + td: Optional[TrafficDirectorManager] = None + def setUp(self): """Hook method for setting up the test fixture before exercising it.""" super().setUp() @@ -872,12 +836,14 @@ def tearDown(self): client_restarts: int = 0 server_restarts: int = 0 try: - client_restarts = self.client_runner.get_pod_restarts( - self.client_runner.deployment - ) - server_restarts = self.server_runner.get_pod_restarts( - self.server_runner.deployment - ) + if self.client_runner: + client_restarts = self.client_runner.get_pod_restarts( + self.client_runner.deployment + ) + if self.server_runner: + server_restarts = self.server_runner.get_pod_restarts( + self.server_runner.deployment + ) except (retryers.RetryError, k8s.NotFound) as e: logger.exception(e) @@ -891,9 +857,10 @@ def tearDown(self): except retryers.RetryError: logger.exception("Got error during teardown") finally: - logger.info("----- Test client/server logs -----") - self.client_runner.logs_explorer_run_history_links() - self.server_runner.logs_explorer_run_history_links() + if self.client_runner and self.server_runner: + logger.info("----- Test client/server logs -----") + self.client_runner.logs_explorer_run_history_links() + self.server_runner.logs_explorer_run_history_links() # Fail if any of the pods restarted. self.assertEqual( @@ -916,13 +883,16 @@ def tearDown(self): ) def cleanup(self): - self.td.cleanup(force=self.force_cleanup) - self.client_runner.cleanup( - force=self.force_cleanup, force_namespace=self.force_cleanup - ) - self.server_runner.cleanup( - force=self.force_cleanup, force_namespace=self.force_cleanup - ) + if self.td: + self.td.cleanup(force=self.force_cleanup) + if self.client_runner: + self.client_runner.cleanup( + force=self.force_cleanup, force_namespace=self.force_cleanup + ) + if self.server_runner: + self.server_runner.cleanup( + force=self.force_cleanup, force_namespace=self.force_cleanup + ) def _start_test_client( self, @@ -1012,16 +982,28 @@ def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: **kwargs, ) + def startTestServer( + self, + server_runner: Optional[KubernetesServerRunner] = None, + **run_kwargs, + ) -> XdsTestServer: + return self.startTestServers(server_runner=server_runner, **run_kwargs)[ + 0 + ] + def startTestServers( - self, replica_count=1, server_runner=None, **kwargs - ) -> List[XdsTestServer]: + self, + replica_count: int = 1, + server_runner: Optional[KubernetesServerRunner] = None, + **run_kwargs, + ) -> list[XdsTestServer]: if server_runner is None: server_runner = self.server_runner test_servers = server_runner.run( replica_count=replica_count, test_port=self.server_port, maintenance_port=self.server_maintenance_port, - **kwargs, + **run_kwargs, ) for test_server in test_servers: test_server.set_xds_address( diff --git a/kubernetes-manifests/client-secure.deployment.yaml b/kubernetes-manifests/client-secure.deployment.yaml index 3842fd59..495ce1db 100644 --- a/kubernetes-manifests/client-secure.deployment.yaml +++ b/kubernetes-manifests/client-secure.deployment.yaml @@ -41,7 +41,17 @@ spec: - "--secure_mode=${secure_mode}" - "--qps=${qps}" - "--rpc=${rpc}" + - "--metadata=${metadata}" + % if request_payload_size > 0: + - "--request_payload_size=${request_payload_size}" + % endif + % if response_payload_size > 0: + - "--response_payload_size=${response_payload_size}" + % endif - "--print_response=${print_response}" + % if enable_csm_observability: + - "--enable_csm_observability=true" + % endif ports: - containerPort: ${stats_port} env: @@ -51,6 +61,14 @@ spec: value: "true" - name: GRPC_XDS_EXPERIMENTAL_V3_SUPPORT value: "true" + - name: GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH + value: "true" + - name: GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY + value: "true" + - name: GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION + value: "true" + - name: GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG + value: "true" - name: GRPC_EXPERIMENTAL_XDS_ENABLE_OVERRIDE_HOST value: "true" volumeMounts: diff --git a/kubernetes-manifests/server-secure.deployment.yaml b/kubernetes-manifests/server-secure.deployment.yaml index 861853de..a11aba42 100644 --- a/kubernetes-manifests/server-secure.deployment.yaml +++ b/kubernetes-manifests/server-secure.deployment.yaml @@ -77,6 +77,9 @@ spec: % if xds_server_uri: - "--xds-server-uri=${xds_server_uri}" % endif + % if config_mesh: + - "--config-mesh-experimental=${config_mesh}" + % endif - "--node-metadata=app=${namespace_name}-${deployment_name}" resources: limits: diff --git a/kubernetes-manifests/server.deployment.yaml b/kubernetes-manifests/server.deployment.yaml index def76dc1..a4562d48 100644 --- a/kubernetes-manifests/server.deployment.yaml +++ b/kubernetes-manifests/server.deployment.yaml @@ -48,6 +48,9 @@ spec: % if enable_csm_observability: - "--enable_csm_observability=true" % endif + % if enable_rlqs: + - "--xds_server_mode=true" + % endif % if address_type: - "--address_type=${address_type}" % endif @@ -60,6 +63,13 @@ spec: value: "true" - name: GRPC_EXPERIMENTAL_XDS_ENABLE_OVERRIDE_HOST value: "true" + % if enable_rlqs: + - name: GRPC_EXPERIMENTAL_XDS_ENABLE_RLQS + value: "true" + ## TODO(sergiitk): remove + - name: GRPC_EXPERIMENTAL_RLQS_DRY_RUN + value: "true" + % endif % if csm_workload_name: - name: CSM_WORKLOAD_NAME value: ${csm_workload_name} @@ -107,6 +117,9 @@ spec: % else: - "--node-metadata=app=${namespace_name}-${deployment_name}" % endif + % if config_mesh: + - "--config-mesh-experimental=${config_mesh}" + % endif % if generate_mesh_id: - "--generate-mesh-id-experimental" % endif diff --git a/run.sh b/run.sh index 41b75041..e5fb254f 100755 --- a/run.sh +++ b/run.sh @@ -98,10 +98,10 @@ main() { echo fi - # Automatically save last run logs to out/ + # Automatically save last run logs to out if [[ -n "${psm_log_file}" ]]; then mkdir -p "${PSM_LOG_DIR}" - exec &> >(tee "${PSM_LOG_DIR}/${psm_log_file}") + exec &> >(tee -i "${PSM_LOG_DIR}/${psm_log_file}") echo "Saving framework log to ${PSM_LOG_DIR}/${psm_log_file}" fi diff --git a/tests/rlqs_test.py b/tests/rlqs_test.py new file mode 100644 index 00000000..0d755ff8 --- /dev/null +++ b/tests/rlqs_test.py @@ -0,0 +1,104 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import TypeAlias + +from absl import flags +from absl.testing import absltest +from typing_extensions import override + +from framework import xds_k8s_testcase +from framework.infrastructure import traffic_director as td +from framework.test_app.runners.k8s import k8s_xds_server_runner + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) +# Rate Limiting test requires Compute v1alpha API to create mesh-compatible +# Cloud Armor security policy. +flags.set_default(xds_k8s_testcase.xds_flags.COMPUTE_API_VERSION, "v1alpha") + +XdsTestServer: TypeAlias = xds_k8s_testcase.XdsTestServer +XdsTestClient: TypeAlias = xds_k8s_testcase.XdsTestClient +KubernetesServerRunner: TypeAlias = k8s_xds_server_runner.KubernetesServerRunner +ServerDeploymentArgs: TypeAlias = k8s_xds_server_runner.ServerDeploymentArgs + + +class RlqsTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase): + @override + def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: + return super().initKubernetesServerRunner( + deployment_args=ServerDeploymentArgs(enable_rlqs=True), **kwargs + ) + + @override + def initTrafficDirectorManager(self) -> td.TrafficDirectorAppNetManager: + return td.TrafficDirectorAppNetManager( + self.gcp_api_manager, + project=self.project, + resource_prefix=self.resource_prefix, + resource_suffix=self.resource_suffix, + network=self.network, + compute_api_version=self.compute_api_version, + netsvc_class=td.NetworkServicesV1Alpha1, + ) + + def test_rate_limit(self) -> None: + with self.subTest("0_create_health_check"): + self.td.create_health_check() + + with self.subTest("1_create_backend_service"): + self.td.create_backend_service() + + with self.subTest("2_create_mesh"): + self.td.create_mesh() + + with self.subTest("3_create_grpc_route"): + self.td.create_grpc_route( + self.server_xds_host, + self.server_xds_port, + ) + + with self.subTest("4_create_endpoint_policy"): + self.td.create_endpoint_policy( + server_namespace=self.server_namespace, + server_name=self.server_name, + server_port=self.server_port, + ) + + # TODO(sergiitk): create rl resources + + with self.subTest("4_start_test_server"): + test_server: XdsTestServer = self.startTestServer( + config_mesh=self.td.mesh.name + ) + + with self.subTest("5_setup_server_backends"): + self.setupServerBackends() + + with self.subTest("6_start_test_client"): + test_client: XdsTestClient = self.startTestClient( + test_server, config_mesh=self.td.mesh.name + ) + + with self.subTest("7_assert_xds_config_exists"): + self.assertXdsConfigExists(test_client) + + # TODO(sergiitk): verify rl + + with self.subTest("8_assert_successful_rpcs"): + self.assertSuccessfulRpcs(test_client) + + +if __name__ == "__main__": + absltest.main() diff --git a/tests/security_app_test.py b/tests/security_app_test.py new file mode 100644 index 00000000..9253bf67 --- /dev/null +++ b/tests/security_app_test.py @@ -0,0 +1,72 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import TypeAlias + +from absl import flags +from absl.testing import absltest + +from framework import xds_k8s_testcase +from framework.test_app.runners.k8s import k8s_xds_server_runner + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +XdsTestServer: TypeAlias = xds_k8s_testcase.XdsTestServer +XdsTestClient: TypeAlias = xds_k8s_testcase.XdsTestClient +KubernetesServerRunner: TypeAlias = k8s_xds_server_runner.KubernetesServerRunner +ServerDeploymentArgs: TypeAlias = k8s_xds_server_runner.ServerDeploymentArgs +SecurityMode = xds_k8s_testcase.SecurityXdsKubernetesTestCase.SecurityMode + + +class SecurityAppNetTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase): + def test_mtls(self): + """mTLS test. + + Both client and server configured to use TLS and mTLS. + """ + with self.subTest("0_create_health_check"): + self.td.create_health_check(port=self.server_maintenance_port) + + with self.subTest("1_create_backend_service"): + self.td.create_backend_service() + + with self.subTest("2_create_mesh"): + self.td.create_mesh() + + with self.subTest("3_create_grpc_route"): + self.td.create_grpc_route( + self.server_xds_host, + self.server_xds_port, + ) + + self.setupSecurityPolicies( + server_tls=True, server_mtls=True, client_tls=True, client_mtls=True + ) + + test_server: XdsTestServer = self.startSecureTestServer( + config_mesh=self.td.mesh.name + ) + self.setupServerBackends() + test_client: XdsTestClient = self.startSecureTestClient( + test_server, config_mesh=self.td.mesh.name + ) + + self.assertTestAppSecurity(SecurityMode.MTLS, test_client, test_server) + self.assertSuccessfulRpcs(test_client) + logger.info("[SUCCESS] mTLS security mode confirmed.") + + +if __name__ == "__main__": + absltest.main()