Skip to content

Commit

Permalink
Merge pull request #13 from ImperialCollegeLondon/BloccProtocol
Browse files Browse the repository at this point in the history
Merge with Alfredo's code for forking
  • Loading branch information
TonyWu3027 authored Aug 16, 2023
2 parents 44adc08 + 4ef029f commit fbedc1e
Show file tree
Hide file tree
Showing 25 changed files with 447 additions and 371 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ TESTS*.xml
.vagrant/
.vscode
integration/*
new_config.txt
new_config_2.0.txt
old_config.txt
17 changes: 4 additions & 13 deletions common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,21 +232,9 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
}

forkedChan := chain.Forked()
logger.Debug("Fork address:", forkedChan)
select {
case _, ok := <-forkedChan:
if !ok {
logger.Debug("Fork channel is closed")
} else {
logger.Debug("Fork channel is open and contains data")
}
default:
logger.Debug("Fork channel is open but does not contain data")
}
logger.Debugf("[channel: %s] Checking if channel is forked", chdr.ChannelId)
select {
case <-forkedChan:
logger.Warningf("[channel: %s] Rejecting deliver request for %s because of chain fork", chdr.ChannelId, addr)
logger.Warningf("[channel: %s] Rejecting deliver request for %s because of fork", chdr.ChannelId, addr)
return cb.Status_FORKED, nil
default:
}
Expand Down Expand Up @@ -327,6 +315,9 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
case <-ctx.Done():
logger.Debugf("Context canceled, aborting wait for next block")
return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
case <-forkedChan:
logger.Warningf("[channel: %s] Aborting deliver request for %s because of fork", chdr.ChannelId, addr)
return cb.Status_FORKED, nil
case <-erroredChan:
// TODO, today, the only user of the errorChan is the orderer consensus implementations. If the peer ever reports
// this error, we will need to update this error message, possibly finding a way to signal what error text to return.
Expand Down
11 changes: 11 additions & 0 deletions common/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,14 @@ func (e VSCCExecutionFailureError) Error() string {
func (e *VSCCExecutionFailureError) IsValid() bool {
return e.Err == nil
}

// ForkedTxError indicates that a transaction
// has been forged and is attempting to be submitted
// to the network
type ForkedTxError struct {
msg string
}

func (e *ForkedTxError) Error() string {
return e.msg
}
26 changes: 0 additions & 26 deletions common/forknotifier/forknotifier.go

This file was deleted.

2 changes: 0 additions & 2 deletions common/ledger/blockledger/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ func (nfei *NotFoundErrorIterator) Close() {}
//
// to accommodate non-deterministic marshaling
func CreateNextBlock(rl Reader, messages []*cb.Envelope) *cb.Block {
logger.Debugf("Creating next block for chain with height %d", rl.Height())

var nextBlockNumber uint64
var previousBlockHash []byte
var err error
Expand Down
23 changes: 23 additions & 0 deletions core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"context"
"errors"
"fmt"
"github.com/hyperledger/fabric/gossip/common"
"sync"
"time"

"github.com/hyperledger/fabric-protos-go/orderer"
errors2 "github.com/hyperledger/fabric/common/errors"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/internal/pkg/comm"
Expand Down Expand Up @@ -139,6 +141,7 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b
BlockGossipDisabled: !d.conf.DeliverServiceConfig.BlockGossipEnabled,
InitialRetryDelay: 100 * time.Millisecond,
YieldLeadership: !d.conf.IsStaticLeader,
ErrorC: make(chan error, 1),
}

if dc.BlockGossipDisabled {
Expand All @@ -158,6 +161,26 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b
d.blockProviders[chainID] = dc
go func() {
dc.DeliverBlocks()
select {
case err := <-dc.ErrorC:
if _, ok := err.(*errors2.ForkedTxError); ok {
// If a fork is detected, stop the delivery for the channel
logger.Errorf("Fork occurred for channel %s. "+
"All subsequent blocks may be compromised.", chainID, err)
err = d.StopDeliverForChannel(chainID)
if err != nil {
logger.Errorf("Fork occurred but Fabric failed to stop delivery for channel %s: %s. "+
"All subsequent blocks may be compromised.", chainID, err)
}
dc.Gossip.StopChain(chainID)
dc.Gossip.LeaveChan(common.ChannelID(chainID))
} else {
// Log or handle other types of errors
logger.Errorf("Received unexpected error from DeliverBlocks: %v", err)
}
default:
// No error, continue as normal
}
finalizer()
}()
return nil
Expand Down
2 changes: 0 additions & 2 deletions core/ledger/kvledger/tests/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ func newBlockGenerator(lgr ledger.PeerLedger, t *testing.T) *blkGenerator {
}

// nextBlockAndPvtdata cuts the next block
// BLOCC: Only used in tests.
func (g *blkGenerator) nextBlockAndPvtdata(trans []*txAndPvtdata, missingPvtData ledger.TxMissingPvtData) *ledger.BlockAndPvtData {
logger.Debugf("Cutting the next block")
block := protoutil.NewBlock(g.lastNum+1, g.lastHash)
blockPvtdata := make(map[uint64]*ledger.TxPvtData)
for i, tran := range trans {
Expand Down
31 changes: 5 additions & 26 deletions core/scc/bscc/bscc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,15 @@ import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/peer"
blocc "github.com/hyperledger/fabric/internal/peer/blocc/chaincode"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"io/ioutil"
"os"
)

func New(peerInstance *peer.Peer, server *comm.GRPCServer, command *cobra.Command) *BSCC {
func New(peerInstance *peer.Peer) *BSCC {
return &BSCC{
peerInstance: peerInstance,
peerServer: server,
unjoin: command,
}
}

Expand All @@ -36,8 +32,6 @@ func (bscc *BSCC) Chaincode() shim.Chaincode {
type BSCC struct {
peerInstance *peer.Peer
config Config
peerServer *comm.GRPCServer
unjoin *cobra.Command
}

type Config struct {
Expand All @@ -50,6 +44,7 @@ var bloccProtoLogger = flogging.MustGetLogger("bscc")

const (
approveSensoryReading string = "ApproveSensoryReading"
simulateForkAttempt string = "SimulateForkAttempt"
)

// ------------------- Error handling ------------------- //
Expand All @@ -62,8 +57,6 @@ func (f InvalidFunctionError) Error() string {

// -------------------- Stub Interface ------------------- //

var index uint64

func (bscc *BSCC) Init(stub shim.ChaincodeStubInterface) pb.Response {
bloccProtoLogger.Info("Init BSCC")
go func() {
Expand All @@ -89,7 +82,6 @@ func (bscc *BSCC) Init(stub shim.ChaincodeStubInterface) pb.Response {
TLSCertFile: tlsCertFile,
CryptoProvider: bscc.peerInstance.CryptoProvider,
}
index = 1
return shim.Success(nil)
}

Expand Down Expand Up @@ -125,6 +117,9 @@ func (bscc *BSCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
txID := args[1]
bloccProtoLogger.Infof("ApproveSensoryReading for: %s", txID)
return shim.Success(txID)
case simulateForkAttempt:
bloccProtoLogger.Warningf("Adding a fork block!")
return shim.Success(nil)
}

return shim.Error(fmt.Sprintf("Requested function %s not found.", fname))
Expand All @@ -135,22 +130,6 @@ func (bscc *BSCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
func (bscc *BSCC) processEvent(event event.Event) {
var err error
bloccProtoLogger.Info("BLOCC - Received approval event:", event)
bloccProtoLogger.Debug("index:", index)
//if index == 2 {
// bscc.peerServer.Stop()
// bscc.unjoin.SetArgs([]string{
// "--channelID=" + event.ChannelID,
// })
// err = bscc.unjoin.Execute()
// if err != nil {
// bloccProtoLogger.Errorf("Failed to unjoin channel: %s", err)
// }
// err = bscc.peerServer.Start()
// if err != nil {
// bloccProtoLogger.Errorf("Failed to start peer server: %s", err)
// }
//}
index++
address, rootCertFile, err := bscc.gatherOrdererInfo(event.ChannelID)
if err != nil {
bloccProtoLogger.Errorf("Failed to gather orderer info: %s", err)
Expand Down
7 changes: 7 additions & 0 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ type GossipChannel interface {
// LeaveChannel makes the peer leave the channel
LeaveChannel()

// HasLeftChannel returns whether the peer has left the channel
HasLeftChannel() bool

// Stop stops the channel's activity
Stop()
}
Expand Down Expand Up @@ -362,6 +365,10 @@ func (gc *gossipChannel) hasLeftChannel() bool {
return atomic.LoadInt32(&gc.leftChannel) == 1
}

func (gc *gossipChannel) HasLeftChannel() bool {
return atomic.LoadInt32(&gc.leftChannel) == 1
}

// GetPeers returns a list of peers with metadata as published by them
func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
var members []discovery.NetworkMember
Expand Down
4 changes: 4 additions & 0 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,16 @@ func (g *Node) JoinChan(joinMsg api.JoinChannelMessage, channelID common.Channel

// LeaveChan makes gossip stop participating in the given channel
func (g *Node) LeaveChan(channelID common.ChannelID) {
g.logger.Info("Leaving gossip network of channel", channelID)
gc := g.chanState.getGossipChannelByChainID(channelID)
if gc == nil {
g.logger.Debug("No such channel", channelID)
return
}
gc.LeaveChannel()

left := gc.HasLeftChannel()
g.logger.Debug("Has left channel", channelID, "?", left)
}

// SuspectPeers makes the gossip instance validate identities of suspected peers, and close
Expand Down
27 changes: 27 additions & 0 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ type GossipServiceAdapter interface {

// Gossip the message across the peers
Gossip(msg *gproto.GossipMessage)

// StopChain stops gossip for a specified chain
StopChain(chainID string)

// LeaveChan makes the Gossip instance leave a channel.
// It still disseminates stateInfo message, but doesn't participate
// in block pulling anymore, and can't return anymore a list of peers
// in the channel.
LeaveChan(channelID common.ChannelID)
}

// DeliveryServiceFactory factory to create and initialize delivery service instance
Expand Down Expand Up @@ -526,6 +535,24 @@ func (g *GossipService) Stop() {
g.gossipSvc.Stop()
}

// StopChain stops the gossip component for a given chain
func (g *GossipService) StopChain(chainID string) {
g.lock.Lock()
defer g.lock.Unlock()

logger.Info("Stopping chain", chainID)
if le, exists := g.leaderElection[chainID]; exists {
logger.Infof("Stopping leader election for %s", chainID)
le.Stop()
}
g.chains[chainID].Stop()
g.privateHandlers[chainID].close()

if g.deliveryService[chainID] != nil {
g.deliveryService[chainID].Stop()
}
}

func (g *GossipService) newLeaderElectionComponent(channelID string, callback func(bool),
electionMetrics *gossipmetrics.ElectionMetrics) election.LeaderElectionService {
PKIid := g.mcs.GetPKIidOfCert(g.peerIdentity)
Expand Down
6 changes: 4 additions & 2 deletions internal/peer/blocc/chaincode/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ import (
)

const (
bloccName = "bscc"
approveFuncName = "ApproveSensoryReading"
bloccName = "bscc"
approveFuncName = "ApproveSensoryReading"
simulateFuncName = "SimulateForkAttempt"
)

var logger = flogging.MustGetLogger("cli.blocc.chaincode")

// Cmd returns the cobra command for Chaincode
func Cmd(cryptoProvider bccsp.BCCSP) *cobra.Command {
chaincodeCmd.AddCommand(ApproveForThisPeerCmd(nil, cryptoProvider))
chaincodeCmd.AddCommand(SimulateForkAttemptCmd(nil, cryptoProvider))

logger.Debugf("bloccCmd: %v", chaincodeCmd)

Expand Down
Loading

0 comments on commit fbedc1e

Please sign in to comment.