1
1
//! The whole redis server implementation is here.
2
2
use std:: net:: SocketAddr ;
3
-
4
-
5
3
use std:: sync:: atomic:: AtomicU16 ;
6
4
use std:: sync:: Arc ;
7
5
use std:: thread:: JoinHandle ;
8
6
9
- use crc:: { Crc , CRC_16_XMODEM } ;
10
7
use derive_builder:: Builder ;
11
8
12
-
13
-
14
9
mod connection;
15
10
mod context;
16
11
pub mod frame;
17
12
pub ( crate ) mod handle;
18
13
19
-
20
-
21
-
22
14
mod cmd;
23
15
mod server_thread;
24
16
25
17
use self :: server_thread:: ServerMonoThreadedHandle ;
26
-
27
-
28
18
use crate :: application:: server:: handle:: ConnectionMsg ;
29
-
30
19
use crate :: domain:: dialer:: { RootDialer , Slot } ;
31
20
use crate :: domain:: storage:: Storage ;
21
+ use crate :: infrastructure:: hash:: HASH_SLOT_MAX ;
32
22
33
23
#[ derive( Debug , Builder , Clone ) ]
34
24
#[ builder( pattern = "owned" , setter( into, strip_option) ) ]
@@ -38,21 +28,6 @@ pub struct ServerConfig {
38
28
connections_limit : Arc < AtomicU16 > ,
39
29
}
40
30
41
- cfg_if:: cfg_if! {
42
- if #[ cfg( target_os = "linux" ) ] {
43
- type Driver = monoio:: IoUringDriver ;
44
- } else {
45
- type Driver = monoio:: LegacyDriver ;
46
- }
47
- }
48
-
49
- const CRC : Crc < u16 > = Crc :: < u16 > :: new ( & CRC_16_XMODEM ) ;
50
-
51
- pub const HASH_SLOT_MAX : u16 = 16384 ;
52
- pub const fn crc_hash ( bytes : & [ u8 ] ) -> u16 {
53
- CRC . checksum ( bytes) % HASH_SLOT_MAX
54
- }
55
-
56
31
impl ServerConfig {
57
32
/// Initialize a Server by starting the thread for each core and assigning
58
33
/// the storage segment & hash slots based on the configuration.
@@ -83,153 +58,6 @@ impl ServerConfig {
83
58
threads. push ( handle. initialize ( ) ) ;
84
59
}
85
60
86
- /*
87
- let mut slots = Vec::new();
88
- for cpu in 0..cpus {
89
- let part_size: u16 = HASH_SLOT_MAX / cpus as u16;
90
- let remainder: u16 = HASH_SLOT_MAX % cpus as u16;
91
-
92
- let start = cpu as u16 * part_size as u16;
93
- let end = if cpu == cpus - 1 {
94
- (cpu as u16 + 1) * part_size + remainder as u16
95
- } else {
96
- (cpu as u16 + 1) * part_size as u16
97
- };
98
-
99
- let slot = start..end;
100
- slots.push(slot);
101
- }
102
-
103
- for cpu in 0..cpus {
104
- let config = self.clone();
105
- let shard = mesh.join_with(cpu).unwrap();
106
-
107
- let slot = slots.get(cpu).unwrap().clone();
108
- let slots = slots.clone();
109
-
110
- let handle = std::thread::spawn(move || {
111
- monoio::utils::bind_to_cpu_set(Some(cpu)).unwrap();
112
-
113
- let mut rt = monoio::RuntimeBuilder::<Driver>::new()
114
- .with_entries(1024)
115
- .enable_timer()
116
- .build()
117
- .expect("Cannot build runtime");
118
-
119
- rt.block_on(async move {
120
- // Initialize domain
121
- let storage =
122
- domain::storage::StorageSegment::new(slots, slot);
123
- let shard = Rc::new(shard);
124
-
125
- let listener = TcpListener::bind_with_config(
126
- config.bind_addr,
127
- &ListenerConfig::new().backlog(16192),
128
- )
129
- .expect("Couldn't listen to addr");
130
-
131
- // Start the async task which is able to receive connection
132
- // from other thread
133
- let storage_inter_thread = storage.clone();
134
- let shard_inter_thread = shard.clone();
135
- monoio::spawn(async move {
136
- let storage = storage_inter_thread;
137
- let shard = shard_inter_thread;
138
-
139
- let mut receiver = shard.receiver().unwrap();
140
-
141
- loop {
142
- let shard = shard.clone();
143
- let ctx = Context::new(storage.clone());
144
-
145
- // Pre-allocate next buffer;
146
-
147
- if let Some(ConnectionMsg {
148
- fd,
149
- current_command,
150
- rest_frame,
151
- }) = receiver.next().await
152
- {
153
- let _spawned = monoio::spawn(async move {
154
- // TODO: We miss things in the buffer right
155
- // now &
156
- // pipelining
157
- // Already accepted tcp stream, we don't
158
- // need to
159
- // accept it again.
160
- let tcp = unsafe {
161
- std::net::TcpStream::from_raw_fd(fd)
162
- };
163
- let conn =
164
- TcpStream::from_std(tcp).unwrap();
165
- conn.set_nodelay(true).unwrap();
166
-
167
- let (connection, r) =
168
- WriteConnection::new(conn, 4 * 1024);
169
-
170
- let handler = Handler {
171
- connection,
172
- connection_r: r,
173
- shard,
174
- };
175
-
176
- if let Err(err) = handler
177
- .continue_run(ctx, current_command)
178
- .await
179
- {
180
- dbg!(err.backtrace());
181
- dbg!(&err);
182
- // error!(?err);
183
- panic!("blbl");
184
- }
185
- // handler.connection.stop().await.unwrap();
186
- });
187
- } else {
188
- break;
189
- }
190
- }
191
- });
192
-
193
- loop {
194
- let storage = storage.clone();
195
- let shard = shard.clone();
196
-
197
- // We accept the TCP Connection
198
- let (conn, _addr) = listener
199
- .accept()
200
- .await
201
- .expect("Unable to accept connections");
202
-
203
- conn.set_nodelay(true).unwrap();
204
- let ctx = Context::new(storage);
205
-
206
- // We map it to an `Handler` which is able to understand
207
- // the Redis protocol
208
- let _spawned = monoio::spawn(async move {
209
- let (connection, r) =
210
- WriteConnection::new(conn, 4 * 1024);
211
-
212
- let handler = Handler {
213
- connection,
214
- connection_r: r,
215
- shard: shard.clone(),
216
- };
217
-
218
- if let Err(err) = handler.run(ctx).await {
219
- dbg!(err.backtrace());
220
- dbg!(&err);
221
- // error!(?err);
222
- panic!("blbl");
223
- }
224
- // handler.connection.stop().await.unwrap();
225
- });
226
- }
227
- });
228
- });
229
- threads.push(handle);
230
- }
231
- */
232
-
233
61
ServerHandle { threads }
234
62
}
235
63
}
0 commit comments