Skip to content

Commit a3da767

Browse files
committed
generic client
1 parent ea91875 commit a3da767

File tree

9 files changed

+328
-851
lines changed

9 files changed

+328
-851
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ This is free and unencumbered software released into the
2424

2525
```go
2626
// Redis is a thread-safe connection establishment.
27-
var Redis = redis.NewClient("rds1.example.com", time.Second/2, 0)
27+
var Redis = redis.NewClient[string,string]("rds1.example.com", time.Second/2, 0)
2828

2929
// Grow adds a string to a list.
3030
func Grow() {
31-
newLen, err := Redis.RPUSHString("demo_list", "hello")
31+
newLen, err := Redis.RPUSH("demo_list", "hello")
3232
if err != nil {
3333
log.Print("demo_list update error: ", err)
3434
return
@@ -38,7 +38,7 @@ func Grow() {
3838

3939
// Ping pushes a message to a publish–subscribe channel.
4040
func Ping() {
41-
clientCount, err := Redis.PUBLISHString("demo_channel", "ping")
41+
clientCount, err := Redis.PUBLISH("demo_channel", "ping")
4242
if err != nil {
4343
log.Print("demo_channel publish error: ", err)
4444
return

client.go

Lines changed: 26 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const (
2525
var errConnLost = errors.New("redis: connection lost while awaiting response")
2626

2727
// ClientConfig defines a Client setup.
28-
type ClientConfig struct {
28+
type ClientConfig[Key, Value String] struct {
2929
// The host defaults to localhost, and the port defaults to 6379.
3030
// Thus, the empty string defaults to "localhost:6379". Use an
3131
// absolute file path (e.g. "/var/run/redis.sock") for Unix
@@ -53,7 +53,7 @@ type ClientConfig struct {
5353
}
5454

5555
// NewClient launches a managed connection to a node (address).
56-
func (c *ClientConfig) NewClient() *Client {
56+
func (c *ClientConfig[Key, Value]) NewClient() *Client[Key, Value] {
5757
return newClient(*c)
5858
}
5959

@@ -62,11 +62,11 @@ func (c *ClientConfig) NewClient() *Client {
6262
//
6363
// Multiple goroutines may invoke methods on a Client simultaneously. Command
6464
// invocation applies <https://redis.io/topics/pipelining> on concurrency.
65-
type Client struct {
65+
type Client[Key, Value String] struct {
6666
// Normalized node address in use. This field is read-only.
6767
Addr string
6868

69-
config ClientConfig
69+
config ClientConfig[Key, Value]
7070

7171
noCopy noCopy
7272

@@ -100,15 +100,15 @@ type Client struct {
100100
// submission blocks on the first attempt. When connection establishment fails,
101101
// then command submission receives the error of the last attempt, until the
102102
// connection restores.
103-
func NewClient(addr string, commandTimeout, dialTimeout time.Duration) *Client {
104-
return newClient(ClientConfig{
103+
func NewClient[Key, Value String](addr string, commandTimeout, dialTimeout time.Duration) *Client[Key, Value] {
104+
return newClient(ClientConfig[Key, Value]{
105105
Addr: addr,
106106
CommandTimeout: commandTimeout,
107107
DialTimeout: dialTimeout,
108108
})
109109
}
110110

111-
func newClient(config ClientConfig) *Client {
111+
func newClient[Key, Value String](config ClientConfig[Key, Value]) *Client[Key, Value] {
112112
config.Addr = normalizeAddr(config.Addr)
113113
if config.DialTimeout == 0 {
114114
config.DialTimeout = time.Second
@@ -119,7 +119,7 @@ func newClient(config ClientConfig) *Client {
119119
queueSize = queueSizeUnix
120120
}
121121

122-
c := &Client{
122+
c := &Client[Key, Value]{
123123
Addr: config.Addr, // decouple
124124
config: config,
125125

@@ -145,7 +145,7 @@ type redisConn struct {
145145
// Command submission is stopped with ErrClosed.
146146
// All pending commands are dealt with on return.
147147
// Calling Close more than once has no effect.
148-
func (c *Client) Close() error {
148+
func (c *Client[Key, Value]) Close() error {
149149
conn := <-c.connSem // lock write
150150
if conn.offline == ErrClosed {
151151
// redundant invocation
@@ -170,7 +170,7 @@ func (c *Client) Close() error {
170170
}
171171

172172
// connectOrClosed populates the connection semaphore.
173-
func (c *Client) connectOrClosed() {
173+
func (c *Client[Key, Value]) connectOrClosed() {
174174
var retryDelay time.Duration
175175
for {
176176
conn, reader, err := c.config.connect(conservativeMSS)
@@ -213,7 +213,7 @@ func (c *Client) connectOrClosed() {
213213
}
214214
}
215215

216-
func (c *Client) cancelQueue() {
216+
func (c *Client[Key, Value]) cancelQueue() {
217217
for {
218218
select {
219219
case ch := <-c.readQueue:
@@ -227,7 +227,7 @@ func (c *Client) cancelQueue() {
227227

228228
// Exchange sends a request, and then it awaits its turn (in the pipeline) for
229229
// response receiption.
230-
func (c *Client) exchange(req *request) (*bufio.Reader, error) {
230+
func (c *Client[Key, Value]) exchange(req *request) (*bufio.Reader, error) {
231231
conn := <-c.connSem // lock write
232232

233233
// validate connection state
@@ -290,7 +290,7 @@ func (c *Client) exchange(req *request) (*bufio.Reader, error) {
290290
return reader, nil
291291
}
292292

293-
func (c *Client) commandOK(req *request) error {
293+
func (c *Client[Key, Value]) commandOK(req *request) error {
294294
r, err := c.exchange(req)
295295
if err != nil {
296296
return err
@@ -300,7 +300,7 @@ func (c *Client) commandOK(req *request) error {
300300
return err
301301
}
302302

303-
func (c *Client) commandOKOrReconnect(req *request) error {
303+
func (c *Client[Key, Value]) commandOKOrReconnect(req *request) error {
304304
r, err := c.exchange(req)
305305
if err != nil {
306306
return err
@@ -314,7 +314,7 @@ func (c *Client) commandOKOrReconnect(req *request) error {
314314
return err
315315
}
316316

317-
func (c *Client) commandInteger(req *request) (int64, error) {
317+
func (c *Client[Key, Value]) commandInteger(req *request) (int64, error) {
318318
r, err := c.exchange(req)
319319
if err != nil {
320320
return 0, err
@@ -324,62 +324,36 @@ func (c *Client) commandInteger(req *request) (int64, error) {
324324
return integer, err
325325
}
326326

327-
func (c *Client) commandBulkBytes(req *request) ([]byte, error) {
327+
func (c *Client[Key, Value]) commandBulk(req *request) (bulk Value, _ error) {
328328
r, err := c.exchange(req)
329329
if err != nil {
330-
return nil, err
331-
}
332-
bytes, err := readBulkBytes(r)
333-
c.passRead(r, err)
334-
if err == errNull {
335-
return nil, nil
336-
}
337-
return bytes, err
338-
}
339-
340-
func (c *Client) commandBulkString(req *request) (string, bool, error) {
341-
r, err := c.exchange(req)
342-
if err != nil {
343-
return "", false, err
330+
return bulk, err
344331
}
345-
s, err := readBulkString(r)
332+
bulk, err = readBulk[Value](r)
346333
c.passRead(r, err)
347334
if err == errNull {
348-
return "", false, nil
335+
err = nil
349336
}
350-
return s, true, err
351-
}
352-
353-
func (c *Client) commandBytesArray(req *request) ([][]byte, error) {
354-
r, err := c.exchange(req)
355-
if err != nil {
356-
return nil, err
357-
}
358-
array, err := readBytesArray(r)
359-
c.passRead(r, err)
360-
if err == errNull {
361-
return nil, nil
362-
}
363-
return array, err
337+
return bulk, err
364338
}
365339

366-
func (c *Client) commandStringArray(req *request) ([]string, error) {
340+
func (c *Client[Key, Value]) commandArray(req *request) ([]Value, error) {
367341
r, err := c.exchange(req)
368342
if err != nil {
369343
return nil, err
370344
}
371-
array, err := readStringArray(r)
345+
array, err := readArray[Value](r)
372346
c.passRead(r, err)
373347
if err == errNull {
374-
return nil, nil
348+
err = nil
375349
}
376350
return array, err
377351
}
378352

379353
// PassRead hands over the buffered reader to the following command in line. It
380354
// goes in idle mode (on the redisConn from connSem) when all requests are done
381355
// for.
382-
func (c *Client) passRead(r *bufio.Reader, err error) {
356+
func (c *Client[Key, Value]) passRead(r *bufio.Reader, err error) {
383357
switch err {
384358
case nil, errNull:
385359
break
@@ -426,7 +400,7 @@ func (c *Client) passRead(r *bufio.Reader, err error) {
426400
}
427401

428402
// DropConnFromRead disconnects with Redis.
429-
func (c *Client) dropConnFromRead() {
403+
func (c *Client[Key, Value]) dropConnFromRead() {
430404
for {
431405
select {
432406
case <-c.readTerm:
@@ -457,7 +431,7 @@ func (c *Client) dropConnFromRead() {
457431
}
458432
}
459433

460-
func (c *ClientConfig) connect(readBufferSize int) (net.Conn, *bufio.Reader, error) {
434+
func (c *ClientConfig[Key, Value]) connect(readBufferSize int) (net.Conn, *bufio.Reader, error) {
461435
network := "tcp"
462436
if isUnixAddr(c.Addr) {
463437
network = "unix"

0 commit comments

Comments
 (0)