Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update schedule search attributes #753

Merged
merged 6 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4047,6 +4047,7 @@ async def _to_proto(
temporalio.converter.encode_search_attributes(
untyped_not_in_typed, action.start_workflow.search_attributes
)
# TODO (dan): confirm whether this be `is not None`
if self.typed_search_attributes:
temporalio.converter.encode_search_attributes(
self.typed_search_attributes, action.start_workflow.search_attributes
Expand Down Expand Up @@ -4499,6 +4500,9 @@ class ScheduleUpdate:
schedule: Schedule
"""Schedule to update."""

search_attributes: Optional[temporalio.common.TypedSearchAttributes] = None
"""Search attributes to update."""


@dataclass
class ScheduleListDescription:
Expand Down Expand Up @@ -6520,14 +6524,20 @@ async def update_schedule(self, input: UpdateScheduleInput) -> None:
if not update:
return
assert isinstance(update, ScheduleUpdate)
request = temporalio.api.workflowservice.v1.UpdateScheduleRequest(
namespace=self._client.namespace,
schedule_id=input.id,
schedule=await update.schedule._to_proto(self._client),
identity=self._client.identity,
request_id=str(uuid.uuid4()),
)
if update.search_attributes is not None:
request.search_attributes.indexed_fields.clear() # Ensure that we at least create an empty map
Copy link
Contributor Author

@dandavison dandavison Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, if the user sets an empty but non-None value, then the request proto does not contain a search_attributes field at all. That's incorrect; what we want in that case is an empty map, which clears all SAs server-side. See docs in API: temporalio/api:/temporal/api/workflowservice/v1/request_response.proto

I investigated making this change (empty map in proto for empty but non-None SAs) in encode_search_attributes so that it is applied consistently to all code paths, but it causes the existing test test_schedule_workflow_search_attribute_update to break.

Copy link
Member

@cretz cretz Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach makes sense since, uniquely in Python, we pass the collection to write to for the helpers instead of just assign the variable due to how Python protos work. Though it is strange you can't just start writing to the map and it is created lazily.

temporalio.converter.encode_search_attributes(
update.search_attributes, request.search_attributes
)
await self._client.workflow_service.update_schedule(
temporalio.api.workflowservice.v1.UpdateScheduleRequest(
namespace=self._client.namespace,
schedule_id=input.id,
schedule=await update.schedule._to_proto(self._client),
identity=self._client.identity,
request_id=str(uuid.uuid4()),
),
request,
retry=True,
metadata=input.rpc_metadata,
timeout=input.rpc_timeout,
Expand Down
151 changes: 146 additions & 5 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,18 +1188,16 @@ async def test_schedule_create_limited_actions_validation(
assert "are remaining actions set" in str(err.value)


async def test_schedule_search_attribute_update(
async def test_schedule_workflow_search_attribute_update(
client: Client, env: WorkflowEnvironment
):
if env.supports_time_skipping:
pytest.skip("Java test server doesn't support schedules")
await assert_no_schedules(client)

# Put search attribute on server
text_attr_key = SearchAttributeKey.for_text(f"python-test-schedule-text")
untyped_keyword_key = SearchAttributeKey.for_keyword(
f"python-test-schedule-keyword"
)
text_attr_key = SearchAttributeKey.for_text("python-test-schedule-text")
untyped_keyword_key = SearchAttributeKey.for_keyword("python-test-schedule-keyword")
await ensure_search_attributes_present(client, text_attr_key, untyped_keyword_key)

# Create a schedule with search attributes on the schedule and on the
Expand Down Expand Up @@ -1273,6 +1271,7 @@ def update_schedule_typed_attrs(
# Check that it changed
desc = await handle.describe()
assert isinstance(desc.schedule.action, ScheduleActionStartWorkflow)
# Check that the workflow search attributes were changed
# This assertion has changed since server 1.24. Now, even untyped search
# attributes are given a type server side
assert (
Expand All @@ -1283,6 +1282,148 @@ def update_schedule_typed_attrs(
and desc.schedule.action.typed_search_attributes[untyped_keyword_key]
== "some-untyped-attr1"
)
# Check that the schedule search attributes were not changed
assert desc.search_attributes[text_attr_key.name] == ["some-schedule-attr1"]
assert desc.typed_search_attributes[text_attr_key] == "some-schedule-attr1"

await handle.delete()
await assert_no_schedules(client)


@pytest.mark.parametrize(
"test_case",
[
"none-is-noop",
"empty-but-non-none-clears",
"all-new-values-overwrites",
"partial-new-values-overwrites-and-drops",
],
)
async def test_schedule_search_attribute_update(
client: Client, env: WorkflowEnvironment, test_case: str
):
if env.supports_time_skipping:
pytest.skip("Java test server doesn't support schedules")
await assert_no_schedules(client)

# Put search attributes on server
key_1 = SearchAttributeKey.for_text("python-test-schedule-sa-update-key-1")
key_2 = SearchAttributeKey.for_keyword("python-test-schedule-sa-update-key-2")
await ensure_search_attributes_present(client, key_1, key_2)
val_1 = "val-1"
val_2 = "val-2"

# Create a schedule with search attributes
create_action = ScheduleActionStartWorkflow(
"some workflow",
[],
id=f"workflow-{uuid.uuid4()}",
task_queue=f"tq-{uuid.uuid4()}",
)
handle = await client.create_schedule(
f"schedule-{uuid.uuid4()}",
Schedule(action=create_action, spec=ScheduleSpec()),
search_attributes=TypedSearchAttributes(
[
SearchAttributePair(key_1, val_1),
SearchAttributePair(key_2, val_2),
]
),
)

def update_search_attributes(
input: ScheduleUpdateInput,
) -> Optional[ScheduleUpdate]:
# Make sure the initial search attributes are present
assert input.description.search_attributes[key_1.name] == [val_1]
assert input.description.search_attributes[key_2.name] == [val_2]
assert input.description.typed_search_attributes[key_1] == val_1
assert input.description.typed_search_attributes[key_2] == val_2

if test_case == "none-is-noop":
# Passing None makes no changes
return ScheduleUpdate(input.description.schedule, search_attributes=None)
elif test_case == "empty-but-non-none-clears":
# Pass empty but non-None to clear all attributes
return ScheduleUpdate(
input.description.schedule,
search_attributes=TypedSearchAttributes.empty,
)
elif test_case == "all-new-values-overwrites":
# Pass all new values to overwrite existing
return ScheduleUpdate(
input.description.schedule,
search_attributes=input.description.typed_search_attributes.updated(
SearchAttributePair(key_1, val_1 + "-new"),
SearchAttributePair(key_2, val_2 + "-new"),
),
)
elif test_case == "partial-new-values-overwrites-and-drops":
# Only update key_1, which should drop key_2
return ScheduleUpdate(
input.description.schedule,
search_attributes=TypedSearchAttributes(
[
SearchAttributePair(key_1, val_1 + "-new"),
]
),
)
else:
raise ValueError(f"Invalid test case: {test_case}")

await handle.update(update_search_attributes)

if test_case == "none-is-noop":

async def expectation() -> bool:
desc = await handle.describe()
return (
desc.search_attributes[key_1.name] == [val_1]
and desc.search_attributes[key_2.name] == [val_2]
and desc.typed_search_attributes[key_1] == val_1
and desc.typed_search_attributes[key_2] == val_2
)

await assert_eq_eventually(True, expectation)
elif test_case == "empty-but-non-none-clears":

async def expectation() -> bool:
desc = await handle.describe()
return (
len(desc.typed_search_attributes) == 0
and len(desc.search_attributes) == 0
)

await assert_eq_eventually(True, expectation)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the test case that fails without the change above relating to empty map in the request proto.

elif test_case == "all-new-values-overwrites":

async def expectation() -> bool:
desc = await handle.describe()
return (
desc.search_attributes[key_1.name] == [val_1 + "-new"]
and desc.search_attributes[key_2.name] == [val_2 + "-new"]
and desc.typed_search_attributes[key_1] == val_1 + "-new"
and desc.typed_search_attributes[key_2] == val_2 + "-new"
)

await assert_eq_eventually(True, expectation)
elif test_case == "partial-new-values-overwrites-and-drops":

async def expectation() -> bool:
desc = await handle.describe()
return (
desc.search_attributes[key_1.name] == [val_1 + "-new"]
and desc.typed_search_attributes[key_1] == val_1 + "-new"
and key_2.name not in desc.search_attributes
and key_2 not in desc.typed_search_attributes
)

await assert_eq_eventually(True, expectation)
else:
raise ValueError(f"Invalid test case: {test_case}")

await handle.delete()
await assert_no_schedules(client)


async def assert_no_schedules(client: Client) -> None:
Expand Down
Loading