Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): Couchbase ingestion source #12345

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import athenaLogo from '../../../../images/awsathenalogo.png';
import mssqlLogo from '../../../../images/mssqllogo.png';
import clickhouseLogo from '../../../../images/clickhouselogo.png';
import cockroachdbLogo from '../../../../images/cockroachdblogo.png';
import couchbaseLogo from '../../../../images/couchbaselogo.png';
import trinoLogo from '../../../../images/trinologo.png';
import dbtLogo from '../../../../images/dbtlogo.png';
import dremioLogo from '../../../../images/dremiologo.png';
Expand Down Expand Up @@ -53,6 +54,8 @@ export const CLICKHOUSE_USAGE = 'clickhouse-usage';
export const CLICKHOUSE_URN = `urn:li:dataPlatform:${CLICKHOUSE}`;
export const COCKROACHDB = 'cockroachdb';
export const COCKROACHDB_URN = `urn:li:dataPlatform:${COCKROACHDB}`;
export const COUCHBASE = 'couchbase';
export const COUCHBASE_URN = `urn:li:dataPlatform:${COUCHBASE}`;
export const DBT = 'dbt';
export const DBT_URN = `urn:li:dataPlatform:${DBT}`;
export const DREMIO = 'dremio';
Expand Down Expand Up @@ -147,6 +150,7 @@ export const PLATFORM_URN_TO_LOGO = {
[BIGQUERY_URN]: bigqueryLogo,
[CLICKHOUSE_URN]: clickhouseLogo,
[COCKROACHDB_URN]: cockroachdbLogo,
[COUCHBASE_URN]: couchbaseLogo,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, I prefer to have the UI forms get added in a separate PR.

Small PRs are easier to review - and in this case, I like to have the UI changes reviewed by someone who does more frontend dev than I do

[DBT_URN]: dbtLogo,
[DREMIO_URN]: dremioLogo,
[DRUID_URN]: druidLogo,
Expand Down
8 changes: 8 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -333,5 +333,13 @@
"description": "Import Nodes and Relationships from Neo4j.",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/neo4j/",
"recipe": "source:\n type: 'neo4j'\n config:\n uri: 'neo4j+ssc://host:7687'\n username: 'neo4j'\n password: 'password'\n env: 'PROD'\n\nsink:\n type: \"datahub-rest\"\n config:\n server: 'http://localhost:8080'"
},
{
"urn": "urn:li:dataPlatform:couchbase",
"name": "couchbase",
"displayName": "Couchbase",
"description": "Import data from Couchbase Server and Capella.",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/couchbase",
"recipe": "source:\n type: couchbase\n config:\n connect_string: 'couchbases://127.0.0.1'\n username: 'Administrator'\n password: 'password'\n cluster_name: 'testdb'\n classification:\n enabled: true\n classifiers:\n - type: datahub\n profiling:\n enabled: true"
}
]
17 changes: 17 additions & 0 deletions metadata-ingestion/docs/sources/couchbase/couchbase_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
source:
type: couchbase
config:
connect_string: "couchbases://127.0.0.1"
username: "Administrator"
password: "password"
cluster_name: "testdb"
classification:
enabled: true
classifiers:
- type: datahub
profiling:
enabled: true
profile_nested_fields: true

sink:
# sink configs
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@
"clickhouse": sql_common | clickhouse_common,
"clickhouse-usage": sql_common | usage_common | clickhouse_common,
"cockroachdb": sql_common | postgres_common | {"sqlalchemy-cockroachdb<2.0.0"},
"couchbase": {"couchbase>=4.3.4"},
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"delta-lake": {*data_lake_profiling, *delta_lake},
Expand Down Expand Up @@ -622,6 +623,7 @@
"clickhouse",
"clickhouse-usage",
"cockroachdb",
"couchbase",
"delta-lake",
"dremio",
"druid",
Expand Down Expand Up @@ -720,6 +722,7 @@
"clickhouse = datahub.ingestion.source.sql.clickhouse:ClickHouseSource",
"clickhouse-usage = datahub.ingestion.source.usage.clickhouse_usage:ClickHouseUsageSource",
"cockroachdb = datahub.ingestion.source.sql.cockroachdb:CockroachDBSource",
"couchbase = datahub.ingestion.source.couchbase.couchbase_main:CouchbaseDBSource",
"delta-lake = datahub.ingestion.source.delta_lake:DeltaLakeSource",
"s3 = datahub.ingestion.source.s3:S3Source",
"dbt = datahub.ingestion.source.dbt.dbt_core:DBTCoreSource",
Expand Down
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first source that really uses async

In general, we should almost never be using "bare" asyncio. instead, the anyio library is preferred. We also should not really have references to the event loop all over the place - that tends to be an anti-pattern. Methods that need an event loop should be async. For cpu-bound operations, we can use the asyncer library to bridge between asyncio tasks and worker threads.

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import asyncio
import logging
from typing import AsyncGenerator, List

from acouchbase.collection import AsyncCollection
from acouchbase.scope import AsyncScope
from couchbase.exceptions import DocumentNotFoundException
from couchbase.result import GetResult

from datahub.ingestion.source.couchbase.couchbase_connect import CouchbaseConnect
from datahub.ingestion.source.couchbase.couchbase_sql import (
SELECT_COLLECTION_COUNT,
SELECT_DOC_IDS,
)
from datahub.ingestion.source.couchbase.retry import retry

logger = logging.getLogger(__name__)


class CouchbaseAggregate:
scope: AsyncScope
collection: AsyncCollection

def __init__(
self,
connector: CouchbaseConnect,
keyspace: str,
batch_size: int = 100,
max_sample_size: int = 0,
):
self.connector = connector
self.keyspace = keyspace
self.batch_size = batch_size
self.max_sample_size = max_sample_size

if batch_size <= 0:
raise ValueError("batch_size must be greater than 0")

Check warning on line 37 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L37

Added line #L37 was not covered by tests

async def init(self):
await self.connector.cluster_init_async()
self.scope, self.collection = await self.connector.connect_keyspace_async(
self.keyspace
)

@retry(factor=0.05)
async def collection_get(self, key: str) -> dict:
try:
result: GetResult = await self.collection.get(key)
return result.content_as[dict]
except DocumentNotFoundException:
logger.warning(f"Document ID {key} not found")
return {}

Check warning on line 52 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L50-L52

Added lines #L50 - L52 were not covered by tests

@retry(factor=0.05)
async def run_query(
self, query: str, offset: int = 0, limit: int = 0
) -> List[dict]:
if offset > 0:
query += f" OFFSET {offset}"

Check warning on line 59 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L59

Added line #L59 was not covered by tests
if limit > 0:
query += f" LIMIT {limit}"
result = self.scope.query(query)
documents = [row async for row in result]
return documents

Check warning on line 64 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L64

Added line #L64 was not covered by tests

async def collection_count(self) -> int:
query = SELECT_COLLECTION_COUNT.format(self.connector.collection_name)

Check warning on line 67 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L67

Added line #L67 was not covered by tests

result = await self.run_query(query)
document = [row for row in result]
return document[0].get("count") if document else 0

Check warning on line 71 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L69-L71

Added lines #L69 - L71 were not covered by tests

async def get_keys(self):
query = SELECT_DOC_IDS.format(self.connector.collection_name)

results = await self.run_query(query, limit=self.max_sample_size)
for row in results:
yield row.get("id")

async def get_key_chunks(self) -> AsyncGenerator[List[str], None]:
keys = []
async for key in self.get_keys():
keys.append(key)
if len(keys) == self.batch_size:
yield keys
keys.clear()
if len(keys) > 0:
yield keys

Check warning on line 88 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L88

Added line #L88 was not covered by tests

async def get_documents(self) -> AsyncGenerator[List[dict], None]:
tasks = []
await self.init()

async for chunk in self.get_key_chunks():
for key in chunk:
tasks.append(asyncio.create_task(self.collection_get(key)))

errors = 0
if len(tasks) > 0:
batch = []
await asyncio.sleep(0)
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logger.error(result)
errors += 1

Check warning on line 106 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L105-L106

Added lines #L105 - L106 were not covered by tests
elif isinstance(result, dict):
if result:
batch.append(result)
yield batch

if errors > 0:
raise RuntimeError(f"batch get: {errors} errors retrieving documents")

Check warning on line 113 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_aggregate.py#L113

Added line #L113 was not covered by tests

tasks.clear()
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Tuple

from pydantic import PositiveInt
from pydantic.fields import Field

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.ingestion.glossary.classification_mixin import (
ClassificationReportMixin,
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulIngestionConfigBase,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulProfilingConfigMixin,
)
from datahub.ingestion.source_config.operation_config import is_profiling_enabled
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict, int_top_k_dict


def flatten(
field_path: List[str], data: Any, truncate: bool = True
) -> Iterable[Tuple[str, Any]]:
if isinstance(data, dict):
for key, value in data.items():
field_path.append(key)
yield from flatten(field_path, value)
if isinstance(value, dict) or isinstance(value, list):
del field_path[-1]
elif isinstance(data, list):
for value in data:
yield from flatten(field_path, value, False)
else:
yield ".".join(field_path), data
if len(field_path) > 0 and truncate:
del field_path[-1]


class CouchbaseDBConfig(
PlatformInstanceConfigMixin,
EnvConfigMixin,
StatefulIngestionConfigBase,
ClassificationSourceConfigMixin,
StatefulProfilingConfigMixin,
):
connect_string: str = Field(
default="couchbases://127.0.0.1", description="Couchbase connect string."
)
username: str = Field(default="Administrator", description="Couchbase username.")
password: str = Field(default="password", description="Couchbase password.")
cluster_name: str = Field(default="cbdb", description="Couchbase cluster name.")
kv_timeout: Optional[PositiveInt] = Field(default=5, description="KV timeout.")
query_timeout: Optional[PositiveInt] = Field(
default=75, description="Query timeout."
)
schema_sample_size: Optional[PositiveInt] = Field(
default=10000, description="Number of documents to sample."
)
options: dict = Field(
default={}, description="Additional options to pass to `ClusterOptions()`."
)
maxSchemaSize: Optional[PositiveInt] = Field(
default=300, description="Maximum number of fields to include in the schema."
)
keyspace_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for keyspace to filter in ingestion.",
)
domain: Dict[str, AllowDenyPattern] = Field(
default=dict(),
description="regex patterns for keyspaces to filter to assign domain_key.",
)

# Profiling
profile_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for tables to profile",
)

profiling: GEProfilingConfig = Field(
default=GEProfilingConfig(),
description="Configuration for profiling",
)

def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)


@dataclass
class CouchbaseDBSourceReport(
StaleEntityRemovalSourceReport, ClassificationReportMixin, IngestionStageReport
):
filtered: List[str] = field(default_factory=list)
documents_processed: int = 0
keyspaces_profiled: int = 0
collection_aggregate_timer: PerfTimer = field(default_factory=PerfTimer)
profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict)
profiling_skipped_table_profile_pattern: TopKDict[str, int] = field(
default_factory=int_top_k_dict
)

def report_dropped(self, name: str) -> None:
self.filtered.append(name)

def report_entity_profiled(self) -> None:
self.keyspaces_profiled += 1


@dataclass
class CouchbaseEntity:
dataset: str
schema: dict
count: int
Loading
Loading