-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): Couchbase ingestion source #12345
base: master
Are you sure you want to change the base?
Changes from all commits
ed2142e
365f7a1
c39d41b
1690db9
7864e3a
32f019c
9af84b3
cdec644
88f4661
b61aad8
b5458fe
60eb467
8591db8
bb14433
8a07901
f980a86
d45700e
2cc5ba5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
source: | ||
type: couchbase | ||
config: | ||
connect_string: "couchbases://127.0.0.1" | ||
username: "Administrator" | ||
password: "password" | ||
cluster_name: "testdb" | ||
classification: | ||
enabled: true | ||
classifiers: | ||
- type: datahub | ||
profiling: | ||
enabled: true | ||
profile_nested_fields: true | ||
|
||
sink: | ||
# sink configs |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the first source that really uses async In general, we should almost never be using "bare" asyncio. instead, the |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import asyncio | ||
import logging | ||
from typing import AsyncGenerator, List | ||
|
||
from acouchbase.collection import AsyncCollection | ||
from acouchbase.scope import AsyncScope | ||
from couchbase.exceptions import DocumentNotFoundException | ||
from couchbase.result import GetResult | ||
|
||
from datahub.ingestion.source.couchbase.couchbase_connect import CouchbaseConnect | ||
from datahub.ingestion.source.couchbase.couchbase_sql import ( | ||
SELECT_COLLECTION_COUNT, | ||
SELECT_DOC_IDS, | ||
) | ||
from datahub.ingestion.source.couchbase.retry import retry | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class CouchbaseAggregate: | ||
scope: AsyncScope | ||
collection: AsyncCollection | ||
|
||
def __init__( | ||
self, | ||
connector: CouchbaseConnect, | ||
keyspace: str, | ||
batch_size: int = 100, | ||
max_sample_size: int = 0, | ||
): | ||
self.connector = connector | ||
self.keyspace = keyspace | ||
self.batch_size = batch_size | ||
self.max_sample_size = max_sample_size | ||
|
||
if batch_size <= 0: | ||
raise ValueError("batch_size must be greater than 0") | ||
|
||
async def init(self): | ||
await self.connector.cluster_init_async() | ||
self.scope, self.collection = await self.connector.connect_keyspace_async( | ||
self.keyspace | ||
) | ||
|
||
@retry(factor=0.05) | ||
async def collection_get(self, key: str) -> dict: | ||
try: | ||
result: GetResult = await self.collection.get(key) | ||
return result.content_as[dict] | ||
except DocumentNotFoundException: | ||
logger.warning(f"Document ID {key} not found") | ||
return {} | ||
Check warning on line 52 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py Codecov / codecov/patchmetadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L50-L52
|
||
|
||
@retry(factor=0.05) | ||
async def run_query( | ||
self, query: str, offset: int = 0, limit: int = 0 | ||
) -> List[dict]: | ||
if offset > 0: | ||
query += f" OFFSET {offset}" | ||
if limit > 0: | ||
query += f" LIMIT {limit}" | ||
result = self.scope.query(query) | ||
documents = [row async for row in result] | ||
return documents | ||
|
||
async def collection_count(self) -> int: | ||
query = SELECT_COLLECTION_COUNT.format(self.connector.collection_name) | ||
|
||
result = await self.run_query(query) | ||
document = [row for row in result] | ||
return document[0].get("count") if document else 0 | ||
Check warning on line 71 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py Codecov / codecov/patchmetadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L69-L71
|
||
|
||
async def get_keys(self): | ||
query = SELECT_DOC_IDS.format(self.connector.collection_name) | ||
|
||
results = await self.run_query(query, limit=self.max_sample_size) | ||
for row in results: | ||
yield row.get("id") | ||
|
||
async def get_key_chunks(self) -> AsyncGenerator[List[str], None]: | ||
keys = [] | ||
async for key in self.get_keys(): | ||
keys.append(key) | ||
if len(keys) == self.batch_size: | ||
yield keys | ||
keys.clear() | ||
if len(keys) > 0: | ||
yield keys | ||
|
||
async def get_documents(self) -> AsyncGenerator[List[dict], None]: | ||
tasks = [] | ||
await self.init() | ||
|
||
async for chunk in self.get_key_chunks(): | ||
for key in chunk: | ||
tasks.append(asyncio.create_task(self.collection_get(key))) | ||
|
||
errors = 0 | ||
if len(tasks) > 0: | ||
batch = [] | ||
await asyncio.sleep(0) | ||
results = await asyncio.gather(*tasks, return_exceptions=True) | ||
for result in results: | ||
if isinstance(result, Exception): | ||
logger.error(result) | ||
errors += 1 | ||
Check warning on line 106 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py Codecov / codecov/patchmetadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L105-L106
|
||
elif isinstance(result, dict): | ||
if result: | ||
batch.append(result) | ||
yield batch | ||
|
||
if errors > 0: | ||
raise RuntimeError(f"batch get: {errors} errors retrieving documents") | ||
|
||
tasks.clear() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
from dataclasses import dataclass, field | ||
from typing import Any, Dict, Iterable, List, Optional, Tuple | ||
|
||
from pydantic import PositiveInt | ||
from pydantic.fields import Field | ||
|
||
from datahub.configuration.common import AllowDenyPattern | ||
from datahub.configuration.source_common import ( | ||
EnvConfigMixin, | ||
PlatformInstanceConfigMixin, | ||
) | ||
from datahub.ingestion.glossary.classification_mixin import ( | ||
ClassificationReportMixin, | ||
ClassificationSourceConfigMixin, | ||
) | ||
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig | ||
from datahub.ingestion.source.state.stale_entity_removal_handler import ( | ||
StaleEntityRemovalSourceReport, | ||
StatefulIngestionConfigBase, | ||
) | ||
from datahub.ingestion.source.state.stateful_ingestion_base import ( | ||
StatefulProfilingConfigMixin, | ||
) | ||
from datahub.ingestion.source_config.operation_config import is_profiling_enabled | ||
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport | ||
from datahub.utilities.perf_timer import PerfTimer | ||
from datahub.utilities.stats_collections import TopKDict, int_top_k_dict | ||
|
||
|
||
def flatten( | ||
field_path: List[str], data: Any, truncate: bool = True | ||
) -> Iterable[Tuple[str, Any]]: | ||
if isinstance(data, dict): | ||
for key, value in data.items(): | ||
field_path.append(key) | ||
yield from flatten(field_path, value) | ||
if isinstance(value, dict) or isinstance(value, list): | ||
del field_path[-1] | ||
elif isinstance(data, list): | ||
for value in data: | ||
yield from flatten(field_path, value, False) | ||
else: | ||
yield ".".join(field_path), data | ||
if len(field_path) > 0 and truncate: | ||
del field_path[-1] | ||
|
||
|
||
class CouchbaseDBConfig( | ||
PlatformInstanceConfigMixin, | ||
EnvConfigMixin, | ||
StatefulIngestionConfigBase, | ||
ClassificationSourceConfigMixin, | ||
StatefulProfilingConfigMixin, | ||
): | ||
connect_string: str = Field( | ||
default="couchbases://127.0.0.1", description="Couchbase connect string." | ||
) | ||
username: str = Field(default="Administrator", description="Couchbase username.") | ||
password: str = Field(default="password", description="Couchbase password.") | ||
cluster_name: str = Field(default="cbdb", description="Couchbase cluster name.") | ||
kv_timeout: Optional[PositiveInt] = Field(default=5, description="KV timeout.") | ||
query_timeout: Optional[PositiveInt] = Field( | ||
default=75, description="Query timeout." | ||
) | ||
schema_sample_size: Optional[PositiveInt] = Field( | ||
default=10000, description="Number of documents to sample." | ||
) | ||
options: dict = Field( | ||
default={}, description="Additional options to pass to `ClusterOptions()`." | ||
) | ||
maxSchemaSize: Optional[PositiveInt] = Field( | ||
default=300, description="Maximum number of fields to include in the schema." | ||
) | ||
keyspace_pattern: AllowDenyPattern = Field( | ||
default=AllowDenyPattern.allow_all(), | ||
description="regex patterns for keyspace to filter in ingestion.", | ||
) | ||
domain: Dict[str, AllowDenyPattern] = Field( | ||
default=dict(), | ||
description="regex patterns for keyspaces to filter to assign domain_key.", | ||
) | ||
|
||
# Profiling | ||
profile_pattern: AllowDenyPattern = Field( | ||
default=AllowDenyPattern.allow_all(), | ||
description="Regex patterns for tables to profile", | ||
) | ||
|
||
profiling: GEProfilingConfig = Field( | ||
default=GEProfilingConfig(), | ||
description="Configuration for profiling", | ||
) | ||
|
||
def is_profiling_enabled(self) -> bool: | ||
return self.profiling.enabled and is_profiling_enabled( | ||
self.profiling.operation_config | ||
) | ||
|
||
|
||
@dataclass | ||
class CouchbaseDBSourceReport( | ||
StaleEntityRemovalSourceReport, ClassificationReportMixin, IngestionStageReport | ||
): | ||
filtered: List[str] = field(default_factory=list) | ||
documents_processed: int = 0 | ||
keyspaces_profiled: int = 0 | ||
collection_aggregate_timer: PerfTimer = field(default_factory=PerfTimer) | ||
profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict) | ||
profiling_skipped_table_profile_pattern: TopKDict[str, int] = field( | ||
default_factory=int_top_k_dict | ||
) | ||
|
||
def report_dropped(self, name: str) -> None: | ||
self.filtered.append(name) | ||
|
||
def report_entity_profiled(self) -> None: | ||
self.keyspaces_profiled += 1 | ||
|
||
|
||
@dataclass | ||
class CouchbaseEntity: | ||
dataset: str | ||
schema: dict | ||
count: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general, I prefer to have the UI forms get added in a separate PR.
Small PRs are easier to review - and in this case, I like to have the UI changes reviewed by someone who does more frontend dev than I do