Skip to content

Commit 322f90e

Browse files
Merge pull request pinecone-io#136 from pinecone-io/upsert_dataframe
Upsert dataframe
2 parents d92c904 + d4316df commit 322f90e

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

pinecone/core/grpc/index_grpc.py

+48-1
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
import numbers
66
from abc import ABC, abstractmethod
77
from functools import wraps
8+
from importlib.util import find_spec
89
from typing import NamedTuple, Optional, Dict, Iterable, Union, List, Tuple, Any
910
from collections.abc import Mapping
1011

1112
import certifi
1213
import grpc
1314
from google.protobuf import json_format
1415
from grpc._channel import _InactiveRpcError, _MultiThreadedRendezvous
15-
from tqdm import tqdm
16+
from tqdm.autonotebook import tqdm
1617
import json
1718

1819
from pinecone import FetchResponse, QueryResponse, ScoredVector, SingleQueryResults, DescribeIndexStatsResponse
@@ -449,6 +450,52 @@ def _upsert_batch(self,
449450
request = UpsertRequest(vectors=vectors, **args_dict)
450451
return self._wrap_grpc_call(self.stub.Upsert, request, timeout=timeout, **kwargs)
451452

453+
def upsert_from_dataframe(self,
454+
df,
455+
namespace: str = None,
456+
batch_size: int = 500,
457+
use_async_requests: bool = True,
458+
show_progress: bool = True) -> None:
459+
"""Upserts a dataframe into the index.
460+
461+
Args:
462+
df: A pandas dataframe with the following columns: id, vector, and metadata.
463+
namespace: The namespace to upsert into.
464+
batch_size: The number of rows to upsert in a single batch.
465+
use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism.
466+
Set to `False`
467+
show_progress: Whether to show a progress bar.
468+
"""
469+
try:
470+
import pandas as pd
471+
except ImportError:
472+
raise RuntimeError("The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`")
473+
474+
if not isinstance(df, pd.DataFrame):
475+
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
476+
477+
pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests")
478+
results = []
479+
for chunk in self._iter_dataframe(df, batch_size=batch_size):
480+
res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests)
481+
pbar.update(len(chunk))
482+
results.append(res)
483+
484+
if use_async_requests:
485+
results = [async_result.result() for async_result in tqdm(results, desc="collecting async responses")]
486+
487+
upserted_count = 0
488+
for res in results:
489+
upserted_count += res.upserted_count
490+
491+
return UpsertResponse(upserted_count=upserted_count)
492+
493+
@staticmethod
494+
def _iter_dataframe(df, batch_size):
495+
for i in range(0, len(df), batch_size):
496+
batch = df.iloc[i:i + batch_size].to_dict(orient="records")
497+
yield batch
498+
452499
def delete(self,
453500
ids: Optional[List[str]] = None,
454501
delete_all: Optional[bool] = None,

pinecone/index.py

+33-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
#
22
# Copyright (c) 2020-2021 Pinecone Systems Inc. All right reserved.
33
#
4+
from tqdm.autonotebook import tqdm
5+
from importlib.util import find_spec
46
import numbers
57

6-
from tqdm import tqdm
78
from collections.abc import Iterable, Mapping
89
from typing import Union, List, Tuple, Optional, Dict, Any
910

@@ -228,6 +229,37 @@ def _vector_transform(item: Union[Vector, Tuple]):
228229
**{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}
229230
)
230231

232+
def upsert_from_dataframe(self,
233+
df,
234+
namespace: str = None,
235+
batch_size: int = 500,
236+
show_progress: bool = True) -> None:
237+
"""Upserts a dataframe into the index.
238+
239+
Args:
240+
df: A pandas dataframe with the following columns: id, vector, and metadata.
241+
namespace: The namespace to upsert into.
242+
batch_size: The number of rows to upsert in a single batch.
243+
show_progress: Whether to show a progress bar.
244+
"""
245+
try:
246+
import pandas as pd
247+
except ImportError:
248+
raise RuntimeError("The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`")
249+
250+
if not isinstance(df, pd.DataFrame):
251+
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
252+
253+
upserted_count = 0
254+
pbar = tqdm(total=len(df), disable=not show_progress)
255+
for i in range(0, len(df), batch_size):
256+
batch = df.iloc[i:i + batch_size].to_dict(orient="records")
257+
res = self.upsert(batch, namespace=namespace)
258+
upserted_count += res.upserted_count
259+
pbar.update(len(batch))
260+
261+
return UpsertResponse(upserted_count=upserted_count)
262+
231263
@validate_and_convert_errors
232264
def delete(self,
233265
ids: Optional[List[str]] = None,

0 commit comments

Comments
 (0)