Skip to content

Commit

Permalink
Different response interfaces for different requests
Browse files Browse the repository at this point in the history
  • Loading branch information
DerekBum committed Dec 14, 2023
1 parent 66c136a commit c9fa63b
Show file tree
Hide file tree
Showing 18 changed files with 566 additions and 197 deletions.
18 changes: 12 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
conn.Addr(), err)
case LogUnexpectedResultId:
respHeader := v[0].(header)
respHeader := v[0].(Header)
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
conn.Addr(), respHeader.requestId)
case LogWatchEventReadFailed:
Expand Down Expand Up @@ -818,10 +818,10 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
return
}

resp := &ConnResponse{header: respHeader, buf: buf}
baseResp := &BaseResponse{header: respHeader, buf: buf}
var fut *Future = nil
if iproto.Type(respHeader.code) == iproto.IPROTO_EVENT {
if event, err := readWatchEvent(&resp.buf); err == nil {
if event, err := readWatchEvent(&baseResp.buf); err == nil {
events <- event
} else {
err = ClientError{
Expand All @@ -833,10 +833,14 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
continue
} else if respHeader.code == PushCode {
if fut = conn.peekFuture(respHeader.requestId); fut != nil {
resp := fut.req.CreateResponse(respHeader)
resp.SetBuf(buf)
fut.AppendPush(resp)
}
} else {
if fut = conn.fetchFuture(respHeader.requestId); fut != nil {
resp := fut.req.CreateResponse(respHeader)
resp.SetBuf(buf)
fut.SetResponse(resp)
conn.markDone(fut)
}
Expand Down Expand Up @@ -873,8 +877,10 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) {
}
}

func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
func (conn *Connection) newFuture(req Request) (fut *Future) {
ctx := req.Ctx()
fut = NewFuture()
fut.SetRequest(req)
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
select {
case conn.rlimit <- struct{}{}:
Expand Down Expand Up @@ -984,7 +990,7 @@ func (conn *Connection) decrementRequestCnt() {
func (conn *Connection) send(req Request, streamId uint64) *Future {
conn.incrementRequestCnt()

fut := conn.newFuture(req.Ctx())
fut := conn.newFuture(req)
if fut.ready == nil {
conn.decrementRequestCnt()
return fut
Expand Down Expand Up @@ -1053,7 +1059,7 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {

if req.Async() {
if fut = conn.fetchFuture(reqid); fut != nil {
resp := &ConnResponse{}
resp := req.CreateResponse(Header{})
fut.SetResponse(resp)
conn.markDone(fut)
}
Expand Down
7 changes: 7 additions & 0 deletions crud/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func (req baseRequest) Async() bool {
return req.impl.Async()
}

// CreateResponse creates a response for the baseRequest.
func (req baseRequest) CreateResponse(header tarantool.Header) tarantool.Response {
resp := tarantool.BaseResponse{}
resp.SetHeader(header)
return &resp
}

type spaceRequest struct {
baseRequest
space string
Expand Down
7 changes: 7 additions & 0 deletions crud/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,10 @@ func (req SelectRequest) Context(ctx context.Context) SelectRequest {

return req
}

// CreateResponse creates a response for the SelectRequest.
func (req SelectRequest) CreateResponse(header tarantool.Header) tarantool.Response {
resp := tarantool.SelectResponse{}
resp.SetHeader(header)
return &resp
}
4 changes: 2 additions & 2 deletions dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,12 @@ func readResponse(r io.Reader) (Response, error) {

respBytes, err := read(r, lenbuf[:])
if err != nil {
return &ConnResponse{}, fmt.Errorf("read error: %w", err)
return &BaseResponse{}, fmt.Errorf("read error: %w", err)
}

buf := smallBuf{b: respBytes}
respHeader, err := decodeHeader(msgpack.NewDecoder(&smallBuf{}), &buf)
resp := &ConnResponse{header: respHeader, buf: buf}
resp := &BaseResponse{header: respHeader, buf: buf}
if err != nil {
return resp, fmt.Errorf("decode response header error: %w", err)
}
Expand Down
63 changes: 49 additions & 14 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,13 @@ func ExampleExecuteRequest() {
data, err := resp.Decode()
fmt.Println("Error", err)
fmt.Println("Data", data)
fmt.Println("MetaData", resp.MetaData())
fmt.Println("SQL Info", resp.SQLInfo())
exResp, ok := resp.(*tarantool.PrepareExecuteResponse)
if !ok {
fmt.Printf("wrong response type")
return
}
fmt.Println("MetaData", exResp.MetaData())
fmt.Println("SQL Info", exResp.SQLInfo())

// There are 4 options to pass named parameters to an SQL query:
// 1) The simple map;
Expand Down Expand Up @@ -608,8 +613,13 @@ func ExampleExecuteRequest() {
data, err = resp.Decode()
fmt.Println("Error", err)
fmt.Println("Data", data)
fmt.Println("MetaData", resp.MetaData())
fmt.Println("SQL Info", resp.SQLInfo())
exResp, ok = resp.(*tarantool.PrepareExecuteResponse)
if !ok {
fmt.Printf("wrong response type")
return
}
fmt.Println("MetaData", exResp.MetaData())
fmt.Println("SQL Info", exResp.SQLInfo())

// 2)
req = req.Args(sqlBind2)
Expand All @@ -619,8 +629,13 @@ func ExampleExecuteRequest() {
data, err = resp.Decode()
fmt.Println("Error", err)
fmt.Println("Data", data)
fmt.Println("MetaData", resp.MetaData())
fmt.Println("SQL Info", resp.SQLInfo())
exResp, ok = resp.(*tarantool.PrepareExecuteResponse)
if !ok {
fmt.Printf("wrong response type")
return
}
fmt.Println("MetaData", exResp.MetaData())
fmt.Println("SQL Info", exResp.SQLInfo())

// 3)
req = req.Args(sqlBind3)
Expand All @@ -630,8 +645,13 @@ func ExampleExecuteRequest() {
data, err = resp.Decode()
fmt.Println("Error", err)
fmt.Println("Data", data)
fmt.Println("MetaData", resp.MetaData())
fmt.Println("SQL Info", resp.SQLInfo())
exResp, ok = resp.(*tarantool.PrepareExecuteResponse)
if !ok {
fmt.Printf("wrong response type")
return
}
fmt.Println("MetaData", exResp.MetaData())
fmt.Println("SQL Info", exResp.SQLInfo())

// 4)
req = req.Args(sqlBind4)
Expand All @@ -641,8 +661,13 @@ func ExampleExecuteRequest() {
data, err = resp.Decode()
fmt.Println("Error", err)
fmt.Println("Data", data)
fmt.Println("MetaData", resp.MetaData())
fmt.Println("SQL Info", resp.SQLInfo())
exResp, ok = resp.(*tarantool.PrepareExecuteResponse)
if !ok {
fmt.Printf("wrong response type")
return
}
fmt.Println("MetaData", exResp.MetaData())
fmt.Println("SQL Info", exResp.SQLInfo())

// The way to pass positional arguments to an SQL query.
req = tarantool.NewExecuteRequest(
Expand All @@ -654,8 +679,13 @@ func ExampleExecuteRequest() {
data, err = resp.Decode()
fmt.Println("Error", err)
fmt.Println("Data", data)
fmt.Println("MetaData", resp.MetaData())
fmt.Println("SQL Info", resp.SQLInfo())
exResp, ok = resp.(*tarantool.PrepareExecuteResponse)
if !ok {
fmt.Printf("wrong response type")
return
}
fmt.Println("MetaData", exResp.MetaData())
fmt.Println("SQL Info", exResp.SQLInfo())

// The way to pass SQL expression with using custom packing/unpacking for
// a type.
Expand All @@ -680,8 +710,13 @@ func ExampleExecuteRequest() {
data, err = resp.Decode()
fmt.Println("Error", err)
fmt.Println("Data", data)
fmt.Println("MetaData", resp.MetaData())
fmt.Println("SQL Info", resp.SQLInfo())
exResp, ok = resp.(*tarantool.PrepareExecuteResponse)
if !ok {
fmt.Printf("wrong response type")
return
}
fmt.Println("MetaData", exResp.MetaData())
fmt.Println("SQL Info", exResp.SQLInfo())
}

func getTestTxnDialer() tarantool.Dialer {
Expand Down
6 changes: 6 additions & 0 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
// Future is a handle for asynchronous request.
type Future struct {
requestId uint32
req Request
next *Future
timeout time.Duration
mutex sync.Mutex
Expand Down Expand Up @@ -150,6 +151,11 @@ func (fut *Future) AppendPush(resp Response) {
fut.ready <- struct{}{}
}

// SetRequest sets a request, for which the future was created.
func (fut *Future) SetRequest(req Request) {
fut.req = req
}

// SetResponse sets a response for the future and finishes the future.
func (fut *Future) SetResponse(resp Response) {
fut.mutex.Lock()
Expand Down
20 changes: 10 additions & 10 deletions future_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestFutureGetIteratorNoItems(t *testing.T) {
}

func TestFutureGetIteratorNoResponse(t *testing.T) {
push := &ConnResponse{}
push := &BaseResponse{}
fut := NewFuture()
fut.AppendPush(push)

Expand All @@ -64,7 +64,7 @@ func TestFutureGetIteratorNoResponse(t *testing.T) {
}

func TestFutureGetIteratorNoResponseTimeout(t *testing.T) {
push := &ConnResponse{}
push := &BaseResponse{}
fut := NewFuture()
fut.AppendPush(push)

Expand All @@ -80,8 +80,8 @@ func TestFutureGetIteratorNoResponseTimeout(t *testing.T) {
}

func TestFutureGetIteratorResponseOnTimeout(t *testing.T) {
push := &ConnResponse{}
resp := &ConnResponse{}
push := &BaseResponse{}
resp := &BaseResponse{}
fut := NewFuture()
fut.AppendPush(push)

Expand Down Expand Up @@ -119,8 +119,8 @@ func TestFutureGetIteratorResponseOnTimeout(t *testing.T) {
}

func TestFutureGetIteratorFirstResponse(t *testing.T) {
resp1 := &ConnResponse{}
resp2 := &ConnResponse{}
resp1 := &BaseResponse{}
resp2 := &BaseResponse{}
fut := NewFuture()
fut.SetResponse(resp1)
fut.SetResponse(resp2)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestFutureGetIteratorFirstError(t *testing.T) {
}

func TestFutureGetIteratorResponse(t *testing.T) {
responses := []*ConnResponse{
responses := []*BaseResponse{
{},
{},
{},
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestFutureGetIteratorResponse(t *testing.T) {

func TestFutureGetIteratorError(t *testing.T) {
const errMsg = "error message"
responses := []*ConnResponse{
responses := []*BaseResponse{
{},
{},
}
Expand Down Expand Up @@ -226,14 +226,14 @@ func TestFutureGetIteratorError(t *testing.T) {

func TestFutureSetStateRaceCondition(t *testing.T) {
err := errors.New("any error")
resp := &ConnResponse{}
resp := &BaseResponse{}

for i := 0; i < 1000; i++ {
fut := NewFuture()
for j := 0; j < 9; j++ {
go func(opt int) {
if opt%3 == 0 {
respAppend := &ConnResponse{}
respAppend := &BaseResponse{}
fut.AppendPush(respAppend)
} else if opt%3 == 1 {
fut.SetError(err)
Expand Down
6 changes: 5 additions & 1 deletion pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2548,7 +2548,11 @@ func TestNewPrepared(t *testing.T) {
if reflect.DeepEqual(data[0], []interface{}{1, "test"}) {
t.Error("Select with named arguments failed")
}
metaData := resp.MetaData()
prepResp, ok := resp.(*tarantool.PrepareExecuteResponse)
if !ok {
t.Fatalf("Not a Prepare response")
}
metaData := prepResp.MetaData()
if metaData[0].FieldType != "unsigned" ||
metaData[0].FieldName != "NAME0" ||
metaData[1].FieldType != "string" ||
Expand Down
2 changes: 1 addition & 1 deletion pool/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type baseRequestMock struct {
mode Mode
}

var reqResp tarantool.Response = &tarantool.ConnResponse{}
var reqResp tarantool.Response = &tarantool.BaseResponse{}
var errReq error = errors.New("response error")
var reqFuture *tarantool.Future = &tarantool.Future{}

Expand Down
14 changes: 14 additions & 0 deletions prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ func (req *PrepareRequest) Context(ctx context.Context) *PrepareRequest {
return req
}

// CreateResponse creates a response for the PrepareRequest.
func (req *PrepareRequest) CreateResponse(header Header) Response {
resp := PrepareExecuteResponse{}
resp.SetHeader(header)
return &resp
}

// UnprepareRequest helps you to create an unprepare request object for
// execution by a Connection.
type UnprepareRequest struct {
Expand Down Expand Up @@ -175,3 +182,10 @@ func (req *ExecutePreparedRequest) Context(ctx context.Context) *ExecutePrepared
req.ctx = ctx
return req
}

// CreateResponse creates a response for the ExecutePreparedRequest.
func (req *ExecutePreparedRequest) CreateResponse(header Header) Response {
resp := PrepareExecuteResponse{}
resp.SetHeader(header)
return &resp
}
Loading

0 comments on commit c9fa63b

Please sign in to comment.