Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cockpit: support access attributes in fsinfo #21603

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/cockpit/channels/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,20 @@ def get_group(gid: int) -> 'str | int':
except KeyError:
return gid

def get_access(name: str, fd: int, mode: int, *, follow_symlinks: bool = False) -> 'bool | None':
if not name:
# HACK: Python's os.access() does not support passing "AT_EMPTY_PATH"
# so we need to resolve the name of the watched directory.
try:
name = os.readlink(f'/proc/self/fd/{fd}')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you can't just os.access() directly on the /proc/self/fd link with follow_symlinks=True? That underlying file descriptor is exactly what you want to query. Trying to do it via filename is possibly racy and I imagine it could run into corner cases...

return os.access(name, mode, follow_symlinks=follow_symlinks)
except OSError:
return None
try:
return os.access(name, mode, dir_fd=fd, follow_symlinks=follow_symlinks)
except OSError:
return None

stat_types = {stat.S_IFREG: 'reg', stat.S_IFDIR: 'dir', stat.S_IFLNK: 'lnk', stat.S_IFCHR: 'chr',
stat.S_IFBLK: 'blk', stat.S_IFIFO: 'fifo', stat.S_IFSOCK: 'sock'}
available_stat_getters = {
Expand All @@ -393,6 +407,11 @@ def get_group(gid: int) -> 'str | int':
}
stat_getters = tuple((key, available_stat_getters.get(key, lambda _: None)) for key in attrs)

available_access_getters = {
'r-ok': lambda name, fd, follow: get_access(name, fd, os.R_OK, follow_symlinks=follow),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having a map from r-ok to os.R_OK (and so on) instead, and removing one layer of indirection?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: x-ok?

'w-ok': lambda name, fd, follow: get_access(name, fd, os.W_OK, follow_symlinks=follow),
}

def get_attrs(fd: int, name: str, follow: Follow) -> 'JsonDict | None':
try:
buf = os.stat(name, follow_symlinks=follow.value, dir_fd=fd) if name else os.fstat(fd)
Expand All @@ -407,6 +426,10 @@ def get_attrs(fd: int, name: str, follow: Follow) -> 'JsonDict | None':
with contextlib.suppress(OSError):
result['target'] = os.readlink(name, dir_fd=fd)

for attr, getter in available_access_getters.items():
if attr in result:
result[attr] = getter(name, fd, follow.value)

return result

return get_attrs
Expand Down
27 changes: 26 additions & 1 deletion test/pytest/test_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,14 +1298,16 @@ async def test_fsinfo_watch_identity_changes(

@pytest.mark.asyncio
async def test_fsinfo_self_owner(transport: MockTransport, tmp_path: Path) -> None:
client = await FsInfoClient.open(transport, tmp_path, ['user', 'uid', 'group', 'gid'])
client = await FsInfoClient.open(transport, tmp_path, ['user', 'uid', 'group', 'gid'], fnmatch='')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this change. '' should already be the default value if the attrs array doesn't contain entries.

        attrs = set(get_strv(options, 'attrs'))
        self.getattrs = self.make_getattrs(attrs - {'targets', 'entries'})
        self.fnmatch = get_str(options, 'fnmatch', '*' if 'entries' in attrs else '')

state = await client.wait()
info = get_dict(state, 'info')

assert get_int(info, 'uid') == os.getuid()
assert get_int(info, 'gid') == os.getgid()
assert info.get('user') == getpass.getuser()
assert info.get('group') == grp.getgrgid(os.getgid()).gr_name # hopefully true...
# user, uid, group, gid
assert len(info.keys()) == 4


@pytest.mark.asyncio
Expand Down Expand Up @@ -1414,3 +1416,26 @@ async def test_fsinfo_targets(transport: MockTransport, tmp_path: Path) -> None:
# double-check with the non-watch variant
client = await FsInfoClient.open(transport, tmp_path, ['type', 'target', 'targets'], fnmatch='l*')
assert await client.wait() == state


@pytest.mark.asyncio
async def test_fsinfo_access_attrs(transport: MockTransport, fsinfo_test_cases: 'dict[Path, JsonObject]') -> None:
for path, expected_state in fsinfo_test_cases.items():
read_ok = True
write_ok = True

# these are errors
if path.name == 'dangling' or path.name == 'loopy':
continue

if path.name == 'no-r-dir':
read_ok = False
elif path.name == 'no-r-file':
read_ok = False
write_ok = False

expected_state = {'info': {'r-ok': read_ok, 'w-ok': write_ok}}

# fnmatch='' to not include entries
client = await FsInfoClient.open(transport, path, attrs=['w-ok', 'r-ok'], fnmatch='')
assert await client.wait() == expected_state, f'for path={path.name}'
Loading