Skip to content

Commit e587ac1

Browse files
authored
Merge pull request #3370 from drmingdrmer/master
[common/meta/types] refactor: a AddResult returned by meta op should include an ID to indicate what has been changed
2 parents 88690ad + de3812d commit e587ac1

File tree

7 files changed

+64
-25
lines changed

7 files changed

+64
-25
lines changed

common/management/src/cluster/cluster_mgr.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ use std::time::UNIX_EPOCH;
2121
use common_exception::ErrorCode;
2222
use common_exception::Result;
2323
use common_meta_api::KVApi;
24-
use common_meta_types::AddResult;
2524
use common_meta_types::KVMeta;
2625
use common_meta_types::MatchSeq;
2726
use common_meta_types::NodeInfo;
27+
use common_meta_types::OkOrExist;
2828
use common_meta_types::Operation;
2929
use common_meta_types::SeqV;
3030
use common_meta_types::UpsertKVAction;
@@ -148,9 +148,9 @@ impl ClusterApi for ClusterMgr {
148148

149149
let res = upsert_node.await?.into_add_result()?;
150150

151-
match res {
152-
AddResult::Ok(v) => Ok(v.seq),
153-
AddResult::Exists(v) => Err(ErrorCode::ClusterNodeAlreadyExists(format!(
151+
match res.res {
152+
OkOrExist::Ok(v) => Ok(v.seq),
153+
OkOrExist::Exists(v) => Err(ErrorCode::ClusterNodeAlreadyExists(format!(
154154
"Cluster ID already exists, seq [{}]",
155155
v.seq
156156
))),

common/management/src/stage/stage_mgr.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ use std::sync::Arc;
1818
use common_exception::ErrorCode;
1919
use common_exception::Result;
2020
use common_meta_api::KVApi;
21-
use common_meta_types::AddResult;
2221
use common_meta_types::IntoSeqV;
2322
use common_meta_types::MatchSeq;
2423
use common_meta_types::MatchSeqExt;
24+
use common_meta_types::OkOrExist;
2525
use common_meta_types::Operation;
2626
use common_meta_types::SeqV;
2727
use common_meta_types::UpsertKVAction;
@@ -58,9 +58,9 @@ impl StageMgrApi for StageMgr {
5858

5959
let res = upsert_info.await?.into_add_result()?;
6060

61-
match res {
62-
AddResult::Ok(v) => Ok(v.seq),
63-
AddResult::Exists(v) => Err(ErrorCode::StageAlreadyExists(format!(
61+
match res.res {
62+
OkOrExist::Ok(v) => Ok(v.seq),
63+
OkOrExist::Exists(v) => Err(ErrorCode::StageAlreadyExists(format!(
6464
"Stage already exists, seq [{}]",
6565
v.seq
6666
))),

common/management/src/user/user_mgr.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ use common_exception::ErrorCode;
1919
use common_exception::Result;
2020
use common_exception::ToErrorCode;
2121
use common_meta_api::KVApi;
22-
use common_meta_types::AddResult;
2322
use common_meta_types::AuthType;
2423
use common_meta_types::GrantObject;
2524
use common_meta_types::IntoSeqV;
2625
use common_meta_types::MatchSeq;
2726
use common_meta_types::MatchSeqExt;
27+
use common_meta_types::OkOrExist;
2828
use common_meta_types::Operation;
2929
use common_meta_types::SeqV;
3030
use common_meta_types::UpsertKVAction;
@@ -100,9 +100,9 @@ impl UserMgrApi for UserMgr {
100100
None,
101101
));
102102
let res = upsert_kv.await?.into_add_result()?;
103-
match res {
104-
AddResult::Ok(v) => Ok(v.seq),
105-
AddResult::Exists(v) => Err(ErrorCode::UserAlreadyExists(format!(
103+
match res.res {
104+
OkOrExist::Ok(v) => Ok(v.seq),
105+
OkOrExist::Exists(v) => Err(ErrorCode::UserAlreadyExists(format!(
106106
"User already exists, seq [{}]",
107107
v.seq
108108
))),

common/meta/raft-store/src/state_machine/applied_state.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fmt::Debug;
16+
1517
use async_raft::AppDataResponse;
18+
use common_exception::ErrorCode;
19+
use common_meta_types::AddResult;
1620
use common_meta_types::Change;
1721
use common_meta_types::DatabaseMeta;
1822
use common_meta_types::Node;
@@ -50,6 +54,24 @@ pub enum AppliedState {
5054

5155
impl AppDataResponse for AppliedState {}
5256

57+
impl<T, ID> TryInto<AddResult<T, ID>> for AppliedState
58+
where
59+
ID: Clone + PartialEq + Debug,
60+
T: Clone + PartialEq + Debug,
61+
Change<T, ID>: TryFrom<AppliedState>,
62+
<Change<T, ID> as TryFrom<AppliedState>>::Error: Debug,
63+
{
64+
type Error = ErrorCode;
65+
66+
fn try_into(self) -> Result<AddResult<T, ID>, Self::Error> {
67+
let typ = std::any::type_name::<T>();
68+
69+
let ch = TryInto::<Change<T, ID>>::try_into(self).expect(typ);
70+
let add_res = ch.into_add_result()?;
71+
Ok(add_res)
72+
}
73+
}
74+
5375
pub enum PrevOrResult<'a> {
5476
Prev(&'a AppliedState),
5577
Result(&'a AppliedState),

common/meta/types/src/change.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@ use serde::Serialize;
1818

1919
use crate::SeqV;
2020

21-
pub enum AddResult<T> {
21+
pub enum OkOrExist<T> {
2222
Ok(SeqV<T>),
2323
Exists(SeqV<T>),
2424
}
2525

26+
pub struct AddResult<T, ID> {
27+
pub id: Option<ID>,
28+
pub res: OkOrExist<T>,
29+
}
30+
2631
/// `Change` describes a state change, including the states before and after a change.
2732
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, derive_more::From)]
2833
pub struct Change<T, ID = u64>
@@ -94,11 +99,15 @@ where
9499
self.prev != self.result
95100
}
96101

97-
pub fn into_add_result(self) -> Result<AddResult<T>, ErrorCode> {
102+
pub fn into_add_result(mut self) -> Result<AddResult<T, ID>, ErrorCode> {
103+
let id = self.ident.take();
98104
let (prev, result) = self.unpack();
99105
if let Some(p) = prev {
100106
return if result.is_some() {
101-
Ok(AddResult::Exists(p))
107+
Ok(AddResult {
108+
id,
109+
res: OkOrExist::Exists(p),
110+
})
102111
} else {
103112
Err(ErrorCode::UnknownException(format!(
104113
"invalid result for add: prev: {:?} result: None",
@@ -108,7 +117,10 @@ where
108117
}
109118

110119
if let Some(res) = result {
111-
Ok(AddResult::Ok(res))
120+
Ok(AddResult {
121+
id,
122+
res: OkOrExist::Ok(res),
123+
})
112124
} else {
113125
Err(ErrorCode::UnknownException(
114126
"invalid result for add: prev: None result: None".to_string(),

common/meta/types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ mod user_stage;
3737

3838
pub use change::AddResult;
3939
pub use change::Change;
40+
pub use change::OkOrExist;
4041
pub use cluster::Node;
4142
pub use cluster::NodeInfo;
4243
pub use cluster::Slot;

metasrv/src/executor/meta_handlers.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use common_exception::ErrorCode;
2020
use common_exception::ToErrorCode;
2121
use common_meta_flight::FlightReq;
2222
use common_meta_flight::GetTableExtReq;
23+
use common_meta_types::AddResult;
2324
use common_meta_types::Change;
2425
use common_meta_types::Cmd::CreateDatabase;
2526
use common_meta_types::Cmd::CreateTable;
@@ -41,6 +42,7 @@ use common_meta_types::GetTableReq;
4142
use common_meta_types::ListDatabaseReq;
4243
use common_meta_types::ListTableReq;
4344
use common_meta_types::LogEntry;
45+
use common_meta_types::OkOrExist;
4446
use common_meta_types::TableIdent;
4547
use common_meta_types::TableInfo;
4648
use common_meta_types::TableMeta;
@@ -187,18 +189,20 @@ impl RequestHandler<FlightReq<CreateTableReq>> for ActionHandler {
187189
.await
188190
.map_err(|e| ErrorCode::MetaNodeInternalError(e.to_string()))?;
189191

190-
let mut ch: Change<TableMeta, u64> = rst.try_into().expect("TableId");
191-
let table_id = ch.ident.take().unwrap();
192-
let (prev, _) = ch.unpack_data();
192+
let add_res: AddResult<TableMeta, u64> = rst.try_into()?;
193193

194-
if prev.is_some() && !if_not_exists {
195-
return Err(ErrorCode::TableAlreadyExists(format!(
196-
"table exists: {}",
197-
table_name
198-
)));
194+
if let OkOrExist::Exists(_) = add_res.res {
195+
if !if_not_exists {
196+
return Err(ErrorCode::TableAlreadyExists(format!(
197+
"table exists: {}",
198+
table_name
199+
)));
200+
}
199201
}
200202

201-
Ok(CreateTableReply { table_id })
203+
Ok(CreateTableReply {
204+
table_id: add_res.id.unwrap(),
205+
})
202206
}
203207
}
204208

0 commit comments

Comments
 (0)