From 6064bdddab3e7d4346cb04ad29a642de06e564f5 Mon Sep 17 00:00:00 2001 From: Kristian Hartikainen Date: Thu, 26 Dec 2024 20:28:46 +0200 Subject: [PATCH] Attempt to fix GCP autoscaler threading issue (#49440) This is an attempt to fix #46451. I believe that the issue is caused by incorrect handling of [`google-api-python-client`](https://github.com/googleapis/google-api-python-client)-library, which is thread-unsafe as per the [documentation](https://googleapis.github.io/google-api-python-client/docs/thread_safety.html). The above-mentioned documentation also describes how the thread-unsafety can be worked around by instantiating separate `httplib2.Http()` object for each thread. Here, I've just slapped the [request build pattern](https://googleapis.github.io/google-api-python-client/docs/thread_safety.html) onto GCP service creation code. I have not yet fully verified whether this works or not as debugging this is slow and quite a pain in the ass. What I have verified though is that the SSL errors that I mentioned in https://github.com/ray-project/ray/issues/46451#issuecomment-2474066981 and which consistently occurred in `/tmp/ray/session_latest/logs/monitor.out` are now gone. I'm not sure when I have time to get back to this but I'll leave this here in case someone else wants to take a look. Signed-off-by: Kristian Hartikainen --- python/ray/autoscaler/_private/gcp/config.py | 29 ++++++++++++++++++-- python/ray/autoscaler/_private/gcp/node.py | 5 ++-- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/python/ray/autoscaler/_private/gcp/config.py b/python/ray/autoscaler/_private/gcp/config.py index fb6fdf2608dd..4a96c08ed52a 100644 --- a/python/ray/autoscaler/_private/gcp/config.py +++ b/python/ray/autoscaler/_private/gcp/config.py @@ -6,6 +6,9 @@ import time from functools import partial, reduce +import google_auth_httplib2 +import googleapiclient +import httplib2 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa @@ -329,21 +332,40 @@ def _is_head_node_a_tpu(config: dict) -> bool: return get_node_type(node_configs[config["head_node_type"]]) == GCPNodeType.TPU +def build_request(http, *args, **kwargs): + new_http = google_auth_httplib2.AuthorizedHttp( + http.credentials, http=httplib2.Http() + ) + return googleapiclient.http.HttpRequest(new_http, *args, **kwargs) + + def _create_crm(gcp_credentials=None): return discovery.build( - "cloudresourcemanager", "v1", credentials=gcp_credentials, cache_discovery=False + "cloudresourcemanager", + "v1", + credentials=gcp_credentials, + requestBuilder=build_request, + cache_discovery=False, ) def _create_iam(gcp_credentials=None): return discovery.build( - "iam", "v1", credentials=gcp_credentials, cache_discovery=False + "iam", + "v1", + credentials=gcp_credentials, + requestBuilder=build_request, + cache_discovery=False, ) def _create_compute(gcp_credentials=None): return discovery.build( - "compute", "v1", credentials=gcp_credentials, cache_discovery=False + "compute", + "v1", + credentials=gcp_credentials, + requestBuilder=build_request, + cache_discovery=False, ) @@ -352,6 +374,7 @@ def _create_tpu(gcp_credentials=None): "tpu", TPU_VERSION, credentials=gcp_credentials, + requestBuilder=build_request, cache_discovery=False, discoveryServiceUrl="https://tpu.googleapis.com/$discovery/rest", ) diff --git a/python/ray/autoscaler/_private/gcp/node.py b/python/ray/autoscaler/_private/gcp/node.py index 9fb2aa7833eb..56a756a587ce 100644 --- a/python/ray/autoscaler/_private/gcp/node.py +++ b/python/ray/autoscaler/_private/gcp/node.py @@ -34,6 +34,7 @@ class inheriting from ``GCPNode``. Those classes are essentially dicts from typing import Any, Dict, List, Optional, Tuple, Union from uuid import uuid4 +import httplib2 from google_auth_httplib2 import AuthorizedHttp from googleapiclient.discovery import Resource from googleapiclient.errors import HttpError @@ -335,7 +336,7 @@ class GCPCompute(GCPResource): def get_new_authorized_http(self, http: AuthorizedHttp) -> AuthorizedHttp: """Generate a new AuthorizedHttp object with the given credentials.""" - new_http = AuthorizedHttp(http.credentials) + new_http = AuthorizedHttp(http.credentials, http=httplib2.Http()) return new_http def wait_for_operation( @@ -637,7 +638,7 @@ def path(self): def get_new_authorized_http(self, http: AuthorizedHttp) -> AuthorizedHttp: """Generate a new AuthorizedHttp object with the given credentials.""" - new_http = AuthorizedHttp(http.credentials) + new_http = AuthorizedHttp(http.credentials, http=httplib2.Http()) return new_http def wait_for_operation(