Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add AlluxioClientConfig #72

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,26 @@ pip install dist/alluxio-<alluxio_client_version>-py3-none-any.whl
Import and initialize the `AlluxioClient` class:
```
# Minimum setup for Alluxio with ETCD membership service
alluxio_client = AlluxioClient(etcd_hosts="localhost")
alluxio_client = AlluxioClient(AlluxioClientConfig(etcd_hosts="localhost"))

# Minimum setup for Alluxio with user-defined worker list
alluxio_client = AlluxioClient(worker_hosts="worker_host1,worker_host2")
alluxio_client = AlluxioClient(AlluxioClientConfig(worker_hosts="worker_host1,worker_host2"))

# Minimum setup for Alluxio with self-defined page size
alluxio_client = AlluxioClient(
alluxio_client = AlluxioClient(AlluxioClientConfig(
etcd_hosts="localhost",
options={"alluxio.worker.page.store.page.size": "20MB"}
)
))
# Minimum setup for Alluxio with ETCD membership service with username/password
options = {
"alluxio.etcd.username": "my_user",
"alluxio.etcd.password": "my_password",
"alluxio.worker.page.store.page.size": "20MB" # Any other options should be included here
}
alluxio_client = AlluxioClient(
alluxio_client = AlluxioClient(AlluxioClientConfig(
etcd_hosts="localhost",
options=options
)
))
```

### Load Operations
Expand Down
2 changes: 2 additions & 0 deletions alluxio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from alluxio.config import AlluxioClientConfig
from alluxio.core import AlluxioAsyncFileSystem
from alluxio.core import AlluxioClient
from alluxio.core import AlluxioPathStatus

__all__ = [
"AlluxioClient",
"AlluxioClientConfig",
"AlluxioAsyncFileSystem",
"AlluxioPathStatus",
]
126 changes: 126 additions & 0 deletions alluxio/annotations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Optional


def PublicAPI(*args, **kwargs):
"""Annotation for documenting public APIs.

Public APIs are classes and methods exposed to end users of Alluxio Client.

If ``stability="alpha"``, the API can be used by advanced users who are
tolerant to and expect breaking changes.

If ``stability="beta"``, the API is still public and can be used by early
users, but are subject to change.

If ``stability="stable"``, the APIs will remain backwards compatible across
minor Alluxio releases (e.g., Alluxio 1.1 -> 1.0).

Args:
stability: One of {"stable", "beta", "alpha"}.

Examples:
>>> from alluxio.annotations import PublicAPI
>>> @PublicAPI
... def func(x):
... return x

>>> @PublicAPI(stability="beta")
... def func(y):
... return y
"""
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
return PublicAPI(stability="stable")(args[0])

if "stability" in kwargs:
stability = kwargs["stability"]
assert stability in ["stable", "beta", "alpha"], stability
elif kwargs:
raise ValueError("Unknown kwargs: {}".format(kwargs.keys()))
else:
stability = "stable"

def wrap(obj):
if stability in ["alpha", "beta"]:
message = (
f"**PublicAPI ({stability}):** This API is in {stability} "
"and may change before becoming stable."
)
_append_doc(obj, message=message)

_mark_annotated(obj)
return obj

return wrap


def _append_doc(obj, *, message: str, directive: Optional[str] = None) -> str:
if not obj.__doc__:
obj.__doc__ = ""

obj.__doc__ = obj.__doc__.rstrip()

indent = _get_indent(obj.__doc__)
obj.__doc__ += "\n\n"

if directive is not None:
obj.__doc__ += f"{' ' * indent}.. {directive}::\n\n"

message = message.replace("\n", "\n" + " " * (indent + 4))
obj.__doc__ += f"{' ' * (indent + 4)}{message}"
else:
message = message.replace("\n", "\n" + " " * (indent + 4))
obj.__doc__ += f"{' ' * indent}{message}"
obj.__doc__ += f"\n{' ' * indent}"


def _mark_annotated(obj) -> None:
# Set magic token for check_api_annotations linter.
if hasattr(obj, "__name__"):
obj._annotated = obj.__name__


def _get_indent(docstring: str) -> int:
"""

Example:
>>> def f():
... '''Docstring summary.'''
>>> f.__doc__
'Docstring summary.'
>>> _get_indent(f.__doc__)
0

>>> def g(foo):
... '''Docstring summary.
...
... Args:
... foo: Does bar.
... '''
>>> g.__doc__
'Docstring summary.\\n\\n Args:\\n foo: Does bar.\\n '
>>> _get_indent(g.__doc__)
4

>>> class A:
... def h():
... '''Docstring summary.
...
... Returns:
... None.
... '''
>>> A.h.__doc__
'Docstring summary.\\n\\n Returns:\\n None.\\n '
>>> _get_indent(A.h.__doc__)
8
"""
if not docstring:
return 0

non_empty_lines = list(filter(bool, docstring.splitlines()))
if len(non_empty_lines) == 1:
# Docstring contains summary only.
return 0

# The docstring summary isn't indented, so check the indentation of the second
# non-empty line.
return len(non_empty_lines[1]) - len(non_empty_lines[1].lstrip())
93 changes: 93 additions & 0 deletions alluxio/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Optional

import humanfriendly

from alluxio.annotations import PublicAPI
from alluxio.const import ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE
from alluxio.const import ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE
from alluxio.const import ALLUXIO_PAGE_SIZE_DEFAULT_VALUE
from alluxio.const import ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE


@PublicAPI(stability="beta")
class AlluxioClientConfig:
"""
Class responsible for creating the configuration for Alluxio Client.
"""

def __init__(
self,
etcd_hosts: Optional[str] = None,
worker_hosts: Optional[str] = None,
etcd_port=2379,
worker_http_port=ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE,
etcd_refresh_workers_interval=120,
page_size=ALLUXIO_PAGE_SIZE_DEFAULT_VALUE,
hash_node_per_worker=ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE,
cluster_name=ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE,
etcd_username: Optional[str] = None,
etcd_password: Optional[str] = None,
concurrency=64,
):
"""
Initializes Alluxio client configuration.

Args:
etcd_hosts (Optional[str], optional): The hostnames of ETCD to get worker addresses from
in 'host1,host2,host3' format. Either etcd_hosts or worker_hosts should be provided, not both.
worker_hosts (Optional[str], optional): The worker hostnames in 'host1,host2,host3' format.
Either etcd_hosts or worker_hosts should be provided, not both.
concurrency (int, optional): The maximum number of concurrent operations for HTTP requests, default to 64.
etcd_port (int, optional): The port of each etcd server.
worker_http_port (int, optional): The port of the HTTP server on each Alluxio worker node.
etcd_refresh_workers_interval (int, optional): The interval to refresh worker list from ETCD membership service periodically.
All negative values mean the service is disabled.
"""
if not (etcd_hosts or worker_hosts):
raise ValueError(
"Must supply either 'etcd_hosts' or 'worker_hosts'"
)
if etcd_hosts and worker_hosts:
raise ValueError(
"Supply either 'etcd_hosts' or 'worker_hosts', not both"
)
if not isinstance(etcd_port, int) or not (1 <= etcd_port <= 65535):
raise ValueError(
"'etcd_port' should be an integer in the range 1-65535"
)
if not isinstance(worker_http_port, int) or not (
1 <= worker_http_port <= 65535
):
raise ValueError(
"'worker_http_port' should be an integer in the range 1-65535"
)
if not isinstance(concurrency, int) or concurrency <= 0:
raise ValueError("'concurrency' should be a positive integer")
if not isinstance(etcd_refresh_workers_interval, int):
raise ValueError(
"'etcd_refresh_workers_interval' should be an integer"
)
self.etcd_hosts = etcd_hosts
self.worker_hosts = worker_hosts
self.etcd_port = etcd_port
self.worker_http_port = worker_http_port
self.etcd_refresh_workers_interval = etcd_refresh_workers_interval
if (
not isinstance(hash_node_per_worker, int)
or hash_node_per_worker <= 0
):
raise ValueError(
"'hash_node_per_worker' should be a positive integer"
)

self.hash_node_per_worker = hash_node_per_worker
self.page_size = humanfriendly.parse_size(page_size, binary=True)
self.cluster_name = cluster_name

if (etcd_username is None) != (etcd_password is None):
raise ValueError(
"Both ETCD username and password must be set or both should be unset."
)
self.etcd_username = etcd_username
self.etcd_password = etcd_password
self.concurrency = concurrency
6 changes: 0 additions & 6 deletions alluxio/const.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
ALLUXIO_CLUSTER_NAME_KEY = "alluxio.cluster.name"
ALLUXIO_CLUSTER_NAME_DEFAULT_VALUE = "DefaultAlluxioCluster"
ALLUXIO_ETCD_USERNAME_KEY = "alluxio.etcd.username"
ALLUXIO_ETCD_PASSWORD_KEY = "alluxio.etcd.password"
ALLUXIO_PAGE_SIZE_KEY = "alluxio.worker.page.store.page.size"
ALLUXIO_PAGE_SIZE_DEFAULT_VALUE = "1MB"
ALLUXIO_HASH_NODE_PER_WORKER_KEY = (
"alluxio.user.consistent.hash.virtual.node.count.per.worker"
)
ALLUXIO_WORKER_HTTP_SERVER_PORT_KEY = "alluxio.worker.http.server.port"
ALLUXIO_WORKER_HTTP_SERVER_PORT_DEFAULT_VALUE = 28080
ALLUXIO_HASH_NODE_PER_WORKER_DEFAULT_VALUE = 5
Expand Down
Loading
Loading