Skip to content

Commit f9ffc46

Browse files
Merge remote-tracking branch 'origin/dev' into upsert_dic_test
2 parents 9f981a6 + 322f90e commit f9ffc46

File tree

2 files changed

+117
-6
lines changed

2 files changed

+117
-6
lines changed

pinecone/core/grpc/index_grpc.py

+84-5
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@
55
import numbers
66
from abc import ABC, abstractmethod
77
from functools import wraps
8+
from importlib.util import find_spec
89
from typing import NamedTuple, Optional, Dict, Iterable, Union, List, Tuple, Any
910
from collections.abc import Mapping
1011

1112
import certifi
1213
import grpc
1314
from google.protobuf import json_format
1415
from grpc._channel import _InactiveRpcError, _MultiThreadedRendezvous
15-
from tqdm import tqdm
16+
from tqdm.autonotebook import tqdm
17+
import json
1618

1719
from pinecone import FetchResponse, QueryResponse, ScoredVector, SingleQueryResults, DescribeIndexStatsResponse
1820
from pinecone.config import Config
@@ -84,9 +86,38 @@ def __init__(self, index_name: str, channel=None, grpc_config: GRPCClientConfig
8486
"client-version": CLIENT_VERSION
8587
}
8688
self._endpoint_override = _endpoint_override
89+
90+
self.method_config = json.dumps(
91+
{
92+
"methodConfig": [
93+
{
94+
"name": [{"service": "VectorService.Upsert"}],
95+
"retryPolicy": {
96+
"maxAttempts": 5,
97+
"initialBackoff": "0.1s",
98+
"maxBackoff": "1s",
99+
"backoffMultiplier": 2,
100+
"retryableStatusCodes": ["UNAVAILABLE"],
101+
},
102+
},
103+
{
104+
"name": [{"service": "VectorService"}],
105+
"retryPolicy": {
106+
"maxAttempts": 5,
107+
"initialBackoff": "0.1s",
108+
"maxBackoff": "1s",
109+
"backoffMultiplier": 2,
110+
"retryableStatusCodes": ["UNAVAILABLE"],
111+
},
112+
}
113+
]
114+
}
115+
)
116+
87117
self._channel = channel or self._gen_channel()
88118
self.stub = self.stub_class(self._channel)
89119

120+
90121
@property
91122
@abstractmethod
92123
def stub_class(self):
@@ -100,7 +131,9 @@ def _gen_channel(self, options=None):
100131
target = self._endpoint()
101132
default_options = {
102133
"grpc.max_send_message_length": MAX_MSG_SIZE,
103-
"grpc.max_receive_message_length": MAX_MSG_SIZE
134+
"grpc.max_receive_message_length": MAX_MSG_SIZE,
135+
"grpc.service_config": self.method_config,
136+
"grpc.enable_retries": True
104137
}
105138
if self.grpc_client_config.secure:
106139
default_options['grpc.ssl_target_name_override'] = target.split(':')[0]
@@ -114,8 +147,8 @@ def _gen_channel(self, options=None):
114147
root_cas = open(certifi.where(), "rb").read()
115148
tls = grpc.ssl_channel_credentials(root_certificates=root_cas)
116149
channel = grpc.secure_channel(target, tls, options=_options)
117-
interceptor = RetryOnRpcErrorClientInterceptor(self.retry_config)
118-
return grpc.intercept_channel(channel, interceptor)
150+
151+
return channel
119152

120153
@property
121154
def channel(self):
@@ -246,7 +279,7 @@ def add_done_callback(self, fun):
246279

247280
def result(self, timeout=None):
248281
try:
249-
self._delegate.result(timeout=timeout)
282+
return self._delegate.result(timeout=timeout)
250283
except _MultiThreadedRendezvous as e:
251284
raise PineconeException(e._state.debug_error_string) from e
252285

@@ -417,6 +450,52 @@ def _upsert_batch(self,
417450
request = UpsertRequest(vectors=vectors, **args_dict)
418451
return self._wrap_grpc_call(self.stub.Upsert, request, timeout=timeout, **kwargs)
419452

453+
def upsert_from_dataframe(self,
454+
df,
455+
namespace: str = None,
456+
batch_size: int = 500,
457+
use_async_requests: bool = True,
458+
show_progress: bool = True) -> None:
459+
"""Upserts a dataframe into the index.
460+
461+
Args:
462+
df: A pandas dataframe with the following columns: id, vector, and metadata.
463+
namespace: The namespace to upsert into.
464+
batch_size: The number of rows to upsert in a single batch.
465+
use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism.
466+
Set to `False`
467+
show_progress: Whether to show a progress bar.
468+
"""
469+
try:
470+
import pandas as pd
471+
except ImportError:
472+
raise RuntimeError("The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`")
473+
474+
if not isinstance(df, pd.DataFrame):
475+
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
476+
477+
pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests")
478+
results = []
479+
for chunk in self._iter_dataframe(df, batch_size=batch_size):
480+
res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests)
481+
pbar.update(len(chunk))
482+
results.append(res)
483+
484+
if use_async_requests:
485+
results = [async_result.result() for async_result in tqdm(results, desc="collecting async responses")]
486+
487+
upserted_count = 0
488+
for res in results:
489+
upserted_count += res.upserted_count
490+
491+
return UpsertResponse(upserted_count=upserted_count)
492+
493+
@staticmethod
494+
def _iter_dataframe(df, batch_size):
495+
for i in range(0, len(df), batch_size):
496+
batch = df.iloc[i:i + batch_size].to_dict(orient="records")
497+
yield batch
498+
420499
def delete(self,
421500
ids: Optional[List[str]] = None,
422501
delete_all: Optional[bool] = None,

pinecone/index.py

+33-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
#
22
# Copyright (c) 2020-2021 Pinecone Systems Inc. All right reserved.
33
#
4+
from tqdm.autonotebook import tqdm
5+
from importlib.util import find_spec
46
import numbers
57

6-
from tqdm import tqdm
78
from collections.abc import Iterable, Mapping
89
from typing import Union, List, Tuple, Optional, Dict, Any
910

@@ -228,6 +229,37 @@ def _vector_transform(item: Union[Vector, Tuple]):
228229
**{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}
229230
)
230231

232+
def upsert_from_dataframe(self,
233+
df,
234+
namespace: str = None,
235+
batch_size: int = 500,
236+
show_progress: bool = True) -> None:
237+
"""Upserts a dataframe into the index.
238+
239+
Args:
240+
df: A pandas dataframe with the following columns: id, vector, and metadata.
241+
namespace: The namespace to upsert into.
242+
batch_size: The number of rows to upsert in a single batch.
243+
show_progress: Whether to show a progress bar.
244+
"""
245+
try:
246+
import pandas as pd
247+
except ImportError:
248+
raise RuntimeError("The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`")
249+
250+
if not isinstance(df, pd.DataFrame):
251+
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
252+
253+
upserted_count = 0
254+
pbar = tqdm(total=len(df), disable=not show_progress)
255+
for i in range(0, len(df), batch_size):
256+
batch = df.iloc[i:i + batch_size].to_dict(orient="records")
257+
res = self.upsert(batch, namespace=namespace)
258+
upserted_count += res.upserted_count
259+
pbar.update(len(batch))
260+
261+
return UpsertResponse(upserted_count=upserted_count)
262+
231263
@validate_and_convert_errors
232264
def delete(self,
233265
ids: Optional[List[str]] = None,

0 commit comments

Comments
 (0)