黑马程序员技术交流社区

标题: 【上海校区】Fabric1.4源码解析:客户端创建通道过程 [打印本页]

作者: 梦缠绕的时候    时间: 2019-7-11 16:12
标题: 【上海校区】Fabric1.4源码解析:客户端创建通道过程
在使用Fabric创建通道的时候,通常我们执行一条命令完成,这篇文章就解析一下执行这条命令后Fabric源码中执行的流程。
peer channel create -o orderer.example.com:7050 -c mychannel -f ./channel-artifacts/channel.tx --tls true --cafile $ORDERER_CA
整个流程的切入点在fabric/peer/main.go文件中的main()方法 (本文中使用的是Fabric1.4版本,不同版本中内容可能不同)。这个方法中也定义了Peer节点可以执行的命令,有关于版本的:version.Cmd(),关于节点状态的:node.Cmd(),关于链码的:chaincode.Cmd(nil),关于客户端日志的:clilogging.Cmd(nil),最后一个就是关于通道的:channel.Cmd(nil)。所以我们就从这里入手,看一下创建通道的整体流程是什么样的。
点进行后,转到了peer/channel/channel.go文件中第49行,其中定义了Peer节点可以执行的对通道进行操作的相关命令:
func Cmd(cf *ChannelCmdFactory) *cobra.Command {    AddFlags(channelCmd)        #创建通道    channelCmd.AddCommand(createCmd(cf))      #从通道获取区块    channelCmd.AddCommand(fetchCmd(cf))    #加入通道    channelCmd.AddCommand(joinCmd(cf))    #列出当前节点所加入的通道    channelCmd.AddCommand(listCmd(cf))    #签名并更新通道配置信息    channelCmd.AddCommand(updateCmd(cf))    #只对通道配置信息进行签名    channelCmd.AddCommand(signconfigtxCmd(cf))    #获取通道信息    channelCmd.AddCommand(getinfoCmd(cf))    return channelCmd}
具体的Peer节点命令使用方法可以参考Fabric官方文档,这里不在一一解释。
我们看一下createCmd(cf)方法,该方法转到了peer/channel/create.go文件中第51行,看文件名字就知道和创建通道相关了。
func createCmd(cf *ChannelCmdFactory) *cobra.Command {    createCmd := &cobra.Command{        Use:   "create",   #使用create关键词创建通道        Short: "Create a channel",          Long:  "Create a channel and write the genesis block to a file.",   #创建通道并将创世区块写入文件        RunE: func(cmd *cobra.Command, args []string) error {            #这一行命令就是对通道进行创建了,点进行看一下            return create(cmd, args, cf)        },    }...}
create(cmd, args, cf)方法在本文件中第227行:
func create(cmd *cobra.Command, args []string, cf *ChannelCmdFactory) error {    // the global chainID filled by the "-c" command    #官方注释用-c来表明通道ID    if channelID == common.UndefinedParamValue {        #UndefinedParamValue="",如果通道ID等于空        return errors.New("must supply channel ID")    }    // Parsing of the command line is done so silence cmd usage    cmd.SilenceUsage = true    var err error    if cf == nil {        #如果ChannelCmdFactory为空,则初始化一个        cf, err = InitCmdFactory(EndorserNotRequired, PeerDeliverNotRequired, OrdererRequired)        if err != nil {            return err        }    }    #最后将ChannelCmdFactory传入该方法,进行通道的创建    return executeCreate(cf)}
首先看一下InitCmdFactory()做了哪些工作,在peer/channel/channel.go文件中第126行:
func InitCmdFactory(isEndorserRequired, isPeerDeliverRequired, isOrdererRequired bool) (*ChannelCmdFactory, error) {    #这里的意思就是只能有一个交付源,要么是Peer要么是Orderer    if isPeerDeliverRequired && isOrdererRequired {        return nil, errors.New("ERROR - only a single deliver source is currently supported")    }    var err error    #初始化ChannelCmdFactory,看一下该结构体的内容    cf := &ChannelCmdFactory{}
直接拿到这里来好了:
type ChannelCmdFactory struct {    #用于背书的客户端    EndorserClient   pb.EndorserClient    #签名者    Signer           msp.SigningIdentity    #用于广播的客户端    BroadcastClient  common.BroadcastClient    #用于交付的客户端    DeliverClient    deliverClientIntf    #创建用于广播的客户端的工厂    BroadcastFactory BroadcastClientFactory}
再往下看:
    #获取默认的签名者,通常是Peer节点    cf.Signer, err = common.GetDefaultSignerFnc()    if err != nil {        return nil, errors.WithMessage(err, "error getting default signer")    }    cf.BroadcastFactory = func() (common.BroadcastClient, error) {        #根据ChannelCmdFactory结构体中的BroadcastFactory获取BroadcastClient        return common.GetBroadcastClientFnc()    }    // for join and list, we need the endorser as well    #我们这里是完成对通道的创建,所以只使用了最后一个isOrdererRequired    if isEndorserRequired {        #创建一个用于背书的客户端        cf.EndorserClient, err = common.GetEndorserClientFnc(common.UndefinedParamValue, common.UndefinedParamValue)        if err != nil {            return nil, errors.WithMessage(err, "error getting endorser client for channel")        }    }    // for fetching blocks from a peer    if isPeerDeliverRequired {        #从Peer节点创建一个用于交付的客户端        cf.DeliverClient, err = common.NewDeliverClientForPeer(channelID, bestEffort)        if err != nil {            return nil, errors.WithMessage(err, "error getting deliver client for channel")        }    }    // for create and fetch, we need the orderer as well    if isOrdererRequired {        if len(strings.Split(common.OrderingEndpoint, ":")) != 2 {            return nil, errors.Errorf("ordering service endpoint %s is not valid or missing", common.OrderingEndpoint)        }        #从Order节点创建一个一个用于交付的客户端        cf.DeliverClient, err = common.NewDeliverClientForOrderer(channelID, bestEffort)        if err != nil {            return nil, err        }    }    logger.Infof("Endorser and orderer connections initialized")    return cf, nil}
返回create()方法:
#到了最后一行代码,传入之前创建的ChannelCmdFactory,开始进行通道的创建return executeCreate(cf)
该方法在peer/channel/create.go文件中的第174行:
#方法比较清晰,一共完成了以下几个步骤func executeCreate(cf *ChannelCmdFactory) error {    #发送创建通道的Transaction到Order节点    err := sendCreateChainTransaction(cf)    if err != nil {        return err    }    #获取该通道内的创世区块(该过程在Order节点共识完成之后)    block, err := getGenesisBlock(cf)    if err != nil {        return err    }    #序列化区块信息    b, err := proto.Marshal(block)    if err != nil {        return err    }    file := channelID + ".block"    if outputBlock != common.UndefinedParamValue {        file = outputBlock    }    #将区块信息写入本地文件中    err = ioutil.WriteFile(file, b, 0644)    if err != nil {        return err    }    return nil}1.Peer节点创建用于创建通道的Envelope文件
首先我们看一下sendCreateChainTransaction()这个方法,又回到了peer/channel/create.go文件中,在第144行:
func sendCreateChainTransaction(cf *ChannelCmdFactory) error {    var err error    #定义了一个Envelope结构体    var chCrtEnv *cb.Envelope
Envelope结构体:
type Envelope struct {    #主要就是保存被序列化的有效载荷    Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`    #由创建者进行的签名信息    Signature            []byte   `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`    XXX_NoUnkeyedLiteral struct{} `json:"-"`    XXX_unrecognized     []byte   `json:"-"`    XXX_sizecache        int32    `json:"-"`}
回到sendCreateChainTransaction()这个方法,继续往下:
    if channelTxFile != "" {        #如果指定了channelTxFile,则使用指定的文件创建通道,这个方法很简单,从文件中读取数据,反序列化后返回chCrtEnv.对于我们启动Fabric网络之前曾创建过一个channel.tx文件,指的就是这个        if chCrtEnv, err = createChannelFromConfigTx(channelTxFile); err != nil {            return err        }    } else {        #如果没有指定,则使用默认的配置创建通道,看一下这个方法,在71行        if chCrtEnv, err = createChannelFromDefaults(cf); err != nil {            return err        }    }-------------------------------createChannelFromDefaults()-------func createChannelFromDefaults(cf *ChannelCmdFactory) (*cb.Envelope, error) {    #主要就这一个方法,点进去    chCrtEnv, err := encoder.MakeChannelCreationTransaction(channelID, localsigner.NewSigner(), genesisconfig.Load(genesisconfig.SampleSingleMSPChannelProfile))    if err != nil {        return nil, err    }    return chCrtEnv, nil}
MakeChannelCreationTransaction()方法传入了通道的ID,并创建了一个签名者,以及默认的配置文件,方法在common/tools/configtxgen/encoder/encoder.go文件中第502行:
func MakeChannelCreationTransaction(channelID string, signer crypto.LocalSigner, conf *genesisconfig.Profile) (*cb.Envelope, error) {    #从名字可以看到是使用了默认的配置模板,对各种策略进行配置,里面就不再细看了    template, err := DefaultConfigTemplate(conf)    if err != nil {        return nil, errors.WithMessage(err, "could not generate default config template")    }    #看一下这个方法,从模板中创建一个用于创建通道的Transaction    return MakeChannelCreationTransactionFromTemplate(channelID, signer, conf, template)}
MakeChannelCreationTransactionFromTemplate()方法在第530行:
func MakeChannelCreationTransactionFromTemplate(channelID string, signer crypto.LocalSigner, conf *genesisconfig.Profile, template *cb.ConfigGroup) (*cb.Envelope, error) {    newChannelConfigUpdate, err := NewChannelCreateConfigUpdate(channelID, conf, template)    ...    #创建一个用于配置更新的结构体    newConfigUpdateEnv := &cb.ConfigUpdateEnvelope{        ConfigUpdate: utils.MarshalOrPanic(newChannelConfigUpdate),    }    if signer != nil {        #如果签名者不为空,创建签名Header        sigHeader, err := signer.NewSignatureHeader()        ...        newConfigUpdateEnv.Signatures = []*cb.ConfigSignature{{            SignatureHeader: utils.MarshalOrPanic(sigHeader),        }}        ...        #进行签名        newConfigUpdateEnv.Signatures[0].Signature, err = signer.Sign(util.ConcatenateBytes(newConfigUpdateEnv.Signatures[0].SignatureHeader, newConfigUpdateEnv.ConfigUpdate))        ...    }    #创建被签名的Envelope,然后一直返回到最外面的方法    return utils.CreateSignedEnvelope(cb.HeaderType_CONFIG_UPDATE, channelID, signer, newConfigUpdateEnv, msgVersion, epoch)}
到这里,用于创建通道的Envelope已经创建好了,sendCreateChainTransaction()继续往下看:
...    #该方法主要是对刚刚创建的Envelope进行验证    if chCrtEnv, err = sanityCheckAndSignConfigTx(chCrtEnv); err != nil {        return err    }    var broadcastClient common.BroadcastClient    #验证完成后,创建一个用于广播信息的客户端    broadcastClient, err = cf.BroadcastFactory()    if err != nil {        return errors.WithMessage(err, "error getting broadcast client")    }    defer broadcastClient.Close()    #将创建通道的Envelope信息广播出去    err = broadcastClient.Send(chCrtEnv)    return err}
到这里,sendCreateChainTransaction()方法结束了,总结一下该方法所做的工作:
2 Order节点对Envelope文件处理
至于获取创世区块以及将文件保存到本地不再说明,接下来我们看一下Peer节点将创建通道的Envelope广播出去后,Order节点做了什么。
方法在/order/common/server/server.go中第141行:
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {    ...    #主要在这一行代码,Handle方法对接收到的信息进行处理    return s.bh.Handle(&broadcastMsgTracer{        AtomicBroadcast_BroadcastServer: srv,        msgTracer: msgTracer{            debug:    s.debug,            function: "Broadcast",        },    })}
对于Handler()方法,在/order/common/broadcast/broadcast.go文件中第66行:
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {    #首先获取消息的源地址    addr := util.ExtractRemoteAddress(srv.Context())    ...    for {        #接收消息        msg, err := srv.Recv()        ...        #处理接收到的消息,我们看一下这个方法        resp := bh.ProcessMessage(msg, addr)        #最后将响应信息广播出去        err = srv.Send(resp)        ...    }}
ProcessMessage(msg, addr)方法的传入参数为接收到的消息以及消息的源地址,该方法比较重要,是Order节点对消息进行处理的主方法。在第136行:
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {    #这个结构体应该理解为记录器,记录消息的相关信息    tracker := &MetricsTracker{        ChannelID: "unknown",        TxType:    "unknown",        Metrics:   bh.Metrics,    }    defer func() {        // This looks a little unnecessary, but if done directly as        // a defer, resp gets the (always nil) current state of resp        // and not the return value        tracker.Record(resp)    }()    #记录处理消息的开始时间    tracker.BeginValidate()    #该方法获取接收到的消息的Header,并判断是否为配置信息    chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)    ...    #由于之前Peer节点发送的为创建通道的信息,所以消息类型为配置信息    if !isConfig {        ...        #而对于普通的交易信息的处理方法这里就不再看了,主要关注于创建通道的消息的处理    } else { // isConfig        logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)        #到了这里,对配置更新消息进行处理,主要方法,点进行看一下        config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
ProcessConfigUpdateMsg(msg)方法在orderer/common/msgprocessor/systemchannel.go文件中第84行:
#这个地方有些不懂,为什么会调用systemchannel.ProcessConfigUpdateMsg()而不是standardchannel.ProcessConfigUpdateMsg()方法?是因为这个结构体的原因?===========================SystemChannel=======================type SystemChannel struct {    *StandardChannel    templator ChannelConfigTemplator}===========================SystemChannel=======================func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {    #首先从消息体中获取通道ID    channelID, err := utils.ChannelID(envConfigUpdate)    ...    #判断获取到的通道ID是否为已经存在的用户通道ID,如果是的话转到StandardChannel中的ProcessConfigUpdateMsg()方法进行处理    if channelID == s.support.ChainID() {        return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)    }    ...    #由于之前由Peer节点发送的为创建通道的Tx,所以对于通道ID是不存在的,因此到了这个方法,点进行看一下    bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
NewChannelConfig()方法在第215行,比较重要的方法,完成通道的配置:
func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {    #首先反序列化有效载荷    configUpdatePayload, err := utils.UnmarshalPayload(envConfigUpdate.Payload)    ...    #反序列化配置更新信息Envelope    configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)s    ...    #获取通道头信息    channelHeader, err := utils.UnmarshalChannelHeader(configUpdatePayload.Header.ChannelHeader)    ...    #反序列化配置更新信息    configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)    ...    #以下具体的不再说了,就是根据之前定义的各项策略对通道进行配置,具体的策略可以看configtx.yaml文件    consortiumConfigValue, ok := configUpdate.WriteSet.Values[channelconfig.ConsortiumKey]    ...    consortium := &cb.Consortium{}    err = proto.Unmarshal(consortiumConfigValue.Value, consortium)    ...    applicationGroup := cb.NewConfigGroup()    consortiumsConfig, ok := dt.support.ConsortiumsConfig()    ...    consortiumConf, ok := consortiumsConfig.Consortiums()[consortium.Name]    ...    applicationGroup.Policies[channelconfig.ChannelCreationPolicyKey] = &cb.ConfigPolicy{        Policy: consortiumConf.ChannelCreationPolicy(),    }    applicationGroup.ModPolicy = channelconfig.ChannelCreationPolicyKey    #获取当前系统通道配置信息    systemChannelGroup := dt.support.ConfigtxValidator().ConfigProto().ChannelGroup    if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 &&        len(configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups) == 0 {        return nil, fmt.Errorf("Proposed configuration has no application group members, but consortium contains members")    }    if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 {        for orgName := range configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups {            consortiumGroup, ok := systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups[orgName]            if !ok {                return nil, fmt.Errorf("Attempted to include a member which is not in the consortium")            }            applicationGroup.Groups[orgName] = proto.Clone(consortiumGroup).(*cb.ConfigGroup)        }    }    channelGroup := cb.NewConfigGroup()    #将系统通道配置信息复制    for key, value := range systemChannelGroup.Values {        channelGroup.Values[key] = proto.Clone(value).(*cb.ConfigValue)        if key == channelconfig.ConsortiumKey {            // Do not set the consortium name, we do this later            continue        }    }    for key, policy := range systemChannelGroup.Policies {        channelGroup.Policies[key] = proto.Clone(policy).(*cb.ConfigPolicy)    }    #新的配置信息中Order组配置使用系统通道的配置,同时将定义的application组配置赋值到新的配置信息    channelGroup.Groups[channelconfig.OrdererGroupKey] = proto.Clone(systemChannelGroup.Groups[channelconfig.OrdererGroupKey]).(*cb.ConfigGroup)    channelGroup.Groups[channelconfig.ApplicationGroupKey] = applicationGroup    channelGroup.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{        Value:     utils.MarshalOrPanic(channelconfig.ConsortiumValue(consortium.Name).Value()),        ModPolicy: channelconfig.AdminsPolicyKey,    }    if oc, ok := dt.support.OrdererConfig(); ok && oc.Capabilities().PredictableChannelTemplate() {        channelGroup.ModPolicy = systemChannelGroup.ModPolicy        zeroVersions(channelGroup)    }    #将创建的新的配置打包为Bundle    bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{        ChannelGroup: channelGroup,    })    ...    return bundle, nil}
接下来我们回到ProcessConfigUpdateMsg()方法:
    ...    #创建一个配置验证器对该方法的传入参数进行验证操作    newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)    ...    #创建一个签名的Envelope,此次为Header类型为HeaderType_CONFIG进行签名    newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)    #创建一个签名的Transaction,此次为Header类型为HeaderType_ORDERER_TRANSACTION进行签名    wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)    ...    #过滤器进行过滤,主要检查是否创建的Transaction过大,以及签名检查,确保Order节点使用正确的证书进行签名    err = s.StandardChannel.filters.Apply(wrappedOrdererTransaction)    ...    #将Transaction返回     return wrappedOrdererTransaction, s.support.Sequence(), nil}
到这里,消息处理完毕,返回到ProcessMessage()方法:
    config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)    ...    #记录消息处理完毕时间    tracker.EndValidate()    #开始进行入队操作    tracker.BeginEnqueue()    #waitReady()是一个阻塞方法,等待入队完成或出现异常    if err = processor.WaitReady(); err != nil {    logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)    return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}    }    #共识方法,具体看定义的Fabric网络使用了哪种共识    err = processor.Configure(config, configSeq)    ...    #最后返回操作成功的响应    return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}}
到这里,由客户端发送的创建通道的Transaction就结束了。总共分为两个部分,一个是Peer节点对创建通道的Envelope进行创建的过程,一个是Order节点接收到该Envelope进行配置的过程,最后总结一下整体流程:
Peer节点方:
Order节点方:


作者: 梦缠绕的时候    时间: 2019-7-11 16:13
有任何问题欢迎在评论区留言
作者: 梦缠绕的时候    时间: 2019-7-11 16:13
或者添加学姐微信
DKA-2018
作者: 晨大喵    时间: 2019-7-17 11:45
感谢分享~~




欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2