@@ -730,10 +730,8 @@ mod tests {
730
730
731
731
use super :: * ;
732
732
use crate :: error:: PostgresExecutionSnafu ;
733
- const CREATE_TABLE : & str =
734
- "CREATE TABLE IF NOT EXISTS greptime_metakv(k bytea PRIMARY KEY, v bytea);" ;
735
733
736
- async fn create_postgres_client ( ) -> Result < Client > {
734
+ async fn create_postgres_client ( table_name : Option < & str > ) -> Result < Client > {
737
735
let endpoint = env:: var ( "GT_POSTGRES_ENDPOINTS" ) . unwrap_or_default ( ) ;
738
736
if endpoint. is_empty ( ) {
739
737
return UnexpectedSnafu {
@@ -747,27 +745,40 @@ mod tests {
747
745
tokio:: spawn ( async move {
748
746
connection. await . context ( PostgresExecutionSnafu ) . unwrap ( ) ;
749
747
} ) ;
750
- client. execute ( CREATE_TABLE , & [ ] ) . await . unwrap ( ) ;
748
+ if let Some ( table_name) = table_name {
749
+ let create_table_sql = format ! (
750
+ "CREATE TABLE IF NOT EXISTS {}(k bytea PRIMARY KEY, v bytea);" ,
751
+ table_name
752
+ ) ;
753
+ client. execute ( & create_table_sql, & [ ] ) . await . unwrap ( ) ;
754
+ }
751
755
Ok ( client)
752
756
}
753
757
758
+ async fn drop_table ( client : & Client , table_name : & str ) {
759
+ let sql = format ! ( "DROP TABLE IF EXISTS {};" , table_name) ;
760
+ client. execute ( & sql, & [ ] ) . await . unwrap ( ) ;
761
+ }
762
+
754
763
#[ tokio:: test]
755
764
async fn test_postgres_crud ( ) {
756
- let client = create_postgres_client ( ) . await . unwrap ( ) ;
757
-
758
765
let key = "test_key" . to_string ( ) ;
759
766
let value = "test_value" . to_string ( ) ;
760
767
768
+ let uuid = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
769
+ let table_name = "test_postgres_crud_greptime_metakv" ;
770
+ let client = create_postgres_client ( Some ( table_name) ) . await . unwrap ( ) ;
771
+
761
772
let ( tx, _) = broadcast:: channel ( 100 ) ;
762
773
let pg_election = PgElection {
763
774
leader_value : "test_leader" . to_string ( ) ,
764
775
client,
765
776
is_leader : AtomicBool :: new ( false ) ,
766
777
leader_infancy : AtomicBool :: new ( true ) ,
767
778
leader_watcher : tx,
768
- store_key_prefix : uuid:: Uuid :: new_v4 ( ) . to_string ( ) ,
779
+ store_key_prefix : uuid,
769
780
candidate_lease_ttl_secs : 10 ,
770
- sql_set : ElectionSqlFactory :: new ( 28319 , "greptime_metakv" ) . build ( ) ,
781
+ sql_set : ElectionSqlFactory :: new ( 28319 , table_name ) . build ( ) ,
771
782
} ;
772
783
773
784
let res = pg_election
@@ -823,14 +834,17 @@ mod tests {
823
834
. unwrap ( ) ;
824
835
assert ! ( res. is_empty( ) ) ;
825
836
assert ! ( current == Timestamp :: default ( ) ) ;
837
+
838
+ drop_table ( & pg_election. client , table_name) . await ;
826
839
}
827
840
828
841
async fn candidate (
829
842
leader_value : String ,
830
843
candidate_lease_ttl_secs : u64 ,
831
844
store_key_prefix : String ,
845
+ table_name : String ,
832
846
) {
833
- let client = create_postgres_client ( ) . await . unwrap ( ) ;
847
+ let client = create_postgres_client ( None ) . await . unwrap ( ) ;
834
848
835
849
let ( tx, _) = broadcast:: channel ( 100 ) ;
836
850
let pg_election = PgElection {
@@ -841,7 +855,7 @@ mod tests {
841
855
leader_watcher : tx,
842
856
store_key_prefix,
843
857
candidate_lease_ttl_secs,
844
- sql_set : ElectionSqlFactory :: new ( 28319 , "greptime_metakv" ) . build ( ) ,
858
+ sql_set : ElectionSqlFactory :: new ( 28319 , & table_name ) . build ( ) ,
845
859
} ;
846
860
847
861
let node_info = MetasrvNodeInfo {
@@ -857,22 +871,24 @@ mod tests {
857
871
async fn test_candidate_registration ( ) {
858
872
let leader_value_prefix = "test_leader" . to_string ( ) ;
859
873
let candidate_lease_ttl_secs = 5 ;
860
- let store_key_prefix = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
874
+ let uuid = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
875
+ let table_name = "test_candidate_registration_greptime_metakv" ;
861
876
let mut handles = vec ! [ ] ;
877
+ let client = create_postgres_client ( Some ( table_name) ) . await . unwrap ( ) ;
878
+
862
879
for i in 0 ..10 {
863
880
let leader_value = format ! ( "{}{}" , leader_value_prefix, i) ;
864
881
let handle = tokio:: spawn ( candidate (
865
882
leader_value,
866
883
candidate_lease_ttl_secs,
867
- store_key_prefix. clone ( ) ,
884
+ uuid. clone ( ) ,
885
+ table_name. to_string ( ) ,
868
886
) ) ;
869
887
handles. push ( handle) ;
870
888
}
871
889
// Wait for candidates to registrate themselves and renew their leases at least once.
872
890
tokio:: time:: sleep ( Duration :: from_secs ( 3 ) ) . await ;
873
891
874
- let client = create_postgres_client ( ) . await . unwrap ( ) ;
875
-
876
892
let ( tx, _) = broadcast:: channel ( 100 ) ;
877
893
let leader_value = "test_leader" . to_string ( ) ;
878
894
let pg_election = PgElection {
@@ -881,9 +897,9 @@ mod tests {
881
897
is_leader : AtomicBool :: new ( false ) ,
882
898
leader_infancy : AtomicBool :: new ( true ) ,
883
899
leader_watcher : tx,
884
- store_key_prefix : store_key_prefix . clone ( ) ,
900
+ store_key_prefix : uuid . clone ( ) ,
885
901
candidate_lease_ttl_secs,
886
- sql_set : ElectionSqlFactory :: new ( 28319 , "greptime_metakv" ) . build ( ) ,
902
+ sql_set : ElectionSqlFactory :: new ( 28319 , table_name ) . build ( ) ,
887
903
} ;
888
904
889
905
let candidates = pg_election. all_candidates ( ) . await . unwrap ( ) ;
@@ -900,20 +916,21 @@ mod tests {
900
916
901
917
// Garbage collection
902
918
for i in 0 ..10 {
903
- let key = format ! (
904
- "{}{}{}{}" ,
905
- store_key_prefix, CANDIDATES_ROOT , leader_value_prefix, i
906
- ) ;
919
+ let key = format ! ( "{}{}{}{}" , uuid, CANDIDATES_ROOT , leader_value_prefix, i) ;
907
920
let res = pg_election. delete_value ( & key) . await . unwrap ( ) ;
908
921
assert ! ( res) ;
909
922
}
923
+
924
+ drop_table ( & pg_election. client , table_name) . await ;
910
925
}
911
926
912
927
#[ tokio:: test]
913
928
async fn test_elected_and_step_down ( ) {
914
929
let leader_value = "test_leader" . to_string ( ) ;
915
930
let candidate_lease_ttl_secs = 5 ;
916
- let client = create_postgres_client ( ) . await . unwrap ( ) ;
931
+ let uuid = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
932
+ let table_name = "test_elected_and_step_down_greptime_metakv" ;
933
+ let client = create_postgres_client ( Some ( table_name) ) . await . unwrap ( ) ;
917
934
918
935
let ( tx, mut rx) = broadcast:: channel ( 100 ) ;
919
936
let leader_pg_election = PgElection {
@@ -922,9 +939,9 @@ mod tests {
922
939
is_leader : AtomicBool :: new ( false ) ,
923
940
leader_infancy : AtomicBool :: new ( true ) ,
924
941
leader_watcher : tx,
925
- store_key_prefix : uuid:: Uuid :: new_v4 ( ) . to_string ( ) ,
942
+ store_key_prefix : uuid,
926
943
candidate_lease_ttl_secs,
927
- sql_set : ElectionSqlFactory :: new ( 28320 , "greptime_metakv" ) . build ( ) ,
944
+ sql_set : ElectionSqlFactory :: new ( 28320 , table_name ) . build ( ) ,
928
945
} ;
929
946
930
947
leader_pg_election. elected ( ) . await . unwrap ( ) ;
@@ -1015,14 +1032,17 @@ mod tests {
1015
1032
}
1016
1033
_ => panic ! ( "Expected LeaderChangeMessage::StepDown" ) ,
1017
1034
}
1035
+
1036
+ drop_table ( & leader_pg_election. client , table_name) . await ;
1018
1037
}
1019
1038
1020
1039
#[ tokio:: test]
1021
1040
async fn test_leader_action ( ) {
1022
1041
let leader_value = "test_leader" . to_string ( ) ;
1023
- let store_key_prefix = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
1042
+ let uuid = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
1043
+ let table_name = "test_leader_action_greptime_metakv" ;
1024
1044
let candidate_lease_ttl_secs = 5 ;
1025
- let client = create_postgres_client ( ) . await . unwrap ( ) ;
1045
+ let client = create_postgres_client ( Some ( table_name ) ) . await . unwrap ( ) ;
1026
1046
1027
1047
let ( tx, mut rx) = broadcast:: channel ( 100 ) ;
1028
1048
let leader_pg_election = PgElection {
@@ -1031,9 +1051,9 @@ mod tests {
1031
1051
is_leader : AtomicBool :: new ( false ) ,
1032
1052
leader_infancy : AtomicBool :: new ( true ) ,
1033
1053
leader_watcher : tx,
1034
- store_key_prefix,
1054
+ store_key_prefix : uuid ,
1035
1055
candidate_lease_ttl_secs,
1036
- sql_set : ElectionSqlFactory :: new ( 28321 , "greptime_metakv" ) . build ( ) ,
1056
+ sql_set : ElectionSqlFactory :: new ( 28321 , table_name ) . build ( ) ,
1037
1057
} ;
1038
1058
1039
1059
// Step 1: No leader exists, campaign and elected.
@@ -1246,38 +1266,41 @@ mod tests {
1246
1266
. query ( & leader_pg_election. sql_set . step_down , & [ ] )
1247
1267
. await
1248
1268
. unwrap ( ) ;
1269
+
1270
+ drop_table ( & leader_pg_election. client , table_name) . await ;
1249
1271
}
1250
1272
1251
1273
#[ tokio:: test]
1252
1274
async fn test_follower_action ( ) {
1253
1275
common_telemetry:: init_default_ut_logging ( ) ;
1254
1276
let candidate_lease_ttl_secs = 5 ;
1255
- let store_key_prefix = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
1277
+ let uuid = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
1278
+ let table_name = "test_follower_action_greptime_metakv" ;
1256
1279
1257
- let follower_client = create_postgres_client ( ) . await . unwrap ( ) ;
1280
+ let follower_client = create_postgres_client ( Some ( table_name ) ) . await . unwrap ( ) ;
1258
1281
let ( tx, mut rx) = broadcast:: channel ( 100 ) ;
1259
1282
let follower_pg_election = PgElection {
1260
1283
leader_value : "test_follower" . to_string ( ) ,
1261
1284
client : follower_client,
1262
1285
is_leader : AtomicBool :: new ( false ) ,
1263
1286
leader_infancy : AtomicBool :: new ( true ) ,
1264
1287
leader_watcher : tx,
1265
- store_key_prefix : store_key_prefix . clone ( ) ,
1288
+ store_key_prefix : uuid . clone ( ) ,
1266
1289
candidate_lease_ttl_secs,
1267
- sql_set : ElectionSqlFactory :: new ( 28322 , "greptime_metakv" ) . build ( ) ,
1290
+ sql_set : ElectionSqlFactory :: new ( 28322 , table_name ) . build ( ) ,
1268
1291
} ;
1269
1292
1270
- let leader_client = create_postgres_client ( ) . await . unwrap ( ) ;
1293
+ let leader_client = create_postgres_client ( Some ( table_name ) ) . await . unwrap ( ) ;
1271
1294
let ( tx, _) = broadcast:: channel ( 100 ) ;
1272
1295
let leader_pg_election = PgElection {
1273
1296
leader_value : "test_leader" . to_string ( ) ,
1274
1297
client : leader_client,
1275
1298
is_leader : AtomicBool :: new ( false ) ,
1276
1299
leader_infancy : AtomicBool :: new ( true ) ,
1277
1300
leader_watcher : tx,
1278
- store_key_prefix,
1301
+ store_key_prefix : uuid ,
1279
1302
candidate_lease_ttl_secs,
1280
- sql_set : ElectionSqlFactory :: new ( 28322 , "greptime_metakv" ) . build ( ) ,
1303
+ sql_set : ElectionSqlFactory :: new ( 28322 , table_name ) . build ( ) ,
1281
1304
} ;
1282
1305
1283
1306
leader_pg_election
@@ -1326,5 +1349,7 @@ mod tests {
1326
1349
. query ( & leader_pg_election. sql_set . step_down , & [ ] )
1327
1350
. await
1328
1351
. unwrap ( ) ;
1352
+
1353
+ drop_table ( & follower_pg_election. client , table_name) . await ;
1329
1354
}
1330
1355
}
0 commit comments