From dfd4cc1e537523b04b01b6e209b5760bd2a007d5 Mon Sep 17 00:00:00 2001 From: Chen Sun Date: Mon, 9 Sep 2024 16:06:20 -0700 Subject: [PATCH] feat(sdk)!: Pin kfp-pipeline-spec==0.4.0, kfp-server-api>=2.1.0,<2.4.0 (#11192) Signed-off-by: Chen Sun --- sdk/python/kfp/client/client.py | 74 +++++++++++++++------------- sdk/python/kfp/client/client_test.py | 38 ++++++++------ sdk/python/requirements.in | 4 +- sdk/python/requirements.txt | 4 +- 4 files changed, 66 insertions(+), 54 deletions(-) diff --git a/sdk/python/kfp/client/client.py b/sdk/python/kfp/client/client.py index bdf9cbdf20f..f8897236343 100644 --- a/sdk/python/kfp/client/client.py +++ b/sdk/python/kfp/client/client.py @@ -421,7 +421,7 @@ def get_kfp_healthz( ) try: - return self._healthz_api.get_healthz() + return self._healthz_api.healthz_service_get_healthz() # ApiException, including network errors, is the only type that may # recover after retry. except kfp_server_api.ApiException: @@ -474,7 +474,8 @@ def create_experiment( description=description, namespace=namespace, ) - experiment = self._experiment_api.create_experiment(body=experiment) + experiment = self._experiment_api.experiment_service_create_experiment( + body=experiment) link = f'{self._get_url_prefix()}/#/experiments/details/{experiment.experiment_id}' if auth.is_ipython(): @@ -502,7 +503,8 @@ def get_pipeline_id(self, name: str) -> Optional[str]: 'stringValue': name, }] }) - result = self._pipelines_api.list_pipelines(filter=pipeline_filter) + result = self._pipelines_api.pipeline_service_list_pipelines( + filter=pipeline_filter) if result.pipelines is None: return None if len(result.pipelines) == 1: @@ -545,7 +547,7 @@ def list_experiments( ``V2beta1ListExperimentsResponse`` object. """ namespace = namespace or self.get_user_namespace() - return self._experiment_api.list_experiments( + return self._experiment_api.experiment_service_list_experiments( page_token=page_token, page_size=page_size, sort_by=sort_by, @@ -577,7 +579,7 @@ def get_experiment( raise ValueError( 'Either experiment_id or experiment_name is required.') if experiment_id is not None: - return self._experiment_api.get_experiment( + return self._experiment_api.experiment_service_get_experiment( experiment_id=experiment_id) experiment_filter = json.dumps({ 'predicates': [{ @@ -587,10 +589,10 @@ def get_experiment( }] }) if namespace is not None: - result = self._experiment_api.list_experiments( + result = self._experiment_api.experiment_service_list_experiments( filter=experiment_filter, namespace=namespace) else: - result = self._experiment_api.list_experiments( + result = self._experiment_api.experiment_service_list_experiments( filter=experiment_filter) if not result.experiments: raise ValueError( @@ -609,7 +611,7 @@ def archive_experiment(self, experiment_id: str) -> dict: Returns: Empty dictionary. """ - return self._experiment_api.archive_experiment( + return self._experiment_api.experiment_service_archive_experiment( experiment_id=experiment_id) def unarchive_experiment(self, experiment_id: str) -> dict: @@ -621,7 +623,7 @@ def unarchive_experiment(self, experiment_id: str) -> dict: Returns: Empty dictionary. """ - return self._experiment_api.unarchive_experiment( + return self._experiment_api.experiment_service_unarchive_experiment( experiment_id=experiment_id) def delete_experiment(self, experiment_id: str) -> dict: @@ -633,7 +635,7 @@ def delete_experiment(self, experiment_id: str) -> dict: Returns: Empty dictionary. """ - return self._experiment_api.delete_experiment( + return self._experiment_api.experiment_service_delete_experiment( experiment_id=experiment_id) def list_pipelines( @@ -666,7 +668,7 @@ def list_pipelines( Returns: ``V2beta1ListPipelinesResponse`` object. """ - return self._pipelines_api.list_pipelines( + return self._pipelines_api.pipeline_service_list_pipelines( namespace=namespace, page_token=page_token, page_size=page_size, @@ -730,7 +732,7 @@ def run_pipeline( runtime_config=job_config.runtime_config, service_account=service_account) - response = self._run_api.create_run(body=run_body) + response = self._run_api.run_service_create_run(body=run_body) link = f'{self._get_url_prefix()}/#/runs/details/{response.run_id}' if auth.is_ipython(): @@ -751,7 +753,7 @@ def archive_run(self, run_id: str) -> dict: Returns: Empty dictionary. """ - return self._run_api.archive_run(run_id=run_id) + return self._run_api.run_service_archive_run(run_id=run_id) def unarchive_run(self, run_id: str) -> dict: """Restores an archived run. @@ -762,7 +764,7 @@ def unarchive_run(self, run_id: str) -> dict: Returns: Empty dictionary. """ - return self._run_api.unarchive_run(run_id=run_id) + return self._run_api.run_service_unarchive_run(run_id=run_id) def delete_run(self, run_id: str) -> dict: """Deletes a run. @@ -773,7 +775,7 @@ def delete_run(self, run_id: str) -> dict: Returns: Empty dictionary. """ - return self._run_api.delete_run(run_id=run_id) + return self._run_api.run_service_delete_run(run_id=run_id) def terminate_run(self, run_id: str) -> dict: """Terminates a run. @@ -784,7 +786,7 @@ def terminate_run(self, run_id: str) -> dict: Returns: Empty dictionary. """ - return self._run_api.terminate_run(run_id=run_id) + return self._run_api.run_service_terminate_run(run_id=run_id) def create_recurring_run( self, @@ -896,7 +898,8 @@ def create_recurring_run( trigger=trigger, max_concurrency=max_concurrency, service_account=service_account) - return self._recurring_run_api.create_recurring_run(body=job_body) + return self._recurring_run_api.recurring_run_service_create_recurring_run( + body=job_body) def _create_job_config( self, @@ -1131,7 +1134,7 @@ def delete_recurring_run(self, recurring_run_id: str) -> dict: Returns: Empty dictionary. """ - return self._recurring_run_api.delete_recurring_run( + return self._recurring_run_api.recurring_run_service_delete_recurring_run( recurring_run_id=recurring_run_id) def disable_job(self, job_id: str) -> dict: @@ -1159,7 +1162,7 @@ def disable_recurring_run(self, recurring_run_id: str) -> dict: Returns: Empty dictionary. """ - return self._recurring_run_api.disable_recurring_run( + return self._recurring_run_api.recurring_run_service_disable_recurring_run( recurring_run_id=recurring_run_id) def enable_job(self, job_id: str) -> dict: @@ -1187,7 +1190,7 @@ def enable_recurring_run(self, recurring_run_id: str) -> dict: Returns: Empty dictionary. """ - return self._recurring_run_api.enable_recurring_run( + return self._recurring_run_api.recurring_run_service_enable_recurring_run( recurring_run_id=recurring_run_id) def list_runs( @@ -1225,7 +1228,7 @@ def list_runs( """ namespace = namespace or self.get_user_namespace() if experiment_id is not None: - return self._run_api.list_runs( + return self._run_api.run_service_list_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, @@ -1233,7 +1236,7 @@ def list_runs( filter=filter) elif namespace is not None: - return self._run_api.list_runs( + return self._run_api.run_service_list_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, @@ -1241,7 +1244,7 @@ def list_runs( filter=filter) else: - return self._run_api.list_runs( + return self._run_api.run_service_list_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, @@ -1281,7 +1284,7 @@ def list_recurring_runs( ``V2beta1ListRecurringRunsResponse`` object. """ if experiment_id is not None: - return self._recurring_run_api.list_recurring_runs( + return self._recurring_run_api.recurring_run_service_list_recurring_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, @@ -1289,7 +1292,7 @@ def list_recurring_runs( filter=filter) elif namespace is not None: - return self._recurring_run_api.list_recurring_runs( + return self._recurring_run_api.recurring_run_service_list_recurring_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, @@ -1297,7 +1300,7 @@ def list_recurring_runs( filter=filter) else: - return self._recurring_run_api.list_recurring_runs( + return self._recurring_run_api.recurring_run_service_list_recurring_runs( page_token=page_token, page_size=page_size, sort_by=sort_by, @@ -1324,7 +1327,7 @@ def get_recurring_run( stacklevel=2) recurring_run_id = recurring_run_id or job_id - return self._recurring_run_api.get_recurring_run( + return self._recurring_run_api.recurring_run_service_get_recurring_run( recurring_run_id=recurring_run_id) def get_run(self, run_id: str) -> kfp_server_api.V2beta1Run: @@ -1336,7 +1339,7 @@ def get_run(self, run_id: str) -> kfp_server_api.V2beta1Run: Returns: ``V2beta1Run`` object. """ - return self._run_api.get_run(run_id=run_id) + return self._run_api.run_service_get_run(run_id=run_id) def wait_for_run_completion( self, @@ -1362,7 +1365,8 @@ def wait_for_run_completion( finish_states = ['succeeded', 'failed', 'skipped', 'error'] while True: try: - get_run_response = self._run_api.get_run(run_id=run_id) + get_run_response = self._run_api.run_service_get_run( + run_id=run_id) is_valid_token = True except kfp_server_api.ApiException as api_ex: # if the token is valid but receiving 401 Unauthorized error @@ -1480,7 +1484,8 @@ def get_pipeline(self, pipeline_id: str) -> kfp_server_api.V2beta1Pipeline: Returns: ``V2beta1Pipeline`` object. """ - return self._pipelines_api.get_pipeline(pipeline_id=pipeline_id) + return self._pipelines_api.pipeline_service_get_pipeline( + pipeline_id=pipeline_id) def delete_pipeline(self, pipeline_id: str) -> dict: """Deletes a pipeline. @@ -1491,7 +1496,8 @@ def delete_pipeline(self, pipeline_id: str) -> dict: Returns: Empty dictionary. """ - return self._pipelines_api.delete_pipeline(pipeline_id=pipeline_id) + return self._pipelines_api.pipeline_service_delete_pipeline( + pipeline_id=pipeline_id) def list_pipeline_versions( self, @@ -1525,7 +1531,7 @@ def list_pipeline_versions( ``V2beta1ListPipelineVersionsResponse`` object. """ - return self._pipelines_api.list_pipeline_versions( + return self._pipelines_api.pipeline_service_list_pipeline_versions( page_token=page_token, page_size=page_size, sort_by=sort_by, @@ -1546,7 +1552,7 @@ def get_pipeline_version( Returns: ``V2beta1PipelineVersion`` object. """ - return self._pipelines_api.get_pipeline_version( + return self._pipelines_api.pipeline_service_get_pipeline_version( pipeline_id=pipeline_id, pipeline_version_id=pipeline_version_id, ) @@ -1565,7 +1571,7 @@ def delete_pipeline_version( Returns: Empty dictionary. """ - return self._pipelines_api.delete_pipeline_version( + return self._pipelines_api.pipeline_service_delete_pipeline_version( pipeline_id=pipeline_id, pipeline_version_id=pipeline_version_id, ) diff --git a/sdk/python/kfp/client/client_test.py b/sdk/python/kfp/client/client_test.py index da6b0710b93..301ec6d119b 100644 --- a/sdk/python/kfp/client/client_test.py +++ b/sdk/python/kfp/client/client_test.py @@ -198,14 +198,15 @@ def test_wait_for_run_completion_invalid_token_should_raise_error(self): with self.assertRaises(kfp_server_api.ApiException): with patch.object( self.client._run_api, - 'get_run', + 'run_service_get_run', side_effect=kfp_server_api.ApiException) as mock_get_run: self.client.wait_for_run_completion( run_id='foo', timeout=1, sleep_duration=0) mock_get_run.assert_called_once() def test_wait_for_run_completion_expired_access_token(self): - with patch.object(self.client._run_api, 'get_run') as mock_get_run: + with patch.object(self.client._run_api, + 'run_service_get_run') as mock_get_run: # We need to iterate through multiple side effects in order to test this logic. mock_get_run.side_effect = [ Mock(state='unknown state'), @@ -221,7 +222,8 @@ def test_wait_for_run_completion_expired_access_token(self): mock_refresh_api_client_token.assert_called_once() def test_wait_for_run_completion_valid_token(self): - with patch.object(self.client._run_api, 'get_run') as mock_get_run: + with patch.object(self.client._run_api, + 'run_service_get_run') as mock_get_run: mock_get_run.return_value = Mock(state='succeeded') response = self.client.wait_for_run_completion( run_id='foo', timeout=1, sleep_duration=0) @@ -230,7 +232,8 @@ def test_wait_for_run_completion_valid_token(self): def test_wait_for_run_completion_run_timeout_should_raise_error(self): with self.assertRaises(TimeoutError): - with patch.object(self.client._run_api, 'get_run') as mock_get_run: + with patch.object(self.client._run_api, + 'run_service_get_run') as mock_get_run: mock_get_run.return_value = Mock(run=Mock(status='foo')) self.client.wait_for_run_completion( run_id='foo', timeout=1, sleep_duration=0) @@ -242,7 +245,7 @@ def test_create_experiment_no_experiment_should_raise_error( with self.assertRaises(ValueError): self.client.create_experiment(name='foo', namespace='ns1') mock_get_experiment.assert_called_once_with( - name='foo', namespace='ns1') + name='foo', onamespace='ns1') @patch('kfp.Client.get_experiment', return_value=Mock(id='foo')) @patch('kfp.Client._get_url_prefix', return_value='/pipeline') @@ -265,7 +268,7 @@ def test__create_experiment_name_not_found(self, mock_get_url_prefix, # is created. with patch.object( self.client._experiment_api, - 'create_experiment', + 'experiment_service_create_experiment', return_value=Mock( experiment_id='foo')) as mock_create_experiment: self.client.create_experiment(name='foo') @@ -285,7 +288,7 @@ def test_get_experiment_does_not_exist_should_raise_error( with self.assertRaises(ValueError): with patch.object( self.client._experiment_api, - 'list_experiments', + 'experiment_service_list_experiments', return_value=Mock( experiments=None)) as mock_list_experiments: self.client.get_experiment(experiment_name='foo') @@ -298,7 +301,7 @@ def test_get_experiment_multiple_experiments_with_name_should_raise_error( with self.assertRaises(ValueError): with patch.object( self.client._experiment_api, - 'list_experiments', + 'experiment_service_list_experiments', return_value=Mock( experiments=['foo', 'foo'])) as mock_list_experiments: self.client.get_experiment(experiment_name='foo') @@ -306,27 +309,30 @@ def test_get_experiment_multiple_experiments_with_name_should_raise_error( mock_get_user_namespace.assert_called_once() def test_get_experiment_with_experiment_id(self): - with patch.object(self.client._experiment_api, - 'get_experiment') as mock_get_experiment: + with patch.object( + self.client._experiment_api, + 'experiment_service_get_experiment') as mock_get_experiment: self.client.get_experiment(experiment_id='foo') mock_get_experiment.assert_called_once_with(experiment_id='foo') def test_get_experiment_with_experiment_name_and_namespace(self): - with patch.object(self.client._experiment_api, - 'list_experiments') as mock_list_experiments: + with patch.object( + self.client._experiment_api, + 'experiment_service_list_experiments') as mock_list_experiments: self.client.get_experiment(experiment_name='foo', namespace='ns1') mock_list_experiments.assert_called_once() @patch('kfp.Client.get_user_namespace', return_value=None) def test_get_experiment_with_experiment_name_and_no_namespace( self, mock_get_user_namespace): - with patch.object(self.client._experiment_api, - 'list_experiments') as mock_list_experiments: + with patch.object( + self.client._experiment_api, + 'experiment_service_list_experiments') as mock_list_experiments: self.client.get_experiment(experiment_name='foo') mock_list_experiments.assert_called_once() mock_get_user_namespace.assert_called_once() - @patch('kfp_server_api.HealthzServiceApi.get_healthz') + @patch('kfp_server_api.HealthzServiceApi.healthz_service_get_healthz') def test_get_kfp_healthz(self, mock_get_kfp_healthz): mock_get_kfp_healthz.return_value = json.dumps([{'foo': 'bar'}]) response = self.client.get_kfp_healthz() @@ -334,7 +340,7 @@ def test_get_kfp_healthz(self, mock_get_kfp_healthz): assert (response == mock_get_kfp_healthz.return_value) @patch( - 'kfp_server_api.HealthzServiceApi.get_healthz', + 'kfp_server_api.HealthzServiceApi.healthz_service_get_healthz', side_effect=kfp_server_api.ApiException) def test_get_kfp_healthz_should_raise_error(self, mock_get_kfp_healthz): with self.assertRaises(TimeoutError): diff --git a/sdk/python/requirements.in b/sdk/python/requirements.in index 955d33cd83b..504b8929834 100644 --- a/sdk/python/requirements.in +++ b/sdk/python/requirements.in @@ -11,12 +11,12 @@ google-auth>=1.6.1,<3 # https://github.com/googleapis/python-storage/blob/main/CHANGELOG.md#221-2022-03-15 google-cloud-storage>=2.2.1,<3 # pin kfp-pipeline-spec to an exact version, since this is the contract between a given KFP SDK version and the BE. we don't want old version of the SDK to write new fields and to have the BE reject the new unsupported field (even if the new field backward compatible from a proto perspective) -kfp-pipeline-spec==0.3.0 +kfp-pipeline-spec==0.4.0 # Update the upper version whenever a new major version of the # kfp-server-api package is released. # Update the lower version when kfp sdk depends on new apis/fields in # kfp-server-api. -kfp-server-api>=2.0.0,<2.1.0 +kfp-server-api>=2.1.0,<2.4.0 kubernetes>=8.0.0,<31 protobuf>=4.21.1,<5 PyYAML>=5.3,<7 diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt index ca52ccf1e2f..41f22813f1e 100644 --- a/sdk/python/requirements.txt +++ b/sdk/python/requirements.txt @@ -43,9 +43,9 @@ googleapis-common-protos==1.63.2 # via google-api-core idna==3.7 # via requests -kfp-pipeline-spec==0.3.0 +kfp-pipeline-spec==0.4.0 # via -r requirements.in -kfp-server-api==2.0.5 +kfp-server-api==2.3.0 # via -r requirements.in kubernetes==30.1.0 # via -r requirements.in