Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: alluxio-py support alluxio and oss filesystem #76

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added alluxio/posix/__init__.py
Empty file.
75 changes: 75 additions & 0 deletions alluxio/posix/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import logging

import yaml
import os

from alluxio.posix.ufs.alluxio import validate_alluxio_config, update_alluxio_config
from alluxio.posix.ufs.oss import validate_oss_config, update_oss_config
from alluxio.posix.const import Constants
from alluxio.posix.exception import ConfigMissingError, ConfigInvalidError


class ConfigManager:
def __init__(self):
self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
current_dir = os.path.dirname(os.path.abspath(__file__))
self.config_file_path = os.getenv('ALLUXIO_PY_CONFIG_FILE_PATH', os.path.join(current_dir, 'config',
'ufs_config.yaml'))
self.config_data = self._load_config()
self.validation_functions = {
Constants.OSS_FILESYSTEM_TYPE: validate_oss_config,
Constants.ALLUXIO_FILESYSTEM_TYPE: validate_alluxio_config
}

def _load_config(self):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary to add a hot update interface. For example, if the user does not have a configuration file and wants to configure config through the API

if not os.path.exists(self.config_file_path):
logging.warning(f"Config file not found: {self.config_file_path}. Initializing without loading config.")
return

with open(self.config_file_path, 'r', encoding='utf-8') as file:
try:
config = yaml.safe_load(file)
return config
except yaml.YAMLError as e:
raise ValueError(f"Error parsing YAML file: {e}")

def set_config_path(self, new_path):
self.config_file_path = new_path
self.config_data = self._load_config()
print(f"Configuration path updated and config reloaded from {new_path}.")

def get_config(self, fs_name: str) -> dict:
try:
fs_config = self.config_data[fs_name]
validation_function = self.validation_functions.get(fs_name)
if validation_function is not None:
validation_function(fs_config)
else:
raise ConfigInvalidError(fs_name, f"No validation function for file system: {fs_name}")
return fs_config
except KeyError:
raise ConfigMissingError(fs_name, "FileSystem Configuration is missing")
except ValueError as e:
raise ConfigMissingError(fs_name, str(e))

def get_config_fs_list(self) -> list:
if self.config_data is None:
return []
else:
return self.config_data.keys()

def update_config(self, fs_type, **kwargs):
if fs_type not in self.get_config_fs_list():
raise KeyError(f"No configuration available for {fs_type}")
config_data = self.get_config(fs_type)

if fs_type == Constants.OSS_FILESYSTEM_TYPE:
self.config_data[fs_type] = update_oss_config(config_data, kwargs)
elif fs_type == Constants.ALLUXIO_FILESYSTEM_TYPE:
self.config_data[fs_type] = update_alluxio_config(config_data, kwargs)
elif fs_type == Constants.S3_FILESYSTEM_TYPE:
raise NotImplementedError()
else:
raise ValueError(f"Unsupported file system type: {fs_type}")

16 changes: 16 additions & 0 deletions alluxio/posix/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class Constants:

# general config
BUCKET_NAME = 'bucket_name'
# URL prefix
OSS_URL_PREFIX = "oss://"
ALLUXIO_URL_PREFIX = "alluxio://"

# enable FileSystem types
LOCAL_FILESYSTEM_TYPE = 'local'
OSS_FILESYSTEM_TYPE = 'oss'
ALLUXIO_FILESYSTEM_TYPE = 'alluxio'
S3_FILESYSTEM_TYPE = 's3'

# assist constants
ALLUXIO_SEP_SIGN = '_'
17 changes: 17 additions & 0 deletions alluxio/posix/delegate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from alluxio.posix import fileimpl
from alluxio.posix.config import ConfigManager
from alluxio.posix.delegateFs import DelegateFileSystem

config_manager = ConfigManager()
delegate_fs = DelegateFileSystem(config_manager)

os.stat = fileimpl.stat
os.open = fileimpl.open
os.listdir = fileimpl.listdir
os.rename = fileimpl.rename
os.mkdir = fileimpl.mkdir
os.remove = fileimpl.remove
os.rmdir = fileimpl.rmdir


107 changes: 107 additions & 0 deletions alluxio/posix/delegateFs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import logging
import re
import threading
import fsspec

from alluxio.posix.config import ConfigManager
from alluxio.posix.const import Constants
from alluxio.posix.exception import UnsupportedDelegateFileSystemError
from alluxio.posix.ufs.alluxio import ALLUXIO_ETCD_HOST, ALLUXIO_BACKUP_FS, ALLUXIO_ETCD_ENABLE, ALLUXIO_ENABLE
from alluxio.posix.ufs.oss import OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT


class DelegateFileSystem:
instance = None

def __init__(self, config_manager: ConfigManager):
self.config_manager = config_manager
self.filesystem_storage = FSStorage()
self.filesystem_storage.data = {}
self.enableFileSystems = [Constants.OSS_FILESYSTEM_TYPE,
Constants.ALLUXIO_FILESYSTEM_TYPE,
Constants.S3_FILESYSTEM_TYPE]
self.__init__file__system()
DelegateFileSystem.instance = self

def __create__file__system(self, fs_name: str):
config = self.config_manager.get_config(fs_name)
if fs_name not in self.enableFileSystems:
raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}")
if config[ALLUXIO_ENABLE]:
fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name +
Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME])
if config.get(ALLUXIO_ETCD_ENABLE):
self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.ALLUXIO_FILESYSTEM_TYPE,
etcd_hosts=config[ALLUXIO_ETCD_HOST],
etcd_port=2379,
target_protocol=config[
ALLUXIO_BACKUP_FS])
return self.filesystem_storage.fs[fs_name]
else:
logging.error("Failed to create Alluxio filesystem, using the default %s filesystem.", fs_name)
if fs_name == Constants.OSS_FILESYSTEM_TYPE:
self.filesystem_storage.fs[fs_name] = fsspec.filesystem(Constants.OSS_FILESYSTEM_TYPE,
key=config[OSS_ACCESS_KEY_ID],
secret=config[OSS_ACCESS_KEY_SECRET],
endpoint=config[OSS_ENDPOINT])
return self.filesystem_storage.fs[fs_name]
elif fs_name == Constants.S3_FILESYSTEM_TYPE:
# todo:新增s3FileSystem
raise NotImplementedError

return None

def get_file_system(self, path: str):
fs_name, bucket = self.__parse__url(path)
if fs_name == Constants.LOCAL_FILESYSTEM_TYPE:
return None
config = self.config_manager.get_config(fs_name)
if config[ALLUXIO_ENABLE]:
fs_name = (Constants.ALLUXIO_FILESYSTEM_TYPE + Constants.ALLUXIO_SEP_SIGN + fs_name +
Constants.ALLUXIO_SEP_SIGN + config[Constants.BUCKET_NAME])
if hasattr(self.filesystem_storage, fs_name):
return self.filesystem_storage.fs[fs_name]
else:
self.__create__file__system(fs_name)
return self.filesystem_storage.fs[fs_name]

def __init__file__system(self):
fs_list = self.config_manager.get_config_fs_list()
for fs_name in fs_list:
self.__create__file__system(fs_name)

def __parse__url(self, path: str):
# parse the schema and bucket name in filepath
if (type(path) is not str) or (path.startswith('/')):
return Constants.LOCAL_FILESYSTEM_TYPE, None
pattern = re.compile(r'^(\w+)://([^/]+)/.*')
match = pattern.match(path)
if match:
fs_name, bucket_name = match.groups()
# Check whether the file system corresponding to the path is supported
if fs_name.lower() in self.enableFileSystems:
return fs_name, bucket_name
else:
raise UnsupportedDelegateFileSystemError(f"Unsupported file system: {fs_name}")
else:
return Constants.LOCAL_FILESYSTEM_TYPE, None


class FSStorage(threading.local):
def __init__(self):
self.fs = {}

def __getitem__(self, key):
return self.fs[key]

def __setitem__(self, key, value):
self.fs[key] = value

def __delitem__(self, key):
del self.fs[key]

def __contains__(self, key):
return key in self.fs

def get(self, key, default=None):
return self.fs.get(key, default)
19 changes: 19 additions & 0 deletions alluxio/posix/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class ConfigMissingError(Exception):
def __init__(self, config_key, message="Configuration key is missing"):
self.config_key = config_key
self.message = message
super().__init__(f"{message}: {config_key}")


class ConfigInvalidError(Exception):
def __init__(self, config_key, message="Configuration key is invalid, config_key"):
self.config_key = config_key
self.message = message
super().__init__(f"{message}: {config_key}")


class UnsupportedDelegateFileSystemError(Exception):
def __init__(self, fs_name, message="FileSystem is not supported, filesystem"):
self.fs_name = fs_name
self.message = message
super().__init__(f"{message}: {fs_name}")
117 changes: 117 additions & 0 deletions alluxio/posix/fileimpl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import logging
import os

from alluxio.posix.delegateFs import DelegateFileSystem

local_open = os.path
local_stat = os.stat
local_listdir = os.listdir
local_rename = os.rename
local_close = os.close
local_mkdir = os.mkdir
local_remove = os.remove
local_rmdir = os.rmdir

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it not better to separate interface and implementation?



def open(file: str, mode: str = "r", **kw):
logging.debug("DelegateFileSystem opening file: %s", file)
instance = DelegateFileSystem.instance
fs = instance.get_file_system(file)
if fs:
try:
return fs.open(file, mode, **kw)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Through fspec reflection, parameters can be passed directly into ufs client. Is there no need to verify the legality of config parameters?

except Exception as e:
logging.error(
f"Failed to open file by delegateFileSystem with exception:{e}."
f"Used local filesystem instead.")
return local_open(file, mode, **kw)
return local_open(file, mode, **kw)


def stat(path: str, **kw):
instance = DelegateFileSystem.instance
fs = instance.get_file_system(path)
if fs:
try:
logging.debug("DelegateFileSystem getStatus filemeta: %s", path)
return fs.stat(path, **kw)
except Exception as e:
logging.error(
f"Failed to stat file by delegateFileSystem with exception:{e}."
f"Used local filesystem instead.")
return local_stat(path, **kw)
logging.info("LocalFileSystem getStatus filemeta: %s", path)
return local_stat(path, **kw)


def listdir(path: str, **kw):
instance = DelegateFileSystem.instance
fs = instance.get_file_system(path)
if fs:
try:
return fs.listdir(path, **kw)
except Exception as e:
logging.error(
f"Failed to list directory by delegateFileSystem with exception: {e}."
f"Used local filesystem instead.")
return local_listdir(path, **kw)
return local_listdir(path, **kw)


def mkdir(path: str, mode=0o777, **kw):
instance = DelegateFileSystem.instance
fs = instance.get_file_system(path)
if fs:
try:
return fs.mkdir(path, mode, **kw)
except Exception as e:
logging.error(
f"Failed to make directory by delegateFileSystem with exception: {e}."
f"Used local filesystem instead.")
return local_mkdir(path, mode, **kw)
return local_mkdir(path, mode, **kw)


def rmdir(path: str, **kw):
instance = DelegateFileSystem.instance
fs = instance.get_file_system(path)
if fs:
try:
return fs.rmdir(path, **kw)
except Exception as e:
logging.error(
f"Failed to remove directory by delegateFileSystem with exception: {e}."
f"Used local filesystem instead."
)
return local_rmdir(path, **kw)
return local_rmdir(path, **kw)


def remove(path: str, **kw):
instance = DelegateFileSystem.instance
fs = instance.get_file_system(path)
if fs:
try:
return fs.rm(path, **kw)
except Exception as e:
logging.error(
f"Failed to remove file by delegateFileSystem with exception: {e}."
f"Used local filesystem instead.")
return local_remove(path, **kw)
return local_remove(path, **kw)


def rename(src: str, dest: str, **kw):
instance = DelegateFileSystem.instance
fs_src = instance.get_file_system(src)
fs_dest = instance.get_file_system(dest)
if fs_src and fs_dest and fs_src == fs_dest:
try:
return fs_src.rename(src, dest, **kw)
except Exception as e:
logging.error(
f"Failed to rename file by delegateFileSystem with exception: {e}."
f"Used local filesystem instead.")
return local_rename(src, dest, **kw)
logging.error("Source and destination are on different file systems or not supported.")
return local_rename(src, dest, **kw)
16 changes: 16 additions & 0 deletions alluxio/posix/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from setuptools import setup, find_packages

setup(
name='alluxio_posix',
version='0.1.0',
packages=find_packages(),
license='MIT',
description='Alluxio POSIX Python SDK',
author='lzq',
author_email='[email protected]',
data_files=[
('config', ['config/ufs_config.yaml']) # 指定配置文件所在路径
],
include_package_data=True,
zip_safe=False
)
Empty file added alluxio/posix/ufs/__init__.py
Empty file.
Loading