From 1c893265c76693c38826bcbd6530c9ed04276b51 Mon Sep 17 00:00:00 2001 From: dviejokfs Date: Mon, 15 Aug 2022 14:25:21 +0200 Subject: [PATCH] Fix anchor peers + update channel --- store/channel/channel.go | 201 ++++++++++++++++++++------------------- 1 file changed, 104 insertions(+), 97 deletions(-) diff --git a/store/channel/channel.go b/store/channel/channel.go index 8efb841..ecf1d2a 100644 --- a/store/channel/channel.go +++ b/store/channel/channel.go @@ -7,6 +7,9 @@ import ( "crypto/x509" "encoding/json" "fmt" + "strings" + "time" + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-config/configtx" "github.com/hyperledger/fabric-config/configtx/membership" @@ -30,8 +33,6 @@ import ( "github.com/lithammer/shortuuid/v3" "github.com/pkg/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "strings" - "time" ) func getResmgmtClient(sdk *fabsdk.FabricSDK, adminOrg appconfig.AdminOrg, configBackends []core.ConfigBackend, isTls bool) (*resmgmt.Client, context2.ClientProvider, msp.SigningIdentity, error) { @@ -481,7 +482,12 @@ applicationUpdate: if err != nil { return nil, err } - cfgBlock, err := resource.ExtractConfigFromBlock(channelBlock) + block, err := resClient.QueryConfigBlockFromOrderer(channelID) + if err != nil { + log.Infof("channel %s does not exist, it will be created", channelID) + channelExists = false + } + cfgBlock, err := resource.ExtractConfigFromBlock(block) if err != nil { return nil, errors.Wrapf(err, "failed to extract config from channel block") } @@ -495,115 +501,116 @@ applicationUpdate: if !strings.Contains(err.Error(), "no differences detected between original and updated config") { return nil, errors.Wrapf(err, "error calculating config update") } - goto FINISH - } - channelConfigBytes, err := CreateConfigUpdateEnvelope(channelID, configUpdate) - if err != nil { - return nil, errors.Wrapf(err, "error creating config update envelope") + log.Infof("No differences detected between original and updated config") + } else { + channelConfigBytes, err := CreateConfigUpdateEnvelope(channelID, configUpdate) + if err != nil { + return nil, errors.Wrapf(err, "error creating config update envelope") + } + var configSignatures []*cb.ConfigSignature + for _, adminPeer := range channelConfig.PeerAdminOrgs { + configUpdateReader := bytes.NewReader(channelConfigBytes) + peerResClient, _, usr, err := getResmgmtClient(sdk, adminPeer, configBackends, false) + if err != nil { + return nil, err + } + signature, err := peerResClient.CreateConfigSignatureFromReader(usr, configUpdateReader) + if err != nil { + return nil, err + } + configSignatures = append(configSignatures, signature) + } + configUpdateReader := bytes.NewReader(channelConfigBytes) + saveChannelResponse, err := peerResClient.SaveChannel( + resmgmt.SaveChannelRequest{ + ChannelID: channelID, + ChannelConfig: configUpdateReader, + SigningIdentities: []msp.SigningIdentity{}, + }, + resmgmt.WithConfigSignatures(configSignatures...), + ) + if err != nil { + return nil, errors.Wrapf(err, "error saving application configuration") + } + log.Infof("Application configuration updated with transaction ID: %s", saveChannelResponse.TransactionID) + applicationTxId = string(saveChannelResponse.TransactionID) } - configUpdateReader := bytes.NewReader(channelConfigBytes) - var configSignatures []*cb.ConfigSignature - for _, adminPeer := range channelConfig.PeerAdminOrgs { - configUpdateReader = bytes.NewReader(channelConfigBytes) - peerResClient, _, usr, err := getResmgmtClient(sdk, adminPeer, configBackends, false) + } + if channelExists && savePeer { + log.Infof("Updating anchor peers %s", channelID) + // add anchor peers + for _, peerOrg := range channelConfig.PeerOrgs { + resClient, _, _, err := getResmgmtClient( + sdk, + appconfig.AdminOrg{ + MSPID: peerOrg.MSPID, + CA: peerOrg.SignCA, + TLSCA: "", + }, + configBackends, + false, + ) + if err != nil { + return nil, errors.Wrapf(err, "error getting resource management client for peer org %s", peerOrg.MSPID) + } + block, err := resClient.QueryConfigBlockFromOrderer(channelID) if err != nil { return nil, err } - signature, err := peerResClient.CreateConfigSignatureFromReader(usr, configUpdateReader) + cfgBlock, err := resource.ExtractConfigFromBlock(block) if err != nil { return nil, err } - configSignatures = append(configSignatures, signature) - } - configUpdateReader = bytes.NewReader(channelConfigBytes) - saveChannelResponse, err := peerResClient.SaveChannel( - resmgmt.SaveChannelRequest{ - ChannelID: channelID, - ChannelConfig: configUpdateReader, - SigningIdentities: []msp.SigningIdentity{}, - }, - resmgmt.WithConfigSignatures(configSignatures...), - ) - if err != nil { - return nil, errors.Wrapf(err, "error saving application configuration") - } - log.Infof("Application configuration updated with transaction ID: %s", saveChannelResponse.TransactionID) - applicationTxId = string(saveChannelResponse.TransactionID) - } -FINISH: - - // add anchor peers - for _, peerOrg := range channelConfig.PeerOrgs { - resClient, _, _, err := getResmgmtClient( - sdk, - appconfig.AdminOrg{ - MSPID: peerOrg.MSPID, - CA: peerOrg.SignCA, - TLSCA: "", - }, - configBackends, - false, - ) - if err != nil { - return nil, errors.Wrapf(err, "error getting resource management client for peer org %s", peerOrg.MSPID) - } - block, err := resClient.QueryConfigBlockFromOrderer(channelID) - if err != nil { - return nil, err - } - cfgBlock, err := resource.ExtractConfigFromBlock(block) - if err != nil { - return nil, err - } - cftxGen := configtx.New(cfgBlock) - app := cftxGen.Application().Organization(peerOrg.MSPID) - for _, peer := range peerOrg.AnchorPeers { - dc := dcs[peer.DC] - ord, err := dc.HLFClient.HlfV1alpha1().FabricPeers(peer.Namespace).Get(ctx, peer.Name, v1.GetOptions{}) + cftxGen := configtx.New(cfgBlock) + app := cftxGen.Application().Organization(peerOrg.MSPID) + for _, peer := range peerOrg.AnchorPeers { + dc := dcs[peer.DC] + ord, err := dc.HLFClient.HlfV1alpha1().FabricPeers(peer.Namespace).Get(ctx, peer.Name, v1.GetOptions{}) + if err != nil { + log.Errorf("failed to get ord %v", err) + return nil, err + } + peerPort := ord.Spec.Istio.Port + peerHostName := ord.Spec.Istio.Hosts[0] + err = app.AddAnchorPeer(configtx.Address{ + Host: peerHostName, + Port: peerPort, + }) + if err != nil { + return nil, err + } + } + anchorPeers, err := app.AnchorPeers() + if err != nil { + return nil, errors.Wrapf(err, "error getting the anchor peers for org %s", peerOrg.MSPID) + } + log.Infof("anchorPeers total: %v", anchorPeers) + configUpdateBytes, err := cftxGen.ComputeMarshaledUpdate(channelID) + if err != nil { + if !strings.Contains(err.Error(), "no differences detected between original and updated config") { + return nil, errors.Wrapf(err, "error calculating config update") + } + goto endSyncChannel + } + configUpdate := &cb.ConfigUpdate{} + err = proto.Unmarshal(configUpdateBytes, configUpdate) if err != nil { - log.Errorf("failed to get ord %v", err) return nil, err } - peerPort := ord.Spec.Istio.Port - peerHostName := ord.Spec.Istio.Hosts[0] - err = app.AddAnchorPeer(configtx.Address{ - Host: peerHostName, - Port: peerPort, - }) + channelConfigBytes, err := CreateConfigUpdateEnvelope(channelID, configUpdate) if err != nil { return nil, err } - } - anchorPeers, err := app.AnchorPeers() - if err != nil { - return nil, errors.Wrapf(err, "error getting the anchor peers for org %s", peerOrg.MSPID) - } - log.Infof("anchorPeers total: %v", anchorPeers) - configUpdateBytes, err := cftxGen.ComputeMarshaledUpdate(channelID) - if err != nil { - if !strings.Contains(err.Error(), "no differences detected between original and updated config") { - return nil, errors.Wrapf(err, "error calculating config update") + configUpdateReader := bytes.NewReader(channelConfigBytes) + chResponse, err := resClient.SaveChannel(resmgmt.SaveChannelRequest{ + ChannelID: channelID, + ChannelConfig: configUpdateReader, + }) + if err != nil { + return nil, errors.Wrapf(err, "error saving anchor peers for org %s", peerOrg.MSPID) } - goto endSyncChannel - } - configUpdate := &cb.ConfigUpdate{} - err = proto.Unmarshal(configUpdateBytes, configUpdate) - if err != nil { - return nil, err - } - channelConfigBytes, err := CreateConfigUpdateEnvelope(channelID, configUpdate) - if err != nil { - return nil, err - } - configUpdateReader := bytes.NewReader(channelConfigBytes) - chResponse, err := resClient.SaveChannel(resmgmt.SaveChannelRequest{ - ChannelID: channelID, - ChannelConfig: configUpdateReader, - }) - if err != nil { - return nil, err + log.Infof("anchor anchorPeers added: %s", chResponse.TransactionID) } - log.Infof("anchor anchorPeers added: %s", chResponse.TransactionID) } endSyncChannel: return &SyncChannelResponse{