Skip to content

Commit 450043d

Browse files
Merge pull request pinecone-io#137 from pinecone-io/upsert_df_sync
Upsert dataframe - support non-async option
2 parents b8382ca + dbe2ed6 commit 450043d

File tree

2 files changed

+42
-16
lines changed

2 files changed

+42
-16
lines changed

pinecone/core/grpc/index_grpc.py

+28-11
Original file line numberDiff line numberDiff line change
@@ -450,28 +450,45 @@ def _upsert_batch(self,
450450
request = UpsertRequest(vectors=vectors, **args_dict)
451451
return self._wrap_grpc_call(self.stub.Upsert, request, timeout=timeout, **kwargs)
452452

453-
def upsert_dataframe(self,
453+
def upsert_from_dataframe(self,
454454
df,
455-
namespase: str = None,
455+
namespace: str = None,
456456
batch_size: int = 500,
457+
use_async_requests: bool = True,
457458
show_progress: bool = True) -> None:
458459
"""Upserts a dataframe into the index.
459460
460461
Args:
461462
df: A pandas dataframe with the following columns: id, vector, and metadata.
462463
namespace: The namespace to upsert into.
463464
batch_size: The number of rows to upsert in a single batch.
465+
use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism.
466+
Set to `False`
464467
show_progress: Whether to show a progress bar.
465468
"""
466-
if find_spec("pandas") is None:
467-
raise ImportError("pandas not found. Please install pandas to use this method.")
468-
469-
async_results = [
470-
self.upsert(vectors=chunk, namespace=namespase, async_req=True)
471-
for chunk in tqdm(self._iter_dataframe(df, batch_size=batch_size),
472-
total=len(df) // batch_size, disable=not show_progress)
473-
]
474-
res = [async_result.result() for async_result in async_results]
469+
try:
470+
import pandas as pd
471+
except ImportError:
472+
raise RuntimeError("The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`")
473+
474+
if not isinstance(df, pd.DataFrame):
475+
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
476+
477+
pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests")
478+
results = []
479+
for chunk in self._iter_dataframe(df, batch_size=batch_size):
480+
res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests)
481+
pbar.update(len(chunk))
482+
results.append(res)
483+
484+
if use_async_requests:
485+
results = [async_result.result() for async_result in tqdm(results, desc="collecting async responses")]
486+
487+
upserted_count = 0
488+
for res in results:
489+
upserted_count += res.upserted_count
490+
491+
return UpsertResponse(upserted_count=upserted_count)
475492

476493
@staticmethod
477494
def _iter_dataframe(df, batch_size):

pinecone/index.py

+14-5
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,9 @@ def _vector_transform(item: Union[Vector, Tuple]):
229229
**{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}
230230
)
231231

232-
def upsert_dataframe(self,
232+
def upsert_from_dataframe(self,
233233
df,
234-
namespase: str = None,
234+
namespace: str = None,
235235
batch_size: int = 500,
236236
show_progress: bool = True) -> None:
237237
"""Upserts a dataframe into the index.
@@ -242,15 +242,24 @@ def upsert_dataframe(self,
242242
batch_size: The number of rows to upsert in a single batch.
243243
show_progress: Whether to show a progress bar.
244244
"""
245-
if find_spec("pandas") is None:
246-
raise ImportError("pandas not found. Please install pandas to use this method.")
245+
try:
246+
import pandas as pd
247+
except ImportError:
248+
raise RuntimeError("The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`")
247249

250+
if not isinstance(df, pd.DataFrame):
251+
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
252+
253+
upserted_count = 0
248254
pbar = tqdm(total=len(df), disable=not show_progress)
249255
for i in range(0, len(df), batch_size):
250256
batch = df.iloc[i:i + batch_size].to_dict(orient="records")
251-
self.upsert(batch, namespace=namespase)
257+
res = self.upsert(batch, namespace=namespace)
258+
upserted_count += res.upserted_count
252259
pbar.update(len(batch))
253260

261+
return UpsertResponse(upserted_count=upserted_count)
262+
254263
@validate_and_convert_errors
255264
def delete(self,
256265
ids: Optional[List[str]] = None,

0 commit comments

Comments
 (0)