-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: alluxio-py support alluxio and oss filesystem #76
base: main
Are you sure you want to change the base?
Changes from all commits
7c9d0cc
e09ac51
df26906
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
import logging | ||
|
||
import yaml | ||
import os | ||
|
||
from alluxio.posix.ufs.alluxio import validate_alluxio_config, update_alluxio_config | ||
from alluxio.posix.ufs.oss import validate_oss_config, update_oss_config | ||
from alluxio.posix.const import Constants | ||
from alluxio.posix.exception import ConfigMissingError, ConfigInvalidError | ||
|
||
|
||
class ConfigManager: | ||
def __init__(self): | ||
self.logger = logging.getLogger(__name__) | ||
logging.basicConfig(level=logging.INFO) | ||
current_dir = os.path.dirname(os.path.abspath(__file__)) | ||
self.config_file_path = os.getenv('ALLUXIO_PY_CONFIG_FILE_PATH', os.path.join(current_dir, 'config', | ||
'ufs_config.yaml')) | ||
self.config_data = self._load_config() | ||
self.validation_functions = { | ||
Constants.OSS_FILESYSTEM_TYPE: validate_oss_config, | ||
Constants.ALLUXIO_FILESYSTEM_TYPE: validate_alluxio_config | ||
} | ||
|
||
def _load_config(self): | ||
if not os.path.exists(self.config_file_path): | ||
logging.warning(f"Config file not found: {self.config_file_path}. Initializing without loading config.") | ||
return | ||
|
||
with open(self.config_file_path, 'r', encoding='utf-8') as file: | ||
try: | ||
config = yaml.safe_load(file) | ||
return config | ||
except yaml.YAMLError as e: | ||
raise ValueError(f"Error parsing YAML file: {e}") | ||
|
||
def set_config_path(self, new_path): | ||
self.config_file_path = new_path | ||
self.config_data = self._load_config() | ||
print(f"Configuration path updated and config reloaded from {new_path}.") | ||
|
||
def get_config(self, fs_name: str) -> dict: | ||
try: | ||
fs_config = self.config_data[fs_name] | ||
validation_function = self.validation_functions.get(fs_name) | ||
if validation_function is not None: | ||
validation_function(fs_config) | ||
else: | ||
raise ConfigInvalidError(fs_name, f"No validation function for file system: {fs_name}") | ||
return fs_config | ||
except KeyError: | ||
raise ConfigMissingError(fs_name, "FileSystem Configuration is missing") | ||
except ValueError as e: | ||
raise ConfigMissingError(fs_name, str(e)) | ||
|
||
def get_config_fs_list(self) -> list: | ||
if self.config_data is None: | ||
return [] | ||
else: | ||
return self.config_data.keys() | ||
|
||
def update_config(self, fs_type, **kwargs): | ||
if fs_type not in self.get_config_fs_list(): | ||
raise KeyError(f"No configuration available for {fs_type}") | ||
config_data = self.get_config(fs_type) | ||
|
||
if fs_type == Constants.OSS_FILESYSTEM_TYPE: | ||
self.config_data[fs_type] = update_oss_config(config_data, kwargs) | ||
elif fs_type == Constants.ALLUXIO_FILESYSTEM_TYPE: | ||
self.config_data[fs_type] = update_alluxio_config(config_data, kwargs) | ||
elif fs_type == Constants.S3_FILESYSTEM_TYPE: | ||
raise NotImplementedError() | ||
else: | ||
raise ValueError(f"Unsupported file system type: {fs_type}") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
class Constants: | ||
|
||
# general config | ||
BUCKET_NAME = 'bucket_name' | ||
# URL prefix | ||
OSS_URL_PREFIX = "oss://" | ||
ALLUXIO_URL_PREFIX = "alluxio://" | ||
|
||
# enable FileSystem types | ||
LOCAL_FILESYSTEM_TYPE = 'local' | ||
OSS_FILESYSTEM_TYPE = 'oss' | ||
ALLUXIO_FILESYSTEM_TYPE = 'alluxio' | ||
S3_FILESYSTEM_TYPE = 's3' | ||
|
||
# assist constants | ||
ALLUXIO_SEP_SIGN = '_' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import os | ||
from alluxio.posix import fileimpl | ||
from alluxio.posix.config import ConfigManager | ||
from alluxio.posix.delegateFs import DelegateFileSystem | ||
|
||
config_manager = ConfigManager() | ||
delegate_fs = DelegateFileSystem(config_manager) | ||
|
||
os.stat = fileimpl.stat | ||
os.open = fileimpl.open | ||
os.listdir = fileimpl.listdir | ||
os.rename = fileimpl.rename | ||
os.mkdir = fileimpl.mkdir | ||
os.remove = fileimpl.remove | ||
os.rmdir = fileimpl.rmdir | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import logging | ||
import re | ||
import threading | ||
import fsspec | ||
|
||
from alluxio.posix.config import ConfigManager | ||
from alluxio.posix.const import Constants | ||
from alluxio.posix.exception import UnsupportedDelegateFileSystemError | ||
from alluxio.posix.ufs.alluxio import ALLUXIO_ETCD_HOST, ALLUXIO_BACKUP_FS, ALLUXIO_ETCD_ENABLE, ALLUXIO_ENABLE | ||
from alluxio.posix.ufs.oss import OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT | ||
|
||
|
||
class DelegateFileSystem: | ||
instance = None | ||
|
||
def __init__(self, config_manager: ConfigManager): | ||
self.config_manager = config_manager | ||
self.filesystem_storage = FSStorage() | ||
self.filesystem_storage.data = {} | ||
self.enableFileSystems = [Constants.OSS_FILESYSTEM_TYPE, | ||
Constants.ALLUXIO_FILESYSTEM_TYPE, | ||
Constants.S3_FILESYSTEM_TYPE] | ||
self.__init__file__system() | ||
DelegateFileSystem.instance = self | ||
|
||
def __create__file__system(self, fs_name: str): | ||
config = self.config_manager.get_config(fs_name) | ||
if fs_name not in self.enableFileSystems: | ||
raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") | ||
if config[ALLUXIO_ENABLE]: | ||
fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + | ||
Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) | ||
if config.get(ALLUXIO_ETCD_ENABLE): | ||
self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.ALLUXIO_FILESYSTEM_TYPE, | ||
etcd_hosts=config[ALLUXIO_ETCD_HOST], | ||
etcd_port=2379, | ||
target_protocol=config[ | ||
ALLUXIO_BACKUP_FS]) | ||
return self.filesystem_storage.fs[fs_name] | ||
else: | ||
logging.error("Failed to create Alluxio filesystem, using the default %s filesystem.", fs_name) | ||
if fs_name == Constants.OSS_FILESYSTEM_TYPE: | ||
self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.OSS_FILESYSTEM_TYPE, | ||
key=config[OSS_ACCESS_KEY_ID], | ||
secret=config[OSS_ACCESS_KEY_SECRET], | ||
endpoint=config[OSS_ENDPOINT]) | ||
return self.filesystem_storage.fs[fs_name] | ||
elif fs_name == Constants.S3_FILESYSTEM_TYPE: | ||
# todo:新增s3FileSystem | ||
raise NotImplementedError | ||
|
||
return None | ||
|
||
def get_file_system(self, path: str): | ||
fs_name, bucket = self.__parse__url(path) | ||
if fs_name == Constants.LOCAL_FILESYSTEM_TYPE: | ||
return None | ||
config = self.config_manager.get_config(fs_name) | ||
if config[ALLUXIO_ENABLE]: | ||
fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + | ||
Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) | ||
if hasattr(self.filesystem_storage, fs_name): | ||
return self.filesystem_storage.fs[fs_name] | ||
else: | ||
self.__create__file__system(fs_name) | ||
return self.filesystem_storage.fs[fs_name] | ||
|
||
def __init__file__system(self): | ||
fs_list = self.config_manager.get_config_fs_list() | ||
for fs_name in fs_list: | ||
self.__create__file__system(fs_name) | ||
|
||
def __parse__url(self, path: str): | ||
# parse the schema and bucket name in filepath | ||
if (type(path) is not str) or (path.startswith('/')): | ||
return Constants.LOCAL_FILESYSTEM_TYPE, None | ||
pattern = re.compile(r'^(\w+)://([^/]+)/.*') | ||
match = pattern.match(path) | ||
if match: | ||
fs_name, bucket_name = match.groups() | ||
# Check whether the file system corresponding to the path is supported | ||
if fs_name.lower() in self.enableFileSystems: | ||
return fs_name, bucket_name | ||
else: | ||
raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") | ||
else: | ||
return Constants.LOCAL_FILESYSTEM_TYPE, None | ||
|
||
|
||
class FSStorage(threading.local): | ||
def __init__(self): | ||
self.fs = {} | ||
|
||
def __getitem__(self, key): | ||
return self.fs[key] | ||
|
||
def __setitem__(self, key, value): | ||
self.fs[key] = value | ||
|
||
def __delitem__(self, key): | ||
del self.fs[key] | ||
|
||
def __contains__(self, key): | ||
return key in self.fs | ||
|
||
def get(self, key, default=None): | ||
return self.fs.get(key, default) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
class ConfigMissingError(Exception): | ||
def __init__(self, config_key, message="Configuration key is missing"): | ||
self.config_key = config_key | ||
self.message = message | ||
super().__init__(f"{message}: {config_key}") | ||
|
||
|
||
class ConfigInvalidError(Exception): | ||
def __init__(self, config_key, message="Configuration key is invalid, config_key"): | ||
self.config_key = config_key | ||
self.message = message | ||
super().__init__(f"{message}: {config_key}") | ||
|
||
|
||
class UnsupportedDelegateFileSystemError(Exception): | ||
def __init__(self, fs_name, message="FileSystem is not supported, filesystem"): | ||
self.fs_name = fs_name | ||
self.message = message | ||
super().__init__(f"{message}: {fs_name}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import logging | ||
import os | ||
|
||
from alluxio.posix.delegateFs import DelegateFileSystem | ||
|
||
local_open = os.path | ||
local_stat = os.stat | ||
local_listdir = os.listdir | ||
local_rename = os.rename | ||
local_close = os.close | ||
local_mkdir = os.mkdir | ||
local_remove = os.remove | ||
local_rmdir = os.rmdir | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it not better to separate interface and implementation? |
||
|
||
|
||
def open(file: str, mode: str = "r", **kw): | ||
logging.debug("DelegateFileSystem opening file: %s", file) | ||
instance = DelegateFileSystem.instance | ||
fs = instance.get_file_system(file) | ||
if fs: | ||
try: | ||
return fs.open(file, mode, **kw) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Through fspec reflection, parameters can be passed directly into ufs client. Is there no need to verify the legality of config parameters? |
||
except Exception as e: | ||
logging.error( | ||
f"Failed to open file by delegateFileSystem with exception:{e}." | ||
f"Used local filesystem instead.") | ||
return local_open(file, mode, **kw) | ||
return local_open(file, mode, **kw) | ||
|
||
|
||
def stat(path: str, **kw): | ||
instance = DelegateFileSystem.instance | ||
fs = instance.get_file_system(path) | ||
if fs: | ||
try: | ||
logging.debug("DelegateFileSystem getStatus filemeta: %s", path) | ||
return fs.stat(path, **kw) | ||
except Exception as e: | ||
logging.error( | ||
f"Failed to stat file by delegateFileSystem with exception:{e}." | ||
f"Used local filesystem instead.") | ||
return local_stat(path, **kw) | ||
logging.info("LocalFileSystem getStatus filemeta: %s", path) | ||
return local_stat(path, **kw) | ||
|
||
|
||
def listdir(path: str, **kw): | ||
instance = DelegateFileSystem.instance | ||
fs = instance.get_file_system(path) | ||
if fs: | ||
try: | ||
return fs.listdir(path, **kw) | ||
except Exception as e: | ||
logging.error( | ||
f"Failed to list directory by delegateFileSystem with exception: {e}." | ||
f"Used local filesystem instead.") | ||
return local_listdir(path, **kw) | ||
return local_listdir(path, **kw) | ||
|
||
|
||
def mkdir(path: str, mode=0o777, **kw): | ||
instance = DelegateFileSystem.instance | ||
fs = instance.get_file_system(path) | ||
if fs: | ||
try: | ||
return fs.mkdir(path, mode, **kw) | ||
except Exception as e: | ||
logging.error( | ||
f"Failed to make directory by delegateFileSystem with exception: {e}." | ||
f"Used local filesystem instead.") | ||
return local_mkdir(path, mode, **kw) | ||
return local_mkdir(path, mode, **kw) | ||
|
||
|
||
def rmdir(path: str, **kw): | ||
instance = DelegateFileSystem.instance | ||
fs = instance.get_file_system(path) | ||
if fs: | ||
try: | ||
return fs.rmdir(path, **kw) | ||
except Exception as e: | ||
logging.error( | ||
f"Failed to remove directory by delegateFileSystem with exception: {e}." | ||
f"Used local filesystem instead." | ||
) | ||
return local_rmdir(path, **kw) | ||
return local_rmdir(path, **kw) | ||
|
||
|
||
def remove(path: str, **kw): | ||
instance = DelegateFileSystem.instance | ||
fs = instance.get_file_system(path) | ||
if fs: | ||
try: | ||
return fs.rm(path, **kw) | ||
except Exception as e: | ||
logging.error( | ||
f"Failed to remove file by delegateFileSystem with exception: {e}." | ||
f"Used local filesystem instead.") | ||
return local_remove(path, **kw) | ||
return local_remove(path, **kw) | ||
|
||
|
||
def rename(src: str, dest: str, **kw): | ||
instance = DelegateFileSystem.instance | ||
fs_src = instance.get_file_system(src) | ||
fs_dest = instance.get_file_system(dest) | ||
if fs_src and fs_dest and fs_src == fs_dest: | ||
try: | ||
return fs_src.rename(src, dest, **kw) | ||
except Exception as e: | ||
logging.error( | ||
f"Failed to rename file by delegateFileSystem with exception: {e}." | ||
f"Used local filesystem instead.") | ||
return local_rename(src, dest, **kw) | ||
logging.error("Source and destination are on different file systems or not supported.") | ||
return local_rename(src, dest, **kw) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from setuptools import setup, find_packages | ||
|
||
setup( | ||
name='alluxio_posix', | ||
version='0.1.0', | ||
packages=find_packages(), | ||
license='MIT', | ||
description='Alluxio POSIX Python SDK', | ||
author='lzq', | ||
author_email='[email protected]', | ||
data_files=[ | ||
('config', ['config/ufs_config.yaml']) # 指定配置文件所在路径 | ||
], | ||
include_package_data=True, | ||
zip_safe=False | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is necessary to add a hot update interface. For example, if the user does not have a configuration file and wants to configure config through the API