Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce query policy #475

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions platform/fabric/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
)

// QueryPolicy defines the policy to use to decide if a query is successful
type QueryPolicy int

const (
// QueryAll requires an answer from all selected peers
QueryAll QueryPolicy = iota
// QueryMajority requires an answer from the majority of the selected peers
QueryMajority
// QueryOne requires an answer from at least one of the selected peers
QueryOne
)

type Envelope struct {
e driver.Envelope
}
Expand Down Expand Up @@ -249,6 +261,12 @@ func (i *ChaincodeQuery) WithRetrySleep(duration time.Duration) *ChaincodeQuery
return i
}

// WithQueryPolicy sets the query policy to use
func (i *ChaincodeQuery) WithQueryPolicy(policy QueryPolicy) *ChaincodeQuery {
i.ChaincodeInvocation.WithQueryPolicy(driver.QueryPolicy(policy))
return i
}

type ChaincodeEndorse struct {
ChaincodeInvocation driver.ChaincodeInvocation
}
Expand Down
4 changes: 3 additions & 1 deletion platform/fabric/core/generic/chaincode/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func NewChaincode(name string, sp view.ServiceProvider, network Network, channel
}

func (c *Chaincode) NewInvocation(function string, args ...interface{}) driver.ChaincodeInvocation {
return NewInvoke(c, function, args...)
return NewInvoke(c, func(chaincode *Chaincode) driver.ChaincodeDiscover {
return NewDiscovery(chaincode)
}, function, args...)
}

func (c *Chaincode) NewDiscover() driver.ChaincodeDiscover {
Expand Down
85 changes: 65 additions & 20 deletions platform/fabric/core/generic/chaincode/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"context"
"encoding/base64"
"fmt"
"strconv"
"strings"
"sync"
Expand All @@ -28,11 +29,14 @@ import (
"go.uber.org/zap/zapcore"
)

type NewChaincodeDiscoverFunc = func(chaincode *Chaincode) driver.ChaincodeDiscover

type Invoke struct {
Chaincode *Chaincode
ServiceProvider view2.ServiceProvider
Network Network
Channel Channel
NewChaincodeDiscover NewChaincodeDiscoverFunc
TxID driver.TxID
SignerIdentity view.Identity
ChaincodePath string
Expand All @@ -50,19 +54,21 @@ type Invoke struct {
NumRetries int
RetrySleep time.Duration
Context context.Context
QueryPolicy driver.QueryPolicy
}

func NewInvoke(chaincode *Chaincode, function string, args ...interface{}) *Invoke {
func NewInvoke(chaincode *Chaincode, newChaincodeDiscover NewChaincodeDiscoverFunc, function string, args ...interface{}) *Invoke {
return &Invoke{
Chaincode: chaincode,
ServiceProvider: chaincode.sp,
Network: chaincode.network,
Channel: chaincode.channel,
ChaincodeName: chaincode.name,
Function: function,
Args: args,
NumRetries: int(chaincode.NumRetries),
RetrySleep: chaincode.RetrySleep,
Chaincode: chaincode,
ServiceProvider: chaincode.sp,
Network: chaincode.network,
Channel: chaincode.channel,
ChaincodeName: chaincode.name,
Function: function,
Args: args,
NumRetries: int(chaincode.NumRetries),
RetrySleep: chaincode.RetrySleep,
NewChaincodeDiscover: newChaincodeDiscover,
}
}

Expand Down Expand Up @@ -264,6 +270,11 @@ func (i *Invoke) WithRetrySleep(duration time.Duration) driver.ChaincodeInvocati
return i
}

func (i *Invoke) WithQueryPolicy(policy driver.QueryPolicy) driver.ChaincodeInvocation {
i.QueryPolicy = policy
return i
}

func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalResponse, driver.SigningIdentity, error) {
// TODO: improve by providing grpc connection pool
var peerClients []peer2.Client
Expand Down Expand Up @@ -305,7 +316,7 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon

// discover
var err error
discovery := NewDiscovery(
discovery := i.NewChaincodeDiscover(
i.Chaincode,
)
discovery.WithFilterByMSPIDs(
Expand Down Expand Up @@ -335,27 +346,38 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
}
}

// get a peer client for all discovered peers
n := len(discoveredPeers)
// get a peer client for all discovered peers and collect the errors
var errs []error
for _, peer := range discoveredPeers {
peerClient, err := i.Channel.NewPeerClientForAddress(grpc.ConnectionConfig{
Address: peer.Endpoint,
TLSEnabled: i.Network.Config().TLSEnabled(),
TLSRootCertBytes: peer.TLSRootCerts,
})
if err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "error getting endorser client for %s", peer.Endpoint)
errs = append(errs, errors.WithMessagef(err, "error getting endorser client for %s", peer.Endpoint))
continue
}
peerClients = append(peerClients, peerClient)
}
if err := i.checkQueryPolicy(errs, len(peerClients), n); err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given discovered peers")
}

// get endorser clients
errs = nil
for _, client := range peerClients {
endorserClient, err := client.Endorser()
if err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "error getting endorser client for %s", client.Address())
errs = append(errs, errors.WithMessagef(err, "error getting endorser client for %s", client.Address()))
continue
}
endorserClients = append(endorserClients, endorserClient)
}
if err := i.checkQueryPolicy(errs, len(endorserClients), n); err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given peer clients")
}
if len(endorserClients) == 0 {
return "", nil, nil, nil, errors.New("no endorser clients retrieved with the current filters")
}
Expand All @@ -373,15 +395,17 @@ func (i *Invoke) prepare(query bool) (string, *pb.Proposal, []*pb.ProposalRespon
}

// collect responses
responses, err := i.collectResponses(endorserClients, signedProp)
responses, errs := i.collectResponses(endorserClients, signedProp)
if err != nil {
return "", nil, nil, nil, errors.Wrapf(err, "failed collecting proposal responses")
}

if len(responses) == 0 {
// this should only happen if some new code has introduced a bug
return "", nil, nil, nil, errors.New("no proposal responses received - this might indicate a bug")
}
if err := i.checkQueryPolicy(errs, len(responses), n); err != nil {
return "", nil, nil, nil, errors.WithMessagef(err, "cannot match query policy with the given peer clients")
}

return txID, prop, responses, signer, nil
}
Expand Down Expand Up @@ -447,12 +471,12 @@ func (i *Invoke) createChaincodeProposalWithTxIDAndTransient(typ common.HeaderTy
}

// collectResponses sends a signed proposal to a set of peers, and gathers all the responses.
func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedProposal *pb.SignedProposal) ([]*pb.ProposalResponse, error) {
func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedProposal *pb.SignedProposal) ([]*pb.ProposalResponse, []error) {
responsesCh := make(chan *pb.ProposalResponse, len(endorserClients))
errorCh := make(chan error, len(endorserClients))
wg := sync.WaitGroup{}
wg.Add(len(endorserClients))
for _, endorser := range endorserClients {
wg.Add(1)
go func(endorser pb.EndorserClient) {
defer wg.Done()
proposalResp, err := endorser.ProcessProposal(context.Background(), signedProposal)
Expand All @@ -466,14 +490,15 @@ func (i *Invoke) collectResponses(endorserClients []pb.EndorserClient, signedPro
wg.Wait()
close(responsesCh)
close(errorCh)
var errs []error
for err := range errorCh {
return nil, err
errs = append(errs, err)
}
var responses []*pb.ProposalResponse
for response := range responsesCh {
responses = append(responses, response)
}
return responses, nil
return responses, errs
}

// getChaincodeSpec get chaincode spec from the fsccli cmd parameters
Expand Down Expand Up @@ -536,3 +561,23 @@ func (i *Invoke) broadcast(txID string, env *common.Envelope) error {
}
return i.Channel.IsFinal(context.Background(), txID)
}

func (i *Invoke) checkQueryPolicy(errs []error, successes int, n int) error {
switch i.QueryPolicy {
case driver.QueryAll:
if len(errs) != 0 {
return errors.Errorf("query all policy, no errors expected [%v]", errs)
}
case driver.QueryOne:
if successes == 0 {
return errors.Errorf("query one policy, errors occurred [%v]", errs)
}
case driver.QueryMajority:
if successes <= n/2 {
return errors.Errorf("query majority policy, no majority reached [%v]", errs)
}
default:
panic(fmt.Sprintf("programming error, policy [%d] is not valid", i.QueryPolicy))
}
return nil
}
150 changes: 150 additions & 0 deletions platform/fabric/core/generic/chaincode/invoke_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package chaincode_test

import (
"testing"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/chaincode"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/chaincode/mocks"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/config"
config2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/config"
mock2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/msp/mock"
"github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

func TestInvokeQueryFailMatchPolicyDiscoveredPeers(t *testing.T) {
network := &mocks.Network{}
cp := &mock2.ConfigProvider{}
cp.IsSetReturns(false)
networkConfig, err := config2.New(cp, "default", true)
assert.NoError(t, err)
network.ConfigReturns(networkConfig)
sc := &mocks.SignerService{}
si := &mocks.SigningIdentity{}
si.SerializeReturns([]byte("alice"), nil)
si.SignReturns([]byte("alice's signature"), nil)
sc.GetSigningIdentityReturns(si, nil)
network.SignerServiceReturns(sc)
channel := &mocks.Channel{}
mspManager := &mocks.MSPManager{}
mspIdentity := &mocks.MSPIdentity{}
mspIdentity.GetMSPIdentifierReturns("mspid")
mspManager.DeserializeIdentityReturns(mspIdentity, nil)
channel.MSPManagerReturns(mspManager)
chConfig := &config.Channel{
Name: "blueberry",
Default: true,
Quiet: false,
NumRetries: 1,
RetrySleep: 1 * time.Second,
Chaincodes: nil,
}
channel.ConfigReturns(chConfig)
pc := &mocks.PeerClient{}
//ec := &mocks.EndorserClient{}
pc.EndorserReturns(nil, errors.Errorf("endorser not found"))
channel.NewPeerClientForAddressReturnsOnCall(0, pc, nil)
channel.NewPeerClientForAddressReturnsOnCall(1, pc, nil)
channel.NewPeerClientForAddressReturnsOnCall(2, pc, nil)
channel.NewPeerClientForAddressReturnsOnCall(3, nil, errors.Errorf("peer not found"))
channel.NewPeerClientForAddressReturnsOnCall(4, pc, nil)
channel.NewPeerClientForAddressReturnsOnCall(5, pc, nil)
channel.NewPeerClientForAddressReturnsOnCall(6, pc, nil)
channel.NewPeerClientForAddressReturnsOnCall(7, nil, errors.Errorf("peer not found"))
channel.NewPeerClientForAddressReturnsOnCall(8, pc, nil)
channel.NewPeerClientForAddressReturnsOnCall(9, pc, nil)
channel.NewPeerClientForAddressReturnsOnCall(10, pc, nil)
channel.NewPeerClientForAddressReturnsOnCall(11, nil, errors.Errorf("peer not found"))
discover := &mocks.Discover{}
discover.WithFilterByMSPIDsReturns(discover)
discover.WithImplicitCollectionsReturns(discover)
discoveredPeers := []driver.DiscoveredPeer{
{
Identity: []byte("peer1"),
MSPID: "mspid",
Endpoint: "1.1.1.1",
TLSRootCerts: nil,
},
{
Identity: []byte("peer2"),
MSPID: "mspid",
Endpoint: "2.1.1.1",
TLSRootCerts: nil,
},
{
Identity: []byte("peer3"),
MSPID: "mspid",
Endpoint: "3.1.1.1",
TLSRootCerts: nil,
},
{
Identity: []byte("peer4"),
MSPID: "mspid",
Endpoint: "4.1.1.1",
TLSRootCerts: nil,
},
}
discover.CallReturns(discoveredPeers, nil)
ch := chaincode.NewChaincode("pineapple", nil, network, channel)

invoke := chaincode.NewInvoke(ch, func(chaincode *chaincode.Chaincode) driver.ChaincodeDiscover {
return discover
}, "apple")
invoke.WithEndorsersFromMyOrg()
invoke.WithQueryPolicy(driver.QueryAll)
invoke.WithSignerIdentity([]byte("alice"))

_, err = invoke.Query()
assert.Error(t, err)
assert.Contains(t, err.Error(), "cannot match query policy with the given discovered peers")
assert.Contains(t, err.Error(), "query all policy, no errors expected [")

invoke = chaincode.NewInvoke(ch, func(chaincode *chaincode.Chaincode) driver.ChaincodeDiscover {
return discover
}, "apple")
invoke.WithEndorsersFromMyOrg()
invoke.WithQueryPolicy(driver.QueryMajority)
invoke.WithSignerIdentity([]byte("alice"))
_, err = invoke.Query()
assert.Error(t, err)
assert.Contains(t, err.Error(), "cannot match query policy with the given peer clients")
assert.Contains(t, err.Error(), "query majority policy, no majority reached")

ec := &mocks.EndorserClient{}
pr := &peer.ProposalResponse{
Version: 0,
Timestamp: nil,
Response: &peer.Response{
Status: 0,
Message: "",
Payload: nil,
},
Payload: nil,
Endorsement: &peer.Endorsement{
Endorser: []byte("endorser"),
Signature: []byte("endorser's signature"),
},
Interest: nil,
}
ec.ProcessProposalReturnsOnCall(0, pr, nil)
ec.ProcessProposalReturnsOnCall(1, nil, errors.New("cannot return proposal response"))
ec.ProcessProposalReturnsOnCall(2, nil, errors.New("cannot return proposal response"))
pc.EndorserReturns(ec, nil)
invoke = chaincode.NewInvoke(ch, func(chaincode *chaincode.Chaincode) driver.ChaincodeDiscover {
return discover
}, "apple")
invoke.WithEndorsersFromMyOrg()
invoke.WithQueryPolicy(driver.QueryOne)
invoke.WithSignerIdentity([]byte("alice"))
_, err = invoke.Query()
assert.NoError(t, err)
}
Loading