Skip to content

Commit

Permalink
Fix anchor peers + update channel
Browse files Browse the repository at this point in the history
  • Loading branch information
dviejokfs committed Aug 15, 2022
1 parent 32b14c5 commit 1c89326
Showing 1 changed file with 104 additions and 97 deletions.
201 changes: 104 additions & 97 deletions store/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-config/configtx"
"github.com/hyperledger/fabric-config/configtx/membership"
Expand All @@ -30,8 +33,6 @@ import (
"github.com/lithammer/shortuuid/v3"
"github.com/pkg/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
"time"
)

func getResmgmtClient(sdk *fabsdk.FabricSDK, adminOrg appconfig.AdminOrg, configBackends []core.ConfigBackend, isTls bool) (*resmgmt.Client, context2.ClientProvider, msp.SigningIdentity, error) {
Expand Down Expand Up @@ -481,7 +482,12 @@ applicationUpdate:
if err != nil {
return nil, err
}
cfgBlock, err := resource.ExtractConfigFromBlock(channelBlock)
block, err := resClient.QueryConfigBlockFromOrderer(channelID)
if err != nil {
log.Infof("channel %s does not exist, it will be created", channelID)
channelExists = false
}
cfgBlock, err := resource.ExtractConfigFromBlock(block)
if err != nil {
return nil, errors.Wrapf(err, "failed to extract config from channel block")
}
Expand All @@ -495,115 +501,116 @@ applicationUpdate:
if !strings.Contains(err.Error(), "no differences detected between original and updated config") {
return nil, errors.Wrapf(err, "error calculating config update")
}
goto FINISH
}
channelConfigBytes, err := CreateConfigUpdateEnvelope(channelID, configUpdate)
if err != nil {
return nil, errors.Wrapf(err, "error creating config update envelope")
log.Infof("No differences detected between original and updated config")
} else {
channelConfigBytes, err := CreateConfigUpdateEnvelope(channelID, configUpdate)
if err != nil {
return nil, errors.Wrapf(err, "error creating config update envelope")
}
var configSignatures []*cb.ConfigSignature
for _, adminPeer := range channelConfig.PeerAdminOrgs {
configUpdateReader := bytes.NewReader(channelConfigBytes)
peerResClient, _, usr, err := getResmgmtClient(sdk, adminPeer, configBackends, false)
if err != nil {
return nil, err
}
signature, err := peerResClient.CreateConfigSignatureFromReader(usr, configUpdateReader)
if err != nil {
return nil, err
}
configSignatures = append(configSignatures, signature)
}
configUpdateReader := bytes.NewReader(channelConfigBytes)
saveChannelResponse, err := peerResClient.SaveChannel(
resmgmt.SaveChannelRequest{
ChannelID: channelID,
ChannelConfig: configUpdateReader,
SigningIdentities: []msp.SigningIdentity{},
},
resmgmt.WithConfigSignatures(configSignatures...),
)
if err != nil {
return nil, errors.Wrapf(err, "error saving application configuration")
}
log.Infof("Application configuration updated with transaction ID: %s", saveChannelResponse.TransactionID)
applicationTxId = string(saveChannelResponse.TransactionID)
}
configUpdateReader := bytes.NewReader(channelConfigBytes)
var configSignatures []*cb.ConfigSignature
for _, adminPeer := range channelConfig.PeerAdminOrgs {
configUpdateReader = bytes.NewReader(channelConfigBytes)
peerResClient, _, usr, err := getResmgmtClient(sdk, adminPeer, configBackends, false)
}
if channelExists && savePeer {
log.Infof("Updating anchor peers %s", channelID)
// add anchor peers
for _, peerOrg := range channelConfig.PeerOrgs {
resClient, _, _, err := getResmgmtClient(
sdk,
appconfig.AdminOrg{
MSPID: peerOrg.MSPID,
CA: peerOrg.SignCA,
TLSCA: "",
},
configBackends,
false,
)
if err != nil {
return nil, errors.Wrapf(err, "error getting resource management client for peer org %s", peerOrg.MSPID)
}
block, err := resClient.QueryConfigBlockFromOrderer(channelID)
if err != nil {
return nil, err
}
signature, err := peerResClient.CreateConfigSignatureFromReader(usr, configUpdateReader)
cfgBlock, err := resource.ExtractConfigFromBlock(block)
if err != nil {
return nil, err
}
configSignatures = append(configSignatures, signature)
}
configUpdateReader = bytes.NewReader(channelConfigBytes)
saveChannelResponse, err := peerResClient.SaveChannel(
resmgmt.SaveChannelRequest{
ChannelID: channelID,
ChannelConfig: configUpdateReader,
SigningIdentities: []msp.SigningIdentity{},
},
resmgmt.WithConfigSignatures(configSignatures...),
)
if err != nil {
return nil, errors.Wrapf(err, "error saving application configuration")
}
log.Infof("Application configuration updated with transaction ID: %s", saveChannelResponse.TransactionID)
applicationTxId = string(saveChannelResponse.TransactionID)
}
FINISH:

// add anchor peers
for _, peerOrg := range channelConfig.PeerOrgs {
resClient, _, _, err := getResmgmtClient(
sdk,
appconfig.AdminOrg{
MSPID: peerOrg.MSPID,
CA: peerOrg.SignCA,
TLSCA: "",
},
configBackends,
false,
)
if err != nil {
return nil, errors.Wrapf(err, "error getting resource management client for peer org %s", peerOrg.MSPID)
}
block, err := resClient.QueryConfigBlockFromOrderer(channelID)
if err != nil {
return nil, err
}
cfgBlock, err := resource.ExtractConfigFromBlock(block)
if err != nil {
return nil, err
}
cftxGen := configtx.New(cfgBlock)
app := cftxGen.Application().Organization(peerOrg.MSPID)
for _, peer := range peerOrg.AnchorPeers {
dc := dcs[peer.DC]
ord, err := dc.HLFClient.HlfV1alpha1().FabricPeers(peer.Namespace).Get(ctx, peer.Name, v1.GetOptions{})
cftxGen := configtx.New(cfgBlock)
app := cftxGen.Application().Organization(peerOrg.MSPID)
for _, peer := range peerOrg.AnchorPeers {
dc := dcs[peer.DC]
ord, err := dc.HLFClient.HlfV1alpha1().FabricPeers(peer.Namespace).Get(ctx, peer.Name, v1.GetOptions{})
if err != nil {
log.Errorf("failed to get ord %v", err)
return nil, err
}
peerPort := ord.Spec.Istio.Port
peerHostName := ord.Spec.Istio.Hosts[0]
err = app.AddAnchorPeer(configtx.Address{
Host: peerHostName,
Port: peerPort,
})
if err != nil {
return nil, err
}
}
anchorPeers, err := app.AnchorPeers()
if err != nil {
return nil, errors.Wrapf(err, "error getting the anchor peers for org %s", peerOrg.MSPID)
}
log.Infof("anchorPeers total: %v", anchorPeers)
configUpdateBytes, err := cftxGen.ComputeMarshaledUpdate(channelID)
if err != nil {
if !strings.Contains(err.Error(), "no differences detected between original and updated config") {
return nil, errors.Wrapf(err, "error calculating config update")
}
goto endSyncChannel
}
configUpdate := &cb.ConfigUpdate{}
err = proto.Unmarshal(configUpdateBytes, configUpdate)
if err != nil {
log.Errorf("failed to get ord %v", err)
return nil, err
}
peerPort := ord.Spec.Istio.Port
peerHostName := ord.Spec.Istio.Hosts[0]
err = app.AddAnchorPeer(configtx.Address{
Host: peerHostName,
Port: peerPort,
})
channelConfigBytes, err := CreateConfigUpdateEnvelope(channelID, configUpdate)
if err != nil {
return nil, err
}
}
anchorPeers, err := app.AnchorPeers()
if err != nil {
return nil, errors.Wrapf(err, "error getting the anchor peers for org %s", peerOrg.MSPID)
}
log.Infof("anchorPeers total: %v", anchorPeers)
configUpdateBytes, err := cftxGen.ComputeMarshaledUpdate(channelID)
if err != nil {
if !strings.Contains(err.Error(), "no differences detected between original and updated config") {
return nil, errors.Wrapf(err, "error calculating config update")
configUpdateReader := bytes.NewReader(channelConfigBytes)
chResponse, err := resClient.SaveChannel(resmgmt.SaveChannelRequest{
ChannelID: channelID,
ChannelConfig: configUpdateReader,
})
if err != nil {
return nil, errors.Wrapf(err, "error saving anchor peers for org %s", peerOrg.MSPID)
}
goto endSyncChannel
}
configUpdate := &cb.ConfigUpdate{}
err = proto.Unmarshal(configUpdateBytes, configUpdate)
if err != nil {
return nil, err
}
channelConfigBytes, err := CreateConfigUpdateEnvelope(channelID, configUpdate)
if err != nil {
return nil, err
}
configUpdateReader := bytes.NewReader(channelConfigBytes)
chResponse, err := resClient.SaveChannel(resmgmt.SaveChannelRequest{
ChannelID: channelID,
ChannelConfig: configUpdateReader,
})
if err != nil {
return nil, err
log.Infof("anchor anchorPeers added: %s", chResponse.TransactionID)
}
log.Infof("anchor anchorPeers added: %s", chResponse.TransactionID)
}
endSyncChannel:
return &SyncChannelResponse{
Expand Down

0 comments on commit 1c89326

Please sign in to comment.