Skip to content

Commit 0e9dd87

Browse files
Upsert df - support non async, add reponse
1. Return the proper result type (UpsertResponse) with the total upserted number. 2. For gRPC - support both async and non-async modes
1 parent f4f4376 commit 0e9dd87

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

pinecone/core/grpc/index_grpc.py

+15-6
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,21 @@ def upsert_dataframe(self,
439439
if not isinstance(df, pd.DataFrame):
440440
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
441441

442-
async_results = [
443-
self.upsert(vectors=chunk, namespace=namespace, async_req=True)
444-
for chunk in tqdm(self._iter_dataframe(df, batch_size=batch_size),
445-
total=len(df) // batch_size, disable=not show_progress)
446-
]
447-
res = [async_result.result() for async_result in async_results]
442+
pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests")
443+
results = []
444+
for chunk in self._iter_dataframe(df, batch_size=batch_size):
445+
res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests)
446+
pbar.update(len(chunk))
447+
results.append(res)
448+
449+
if use_async_requests:
450+
results = [async_result.result() for async_result in tqdm(results, desc="collecting async responses")]
451+
452+
upserted_count = 0
453+
for res in results:
454+
upserted_count += res.upserted_count
455+
456+
return UpsertResponse(upserted_count=upserted_count)
448457

449458
@staticmethod
450459
def _iter_dataframe(df, batch_size):

pinecone/index.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,16 @@ def upsert_dataframe(self,
214214
if not isinstance(df, pd.DataFrame):
215215
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
216216

217+
upserted_count = 0
217218
pbar = tqdm(total=len(df), disable=not show_progress)
218219
for i in range(0, len(df), batch_size):
219220
batch = df.iloc[i:i + batch_size].to_dict(orient="records")
220-
self.upsert(batch, namespace=namespace)
221+
res = self.upsert(batch, namespace=namespace)
222+
upserted_count += res.upserted_count
221223
pbar.update(len(batch))
222224

225+
return UpsertResponse(upserted_count=upserted_count)
226+
223227
@validate_and_convert_errors
224228
def delete(self,
225229
ids: Optional[List[str]] = None,

0 commit comments

Comments
 (0)