diff --git a/v3/client/connection.go b/v3/client/connection.go index 37884ead..2cec8e00 100644 --- a/v3/client/connection.go +++ b/v3/client/connection.go @@ -23,8 +23,6 @@ import ( "sync/atomic" "time" - cache "github.com/patrickmn/go-cache" - "github.com/FISCO-BCOS/bcos-c-sdk/bindings/go/csdk" "github.com/FISCO-BCOS/go-sdk/v3/types" "github.com/ethereum/go-ethereum/common" @@ -59,19 +57,19 @@ type Error interface { // Connection represents a connection to an RPC server. type Connection struct { - csdk *csdk.CSDK - idCounter int64 - blockNumberNotify func(int64) - notifyLock sync.Mutex - transactionHandlers *cache.Cache - closed bool + csdk *csdk.CSDK + idCounter int64 + blockNumberNotify func(int64) + notifyLock sync.Mutex + closed bool + lock sync.Mutex } type requestOp struct { ids []json.RawMessage err error respChanData *csdk.CallbackChan - handler func(*types.Receipt, error) + //handler func(*types.Receipt, error) } type EventLogRespResult struct { @@ -143,8 +141,9 @@ func NewConnectionByFile(configFile, groupID string, privateKey []byte) (*Connec if err != nil { return nil, err } - c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)} - go c.processTransactionResponses() + c := &Connection{ + csdk: sdk, + } return c, nil } @@ -173,51 +172,42 @@ func NewConnection(config *Config) (*Connection, error) { if err != nil { return nil, err } - c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)} - go c.processTransactionResponses() + c := &Connection{ + csdk: sdk, + } return c, nil } func (c *Connection) GetCSDK() *csdk.CSDK { return c.csdk } -func (c *Connection) processTransactionResponses() { - for { - if !c.closed { - items := c.transactionHandlers.Items() - for key, item := range items { - op := item.Object.(*requestOp) - if len(op.respChanData.Data) > 0 { - go func() { - resp, _, err := op.waitRpcMessage() - if err != nil { - op.handler(nil, err) - return - } - if resp.Error != nil { - op.handler(nil, resp.Error) - return - } - if len(resp.Result) == 0 { - op.handler(nil, errors.New("result is null")) - return - } - var receipt types.Receipt - err = json.Unmarshal(resp.Result, &receipt) - if err != nil { - op.handler(nil, fmt.Errorf("unmarshal receipt error: %v", err)) - return - } - op.handler(&receipt, nil) - }() - c.transactionHandlers.Delete(key) - } - } - } else { +func wrapTransactionResponsesHandler(f func(*types.Receipt, error)) func([]byte, error) { + return func(bytes []byte, err error) { + var jrm jsonrpcMessage + if err != nil { + f(nil, err) + return + } + if err = json.Unmarshal(bytes, &jrm); err != nil { + f(nil, err) + return + } + if jrm.Error != nil { + f(nil, jrm.Error) + return + } + if len(jrm.Result) == 0 { + f(nil, errors.New("result is null")) + return + } + var receipt types.Receipt + err = json.Unmarshal(jrm.Result, &receipt) + if err != nil { + f(nil, fmt.Errorf("unmarshal receipt error: %v", err)) return } + f(&receipt, nil) } - } func (c *Connection) nextID() int64 { @@ -238,6 +228,14 @@ func (c *Connection) NewMessage(method string, paramsIn ...interface{}) (*jsonrp // Close closes the client, aborting any in-flight requests. func (c *Connection) Close() { + if c.closed { + return + } + c.lock.Lock() + defer c.lock.Unlock() + if c.closed { + return + } c.closed = true c.csdk.Close() } @@ -340,7 +338,14 @@ func (c *Connection) Call(result interface{}, method string, args ...interface{} // can also pass nil, in which case the result is ignored. func (c *Connection) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { //logrus.Infof("CallContext method:%s\n", method) - op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 1)}} + op := &requestOp{respChanData: &csdk.CallbackChan{Data: nil}} + + switch method { + case "asyncSendTransaction", "SendEncodedTransaction": + default: + op.respChanData.Data = make(chan csdk.Response, 1) + } + switch method { case "call": arg := args[0].(map[string]interface{}) @@ -404,67 +409,59 @@ func (c *Connection) CallContext(ctx context.Context, result interface{}, method case "getPendingTxSize": c.csdk.GetPendingTxSize(op.respChanData) case "asyncSendTransaction": - fallthrough + data := hexutil.Encode(args[0].([]byte)) + contractAddress := args[1].(string) + op.respChanData.Handler = wrapTransactionResponsesHandler(args[2].(func(*types.Receipt, error))) + var abiStr string + if len(args) >= 4 && len(contractAddress) == 0 { + abiStr = args[3].(string) + } + _, err := c.csdk.CreateAndSendTransaction(op.respChanData, contractAddress, data, abiStr, "", false) + if err != nil { + return err + } case "sendTransaction": - fallthrough + data := hexutil.Encode(args[0].([]byte)) + contractAddress := args[1].(string) + var abiStr string + if len(args) >= 3 && len(contractAddress) == 0 { + abiStr = args[2].(string) + } + _, err := c.csdk.CreateAndSendTransaction(op.respChanData, contractAddress, data, abiStr, "", false) + if err != nil { + return err + } case "SendEncodedTransaction": - var handler func(*types.Receipt, error) - if method == "sendTransaction" { - data := hexutil.Encode(args[0].([]byte)) - contractAddress := args[1].(string) - var abiStr string - if len(args) >= 3 && len(contractAddress) == 0 { - abiStr = args[2].(string) - } - _, err := c.csdk.CreateAndSendTransaction(op.respChanData, contractAddress, data, abiStr, "", false) - if err != nil { - return err - } - } else if method == "asyncSendTransaction" { - data := hexutil.Encode(args[0].([]byte)) - contractAddress := args[1].(string) - handler = args[2].(func(*types.Receipt, error)) - var abiStr string - if len(args) >= 4 && len(contractAddress) == 0 { - abiStr = args[3].(string) - } - _, err := c.csdk.CreateAndSendTransaction(op.respChanData, contractAddress, data, abiStr, "", false) - if err != nil { - return err - } - } else { // SendEncodedTransaction - encodedTransaction := args[0].([]byte) - withProof := args[1].(bool) - if len(args) >= 3 { - handler = args[2].(func(*types.Receipt, error)) - } - err := c.csdk.SendEncodedTransaction(op.respChanData, encodedTransaction, withProof) - if err != nil { - return err - } + encodedTransaction := args[0].([]byte) + withProof := args[1].(bool) + if len(args) >= 3 { + op.respChanData.Handler = wrapTransactionResponsesHandler(args[2].(func(*types.Receipt, error))) + } else { + op.respChanData.Data = make(chan csdk.Response, 1) } - // async send transaction - if handler != nil { - op.handler = handler - pointer := fmt.Sprintf("%p", op.respChanData) - c.transactionHandlers.Set(pointer, op, defaultTransactionTimeout) - return nil + err := c.csdk.SendEncodedTransaction(op.respChanData, encodedTransaction, withProof) + if err != nil { + return err } default: return ErrNoRpcMethod } - - // dispatch has accepted the request and will close the channel when it quits. - switch resp, _, err := op.waitRpcMessage(); { - case err != nil: - return err - case resp.Error != nil: - return resp.Error - case len(resp.Result) == 0: - logrus.Errorf("result is null, %+v, err:%+v \n", resp, err) - return ErrNoResult - default: - return json.Unmarshal(resp.Result, &result) + // async send transaction + if op.respChanData.Handler != nil { + return nil + } else { + // dispatch has accepted the request and will close the channel when it quits. + switch resp, _, err := op.waitRpcMessage(); { + case err != nil: + return err + case resp.Error != nil: + return resp.Error + case len(resp.Result) == 0: + logrus.Errorf("result is null, %+v, err:%+v \n", resp, err) + return ErrNoResult + default: + return json.Unmarshal(resp.Result, &result) + } } } diff --git a/v3/go.mod b/v3/go.mod index 82988dfc..2199afeb 100644 --- a/v3/go.mod +++ b/v3/go.mod @@ -5,12 +5,11 @@ go 1.21.5 // replace github.com/FISCO-BCOS/bcos-c-sdk => ../../bcos-c-sdk require ( - github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240219081048-53240138c396 + github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240305024607-895d57002774 github.com/FISCO-BCOS/crypto v0.0.0-20200202032121-bd8ab0b5d4f1 github.com/TarsCloud/TarsGo v1.4.5 github.com/deckarep/golang-set/v2 v2.6.0 github.com/ethereum/go-ethereum v1.13.10 - github.com/patrickmn/go-cache v2.1.0+incompatible github.com/schollz/progressbar/v3 v3.14.1 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.5.0 @@ -81,6 +80,7 @@ require ( github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/opentracing/opentracing-go v1.1.0 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.12.0 // indirect diff --git a/v3/go.sum b/v3/go.sum index 79540be2..9353b7e4 100644 --- a/v3/go.sum +++ b/v3/go.sum @@ -40,8 +40,8 @@ github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EF github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w= github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240219081048-53240138c396 h1:VYwt+oviUo74Hb43ml9jEXC5DLJidNFrOp7lG6t8UTU= -github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240219081048-53240138c396/go.mod h1:n2KxbYa73MW3xdLVu2vpPpoblZMms+CwPmvFkubO9xM= +github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240305024607-895d57002774 h1:CgNdyFyNAHeg+3jxCQU+y5CtHawLvj5p1hCLmOH4ffQ= +github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240305024607-895d57002774/go.mod h1:n2KxbYa73MW3xdLVu2vpPpoblZMms+CwPmvFkubO9xM= github.com/FISCO-BCOS/crypto v0.0.0-20200202032121-bd8ab0b5d4f1 h1:ThPht4qK10+cMZC5COIjHPq0INm5HAMVYqrez5zEgFI= github.com/FISCO-BCOS/crypto v0.0.0-20200202032121-bd8ab0b5d4f1/go.mod h1:UrLdwsFrjiaCsvdcPLcH6B7s/FUmym3qfM93u2ziR+4= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=