From 7c9d0ccf35fa5056e0fbadf24e7abbc2614b87ae Mon Sep 17 00:00:00 2001 From: liiuzq-xiaobai Date: Tue, 20 Aug 2024 17:10:32 +0800 Subject: [PATCH 1/3] feat: alluxio-py support alluxio and oss filesystem The implementation of the delegated filesystem for Alluxio and OSS has been completed. Specific notes: 1.Users need to specify in the configuration whether the delegated filesystem should be accelerated by Alluxio using the alluxio_enable flag. If set to true, the configuration file must still include the necessary initialization settings for the Alluxio filesystem. 2.The configuration file can include multiple OSS filesystems as delegated filesystems, but it is necessary to ensure that their bucket_name is unique. A unique delegated filesystem is determined by the combination of the delegated filesystem name and the bucket_name. --- alluxio/posix/__init__.py | 0 alluxio/posix/config.py | 77 ++++++++++++ alluxio/posix/const.py | 28 +++++ alluxio/posix/delegate.py | 14 +++ alluxio/posix/demo.py | 77 ++++++++++++ alluxio/posix/exception.py | 19 +++ alluxio/posix/fileimpl.py | 219 +++++++++++++++++++++++++++++++++ tests/posix/test_conf.py | 0 tests/posix/test_delegatefs.py | 0 9 files changed, 434 insertions(+) create mode 100644 alluxio/posix/__init__.py create mode 100644 alluxio/posix/config.py create mode 100644 alluxio/posix/const.py create mode 100644 alluxio/posix/delegate.py create mode 100644 alluxio/posix/demo.py create mode 100644 alluxio/posix/exception.py create mode 100644 alluxio/posix/fileimpl.py create mode 100644 tests/posix/test_conf.py create mode 100644 tests/posix/test_delegatefs.py diff --git a/alluxio/posix/__init__.py b/alluxio/posix/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alluxio/posix/config.py b/alluxio/posix/config.py new file mode 100644 index 0000000..a71f618 --- /dev/null +++ b/alluxio/posix/config.py @@ -0,0 +1,77 @@ +import yaml +import os + +from alluxio.posix.const import Constants +from alluxio.posix.exception import ConfigMissingError, ConfigInvalidError + + +class ConfigManager: + def __init__(self, config_file_path='config/ufs_config.yaml'): + self.config_file_path = config_file_path + self.config_data = self._load_config() + self.validation_functions = { + Constants.OSS_FILESYSTEM_TYPE: self._validate_oss_config, + Constants.ALLUXIO_FILESYSTEM_TYPE: self._validate_alluxio_config, + Constants.S3_FILESYSTEM_TYPE: self._validate_s3_config + } + + def _load_config(self): + if not os.path.exists(self.config_file_path): + raise FileNotFoundError(f"{self.config_file_path} does not exist.") + + with open(self.config_file_path, 'r', encoding='utf-8') as file: + try: + config = yaml.safe_load(file) + return config + except yaml.YAMLError as e: + raise ValueError(f"Error parsing YAML file: {e}") + + def get_config(self, fs_name: str) -> dict: + try: + fs_config = self.config_data[fs_name] + validation_function = self.validation_functions.get(fs_name) + if validation_function is not None: + validation_function(fs_config) + else: + raise ConfigInvalidError(fs_name, f"No validation function for file system: {fs_name}") + return fs_config + except KeyError: + raise ConfigMissingError(fs_name, "FileSystem Configuration is missing") + except ValueError as e: + raise ConfigMissingError(fs_name, str(e)) + + def get_config_fs_list(self) -> list: + return self.config_data.keys() + + @staticmethod + def _validate_oss_config(config): + required_keys = [ + Constants.OSS_ACCESS_KEY_ID, + Constants.OSS_ACCESS_KEY_SECRET, + Constants.OSS_ENDPOINT, + Constants.OSS_BUCKET_NAME + ] + + for key in required_keys: + if key not in config: + raise ConfigMissingError(f"Missing required OSS config key: {key}") + if not config[key]: + raise ValueError(f"OSS config key '{key}' cannot be empty") + + @staticmethod + def _validate_alluxio_config(config): + required_keys = [] + if config.get(Constants.ALLUXIO_ETCD_ENABLE, False): + # If ALLUXIO_ETCD_ENABLE is True, ALLUXIO_ETCD_HOST must be set + required_keys.append(Constants.ALLUXIO_ETCD_HOST) + else: + # If ALLUXIO_ETCD_ENABLE is False, ALLUXIO_WORKER_HOSTS must be set + required_keys.append(Constants.ALLUXIO_WORKER_HOSTS) + + if not all(config.get(key) for key in required_keys): + raise ConfigMissingError(f"The following keys must be set in the configuration: {required_keys}") + + + @staticmethod + def _validate_s3_config(config): + raise NotImplementedError diff --git a/alluxio/posix/const.py b/alluxio/posix/const.py new file mode 100644 index 0000000..b32e5ce --- /dev/null +++ b/alluxio/posix/const.py @@ -0,0 +1,28 @@ +class Constants: + + # general config + BUCKET_NAME = 'bucket_name' + # URL prefix + OSS_URL_PREFIX = "oss://" + ALLUXIO_URL_PREFIX = "alluxio://" + + # enable FileSystem types + LOCAL_FILESYSTEM_TYPE = 'local' + OSS_FILESYSTEM_TYPE = 'oss' + ALLUXIO_FILESYSTEM_TYPE = 'alluxio' + S3_FILESYSTEM_TYPE = 's3' + + # OSS config keys + OSS_ACCESS_KEY_ID = 'access_key_id' + OSS_ACCESS_KEY_SECRET = 'access_key_secret' + OSS_ENDPOINT = 'endpoint' + + # Alluxio config keys + ALLUXIO_ETCD_ENABLE = "alluxio_etcd_enable" + ALLUXIO_ETCD_HOST = 'alluxio_etcd_host' + ALLUXIO_WORKER_HOSTS = 'alluxio_worker_hosts' + ALLUXIO_BACKUP_FS = 'alluxio_backup_fs' + ALLUXIO_ENABLE = 'alluxio_enable' + + # assist constants + ALLUXIO_SEP_SIGN = '_' diff --git a/alluxio/posix/delegate.py b/alluxio/posix/delegate.py new file mode 100644 index 0000000..326b5ab --- /dev/null +++ b/alluxio/posix/delegate.py @@ -0,0 +1,14 @@ +import os +from alluxio.posix import fileimpl +config_manager = fileimpl.ConfigManager("../../config/ufs_config.yaml") +delegate_fs = fileimpl.DelegateFileSystem(config_manager) + +os.stat = fileimpl.stat +os.open = fileimpl.open +os.listdir = fileimpl.listdir +os.rename = fileimpl.rename +os.mkdir = fileimpl.mkdir +os.remove = fileimpl.remove +os.rmdir = fileimpl.rmdir + + diff --git a/alluxio/posix/demo.py b/alluxio/posix/demo.py new file mode 100644 index 0000000..cbfdd08 --- /dev/null +++ b/alluxio/posix/demo.py @@ -0,0 +1,77 @@ +import os +import delegate + + +def delegatefs_open_write(): + write_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + with os.open(write_file_path, 'w') as f: + f.write('Hello, OSSP! Hello alluxio-py!') + + +def delegatefs_open_read(): + read_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + with os.open(read_file_path, mode='r') as f: + content = f.read() + print("File content:") + print(content) + + +def delegatefs_listdir(): + dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/' + print(os.listdir(dir_path)) + + +def delegatefs_rename(): + origin_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-rename-1.txt' + renamed_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-rename-2.txt' + dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/' + with os.open(origin_file_path, mode='w') as f: + f.write('Test rename...') + os.rename(origin_file_path, renamed_file_path) + os.listdir(dir_path) + + +def delegatefs_stat(): + stat_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + print(os.stat(stat_file_path)) + + +def delegatefs_mkdir(): + dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/' + mkdir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-mkdir' + os.mkdir(mkdir_path) + print(os.listdir(mkdir_path)) + + +def delegatefs_remove(): + dir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/' + remove_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + print(os.listdir(dir_path)) + os.remove(remove_path) + + +def delegatefs_truncate(): + read_file_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/delegatefs-io-1.txt' + with os.open(read_file_path, mode='r') as f: + f.truncate(10) + content = f.read() + print("File content:") + print(content) + + +def delegatefs_rmdir(): + rmdir_path = f'oss://alhz-ossp-alluxio-test/alluxio-py/rmdirtest' + os.rmdir(rmdir_path) + + +if __name__ == "__main__": + delegatefs_open_write() + delegatefs_open_read() + delegatefs_listdir() + delegatefs_rename() + delegatefs_stat() + delegatefs_remove() + # 下面两个OssFileSystem中为空实现,但不会抛出异常 + delegatefs_mkdir() + delegatefs_rmdir() + delegatefs_truncate() \ No newline at end of file diff --git a/alluxio/posix/exception.py b/alluxio/posix/exception.py new file mode 100644 index 0000000..1b8e1c6 --- /dev/null +++ b/alluxio/posix/exception.py @@ -0,0 +1,19 @@ +class ConfigMissingError(Exception): + def __init__(self, config_key, message="Configuration key is missing"): + self.config_key = config_key + self.message = message + super().__init__(f"{message}: {config_key}") + + +class ConfigInvalidError(Exception): + def __init__(self, config_key, message="Configuration key is invalid, config_key"): + self.config_key = config_key + self.message = message + super().__init__(f"{message}: {config_key}") + + +class UnsupportedDelegateFileSystemError(Exception): + def __init__(self, fs_name, message="FileSystem is not supported, filesystem"): + self.fs_name = fs_name + self.message = message + super().__init__(f"{message}: {fs_name}") diff --git a/alluxio/posix/fileimpl.py b/alluxio/posix/fileimpl.py new file mode 100644 index 0000000..ea9bb76 --- /dev/null +++ b/alluxio/posix/fileimpl.py @@ -0,0 +1,219 @@ +import logging +import os +import re +import threading +import fsspec + +from alluxio.posix.config import ConfigManager +from alluxio.posix.const import Constants +from alluxio.posix.exception import UnsupportedDelegateFileSystemError + +local_open = os.path +local_stat = os.stat +local_listdir = os.listdir +local_rename = os.rename +local_close = os.close +local_mkdir = os.mkdir +local_remove = os.remove +local_rmdir = os.rmdir + + +def open(file: str, mode: str = "r", **kw): + logging.info("DelegateFileSystem opening file: %s", file) + instance = DelegateFileSystem.instance + fs = instance.get_file_system(file) + if fs: + try: + return fs.open(file, mode, **kw) + except Exception as e: + logging.error( + f"Failed to open file by delegateFileSystem with exception:{e}." + f"Used local filesystem instead.") + return local_open(file, mode, **kw) + return local_open(file, mode, **kw) + + +def stat(path: str, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + logging.info("DelegateFileSystem getStatus filemeta: %s", path) + return fs.stat(path, **kw) + except Exception as e: + logging.error( + f"Failed to stat file by delegateFileSystem with exception:{e}." + f"Used local filesystem instead.") + return local_stat(path, **kw) + logging.info("LocalFileSystem getStatus filemeta: %s", path) + return local_stat(path, **kw) + + +def listdir(path: str, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + return fs.listdir(path, **kw) + except Exception as e: + logging.error( + f"Failed to list directory by delegateFileSystem with exception: {e}." + f"Used local filesystem instead.") + return local_listdir(path, **kw) + return local_listdir(path, **kw) + + +def mkdir(path: str, mode=0o777, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + return fs.mkdir(path, mode, **kw) + except Exception as e: + logging.error( + f"Failed to make directory by delegateFileSystem with exception: {e}." + f"Used local filesystem instead.") + return local_mkdir(path, mode, **kw) + return local_mkdir(path, mode, **kw) + + +def rmdir(path: str, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + return fs.rmdir(path, **kw) + except Exception as e: + logging.error( + f"Failed to remove directory by delegateFileSystem with exception: {e}." + f"Used local filesystem instead." + ) + return local_rmdir(path, **kw) + return local_rmdir(path, **kw) + + +def remove(path: str, **kw): + instance = DelegateFileSystem.instance + fs = instance.get_file_system(path) + if fs: + try: + return fs.rm(path, **kw) + except Exception as e: + logging.error( + f"Failed to remove file by delegateFileSystem with exception: {e}." + f"Used local filesystem instead.") + return local_remove(path, **kw) + return local_remove(path, **kw) + + +def rename(src: str, dest: str, **kw): + instance = DelegateFileSystem.instance + fs_src = instance.get_file_system(src) + fs_dest = instance.get_file_system(dest) + if fs_src and fs_dest and fs_src == fs_dest: + try: + return fs_src.rename(src, dest, **kw) + except Exception as e: + logging.error( + f"Failed to rename file by delegateFileSystem with exception: {e}." + f"Used local filesystem instead.") + return local_rename(src, dest, **kw) + logging.error("Source and destination are on different file systems or not supported.") + return local_rename(src, dest, **kw) + + +class DelegateFileSystem: + instance = None + + def __init__(self, config_manager: ConfigManager): + self.config_manager = config_manager + self.filesystem_storage = FSStorage() + self.filesystem_storage.data = {} + self.enableFileSystems = [Constants.OSS_FILESYSTEM_TYPE, + Constants.ALLUXIO_FILESYSTEM_TYPE, + Constants.S3_FILESYSTEM_TYPE] + self.__init__file__system() + DelegateFileSystem.instance = self + + def __create__file__system(self, fs_name: str): + config = self.config_manager.get_config(fs_name) + if fs_name not in self.enableFileSystems: + raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") + if config[Constants.ALLUXIO_ENABLE]: + fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + + Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) + if config.get(Constants.ALLUXIO_ETCD_ENABLE): + self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.ALLUXIO_FILESYSTEM_TYPE, + etcd_hosts=config[Constants.ALLUXIO_ETCD_HOST], + etcd_port=2379, + target_protocol=config[ + Constants.ALLUXIO_BACKUP_FS]) + return self.filesystem_storage.fs[fs_name] + else: + logging.error("Failed to create Alluxio filesystem, using the default %s filesystem.", fs_name) + if fs_name == Constants.OSS_FILESYSTEM_TYPE: + self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.OSS_FILESYSTEM_TYPE, + key=config[Constants.OSS_ACCESS_KEY_ID], + secret=config[Constants.OSS_ACCESS_KEY_SECRET], + endpoint=config[Constants.OSS_ENDPOINT]) + return self.filesystem_storage.fs[fs_name] + elif fs_name == Constants.S3_FILESYSTEM_TYPE: + # todo:新增s3FileSystem + raise NotImplementedError + + return None + + def get_file_system(self, path: str): + fs_name, bucket = self.__parse__url(path) + config = self.config_manager.get_config(fs_name) + if fs_name == Constants.LOCAL_FILESYSTEM_TYPE: + return None + if config[Constants.ALLUXIO_ENABLE]: + fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + + Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) + if hasattr(self.filesystem_storage, fs_name): + return self.filesystem_storage.fs[fs_name] + else: + self.__create__file__system(fs_name) + return self.filesystem_storage.fs[fs_name] + + def __init__file__system(self): + fs_list = self.config_manager.get_config_fs_list() + for fs_name in fs_list: + self.__create__file__system(fs_name) + + def __parse__url(self, path: str): + # parse the schema and bucket name in filepath + if (type(path) is not str) or (path.startswith('/')): + return Constants.LOCAL_FILESYSTEM_TYPE, None + pattern = re.compile(r'^(\w+)://([^/]+)/.*') + match = pattern.match(path) + if match: + fs_name, bucket_name = match.groups() + # Check whether the file system corresponding to the path is supported + if fs_name.lower() in self.enableFileSystems: + return fs_name, bucket_name + else: + raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") + else: + return Constants.LOCAL_FILESYSTEM_TYPE, None + + +class FSStorage(threading.local): + def __init__(self): + self.data = {} + + def __getitem__(self, key): + return self.data[key] + + def __setitem__(self, key, value): + self.data[key] = value + + def __delitem__(self, key): + del self.data[key] + + def __contains__(self, key): + return key in self.data + + def get(self, key, default=None): + return self.data.get(key, default) diff --git a/tests/posix/test_conf.py b/tests/posix/test_conf.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/posix/test_delegatefs.py b/tests/posix/test_delegatefs.py new file mode 100644 index 0000000..e69de29 From e09ac515910d829a518240c45e6474f48a76d16f Mon Sep 17 00:00:00 2001 From: liiuzq-xiaobai Date: Fri, 20 Sep 2024 18:32:37 +0800 Subject: [PATCH 2/3] feat: alluxio-py support alluxio and oss filesystem.Complete some commits.Date:2024-09-20 The implementation of the delegated filesystem for Alluxio and OSS has been completed. Specific notes: 1.Users need to specify in the configuration whether the delegated filesystem should be accelerated by Alluxio using the alluxio_enable flag. If set to true, the configuration file must still include the necessary initialization settings for the Alluxio filesystem. 2.The configuration file can include multiple OSS filesystems as delegated filesystems, but it is necessary to ensure that their bucket_name is unique. A unique delegated filesystem is determined by the combination of the delegated filesystem name and the bucket_name. --- alluxio/posix/config.py | 62 ++++++++++++++------------------ alluxio/posix/const.py | 22 ++++++------ alluxio/posix/delegate.py | 2 +- alluxio/posix/fileimpl.py | 14 ++++---- alluxio/posix/setup.py | 16 +++++++++ alluxio/posix/ufs/__init__.py | 0 alluxio/posix/ufs/alluxio.py | 32 +++++++++++++++++ alluxio/posix/ufs/oss.py | 34 ++++++++++++++++++ {alluxio => tests}/posix/demo.py | 0 9 files changed, 127 insertions(+), 55 deletions(-) create mode 100644 alluxio/posix/setup.py create mode 100644 alluxio/posix/ufs/__init__.py create mode 100644 alluxio/posix/ufs/alluxio.py create mode 100644 alluxio/posix/ufs/oss.py rename {alluxio => tests}/posix/demo.py (100%) diff --git a/alluxio/posix/config.py b/alluxio/posix/config.py index a71f618..7e14f23 100644 --- a/alluxio/posix/config.py +++ b/alluxio/posix/config.py @@ -1,18 +1,23 @@ +import logging + import yaml import os +from alluxio.posix.ufs.alluxio import validate_alluxio_config, update_alluxio_config +from alluxio.posix.ufs.oss import validate_oss_config, update_oss_config from alluxio.posix.const import Constants from alluxio.posix.exception import ConfigMissingError, ConfigInvalidError class ConfigManager: - def __init__(self, config_file_path='config/ufs_config.yaml'): - self.config_file_path = config_file_path + def __init__(self): + self.logger = logging.getLogger(__name__) + logging.basicConfig(level=logging.INFO) + self.config_file_path = os.getenv('ALLUXIO_PY_CONFIG_FILE_PATH', 'config/ufs_config.yaml') self.config_data = self._load_config() self.validation_functions = { - Constants.OSS_FILESYSTEM_TYPE: self._validate_oss_config, - Constants.ALLUXIO_FILESYSTEM_TYPE: self._validate_alluxio_config, - Constants.S3_FILESYSTEM_TYPE: self._validate_s3_config + Constants.OSS_FILESYSTEM_TYPE: validate_oss_config, + Constants.ALLUXIO_FILESYSTEM_TYPE: validate_alluxio_config } def _load_config(self): @@ -26,6 +31,11 @@ def _load_config(self): except yaml.YAMLError as e: raise ValueError(f"Error parsing YAML file: {e}") + def set_config_path(self, new_path): + self.config_file_path = new_path + self.config_data = self._load_config() + print(f"Configuration path updated and config reloaded from {new_path}.") + def get_config(self, fs_name: str) -> dict: try: fs_config = self.config_data[fs_name] @@ -43,35 +53,15 @@ def get_config(self, fs_name: str) -> dict: def get_config_fs_list(self) -> list: return self.config_data.keys() - @staticmethod - def _validate_oss_config(config): - required_keys = [ - Constants.OSS_ACCESS_KEY_ID, - Constants.OSS_ACCESS_KEY_SECRET, - Constants.OSS_ENDPOINT, - Constants.OSS_BUCKET_NAME - ] - - for key in required_keys: - if key not in config: - raise ConfigMissingError(f"Missing required OSS config key: {key}") - if not config[key]: - raise ValueError(f"OSS config key '{key}' cannot be empty") - - @staticmethod - def _validate_alluxio_config(config): - required_keys = [] - if config.get(Constants.ALLUXIO_ETCD_ENABLE, False): - # If ALLUXIO_ETCD_ENABLE is True, ALLUXIO_ETCD_HOST must be set - required_keys.append(Constants.ALLUXIO_ETCD_HOST) + def update_config(self, fs_type, key, value): + if fs_type not in self.get_config_fs_list(): + raise KeyError(f"No configuration available for {fs_type}") + config_data = self.get_config(fs_type) + if fs_type == Constants.OSS_FILESYSTEM_TYPE: + self.config_data[fs_type] = update_oss_config(config_data, key, value) + elif fs_type == Constants.ALLUXIO_FILESYSTEM_TYPE: + self.config_data[fs_type] = update_alluxio_config(config_data, key, value) + elif fs_type == Constants.S3_FILESYSTEM_TYPE: + raise NotImplementedError() else: - # If ALLUXIO_ETCD_ENABLE is False, ALLUXIO_WORKER_HOSTS must be set - required_keys.append(Constants.ALLUXIO_WORKER_HOSTS) - - if not all(config.get(key) for key in required_keys): - raise ConfigMissingError(f"The following keys must be set in the configuration: {required_keys}") - - - @staticmethod - def _validate_s3_config(config): - raise NotImplementedError + raise ValueError(f"Unsupported file system type: {fs_type}") diff --git a/alluxio/posix/const.py b/alluxio/posix/const.py index b32e5ce..ae6e477 100644 --- a/alluxio/posix/const.py +++ b/alluxio/posix/const.py @@ -12,17 +12,17 @@ class Constants: ALLUXIO_FILESYSTEM_TYPE = 'alluxio' S3_FILESYSTEM_TYPE = 's3' - # OSS config keys - OSS_ACCESS_KEY_ID = 'access_key_id' - OSS_ACCESS_KEY_SECRET = 'access_key_secret' - OSS_ENDPOINT = 'endpoint' - - # Alluxio config keys - ALLUXIO_ETCD_ENABLE = "alluxio_etcd_enable" - ALLUXIO_ETCD_HOST = 'alluxio_etcd_host' - ALLUXIO_WORKER_HOSTS = 'alluxio_worker_hosts' - ALLUXIO_BACKUP_FS = 'alluxio_backup_fs' - ALLUXIO_ENABLE = 'alluxio_enable' + # # OSS config keys + # OSS_ACCESS_KEY_ID = 'access_key_id' + # OSS_ACCESS_KEY_SECRET = 'access_key_secret' + # OSS_ENDPOINT = 'endpoint' + # + # # Alluxio config keys + # ALLUXIO_ETCD_ENABLE = "alluxio_etcd_enable" + # ALLUXIO_ETCD_HOST = 'alluxio_etcd_host' + # ALLUXIO_WORKER_HOSTS = 'alluxio_worker_hosts' + # ALLUXIO_BACKUP_FS = 'alluxio_backup_fs' + # ALLUXIO_ENABLE = 'alluxio_enable' # assist constants ALLUXIO_SEP_SIGN = '_' diff --git a/alluxio/posix/delegate.py b/alluxio/posix/delegate.py index 326b5ab..d78ccbb 100644 --- a/alluxio/posix/delegate.py +++ b/alluxio/posix/delegate.py @@ -1,6 +1,6 @@ import os from alluxio.posix import fileimpl -config_manager = fileimpl.ConfigManager("../../config/ufs_config.yaml") +config_manager = fileimpl.ConfigManager() delegate_fs = fileimpl.DelegateFileSystem(config_manager) os.stat = fileimpl.stat diff --git a/alluxio/posix/fileimpl.py b/alluxio/posix/fileimpl.py index ea9bb76..8e3ab4a 100644 --- a/alluxio/posix/fileimpl.py +++ b/alluxio/posix/fileimpl.py @@ -165,9 +165,9 @@ def __create__file__system(self, fs_name: str): def get_file_system(self, path: str): fs_name, bucket = self.__parse__url(path) - config = self.config_manager.get_config(fs_name) if fs_name == Constants.LOCAL_FILESYSTEM_TYPE: return None + config = self.config_manager.get_config(fs_name) if config[Constants.ALLUXIO_ENABLE]: fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) @@ -201,19 +201,19 @@ def __parse__url(self, path: str): class FSStorage(threading.local): def __init__(self): - self.data = {} + self.fs = {} def __getitem__(self, key): - return self.data[key] + return self.fs[key] def __setitem__(self, key, value): - self.data[key] = value + self.fs[key] = value def __delitem__(self, key): - del self.data[key] + del self.fs[key] def __contains__(self, key): - return key in self.data + return key in self.fs def get(self, key, default=None): - return self.data.get(key, default) + return self.fs.get(key, default) diff --git a/alluxio/posix/setup.py b/alluxio/posix/setup.py new file mode 100644 index 0000000..fac6db0 --- /dev/null +++ b/alluxio/posix/setup.py @@ -0,0 +1,16 @@ +from setuptools import setup, find_packages + +setup( + name='alluxio_posix', + version='0.1.0', + packages=find_packages(), + license='MIT', + description='Alluxio POSIX Python SDK', + author='lzq', + author_email='liuzq0909@163.com', + data_files=[ + ('config', ['config/ufs_config.yaml']) # 指定配置文件所在路径 + ], + include_package_data=True, + zip_safe=False +) \ No newline at end of file diff --git a/alluxio/posix/ufs/__init__.py b/alluxio/posix/ufs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alluxio/posix/ufs/alluxio.py b/alluxio/posix/ufs/alluxio.py new file mode 100644 index 0000000..2896ff1 --- /dev/null +++ b/alluxio/posix/ufs/alluxio.py @@ -0,0 +1,32 @@ +from alluxio.posix.const import Constants +from alluxio.posix.exception import ConfigMissingError + +ALLUXIO_ETCD_ENABLE = "alluxio_etcd_enable" +ALLUXIO_ETCD_HOST = 'alluxio_etcd_host' +ALLUXIO_WORKER_HOSTS = 'alluxio_worker_hosts' +ALLUXIO_BACKUP_FS = 'alluxio_backup_fs' +ALLUXIO_ENABLE = 'alluxio_enable' + + +def validate_alluxio_config(config): + required_keys = [] + if config.get(ALLUXIO_ETCD_ENABLE, False): + required_keys.append(ALLUXIO_ETCD_HOST) + else: + required_keys.append(ALLUXIO_WORKER_HOSTS) + + if not all(config.get(key) for key in required_keys): + raise ConfigMissingError(f"The following keys must be set in the configuration: {required_keys}") + + +def update_alluxio_config(config_data, key, value): + allowed_keys = [ + ALLUXIO_ETCD_ENABLE, + ALLUXIO_ETCD_HOST, + ALLUXIO_WORKER_HOSTS + ] + if key not in allowed_keys: + raise ValueError(f"Invalid configuration key for Alluxio: {key}") + config_data[key] = value + validate_alluxio_config(config_data) + return config_data diff --git a/alluxio/posix/ufs/oss.py b/alluxio/posix/ufs/oss.py new file mode 100644 index 0000000..5f9590e --- /dev/null +++ b/alluxio/posix/ufs/oss.py @@ -0,0 +1,34 @@ +from alluxio.posix.const import Constants +from alluxio.posix.exception import ConfigMissingError + +OSS_ACCESS_KEY_ID = 'access_key_id' +OSS_ACCESS_KEY_SECRET = 'access_key_secret' +OSS_ENDPOINT = 'endpoint' + + +def validate_oss_config(config): + required_keys = [ + OSS_ACCESS_KEY_ID, + OSS_ACCESS_KEY_SECRET, + OSS_ENDPOINT, + Constants.BUCKET_NAME + ] + + for key in required_keys: + if key not in config: + raise ConfigMissingError(f"Missing required OSS config key: {key}") + if not config[key]: + raise ValueError(f"OSS config key '{key}' cannot be empty") + + +def update_oss_config(config_data, key, value): + if key not in [ + OSS_ACCESS_KEY_ID, + OSS_ACCESS_KEY_SECRET, + OSS_ENDPOINT, + Constants.BUCKET_NAME + ]: + raise ValueError(f"Invalid configuration key: {key}") + config_data[key] = value + validate_oss_config(config_data) + return config_data diff --git a/alluxio/posix/demo.py b/tests/posix/demo.py similarity index 100% rename from alluxio/posix/demo.py rename to tests/posix/demo.py From df2690696e611ec6faf18bdaa93aad5bd481c746 Mon Sep 17 00:00:00 2001 From: liiuzq-xiaobai Date: Sun, 29 Sep 2024 18:51:46 +0800 Subject: [PATCH 3/3] feat: alluxio-py support alluxio and oss filesystem.Complete some commits.Date:2024-09-29 The implementation of the delegated filesystem for Alluxio and OSS has been completed. Specific notes: 1.Users need to specify in the configuration whether the delegated filesystem should be accelerated by Alluxio using the alluxio_enable flag. If set to true, the configuration file must still include the necessary initialization settings for the Alluxio filesystem. 2.The configuration file can include multiple OSS filesystems as delegated filesystems, but it is necessary to ensure that their bucket_name is unique. A unique delegated filesystem is determined by the combination of the delegated filesystem name and the bucket_name. --- alluxio/posix/config.py | 20 +++++-- alluxio/posix/const.py | 12 ---- alluxio/posix/delegate.py | 7 ++- alluxio/posix/delegateFs.py | 107 ++++++++++++++++++++++++++++++++++ alluxio/posix/fileimpl.py | 108 +---------------------------------- alluxio/posix/ufs/alluxio.py | 16 +++--- alluxio/posix/ufs/oss.py | 19 +++--- 7 files changed, 149 insertions(+), 140 deletions(-) create mode 100644 alluxio/posix/delegateFs.py diff --git a/alluxio/posix/config.py b/alluxio/posix/config.py index 7e14f23..fdd3bd7 100644 --- a/alluxio/posix/config.py +++ b/alluxio/posix/config.py @@ -13,7 +13,9 @@ class ConfigManager: def __init__(self): self.logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) - self.config_file_path = os.getenv('ALLUXIO_PY_CONFIG_FILE_PATH', 'config/ufs_config.yaml') + current_dir = os.path.dirname(os.path.abspath(__file__)) + self.config_file_path = os.getenv('ALLUXIO_PY_CONFIG_FILE_PATH', os.path.join(current_dir, 'config', + 'ufs_config.yaml')) self.config_data = self._load_config() self.validation_functions = { Constants.OSS_FILESYSTEM_TYPE: validate_oss_config, @@ -22,7 +24,8 @@ def __init__(self): def _load_config(self): if not os.path.exists(self.config_file_path): - raise FileNotFoundError(f"{self.config_file_path} does not exist.") + logging.warning(f"Config file not found: {self.config_file_path}. Initializing without loading config.") + return with open(self.config_file_path, 'r', encoding='utf-8') as file: try: @@ -51,17 +54,22 @@ def get_config(self, fs_name: str) -> dict: raise ConfigMissingError(fs_name, str(e)) def get_config_fs_list(self) -> list: - return self.config_data.keys() + if self.config_data is None: + return [] + else: + return self.config_data.keys() - def update_config(self, fs_type, key, value): + def update_config(self, fs_type, **kwargs): if fs_type not in self.get_config_fs_list(): raise KeyError(f"No configuration available for {fs_type}") config_data = self.get_config(fs_type) + if fs_type == Constants.OSS_FILESYSTEM_TYPE: - self.config_data[fs_type] = update_oss_config(config_data, key, value) + self.config_data[fs_type] = update_oss_config(config_data, kwargs) elif fs_type == Constants.ALLUXIO_FILESYSTEM_TYPE: - self.config_data[fs_type] = update_alluxio_config(config_data, key, value) + self.config_data[fs_type] = update_alluxio_config(config_data, kwargs) elif fs_type == Constants.S3_FILESYSTEM_TYPE: raise NotImplementedError() else: raise ValueError(f"Unsupported file system type: {fs_type}") + diff --git a/alluxio/posix/const.py b/alluxio/posix/const.py index ae6e477..d8b3a7b 100644 --- a/alluxio/posix/const.py +++ b/alluxio/posix/const.py @@ -12,17 +12,5 @@ class Constants: ALLUXIO_FILESYSTEM_TYPE = 'alluxio' S3_FILESYSTEM_TYPE = 's3' - # # OSS config keys - # OSS_ACCESS_KEY_ID = 'access_key_id' - # OSS_ACCESS_KEY_SECRET = 'access_key_secret' - # OSS_ENDPOINT = 'endpoint' - # - # # Alluxio config keys - # ALLUXIO_ETCD_ENABLE = "alluxio_etcd_enable" - # ALLUXIO_ETCD_HOST = 'alluxio_etcd_host' - # ALLUXIO_WORKER_HOSTS = 'alluxio_worker_hosts' - # ALLUXIO_BACKUP_FS = 'alluxio_backup_fs' - # ALLUXIO_ENABLE = 'alluxio_enable' - # assist constants ALLUXIO_SEP_SIGN = '_' diff --git a/alluxio/posix/delegate.py b/alluxio/posix/delegate.py index d78ccbb..514ee4b 100644 --- a/alluxio/posix/delegate.py +++ b/alluxio/posix/delegate.py @@ -1,7 +1,10 @@ import os from alluxio.posix import fileimpl -config_manager = fileimpl.ConfigManager() -delegate_fs = fileimpl.DelegateFileSystem(config_manager) +from alluxio.posix.config import ConfigManager +from alluxio.posix.delegateFs import DelegateFileSystem + +config_manager = ConfigManager() +delegate_fs = DelegateFileSystem(config_manager) os.stat = fileimpl.stat os.open = fileimpl.open diff --git a/alluxio/posix/delegateFs.py b/alluxio/posix/delegateFs.py new file mode 100644 index 0000000..f60e178 --- /dev/null +++ b/alluxio/posix/delegateFs.py @@ -0,0 +1,107 @@ +import logging +import re +import threading +import fsspec + +from alluxio.posix.config import ConfigManager +from alluxio.posix.const import Constants +from alluxio.posix.exception import UnsupportedDelegateFileSystemError +from alluxio.posix.ufs.alluxio import ALLUXIO_ETCD_HOST, ALLUXIO_BACKUP_FS, ALLUXIO_ETCD_ENABLE, ALLUXIO_ENABLE +from alluxio.posix.ufs.oss import OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT + + +class DelegateFileSystem: + instance = None + + def __init__(self, config_manager: ConfigManager): + self.config_manager = config_manager + self.filesystem_storage = FSStorage() + self.filesystem_storage.data = {} + self.enableFileSystems = [Constants.OSS_FILESYSTEM_TYPE, + Constants.ALLUXIO_FILESYSTEM_TYPE, + Constants.S3_FILESYSTEM_TYPE] + self.__init__file__system() + DelegateFileSystem.instance = self + + def __create__file__system(self, fs_name: str): + config = self.config_manager.get_config(fs_name) + if fs_name not in self.enableFileSystems: + raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") + if config[ALLUXIO_ENABLE]: + fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + + Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) + if config.get(ALLUXIO_ETCD_ENABLE): + self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.ALLUXIO_FILESYSTEM_TYPE, + etcd_hosts=config[ALLUXIO_ETCD_HOST], + etcd_port=2379, + target_protocol=config[ + ALLUXIO_BACKUP_FS]) + return self.filesystem_storage.fs[fs_name] + else: + logging.error("Failed to create Alluxio filesystem, using the default %s filesystem.", fs_name) + if fs_name == Constants.OSS_FILESYSTEM_TYPE: + self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.OSS_FILESYSTEM_TYPE, + key=config[OSS_ACCESS_KEY_ID], + secret=config[OSS_ACCESS_KEY_SECRET], + endpoint=config[OSS_ENDPOINT]) + return self.filesystem_storage.fs[fs_name] + elif fs_name == Constants.S3_FILESYSTEM_TYPE: + # todo:新增s3FileSystem + raise NotImplementedError + + return None + + def get_file_system(self, path: str): + fs_name, bucket = self.__parse__url(path) + if fs_name == Constants.LOCAL_FILESYSTEM_TYPE: + return None + config = self.config_manager.get_config(fs_name) + if config[ALLUXIO_ENABLE]: + fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + + Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) + if hasattr(self.filesystem_storage, fs_name): + return self.filesystem_storage.fs[fs_name] + else: + self.__create__file__system(fs_name) + return self.filesystem_storage.fs[fs_name] + + def __init__file__system(self): + fs_list = self.config_manager.get_config_fs_list() + for fs_name in fs_list: + self.__create__file__system(fs_name) + + def __parse__url(self, path: str): + # parse the schema and bucket name in filepath + if (type(path) is not str) or (path.startswith('/')): + return Constants.LOCAL_FILESYSTEM_TYPE, None + pattern = re.compile(r'^(\w+)://([^/]+)/.*') + match = pattern.match(path) + if match: + fs_name, bucket_name = match.groups() + # Check whether the file system corresponding to the path is supported + if fs_name.lower() in self.enableFileSystems: + return fs_name, bucket_name + else: + raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") + else: + return Constants.LOCAL_FILESYSTEM_TYPE, None + + +class FSStorage(threading.local): + def __init__(self): + self.fs = {} + + def __getitem__(self, key): + return self.fs[key] + + def __setitem__(self, key, value): + self.fs[key] = value + + def __delitem__(self, key): + del self.fs[key] + + def __contains__(self, key): + return key in self.fs + + def get(self, key, default=None): + return self.fs.get(key, default) \ No newline at end of file diff --git a/alluxio/posix/fileimpl.py b/alluxio/posix/fileimpl.py index 8e3ab4a..a029111 100644 --- a/alluxio/posix/fileimpl.py +++ b/alluxio/posix/fileimpl.py @@ -1,12 +1,7 @@ import logging import os -import re -import threading -import fsspec -from alluxio.posix.config import ConfigManager -from alluxio.posix.const import Constants -from alluxio.posix.exception import UnsupportedDelegateFileSystemError +from alluxio.posix.delegateFs import DelegateFileSystem local_open = os.path local_stat = os.stat @@ -19,7 +14,7 @@ def open(file: str, mode: str = "r", **kw): - logging.info("DelegateFileSystem opening file: %s", file) + logging.debug("DelegateFileSystem opening file: %s", file) instance = DelegateFileSystem.instance fs = instance.get_file_system(file) if fs: @@ -38,7 +33,7 @@ def stat(path: str, **kw): fs = instance.get_file_system(path) if fs: try: - logging.info("DelegateFileSystem getStatus filemeta: %s", path) + logging.debug("DelegateFileSystem getStatus filemeta: %s", path) return fs.stat(path, **kw) except Exception as e: logging.error( @@ -120,100 +115,3 @@ def rename(src: str, dest: str, **kw): return local_rename(src, dest, **kw) logging.error("Source and destination are on different file systems or not supported.") return local_rename(src, dest, **kw) - - -class DelegateFileSystem: - instance = None - - def __init__(self, config_manager: ConfigManager): - self.config_manager = config_manager - self.filesystem_storage = FSStorage() - self.filesystem_storage.data = {} - self.enableFileSystems = [Constants.OSS_FILESYSTEM_TYPE, - Constants.ALLUXIO_FILESYSTEM_TYPE, - Constants.S3_FILESYSTEM_TYPE] - self.__init__file__system() - DelegateFileSystem.instance = self - - def __create__file__system(self, fs_name: str): - config = self.config_manager.get_config(fs_name) - if fs_name not in self.enableFileSystems: - raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") - if config[Constants.ALLUXIO_ENABLE]: - fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + - Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) - if config.get(Constants.ALLUXIO_ETCD_ENABLE): - self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.ALLUXIO_FILESYSTEM_TYPE, - etcd_hosts=config[Constants.ALLUXIO_ETCD_HOST], - etcd_port=2379, - target_protocol=config[ - Constants.ALLUXIO_BACKUP_FS]) - return self.filesystem_storage.fs[fs_name] - else: - logging.error("Failed to create Alluxio filesystem, using the default %s filesystem.", fs_name) - if fs_name == Constants.OSS_FILESYSTEM_TYPE: - self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.OSS_FILESYSTEM_TYPE, - key=config[Constants.OSS_ACCESS_KEY_ID], - secret=config[Constants.OSS_ACCESS_KEY_SECRET], - endpoint=config[Constants.OSS_ENDPOINT]) - return self.filesystem_storage.fs[fs_name] - elif fs_name == Constants.S3_FILESYSTEM_TYPE: - # todo:新增s3FileSystem - raise NotImplementedError - - return None - - def get_file_system(self, path: str): - fs_name, bucket = self.__parse__url(path) - if fs_name == Constants.LOCAL_FILESYSTEM_TYPE: - return None - config = self.config_manager.get_config(fs_name) - if config[Constants.ALLUXIO_ENABLE]: - fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name + - Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME]) - if hasattr(self.filesystem_storage, fs_name): - return self.filesystem_storage.fs[fs_name] - else: - self.__create__file__system(fs_name) - return self.filesystem_storage.fs[fs_name] - - def __init__file__system(self): - fs_list = self.config_manager.get_config_fs_list() - for fs_name in fs_list: - self.__create__file__system(fs_name) - - def __parse__url(self, path: str): - # parse the schema and bucket name in filepath - if (type(path) is not str) or (path.startswith('/')): - return Constants.LOCAL_FILESYSTEM_TYPE, None - pattern = re.compile(r'^(\w+)://([^/]+)/.*') - match = pattern.match(path) - if match: - fs_name, bucket_name = match.groups() - # Check whether the file system corresponding to the path is supported - if fs_name.lower() in self.enableFileSystems: - return fs_name, bucket_name - else: - raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}") - else: - return Constants.LOCAL_FILESYSTEM_TYPE, None - - -class FSStorage(threading.local): - def __init__(self): - self.fs = {} - - def __getitem__(self, key): - return self.fs[key] - - def __setitem__(self, key, value): - self.fs[key] = value - - def __delitem__(self, key): - del self.fs[key] - - def __contains__(self, key): - return key in self.fs - - def get(self, key, default=None): - return self.fs.get(key, default) diff --git a/alluxio/posix/ufs/alluxio.py b/alluxio/posix/ufs/alluxio.py index 2896ff1..c2f9da0 100644 --- a/alluxio/posix/ufs/alluxio.py +++ b/alluxio/posix/ufs/alluxio.py @@ -19,14 +19,16 @@ def validate_alluxio_config(config): raise ConfigMissingError(f"The following keys must be set in the configuration: {required_keys}") -def update_alluxio_config(config_data, key, value): +def update_alluxio_config(config_data, updates): allowed_keys = [ - ALLUXIO_ETCD_ENABLE, - ALLUXIO_ETCD_HOST, - ALLUXIO_WORKER_HOSTS + 'ALLUXIO_ETCD_ENABLE', + 'ALLUXIO_ETCD_HOST', + 'ALLUXIO_WORKER_HOSTS' ] - if key not in allowed_keys: - raise ValueError(f"Invalid configuration key for Alluxio: {key}") - config_data[key] = value + for key, value in updates.items(): + if key not in allowed_keys: + raise ValueError(f"Invalid configuration key for Alluxio: {key}") + config_data[key] = value + validate_alluxio_config(config_data) return config_data diff --git a/alluxio/posix/ufs/oss.py b/alluxio/posix/ufs/oss.py index 5f9590e..0daedd2 100644 --- a/alluxio/posix/ufs/oss.py +++ b/alluxio/posix/ufs/oss.py @@ -21,14 +21,17 @@ def validate_oss_config(config): raise ValueError(f"OSS config key '{key}' cannot be empty") -def update_oss_config(config_data, key, value): - if key not in [ - OSS_ACCESS_KEY_ID, - OSS_ACCESS_KEY_SECRET, - OSS_ENDPOINT, +def update_oss_config(config_data, updates): + valid_keys = [ + 'OSS_ACCESS_KEY_ID', + 'OSS_ACCESS_KEY_SECRET', + 'OSS_ENDPOINT', Constants.BUCKET_NAME - ]: - raise ValueError(f"Invalid configuration key: {key}") - config_data[key] = value + ] + for key, value in updates.items(): + if key not in valid_keys: + raise ValueError(f"Invalid configuration key: {key}") + config_data[key] = value + validate_oss_config(config_data) return config_data