A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

1.网络初始化的过程中执行以下内容,在创建节点Engine过程中该节点作为客户端的身份连接到其他Peer

peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)

2.返回一个使用提供的handler工厂函数,在新的聊天服务调用上创建新的handler句柄的Peer

func NewPeerWithEngine(secHelperFunc func() crypto.Peer, engFactory EngineFactory) (peer *Impl, err error) {    peer = new(Impl)    peerNodes := peer.initDiscovery()    peer.handlerMap = &handlerMap{m: make(map[pb.PeerID]MessageHandler)}    peer.isValidator = ValidatorEnabled()    peer.secHelper = secHelperFunc()    // 为peer安装安全对象    if SecurityEnabled() {        if peer.secHelper == nil {            return nil, fmt.Errorf("Security helper not provided")        }    }    // 在引擎之前初始化账本,因为共识可能希望立即开始询问账本    ledgerPtr, err := ledger.GetLedger()    if err != nil {        return nil, fmt.Errorf("Error constructing NewPeerWithHandler: %s", err)    }    peer.ledgerWrapper = &ledgerWrapper{ledger: ledgerPtr}    peer.engine, err = engFactory(peer)    if err != nil {        return nil, err    }    peer.handlerFactory = peer.engine.GetHandlerFactory()    if peer.handlerFactory == nil {        return nil, errors.New("Cannot supply nil handler factory")    }    peer.chatWithSomePeers(peerNodes)    return peer, nil}

2.1.NewPeerWithHandler返回一个使用提供的handler工厂函数在新的聊天服务调用上创建新的handlers句柄的Peer,

func NewPeerWithHandler(secHelperFunc func() crypto.Peer, handlerFact HandlerFactory) (*Impl, error) {    peer := new(Impl)    peerNodes := peer.initDiscovery()    if handlerFact == nil {        return nil, errors.New("Cannot supply nil handler factory")    }    peer.handlerFactory = handlerFact    peer.handlerMap = &handlerMap{m: make(map[pb.PeerID]MessageHandler)}    peer.secHelper = secHelperFunc()    // 为peer安装安全对象    if SecurityEnabled() {        if peer.secHelper == nil {            return nil, fmt.Errorf("Security helper not provided")        }    }    ledgerPtr, err := ledger.GetLedger()    if err != nil {        return nil, fmt.Errorf("Error constructing NewPeerWithHandler: %s", err)    }    peer.ledgerWrapper = &ledgerWrapper{ledger: ledgerPtr}    peer.chatWithSomePeers(peerNodes)    return peer, nil}

2.2.Discovery是合并引导peer成员选择和对于NVP节点的VP节点选择接口

type Discovery interface {    AddNode(string) bool           // 添加一个地址到discovery列表    RemoveNode(string) bool        // 从discovery列表中移出一个地址    GetAllNodes() []string         // 返回这个节点维护的所有地址    GetRandomNodes(n int) []string // 返回这个peer连接的随机地址    FindNode(string) bool          // 在discovery列表中找一个节点}// DiscoveryImpl是一个Discovery的实现type DiscoveryImpl struct {    sync.RWMutex    nodes  map[string]bool    seq    []string    random *rand.Rand}// NewDiscoveryImpl 是一个Discovery实现的构造器func NewDiscoveryImpl() *DiscoveryImpl {    di := DiscoveryImpl{}    di.nodes = make(map[string]bool)    di.random = rand.New(rand.NewSource(time.Now().Unix()))    return &di}// AddNode添加一个地址到discovery列表func (di *DiscoveryImpl) AddNode(address string) bool {    di.Lock()    defer di.Unlock()    if _, ok := di.nodes[address]; !ok {        di.seq = append(di.seq, address)        di.nodes[address] = true    }    return di.nodes[address]}// RemoveNode从discovery列表中移出一个地址func (di *DiscoveryImpl) RemoveNode(address string) bool {    di.Lock()    defer di.Unlock()    if _, ok := di.nodes[address]; ok {        di.nodes[address] = false        return true    }    return false}// GetAllNodes返回这个节点维护的所有地址func (di *DiscoveryImpl) GetAllNodes() []string {    di.RLock()    defer di.RUnlock()    var addresses []string    for address, valid := range di.nodes {        if valid {            addresses = append(addresses, address) // TODO Expensive, don't quite like it        }    }    return addresses}// GetRandomNodes 返回n个随机节点func (di *DiscoveryImpl) GetRandomNodes(n int) []string {    var pick string    randomNodes := make([]string, n)    di.RLock()    defer di.RUnlock()    for i := 0; i < n; i++ {        for {            pick = di.seq[di.random.Intn(len(di.nodes))]            if di.nodes[pick] && !inArray(pick, randomNodes) {                break            }        }        randomNodes = pick    }    return randomNodes}// FindNode 如果地址存储在discovery列表中就返回truefunc (di *DiscoveryImpl) FindNode(address string) bool {    di.RLock()    defer di.RUnlock()    _, ok := di.nodes[address]    return ok}func inArray(element string, array []string) bool {    for _, val := range array {        if val == element {            return true        }    }    return false}

2.3.Peer是一个能够验证transactions的实体

type Peer interface {    Node    // GetID返回一个peer的验证标识符    GetID() []byte    // GetEnrollmentID返回这个peer的登记id    GetEnrollmentID() string    // Sign 如果没有错误出现的话用这个验证器签名密钥签名消息和到处签名    Sign(msg []byte) ([]byte, error)    // Verify 检查签名是否是vkID的验证密钥下的消息的有效签名    // 如果检查成功,Verify返回空意味着没有错误出现    // 如果vkID是空,然后针对该验证者的验证密钥验证签名.    Verify(vkID, signature, message []byte) error}

3.从discovery列表中加载地址之前保存到磁盘并将它们添加到当前discovery列表

func (p *Impl) initDiscovery() []string {    p.discHelper = discovery.NewDiscoveryImpl()    p.discPersist = viper.GetBool("peer.discovery.persist")    if !p.discPersist {        peerLogger.Warning("Discovery list will not be persisted to disk")    }    // 加载一些预先保存的地址    addresses, err := p.LoadDiscoveryList()    if err != nil {        peerLogger.Errorf("%s", err)    }    for _, address := range addresses { // add them to the current discovery list        _ = p.discHelper.AddNode(address)    }    peerLogger.Debugf("Retrieved discovery list from disk: %v", addresses)    //解析配置文件, ENV标志, 等等.    rootNodes := strings.Split(viper.GetString("peer.discovery.rootnode"), ",")    if !(len(rootNodes) == 1 && strings.Compare(rootNodes[0], "") == 0) {        addresses = append(rootNodes, p.discHelper.GetAllNodes()...)    }    return addresses}

4.NewDiscoveryImpl是一个Discovery构造器的实现

func NewDiscoveryImpl() *DiscoveryImpl {    di := DiscoveryImpl{}    di.nodes = make(map[string]bool)    di.random = rand.New(rand.NewSource(time.Now().Unix()))    return &di}

4.1.相关结构体。

type DiscoveryImpl struct {    sync.RWMutex    nodes  map[string]bool    seq    []string    random *rand.Rand}

5.加载一些预先保存的地址,LoadDiscoveryList使一个peer从数据库中加载到discovery列表

func (p *Impl) LoadDiscoveryList() ([]string, error) {    var err error    packed, err := p.Load("discovery")    if err != nil {        err = fmt.Errorf("Unable to load discovery list from DB: %s", err)        peerLogger.Error(err)        return nil, err    }    addresses := &pb.PeersAddresses{}    err = proto.Unmarshal(packed, addresses)    if err != nil {        err = fmt.Errorf("Could not unmarshal discovery list message: %s", err)        peerLogger.Error(err)    }    return addresses.Addresses, err}

5.1.PeersAddresses定义

type PeersAddresses struct {    Addresses []string `protobuf:"bytes,1,rep,name=addresses" json:"addresses,omitempty"`}

5.2 Unmarshaler是表示可以解组本身的对象的接口。 该方法应该在解码开始之前复位接收机。 参数指向可能被覆盖的数据,因此实现不应该保留对缓冲区的引用。

type Unmarshaler interface {    Unmarshal([]byte) error}

6.Load 使一个peer能够读取与给定数据库键相对应的值

func (p *Impl) Load(key string) ([]byte, error) {    db := db.GetDBHandle()    return db.Get(db.PersistCF, []byte(key))}

7.AddNode添加地址到discovery列表

func (di *DiscoveryImpl) AddNode(address string) bool {    di.Lock()    defer di.Unlock()    if _, ok := di.nodes[address]; !ok {        di.seq = append(di.seq, address)        di.nodes[address] = true    }    return di.nodes[address]}

8.ValidatorEnabled返回peer.validator.enabled是否可用

func ValidatorEnabled() bool {    if !configurationCached {        cacheConfiguration()    }    return validatorEnabled}

9.SecurityEnabled 从配置中返回安全可用性能

func SecurityEnabled() bool {    if !configurationCached {        cacheConfiguration()    }    return securityEnabled}

10.Engine接口负责管理Peer通信(Handlers)和交易的进程, GetHandlerFactory 返回一个handler句柄来接收聊天流

type Engine interface {    TransactionProccesor    GetHandlerFactory() HandlerFactory}

11.chatWithSomePeers根据是不是验证节点的情况启动与1个或所有peer聊天的节点

func (p *Impl) chatWithSomePeers(addresses []string) {    p.reconnectOnce.Do(func() {        go p.ensureConnected()    })    if len(addresses) == 0 {        peerLogger.Debug("Starting up the first peer of a new network")        return // 没什么事做    }    for _, address := range addresses {        if pe, err := GetPeerEndpoint(); err == nil {            if address == pe.Address {                peerLogger.Debugf("Skipping own address: %v", address)                continue            }        } else {            peerLogger.Errorf("Failed to obtain peer endpoint, %v", err)            return        }        go p.chatWithPeer(address)    }}

12.确保节点之间是已经连接好的

func (p *Impl) ensureConnected() {    touchPeriod := viper.GetDuration("peer.discovery.touchPeriod")    touchMaxNodes := viper.GetInt("peer.discovery.touchMaxNodes")    tickChan := time.NewTicker(touchPeriod).C    peerLogger.Debugf("Starting Peer reconnect service (touch service), with period = %s", touchPeriod)    for {        // 简单循环和检查是否需要从新连接        <-tickChan        peersMsg, err := p.GetPeers()        if err != nil {            peerLogger.Errorf("Error in touch service: %s", err.Error())        }        allNodes := p.discHelper.GetAllNodes() // these will always be returned in random order        if len(peersMsg.Peers) < len(allNodes) {            peerLogger.Warning("Touch service indicates dropped connections, attempting to reconnect...")            delta := util.FindMissingElements(allNodes, getPeerAddresses(peersMsg))            if len(delta) > touchMaxNodes {                delta = delta[:touchMaxNodes]            }            p.chatWithSomePeers(delta)        } else {            peerLogger.Debug("Touch service indicates no dropped connections")        }        peerLogger.Debugf("Connected to: %v", getPeerAddresses(peersMsg))        peerLogger.Debugf("Discovery knows about: %v", allNodes)    }}

13.GetPeerEndpoint 从缓存配置中返回peerEndpoint

func GetPeerEndpoint() (*pb.PeerEndpoint, error) {    if !configurationCached {        cacheConfiguration()    }    return peerEndpoint, peerEndpointError}

14.Peer节点之间通信处理

func (p *Impl) chatWithPeer(address string) error {    peerLogger.Debugf("Initiating Chat with peer address: %s", address)    conn, err := NewPeerClientConnectionWithAddress(address)    if err != nil {        peerLogger.Errorf("Error creating connection to peer address %s: %s", address, err)        return err    }    serverClient := pb.NewPeerClient(conn)    ctx := context.Background()    stream, err := serverClient.Chat(ctx)    if err != nil {        peerLogger.Errorf("Error establishing chat with peer address %s: %s", address, err)        return err    }    peerLogger.Debugf("Established Chat with peer address: %s", address)    err = p.handleChat(ctx, stream, true)    stream.CloseSend()    if err != nil {        peerLogger.Errorf("Ending Chat with peer address %s due to error: %s", address, err)        return err    }    return nil}

15.返回目前已经注册的PeerEndpoints

func (p *Impl) GetPeers() (*pb.PeersMessage, error) {    p.handlerMap.RLock()    defer p.handlerMap.RUnlock()    peers := []*pb.PeerEndpoint{}    for _, msgHandler := range p.handlerMap.m {        peerEndpoint, err := msgHandler.To()        if err != nil {            return nil, fmt.Errorf("Error getting peers: %s", err)        }        peers = append(peers, &peerEndpoint)    }    peersMessage := &pb.PeersMessage{Peers: peers}    return peersMessage, nil}

15.1.处理开链消息的标准接口

type MessageHandler interface {    RemoteLedger    HandleMessage(msg *pb.Message) error    SendMessage(msg *pb.Message) error    To() (pb.PeerEndpoint, error)    Stop() error}

16.GetAllNodes返回存储在discovery列表中所有地址的一个数组

func (di *DiscoveryImpl) GetAllNodes() []string {    di.RLock()    defer di.RUnlock()    var addresses []string    for address, valid := range di.nodes {        if valid {            addresses = append(addresses, address) // TODO Expensive, don't quite like it        }    }    return addresses}

17.FindMissingElements标识第一个切片中不存在于第二个切片中的元素,第二切片预期为第一切片的子集

func FindMissingElements(all []string, some []string) (delta []string) {all:    for _, v1 := range all {        for _, v2 := range some {            if strings.Compare(v1, v2) == 0 {                continue all            }        }        delta = append(delta, v1)    }    return}
  • NewPeerClientConnectionWithAddress返回一个新的grpc.ClientConn来配置本地PEERfunc NewPeerClientConnectionWithAddress(peerAddress string) (*grpc.ClientConn, error) {if comm.TLSEnabled() {    return comm.NewClientConnectionWithAddress(peerAddress, true, true, comm.InitTLSForPeer())}return comm.NewClientConnectionWithAddress(peerAddress, true, false, nil)}

    19.在chat会话期间接受一个流消息,同时接收其他消息(例如来自其他的 peer)

    type PeerClient interface {// 在chat会话期间接受一个流消息,同时接收其他消息(例如来自其他的peer)Chat(ctx context.Context, opts ...grpc.CallOption) (Peer_ChatClient, error)// 处理来自远程源的事务ProcessTransaction(ctx context.Context, in *Transaction, opts ...grpc.CallOption) (*Response, error)}
  • handleChat执行过程中,建立消息循环,而这里的handler.HandleMessage。这个handler是Engine的消息响应句柄,该消息响应处理来自于Consensus模块func (p *Impl) handleChat(ctx context.Context, stream ChatStream, initiatedStream bool) error {deadline, ok := ctx.Deadline()peerLogger.Debugf("Current context deadline = %s, ok = %v", deadline, ok)handler, err := p.handlerFactory(p, stream, initiatedStream)if err != nil {    return fmt.Errorf("Error creating handler during handleChat initiation: %s", err)}defer handler.Stop()for {    in, err := stream.Recv()    if err == io.EOF {        peerLogger.Debug("Received EOF, ending Chat")        return nil    }    if err != nil {        e := fmt.Errorf("Error during Chat, stopping handler: %s", err)        peerLogger.Error(e.Error())        return e    }    err = handler.HandleMessage(in)    if err != nil {        peerLogger.Errorf("Error handling message: %s", err)        //return err    }}}

    21.Impl是peer服务的实现

    type Impl struct {handlerFactory HandlerFactoryhandlerMap     *handlerMapledgerWrapper  *ledgerWrappersecHelper      crypto.Peerengine         EngineisValidator    boolreconnectOnce  sync.OncediscHelper     discovery.DiscoverydiscPersist    bool}

    22.MessageHandler处理开链消息的标准接口

    type MessageHandler interface {RemoteLedgerHandleMessage(msg *pb.Message) errorSendMessage(msg *pb.Message) errorTo() (pb.PeerEndpoint, error)Stop() error}

23.ChatStream两个Peer之间的流支持接口

type ChatStream interface {
Send(pb.Message) error
Recv() (
pb.Message, error)
}


转自 巴比特论坛

地址 http://8btc.com/thread-262561-1-1.html

文章仅作为分享



0 个回复

您需要登录后才可以回帖 登录 | 加入黑马