Skip to content

Commit 1853970

Browse files
[grpc] Configure gRPC retries directly
1. Replaced the current implementation of gRPC retiries which uses an interceptor with a direct config to the gRPC layer. 2. Shortened `maxBackoff` from 1.6 sec to 1 sec. When this time is too long, it seems that we start getting server-side resests as retry message are trying to use an older channel that was already closed.
1 parent 42ed389 commit 1853970

File tree

1 file changed

+36
-4
lines changed

1 file changed

+36
-4
lines changed

pinecone/core/grpc/index_grpc.py

+36-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from google.protobuf import json_format
1212
from grpc._channel import _InactiveRpcError, _MultiThreadedRendezvous
1313
from tqdm import tqdm
14+
import json
1415

1516
from pinecone import FetchResponse, QueryResponse, ScoredVector, SingleQueryResults, DescribeIndexStatsResponse
1617
from pinecone.config import Config
@@ -81,9 +82,38 @@ def __init__(self, index_name: str, channel=None, grpc_config: GRPCClientConfig
8182
"client-version": CLIENT_VERSION
8283
}
8384
self._endpoint_override = _endpoint_override
85+
86+
self.method_config = json.dumps(
87+
{
88+
"methodConfig": [
89+
{
90+
"name": [{"service": "VectorService.Upsert"}],
91+
"retryPolicy": {
92+
"maxAttempts": 5,
93+
"initialBackoff": "0.1s",
94+
"maxBackoff": "1s",
95+
"backoffMultiplier": 2,
96+
"retryableStatusCodes": ["UNAVAILABLE"],
97+
},
98+
},
99+
{
100+
"name": [{"service": "VectorService"}],
101+
"retryPolicy": {
102+
"maxAttempts": 5,
103+
"initialBackoff": "0.1s",
104+
"maxBackoff": "1s",
105+
"backoffMultiplier": 2,
106+
"retryableStatusCodes": ["UNAVAILABLE"],
107+
},
108+
}
109+
]
110+
}
111+
)
112+
84113
self._channel = channel or self._gen_channel()
85114
self.stub = self.stub_class(self._channel)
86115

116+
87117
@property
88118
@abstractmethod
89119
def stub_class(self):
@@ -97,7 +127,9 @@ def _gen_channel(self, options=None):
97127
target = self._endpoint()
98128
default_options = {
99129
"grpc.max_send_message_length": MAX_MSG_SIZE,
100-
"grpc.max_receive_message_length": MAX_MSG_SIZE
130+
"grpc.max_receive_message_length": MAX_MSG_SIZE,
131+
"grpc.service_config": self.method_config,
132+
"grpc.enable_retries": True
101133
}
102134
if self.grpc_client_config.secure:
103135
default_options['grpc.ssl_target_name_override'] = target.split(':')[0]
@@ -111,8 +143,8 @@ def _gen_channel(self, options=None):
111143
root_cas = open(certifi.where(), "rb").read()
112144
tls = grpc.ssl_channel_credentials(root_certificates=root_cas)
113145
channel = grpc.secure_channel(target, tls, options=_options)
114-
interceptor = RetryOnRpcErrorClientInterceptor(self.retry_config)
115-
return grpc.intercept_channel(channel, interceptor)
146+
147+
return channel
116148

117149
@property
118150
def channel(self):
@@ -243,7 +275,7 @@ def add_done_callback(self, fun):
243275

244276
def result(self, timeout=None):
245277
try:
246-
self._delegate.result(timeout=timeout)
278+
return self._delegate.result(timeout=timeout)
247279
except _MultiThreadedRendezvous as e:
248280
raise PineconeException(e._state.debug_error_string) from e
249281

0 commit comments

Comments
 (0)