-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cockpit: support access attributes in fsinfo #21603
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -378,6 +378,20 @@ def get_group(gid: int) -> 'str | int': | |
except KeyError: | ||
return gid | ||
|
||
def get_access(name: str, fd: int, mode: int, *, follow_symlinks: bool = False) -> 'bool | None': | ||
if not name: | ||
# HACK: Python's os.access() does not support passing "AT_EMPTY_PATH" | ||
# so we need to resolve the name of the watched directory. | ||
try: | ||
name = os.readlink(f'/proc/self/fd/{fd}') | ||
return os.access(name, mode, follow_symlinks=follow_symlinks) | ||
except OSError: | ||
return None | ||
try: | ||
return os.access(name, mode, dir_fd=fd, follow_symlinks=follow_symlinks) | ||
except OSError: | ||
return None | ||
|
||
stat_types = {stat.S_IFREG: 'reg', stat.S_IFDIR: 'dir', stat.S_IFLNK: 'lnk', stat.S_IFCHR: 'chr', | ||
stat.S_IFBLK: 'blk', stat.S_IFIFO: 'fifo', stat.S_IFSOCK: 'sock'} | ||
available_stat_getters = { | ||
|
@@ -393,6 +407,11 @@ def get_group(gid: int) -> 'str | int': | |
} | ||
stat_getters = tuple((key, available_stat_getters.get(key, lambda _: None)) for key in attrs) | ||
|
||
available_access_getters = { | ||
'r-ok': lambda name, fd, follow: get_access(name, fd, os.R_OK, follow_symlinks=follow), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about having a map from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also: |
||
'w-ok': lambda name, fd, follow: get_access(name, fd, os.W_OK, follow_symlinks=follow), | ||
} | ||
|
||
def get_attrs(fd: int, name: str, follow: Follow) -> 'JsonDict | None': | ||
try: | ||
buf = os.stat(name, follow_symlinks=follow.value, dir_fd=fd) if name else os.fstat(fd) | ||
|
@@ -407,6 +426,10 @@ def get_attrs(fd: int, name: str, follow: Follow) -> 'JsonDict | None': | |
with contextlib.suppress(OSError): | ||
result['target'] = os.readlink(name, dir_fd=fd) | ||
|
||
for attr, getter in available_access_getters.items(): | ||
if attr in result: | ||
result[attr] = getter(name, fd, follow.value) | ||
|
||
return result | ||
|
||
return get_attrs | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1298,14 +1298,16 @@ async def test_fsinfo_watch_identity_changes( | |
|
||
@pytest.mark.asyncio | ||
async def test_fsinfo_self_owner(transport: MockTransport, tmp_path: Path) -> None: | ||
client = await FsInfoClient.open(transport, tmp_path, ['user', 'uid', 'group', 'gid']) | ||
client = await FsInfoClient.open(transport, tmp_path, ['user', 'uid', 'group', 'gid'], fnmatch='') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this change. attrs = set(get_strv(options, 'attrs'))
self.getattrs = self.make_getattrs(attrs - {'targets', 'entries'})
self.fnmatch = get_str(options, 'fnmatch', '*' if 'entries' in attrs else '') |
||
state = await client.wait() | ||
info = get_dict(state, 'info') | ||
|
||
assert get_int(info, 'uid') == os.getuid() | ||
assert get_int(info, 'gid') == os.getgid() | ||
assert info.get('user') == getpass.getuser() | ||
assert info.get('group') == grp.getgrgid(os.getgid()).gr_name # hopefully true... | ||
# user, uid, group, gid | ||
assert len(info.keys()) == 4 | ||
|
||
|
||
@pytest.mark.asyncio | ||
|
@@ -1414,3 +1416,26 @@ async def test_fsinfo_targets(transport: MockTransport, tmp_path: Path) -> None: | |
# double-check with the non-watch variant | ||
client = await FsInfoClient.open(transport, tmp_path, ['type', 'target', 'targets'], fnmatch='l*') | ||
assert await client.wait() == state | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_fsinfo_access_attrs(transport: MockTransport, fsinfo_test_cases: 'dict[Path, JsonObject]') -> None: | ||
for path, expected_state in fsinfo_test_cases.items(): | ||
read_ok = True | ||
write_ok = True | ||
|
||
# these are errors | ||
if path.name == 'dangling' or path.name == 'loopy': | ||
continue | ||
|
||
if path.name == 'no-r-dir': | ||
read_ok = False | ||
elif path.name == 'no-r-file': | ||
read_ok = False | ||
write_ok = False | ||
|
||
expected_state = {'info': {'r-ok': read_ok, 'w-ok': write_ok}} | ||
|
||
# fnmatch='' to not include entries | ||
client = await FsInfoClient.open(transport, path, attrs=['w-ok', 'r-ok'], fnmatch='') | ||
assert await client.wait() == expected_state, f'for path={path.name}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason you can't just
os.access()
directly on the/proc/self/fd
link withfollow_symlinks=True
? That underlying file descriptor is exactly what you want to query. Trying to do it via filename is possibly racy and I imagine it could run into corner cases...