diff --git a/src/cockpit/channels/pcp.py b/src/cockpit/channels/pcp.py index 4ec4028a5cfd..1521dea8f2e2 100644 --- a/src/cockpit/channels/pcp.py +++ b/src/cockpit/channels/pcp.py @@ -240,7 +240,7 @@ def parse_options(self, options: JsonObject): # if self.omit_instances and self.instances: # raise ChannelError('protocol-error', message='') - def sample(self, archive): + def sample(self, archive, total_fetched): context = archive.context # HACK: this is some utter sillyness, maybe we can construct our own pcp.pmapi.c_uint_Array_1 @@ -253,10 +253,16 @@ def sample(self, archive): fetched = [] try: for _ in range(self.archive_batch): + if total_fetched == self.limit: + self.send_updates(archive, fetched) + logger.debug('Reached limit, stopping') + return total_fetched + # Consider using the fetchGroup API https://pcp.readthedocs.io/en/latest/PG/PMAPI.html#fetchgroup-operation # HACK: This is some pcp weirdness where it only accepts a PCP type list and not a Python list # PMIDS results = context.pmFetch(pmids) fetched.append(self.parse_fetched_results(context, results, descs)) + total_fetched += 1 self.send_updates(archive, fetched) fetched.clear() @@ -270,6 +276,8 @@ def sample(self, archive): break + return total_fetched + def parse_fetched_results(self, context: 'pmapi.pmContext', results: Any, descs: Any) -> Sample: metrics = list(self.metrics) samples: dict[str, float | list[float]] = {} @@ -414,6 +422,7 @@ def send_updates(self, archive, samples: Sequence[Sample]) -> None: self.send_data(json.dumps(data).encode()) def sample_archives(self, archives): + total_fetched = 0 for i, archive in enumerate(archives): timestamp = self.start_timestamp @@ -435,7 +444,9 @@ def sample_archives(self, archives): except pmapi.pmErr as exc: raise ChannelError('internal-error', message=str(exc)) from None - self.sample(archive) + total_fetched = self.sample(archive, total_fetched) + if total_fetched == self.limit: + return True else: return True @@ -475,6 +486,8 @@ async def run(self, options: JsonObject) -> None: self.sample_archives(archives) + self.done() + # while True: # # if all_read: diff --git a/test/pytest/test_pcp.py b/test/pytest/test_pcp.py index a76910168219..fa80b32cf0b4 100644 --- a/test/pytest/test_pcp.py +++ b/test/pytest/test_pcp.py @@ -51,6 +51,21 @@ def test_broken_archive(tmpdir_factory): return pcp_dir +@pytest.fixture +def big_archive(tmpdir_factory): + pcp_dir = tmpdir_factory.mktemp('big-archive') + archive_1 = pmi.pmiLogImport(f"{pcp_dir}/0") + archive_1.pmiAddMetric("mock.value", PM_ID_NULL, PM_TYPE_U32, PM_INDOM_NULL, + PM_SEM_INSTANT, archive_1.pmiUnits(0, 0, 0, 0, 0, 0)) + for i in range(1000): + archive_1.pmiPutValue("mock.value", None, str(i)) + archive_1.pmiWrite(i, 0) + + archive_1.pmiEnd() + + return pcp_dir + + @pytest.fixture def test_archive(tmpdir_factory): pcp_dir = tmpdir_factory.mktemp('mock-archives') @@ -102,8 +117,8 @@ async def test_pcp_open_error(transport, test_archive): @pytest.mark.asyncio async def test_pcp_open(transport, test_archive): - _ = await transport.check_open('metrics1', source=str(test_archive), - metrics=[{"name": "mock.value"}]) + ch = await transport.check_open('metrics1', source=str(test_archive), + metrics=[{"name": "mock.value"}]) _, data = await transport.next_frame() # first message is always the meta message @@ -136,6 +151,55 @@ async def test_pcp_open(transport, test_archive): data = json.loads(data) assert data == [[13], [14], [15]] + transport.check_close(channel=ch) + + +@pytest.mark.asyncio +async def test_pcp_big_archive(transport, big_archive): + _ = await transport.check_open('metrics1', source=str(big_archive), + metrics=[{"name": "mock.value"}]) + + _, data = await transport.next_frame() + # first message is always the meta message + meta = json.loads(data) + + # TODO: assert helper function? + assert meta['timestamp'] == 0 + assert meta['interval'] == 1000 # default interval + assert meta['source'] == str(big_archive) + + metrics = meta['metrics'] + assert len(metrics) == 1 + + metric = metrics[0] + assert metric['name'] == 'mock.value' + assert 'derive' not in metric + assert metric['semantic'] == 'instant' + + _, data = await transport.next_frame() + data = json.loads(data) + # archives batch size is hardcoded to 60 + # TODO import? + assert data == [[i] for i in range(60)] + + +@pytest.mark.asyncio +async def test_pcp_limit_archive(transport, big_archive): + + ch = await transport.check_open('metrics1', source=str(big_archive), + limit=30, + metrics=[{"name": "mock.value"}]) + + # first message is always the meta message + _, data = await transport.next_frame() + # TODO: verify + + _, data = await transport.next_frame() + data = json.loads(data) + assert data == [[i] for i in range(30)] + + transport.check_close(channel=ch) + @pytest.mark.asyncio async def test_pcp_broken_archive(transport, test_broken_archive):