13
13
from google .protobuf import json_format
14
14
from grpc ._channel import _InactiveRpcError , _MultiThreadedRendezvous
15
15
from tqdm import tqdm
16
+ import json
16
17
17
18
from pinecone import FetchResponse , QueryResponse , ScoredVector , SingleQueryResults , DescribeIndexStatsResponse
18
19
from pinecone .config import Config
@@ -84,9 +85,38 @@ def __init__(self, index_name: str, channel=None, grpc_config: GRPCClientConfig
84
85
"client-version" : CLIENT_VERSION
85
86
}
86
87
self ._endpoint_override = _endpoint_override
88
+
89
+ self .method_config = json .dumps (
90
+ {
91
+ "methodConfig" : [
92
+ {
93
+ "name" : [{"service" : "VectorService.Upsert" }],
94
+ "retryPolicy" : {
95
+ "maxAttempts" : 5 ,
96
+ "initialBackoff" : "0.1s" ,
97
+ "maxBackoff" : "1s" ,
98
+ "backoffMultiplier" : 2 ,
99
+ "retryableStatusCodes" : ["UNAVAILABLE" ],
100
+ },
101
+ },
102
+ {
103
+ "name" : [{"service" : "VectorService" }],
104
+ "retryPolicy" : {
105
+ "maxAttempts" : 5 ,
106
+ "initialBackoff" : "0.1s" ,
107
+ "maxBackoff" : "1s" ,
108
+ "backoffMultiplier" : 2 ,
109
+ "retryableStatusCodes" : ["UNAVAILABLE" ],
110
+ },
111
+ }
112
+ ]
113
+ }
114
+ )
115
+
87
116
self ._channel = channel or self ._gen_channel ()
88
117
self .stub = self .stub_class (self ._channel )
89
118
119
+
90
120
@property
91
121
@abstractmethod
92
122
def stub_class (self ):
@@ -100,7 +130,9 @@ def _gen_channel(self, options=None):
100
130
target = self ._endpoint ()
101
131
default_options = {
102
132
"grpc.max_send_message_length" : MAX_MSG_SIZE ,
103
- "grpc.max_receive_message_length" : MAX_MSG_SIZE
133
+ "grpc.max_receive_message_length" : MAX_MSG_SIZE ,
134
+ "grpc.service_config" : self .method_config ,
135
+ "grpc.enable_retries" : True
104
136
}
105
137
if self .grpc_client_config .secure :
106
138
default_options ['grpc.ssl_target_name_override' ] = target .split (':' )[0 ]
@@ -114,8 +146,8 @@ def _gen_channel(self, options=None):
114
146
root_cas = open (certifi .where (), "rb" ).read ()
115
147
tls = grpc .ssl_channel_credentials (root_certificates = root_cas )
116
148
channel = grpc .secure_channel (target , tls , options = _options )
117
- interceptor = RetryOnRpcErrorClientInterceptor ( self . retry_config )
118
- return grpc . intercept_channel ( channel , interceptor )
149
+
150
+ return channel
119
151
120
152
@property
121
153
def channel (self ):
@@ -246,7 +278,7 @@ def add_done_callback(self, fun):
246
278
247
279
def result (self , timeout = None ):
248
280
try :
249
- self ._delegate .result (timeout = timeout )
281
+ return self ._delegate .result (timeout = timeout )
250
282
except _MultiThreadedRendezvous as e :
251
283
raise PineconeException (e ._state .debug_error_string ) from e
252
284
0 commit comments