Skip to content

Commit

Permalink
Implement ImportWorkflowExecution admin API (temporalio#4827)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
* Implement ImportWorkflowExecution admin API
  * Allow batch import workflow execution events creating new workflow
  * Allow batch import workflow execution events updating existing workflow
  * Allow import workflow execution events without breaking XDC version semantics

<!-- Tell your future self why have you made these changes -->
**Why?**
New functionality

<!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? -->
**How did you test it?**
Integration tests

<!-- Assuming the worst case, what can be broken when deploying this change to production? -->
**Potential risks**
N/A

<!-- Is this PR a hotfix candidate or require that a notification be sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
N/A
  • Loading branch information
wxing1292 authored Sep 7, 2023
1 parent c9d9a93 commit 0d343ce
Show file tree
Hide file tree
Showing 37 changed files with 3,986 additions and 981 deletions.
6 changes: 5 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ linters-settings:
disabled: true
- name: var-naming
disabled: true
- name: empty-block
disabled: true
- name: flag-parameter
disabled: true

# Rule tuning
- name: cognitive-complexity
Expand All @@ -88,7 +92,7 @@ linters-settings:
- 18
- name: function-result-limit
arguments:
- 4
- 5
- name: unhandled-error
arguments:
- "fmt.*"
Expand Down
1,460 changes: 1,084 additions & 376 deletions api/adminservice/v1/request_response.pb.go

Large diffs are not rendered by default.

157 changes: 99 additions & 58 deletions api/adminservice/v1/service.pb.go

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions api/adminservicemock/v1/service.pb.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1,455 changes: 1,082 additions & 373 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

223 changes: 133 additions & 90 deletions api/historyservice/v1/service.pb.go

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions api/historyservicemock/v1/service.pb.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions client/admin/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions client/admin/metric_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions client/admin/retryable_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions client/history/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions client/history/metric_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions client/history/retryable_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions common/persistence/versionhistory/version_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,16 @@ func GetVersionHistoryEventVersion(v *historyspb.VersionHistory, eventID int64)
func IsEmptyVersionHistory(v *historyspb.VersionHistory) bool {
return len(v.Items) == 0
}

// CompareVersionHistory compares 2 version history items
func CompareVersionHistory(v1 *historyspb.VersionHistory, v2 *historyspb.VersionHistory) (int, error) {
lastItem1, err := GetLastVersionHistoryItem(v1)
if err != nil {
return 0, err
}
lastItem2, err := GetLastVersionHistoryItem(v2)
if err != nil {
return 0, err
}
return CompareVersionHistoryItem(lastItem1, lastItem2), nil
}
32 changes: 32 additions & 0 deletions common/persistence/versionhistory/version_history_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,35 @@ func CopyVersionHistoryItem(item *historyspb.VersionHistoryItem) *historyspb.Ver
func IsEqualVersionHistoryItem(item1 *historyspb.VersionHistoryItem, item2 *historyspb.VersionHistoryItem) bool {
return item1.EventId == item2.EventId && item1.Version == item2.Version
}

// IsEqualVersionHistoryItems checks whether version history items are equal
func IsEqualVersionHistoryItems(items1 []*historyspb.VersionHistoryItem, items2 []*historyspb.VersionHistoryItem) bool {
if len(items1) != len(items2) {
return false
}
for i := 0; i < len(items1); i++ {
if !IsEqualVersionHistoryItem(items1[i], items2[i]) {
return false
}
}
return true
}

// CompareVersionHistoryItem compares 2 version history items
func CompareVersionHistoryItem(item1 *historyspb.VersionHistoryItem, item2 *historyspb.VersionHistoryItem) int {
if item1.Version < item2.Version {
return -1
}
if item1.Version > item2.Version {
return 1
}

// item1.Version == item2.Version
if item1.EventId < item2.EventId {
return -1
}
if item1.EventId > item2.EventId {
return 1
}
return 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ message RebuildMutableStateRequest {
message RebuildMutableStateResponse {
}

message ImportWorkflowExecutionRequest {
string namespace = 1;
temporal.api.common.v1.WorkflowExecution execution = 2;
repeated temporal.api.common.v1.DataBlob history_batches = 3;
temporal.server.api.history.v1.VersionHistory version_history = 4;
bytes token = 5;
}

message ImportWorkflowExecutionResponse {
bytes token = 1;
}

message DescribeMutableStateRequest {
string namespace = 1;
temporal.api.common.v1.WorkflowExecution execution = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ service AdminService {
rpc RebuildMutableState (RebuildMutableStateRequest) returns (RebuildMutableStateResponse) {
}

// ImportWorkflowExecution attempts to import workflow according to persisted history events.
// NOTE: this is experimental API
rpc ImportWorkflowExecution (ImportWorkflowExecutionRequest) returns (ImportWorkflowExecutionResponse) {
}

// DescribeWorkflowExecution returns information about the internal states of workflow execution.
rpc DescribeMutableState (DescribeMutableStateRequest) returns (DescribeMutableStateResponse) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,18 @@ message RebuildMutableStateRequest {
message RebuildMutableStateResponse {
}

message ImportWorkflowExecutionRequest {
string namespace_id = 1;
temporal.api.common.v1.WorkflowExecution execution = 2;
repeated temporal.api.common.v1.DataBlob history_batches = 3;
temporal.server.api.history.v1.VersionHistory version_history = 4;
bytes token = 5;
}

message ImportWorkflowExecutionResponse {
bytes token = 1;
}

message DeleteWorkflowVisibilityRecordRequest {
string namespace_id = 1;
temporal.api.common.v1.WorkflowExecution execution = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,15 @@ service HistoryService {
}

// RebuildMutableState attempts to rebuild mutable state according to persisted history events.
// NOTE: this is experimental API
rpc RebuildMutableState (RebuildMutableStateRequest) returns (RebuildMutableStateResponse) {
}

// ImportWorkflowExecution attempts to import workflow according to persisted history events.
// NOTE: this is experimental API
rpc ImportWorkflowExecution (ImportWorkflowExecutionRequest) returns (ImportWorkflowExecutionResponse) {
}

// DeleteWorkflowVisibilityRecord force delete a workflow's visibility record.
// This is used by admin delete workflow execution API to delete visibility record as frontend
// visibility manager doesn't support write operations
Expand Down
Loading

0 comments on commit 0d343ce

Please sign in to comment.