5
5
import dev .keva .server .command .setup .CommandServiceImpl ;
6
6
import dev .keva .server .config .ConfigHolder ;
7
7
import dev .keva .server .replication .master .ReplicationService ;
8
- import dev .keva .server .replication .master .ReplicationServiceImpl ;
9
8
import dev .keva .server .replication .slave .SlaveService ;
10
- import dev .keva .server .replication .slave .SlaveServiceImpl ;
11
9
import dev .keva .server .storage .NoHeapStorageServiceImpl ;
12
10
import dev .keva .server .storage .StorageService ;
13
11
import dev .keva .store .NoHeapConfig ;
14
12
import dev .keva .store .NoHeapFactory ;
15
13
import dev .keva .store .NoHeapStore ;
16
14
import io .netty .bootstrap .ServerBootstrap ;
15
+ import io .netty .buffer .PooledByteBufAllocator ;
17
16
import io .netty .channel .Channel ;
18
17
import io .netty .channel .ChannelFuture ;
18
+ import io .netty .channel .ChannelOption ;
19
19
import io .netty .channel .EventLoopGroup ;
20
20
import io .netty .channel .nio .NioEventLoopGroup ;
21
21
import io .netty .channel .socket .nio .NioServerSocketChannel ;
22
- import io .netty .handler .logging .LogLevel ;
23
- import io .netty .handler .logging .LoggingHandler ;
24
22
import lombok .Getter ;
25
23
import lombok .extern .slf4j .Slf4j ;
26
24
import lombok .val ;
31
29
32
30
@ Slf4j
33
31
public class NettyServer implements Server {
32
+ private static final int BUFFER_SIZE = 1024 * 1024 ;
33
+
34
34
private final ConfigHolder config ;
35
35
36
36
// Executors
@@ -46,34 +46,40 @@ public class NettyServer implements Server {
46
46
private CommandService commandService ;
47
47
@ Getter // use for testing should change to dedicated command or extract from INFO
48
48
private ReplicationService replicationService ;
49
- private WriteLog writeLog ;
49
+ // private WriteLog writeLog;
50
50
private Channel channel ;
51
- private ServerBootstrap server ;
52
51
private NoHeapStore noHeapStore ;
53
52
54
53
public NettyServer (ConfigHolder config ) {
55
54
this .config = config ;
56
55
}
57
56
58
57
private void initServices (boolean isFreshStart ) {
59
- if (isFreshStart ) {
60
- writeLog = new WriteLog (config .getWriteLogSize ());
61
- }
58
+ // TODO: re-enable rep mode
59
+ // if (isFreshStart) {
60
+ // writeLog = new WriteLog(config.getWriteLogSize());
61
+ // }
62
62
initStorageService (isFreshStart );
63
- replicationService = new ReplicationServiceImpl (healthCheckerPool , repWorkerPool , storageService , writeLog );
63
+ // replicationService = new ReplicationServiceImpl(healthCheckerPool, repWorkerPool, storageService, writeLog);
64
64
65
65
connectionService = new ConnectionServiceImpl ();
66
66
final CommandRegistrar commandRegistrar = new CommandRegistrar (storageService , replicationService , connectionService );
67
67
commandService = new CommandServiceImpl (commandRegistrar .getHandlerMap (), replicationService );
68
- slaveService = new SlaveServiceImpl (healthCheckerPool , writeLog , commandService );
68
+ // slaveService = new SlaveServiceImpl(healthCheckerPool, writeLog, commandService);
69
69
}
70
70
71
71
public ServerBootstrap bootstrapServer () {
72
72
final ServerBootstrap b = new ServerBootstrap ();
73
73
b .group (bossGroup , workerGroup )
74
74
.channel (NioServerSocketChannel .class )
75
- .handler (new LoggingHandler (LogLevel .TRACE ))
76
- .childHandler (new RedisCodecInitializer (new ServerHandler (connectionService , commandService )));
75
+ .childHandler (new RedisCodecInitializer (new ServerHandler (connectionService , commandService )))
76
+ .option (ChannelOption .SO_BACKLOG , 100 )
77
+ .option (ChannelOption .ALLOCATOR , PooledByteBufAllocator .DEFAULT )
78
+ .childOption (ChannelOption .SO_RCVBUF , BUFFER_SIZE )
79
+ .childOption (ChannelOption .SO_SNDBUF , BUFFER_SIZE )
80
+ .childOption (ChannelOption .SO_KEEPALIVE , true )
81
+ .childOption (ChannelOption .ALLOCATOR , PooledByteBufAllocator .DEFAULT )
82
+ .childOption (ChannelOption .TCP_NODELAY , true );
77
83
return b ;
78
84
}
79
85
@@ -110,8 +116,11 @@ public void shutdown() {
110
116
healthCheckerPool .shutdown ();
111
117
bossGroup .shutdownGracefully ();
112
118
workerGroup .shutdownGracefully ();
113
- channel .close ();
114
- log .info ("Database server at {} stopped" , config .getPort ());
119
+
120
+ if (channel != null ) {
121
+ channel .close ();
122
+ }
123
+ log .info ("Keva server at {} stopped" , config .getPort ());
115
124
}
116
125
117
126
@ Override
@@ -120,12 +129,15 @@ public void run(boolean isFreshStart) {
120
129
initExecutors ();
121
130
initServices (isFreshStart );
122
131
startSlaveService ();
123
- server = bootstrapServer ();
132
+ ServerBootstrap server = bootstrapServer ();
124
133
final ChannelFuture sync = server .bind (config .getPort ()).sync ();
125
- log .info ("Database server started at {}" , config .getPort ());
134
+ sync .syncUninterruptibly ();
135
+ log .info ("Keva server started at {}" , config .getPort ());
126
136
127
- channel = sync .channel ();
128
- channel .closeFuture ().sync ();
137
+ if (channel != null ) {
138
+ channel = sync .channel ();
139
+ channel .closeFuture ().sync ();
140
+ }
129
141
} catch (InterruptedException e ) {
130
142
log .error ("Failed to start server: " , e );
131
143
Thread .currentThread ().interrupt ();
0 commit comments