diff --git a/src/cockpit/channels/filesystem.py b/src/cockpit/channels/filesystem.py index 572fb7ab29c9..b34e0c676192 100644 --- a/src/cockpit/channels/filesystem.py +++ b/src/cockpit/channels/filesystem.py @@ -378,6 +378,20 @@ def get_group(gid: int) -> 'str | int': except KeyError: return gid + def get_access(name: str, fd: int, mode: int, *, follow_symlinks: bool = False) -> 'bool | None': + if not name: + # HACK: Python's os.access() does not support passing "AT_EMPTY_PATH" + # so we need to resolve the name of the watched directory. + try: + name = os.readlink(f'/proc/self/fd/{fd}') + return os.access(name, mode, follow_symlinks=follow_symlinks) + except OSError: + return None + try: + return os.access(name, mode, dir_fd=fd, follow_symlinks=follow_symlinks) + except OSError: + return None + stat_types = {stat.S_IFREG: 'reg', stat.S_IFDIR: 'dir', stat.S_IFLNK: 'lnk', stat.S_IFCHR: 'chr', stat.S_IFBLK: 'blk', stat.S_IFIFO: 'fifo', stat.S_IFSOCK: 'sock'} available_stat_getters = { @@ -393,6 +407,11 @@ def get_group(gid: int) -> 'str | int': } stat_getters = tuple((key, available_stat_getters.get(key, lambda _: None)) for key in attrs) + available_access_getters = { + 'r-ok': lambda name, fd, follow: get_access(name, fd, os.R_OK, follow_symlinks=follow), + 'w-ok': lambda name, fd, follow: get_access(name, fd, os.W_OK, follow_symlinks=follow), + } + def get_attrs(fd: int, name: str, follow: Follow) -> 'JsonDict | None': try: buf = os.stat(name, follow_symlinks=follow.value, dir_fd=fd) if name else os.fstat(fd) @@ -407,6 +426,10 @@ def get_attrs(fd: int, name: str, follow: Follow) -> 'JsonDict | None': with contextlib.suppress(OSError): result['target'] = os.readlink(name, dir_fd=fd) + for attr, getter in available_access_getters.items(): + if attr in result: + result[attr] = getter(name, fd, follow.value) + return result return get_attrs diff --git a/test/pytest/test_bridge.py b/test/pytest/test_bridge.py index 91d8f2b66664..7ce14a1d28b8 100644 --- a/test/pytest/test_bridge.py +++ b/test/pytest/test_bridge.py @@ -1298,7 +1298,7 @@ async def test_fsinfo_watch_identity_changes( @pytest.mark.asyncio async def test_fsinfo_self_owner(transport: MockTransport, tmp_path: Path) -> None: - client = await FsInfoClient.open(transport, tmp_path, ['user', 'uid', 'group', 'gid']) + client = await FsInfoClient.open(transport, tmp_path, ['user', 'uid', 'group', 'gid'], fnmatch='') state = await client.wait() info = get_dict(state, 'info') @@ -1306,6 +1306,8 @@ async def test_fsinfo_self_owner(transport: MockTransport, tmp_path: Path) -> No assert get_int(info, 'gid') == os.getgid() assert info.get('user') == getpass.getuser() assert info.get('group') == grp.getgrgid(os.getgid()).gr_name # hopefully true... + # user, uid, group, gid + assert len(info.keys()) == 4 @pytest.mark.asyncio @@ -1414,3 +1416,26 @@ async def test_fsinfo_targets(transport: MockTransport, tmp_path: Path) -> None: # double-check with the non-watch variant client = await FsInfoClient.open(transport, tmp_path, ['type', 'target', 'targets'], fnmatch='l*') assert await client.wait() == state + + +@pytest.mark.asyncio +async def test_fsinfo_access_attrs(transport: MockTransport, fsinfo_test_cases: 'dict[Path, JsonObject]') -> None: + for path, expected_state in fsinfo_test_cases.items(): + read_ok = True + write_ok = True + + # these are errors + if path.name == 'dangling' or path.name == 'loopy': + continue + + if path.name == 'no-r-dir': + read_ok = False + elif path.name == 'no-r-file': + read_ok = False + write_ok = False + + expected_state = {'info': {'r-ok': read_ok, 'w-ok': write_ok}} + + # fnmatch='' to not include entries + client = await FsInfoClient.open(transport, path, attrs=['w-ok', 'r-ok'], fnmatch='') + assert await client.wait() == expected_state, f'for path={path.name}'