Skip to content

Commit

Permalink
[apache#2251] feat(PyClient): Add Metalake APIs in Gravitino Python c…
Browse files Browse the repository at this point in the history
…lient (apache#2842)

### What changes were proposed in this pull request?

Add GravitinoAdminClient, support below functions:
1. create_metalake()
2. alter_metalake()
3. drop_metalake()
4. list_metalakes()

### Why are the changes needed?

Fix: apache#2251

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

CI Passed
  • Loading branch information
xunliu authored Apr 9, 2024
1 parent 2fb6189 commit 1a74d6b
Show file tree
Hide file tree
Showing 35 changed files with 1,509 additions and 65 deletions.
2 changes: 1 addition & 1 deletion clients/client-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

1. Install dependency
```bash
pip install -e .[dev]
pip install -e '.[dev]'
```

2. Run tests
Expand Down
23 changes: 21 additions & 2 deletions clients/client-python/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ fun deleteCacheDir(targetDir: String) {
}
}

fun gravitinoServer(operation: String) {
val process = ProcessBuilder("${project.rootDir.path}/distribution/package/bin/gravitino.sh", operation).start()
val exitCode = process.waitFor()
if (exitCode == 0) {
val currentContext = process.inputStream.bufferedReader().readText()
println("Current docker context is: $currentContext")
} else {
println("checkOrbStackStatus Command execution failed with exit code $exitCode")
}
}

tasks {
val pipInstall by registering(VenvTask::class) {
venvExec = "pip"
Expand All @@ -35,12 +46,20 @@ tasks {
}

val integrationTest by registering(VenvTask::class) {
doFirst() {
gravitinoServer("start")
}

dependsOn(pipInstall)
venvExec = "python"
args = listOf("-m", "unittest", "tests/test_integration_gravitino_client.py")
args = listOf("-m", "unittest")
workingDir = projectDir.resolve(".")
environment = mapOf("PROJECT_VERSION" to project.version,
"GRAVITINO_HOME" to project.rootDir.path + "/distribution/package")
"GRADLE_START_GRAVITINO" to "True")

doLast {
gravitinoServer("stop")
}
}

val build by registering(VenvTask::class) {
Expand Down
4 changes: 4 additions & 0 deletions clients/client-python/gravitino/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
107 changes: 107 additions & 0 deletions clients/client-python/gravitino/client/gravitino_admin_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
import logging
from typing import List, Dict

from gravitino.client.gravitino_client_base import GravitinoClientBase
from gravitino.client.gravitino_metalake import GravitinoMetalake
from gravitino.dto.dto_converters import DTOConverters
from gravitino.dto.requests.metalake_create_request import MetalakeCreateRequest
from gravitino.dto.requests.metalake_updates_request import MetalakeUpdatesRequest
from gravitino.dto.responses.drop_response import DropResponse
from gravitino.dto.responses.metalake_list_response import MetalakeListResponse
from gravitino.dto.responses.metalake_response import MetalakeResponse
from gravitino.meta_change import MetalakeChange
from gravitino.name_identifier import NameIdentifier

logger = logging.getLogger(__name__)


class GravitinoAdminClient(GravitinoClientBase):
"""
Gravitino Client for the administrator to interact with the Gravitino API, allowing the client to list, load, create, and alter Metalakes.
Normal users should use {@link GravitinoClient} to connect with the Gravitino server.
"""

def __init__(self, uri): # TODO: AuthDataProvider authDataProvider
super().__init__(uri)

def list_metalakes(self) -> List[GravitinoMetalake]:
"""
Retrieves a list of Metalakes from the Gravitino API.
Return:
An array of GravitinoMetalake objects representing the Metalakes.
"""
resp = self.rest_client.get(self.API_METALAKES_LIST_PATH)
metalake_list_resp = MetalakeListResponse.from_json(resp.body, infer_missing=True)
metalake_list_resp.validate()

return [GravitinoMetalake.build(o, self.rest_client) for o in metalake_list_resp.metalakes]

def create_metalake(self, ident: NameIdentifier, comment: str, properties: Dict[str, str]) -> GravitinoMetalake:
"""
Creates a new Metalake using the Gravitino API.
Args:
ident: The identifier of the new Metalake.
comment: The comment for the new Metalake.
properties: The properties of the new Metalake.
Return:
A GravitinoMetalake instance representing the newly created Metalake.
TODO: @throws MetalakeAlreadyExistsException If a Metalake with the specified identifier already exists.
"""
NameIdentifier.check_metalake(ident)

req = MetalakeCreateRequest(ident.name, comment, properties)
req.validate()

resp = self.rest_client.post(self.API_METALAKES_LIST_PATH, req)
metalake_response = MetalakeResponse.from_json(resp.body, infer_missing=True)
metalake_response.validate()

return GravitinoMetalake.build(metalake_response.metalake, self.rest_client)

def alter_metalake(self, ident: NameIdentifier, *changes: MetalakeChange) -> GravitinoMetalake:
"""
Alters a specific Metalake using the Gravitino API.
Args:
ident: The identifier of the Metalake to be altered.
changes: The changes to be applied to the Metalake.
Return:
A GravitinoMetalake instance representing the updated Metalake.
TODO: @throws NoSuchMetalakeException If the specified Metalake does not exist.
TODO: @throws IllegalArgumentException If the provided changes are invalid or not applicable.
"""
NameIdentifier.check_metalake(ident)

reqs = [DTOConverters.to_metalake_update_request(change) for change in changes]
updates_request = MetalakeUpdatesRequest(reqs)
updates_request.validate()

resp = self.rest_client.put(self.API_METALAKES_IDENTIFIER_PATH + ident.name,
updates_request) # , MetalakeResponse, {}, ErrorHandlers.metalake_error_handler())
metalake_response = MetalakeResponse.from_json(resp.body)
metalake_response.validate()

return GravitinoMetalake.build(metalake_response.metalake, self.rest_client)

def drop_metalake(self, ident: NameIdentifier) -> bool:
"""
Drops a specific Metalake using the Gravitino API.
Args:
ident: The identifier of the Metalake to be dropped.
Return:
True if the Metalake was successfully dropped, false otherwise.
"""
NameIdentifier.check_metalake(ident)

try:
resp = self.rest_client.delete(self.API_METALAKES_IDENTIFIER_PATH + ident.name)
dropResponse = DropResponse.from_json(resp.body)

return dropResponse.dropped()

except Exception as e:
logger.warning(f"Failed to drop metadata ", e)
return False
64 changes: 64 additions & 0 deletions clients/client-python/gravitino/client/gravitino_client_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
import logging

from gravitino.client.gravitino_metalake import GravitinoMetalake
from gravitino.client.gravitino_version import GravitinoVersion
from gravitino.dto.responses.metalake_response import MetalakeResponse
from gravitino.name_identifier import NameIdentifier
from gravitino.utils import HTTPClient

logger = logging.getLogger(__name__)

class GravitinoClientBase:
"""
Base class for Gravitino Java client;
It uses an underlying {@link RESTClient} to send HTTP requests and receive responses from the API.
"""
rest_client: HTTPClient # The REST client to communicate with the REST server
API_METALAKES_LIST_PATH = "api/metalakes" # The REST API path for listing metalakes
API_METALAKES_IDENTIFIER_PATH = f"{API_METALAKES_LIST_PATH}/" # The REST API path prefix for load a specific metalake

def __init__(self, uri: str):
self.rest_client = HTTPClient(uri)

def load_metalake(self, ident: NameIdentifier) -> GravitinoMetalake:
"""Loads a specific Metalake from the Gravitino API.
Args:
ident The identifier of the Metalake to be loaded.
Return:
A GravitinoMetalake instance representing the loaded Metalake.
Raises:
NoSuchMetalakeException If the specified Metalake does not exist.
"""
NameIdentifier.check_metalake(ident)

resp = self.rest_client.get(GravitinoClientBase.API_METALAKES_IDENTIFIER_PATH + ident.name)
metalake_response = MetalakeResponse.from_json(resp.body)
metalake_response.validate()

return GravitinoMetalake.build(metalake_response.metalake, self.rest_client)

def get_version(self) -> GravitinoVersion:
"""Retrieves the version of the Gravitino API.
Return:
A GravitinoVersion instance representing the version of the Gravitino API.
"""
resp = self.rest_client.get("api/version")
resp.validate()

return GravitinoVersion(resp.get_version())

def close(self):
"""Closes the GravitinoClient and releases any underlying resources."""
if self.rest_client is not None:
try:
self.rest_client.close()
except Exception as e:
logger.warning("Failed to close the HTTP REST client", e)
28 changes: 28 additions & 0 deletions clients/client-python/gravitino/client/gravitino_metalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
from typing import Dict

from gravitino.dto.audit_dto import AuditDTO
from gravitino.dto.metalake_dto import MetalakeDTO
from gravitino.utils import HTTPClient


class GravitinoMetalake(MetalakeDTO):
"""
Gravitino Metalake is the top-level metadata repository for users. It contains a list of catalogs
as sub-level metadata collections. With {@link GravitinoMetalake}, users can list, create, load,
alter and drop a catalog with specified identifier.
"""
restClient: HTTPClient

def __init__(self, name: str = None, comment: str = None, properties: Dict[str, str] = None, audit: AuditDTO = None,
rest_client: HTTPClient = None):
super().__init__(name=name, comment=comment, properties=properties, audit=audit)
self.restClient = rest_client

@classmethod
def build(cls, metalake: MetalakeDTO = None, client: HTTPClient = None):
return cls(name=metalake.name, comment=metalake.comment, properties=metalake.properties,
audit=metalake.audit, rest_client=client)
14 changes: 14 additions & 0 deletions clients/client-python/gravitino/client/gravitino_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
from dataclasses import dataclass

from gravitino.dto.version_dto import VersionDTO


@dataclass
class GravitinoVersion(VersionDTO):
"""Gravitino version information."""
def __init__(self, versionDTO):
super().__init__(versionDTO.version, versionDTO.compile_date, versionDTO.git_commit)
4 changes: 4 additions & 0 deletions clients/client-python/gravitino/dto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
33 changes: 33 additions & 0 deletions clients/client-python/gravitino/dto/audit_dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
from dataclasses import dataclass, field
from typing import Optional

from dataclasses_json import DataClassJsonMixin, config


@dataclass
class AuditDTO(DataClassJsonMixin):
"""Data transfer object representing audit information."""

creator: Optional[str]
"""The creator of the audit."""

create_time: Optional[str] = field(metadata=config(field_name='createTime')) # TODO: Can't deserialized datetime from JSON
"""The create time of the audit."""

last_modifier: Optional[str] = field(metadata=config(field_name='lastModifier'))
"""The last modifier of the audit."""

last_modified_time: Optional[str] = field(
metadata=config(field_name='lastModifiedTime')) # TODO: Can't deserialized datetime from JSON
"""The last modified time of the audit."""

def __init__(self, creator: str = None, create_time: str = None, last_modifier: str = None,
last_modified_time: str = None):
self.creator: str = creator
self.create_time: str = create_time
self.last_modifier: str = last_modifier
self.last_modified_time: str = last_modified_time
24 changes: 24 additions & 0 deletions clients/client-python/gravitino/dto/dto_converters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
from gravitino.dto.requests.metalake_update_request import MetalakeUpdateRequest
from gravitino.meta_change import MetalakeChange


class DTOConverters:
"""Utility class for converting between DTOs and domain objects."""

@staticmethod
def to_metalake_update_request(change: MetalakeChange) -> object:
# Assuming MetalakeUpdateRequest has similar nested class structure for requests
if isinstance(change, MetalakeChange.RenameMetalake):
return MetalakeUpdateRequest.RenameMetalakeRequest(change.newName)
elif isinstance(change, MetalakeChange.UpdateMetalakeComment):
return MetalakeUpdateRequest.UpdateMetalakeCommentRequest(change.newComment)
elif isinstance(change, MetalakeChange.SetProperty):
return MetalakeUpdateRequest.SetMetalakePropertyRequest(change.property, change.value)
elif isinstance(change, MetalakeChange.RemoveProperty):
return MetalakeUpdateRequest.RemoveMetalakePropertyRequest(change.property)
else:
raise ValueError(f"Unknown change type: {type(change).__name__}")
51 changes: 51 additions & 0 deletions clients/client-python/gravitino/dto/metalake_dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
from dataclasses import dataclass
from typing import Optional, Dict

from dataclasses_json import DataClassJsonMixin

from gravitino.dto.audit_dto import AuditDTO


@dataclass
class MetalakeDTO(DataClassJsonMixin):
"""Represents a Metalake Data Transfer Object (DTO) that implements the Metalake interface."""

name: str
"""The name of the Metalake DTO."""

comment: Optional[str]
"""The comment of the Metalake DTO."""

properties: Optional[Dict[str, str]] = None
"""The properties of the Metalake DTO."""

audit: AuditDTO = None
"""The audit information of the Metalake DTO."""

def __init__(self, name: str = None, comment: str = None, properties: Dict[str, str] = None,
audit: AuditDTO = None):
self.name = name
self.comment = comment
self.properties = properties
self.audit = audit

def equals(self, other):
if self == other:
return True
if not isinstance(other, MetalakeDTO):
return False
return self.name == other.name and self.comment == other.comment and \
self.property_equal(self.properties, other.properties) and self.audit == other.audit

def property_equal(self, p1, p2):
if p1 is None and p2 is None:
return True
if p1 is not None and not p1 and p2 is None:
return True
if p2 is not None and not p2 and p1 is None:
return True
return p1 == p2
4 changes: 4 additions & 0 deletions clients/client-python/gravitino/dto/requests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""
Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2.
"""
Loading

0 comments on commit 1a74d6b

Please sign in to comment.