1.网络初始化的过程中执行以下内容,在创建节点Engine过程中该节点作为客户端的身份连接到其他Peer peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)2.返回一个使用提供的handler工厂函数,在新的聊天服务调用上创建新的handler句柄的Peer func NewPeerWithEngine(secHelperFunc func() crypto.Peer, engFactory EngineFactory) (peer *Impl, err error) { peer = new(Impl) peerNodes := peer.initDiscovery() peer.handlerMap = &handlerMap{m: make(map[pb.PeerID]MessageHandler)} peer.isValidator = ValidatorEnabled() peer.secHelper = secHelperFunc() // 为peer安装安全对象 if SecurityEnabled() { if peer.secHelper == nil { return nil, fmt.Errorf("Security helper not provided") } } // 在引擎之前初始化账本,因为共识可能希望立即开始询问账本 ledgerPtr, err := ledger.GetLedger() if err != nil { return nil, fmt.Errorf("Error constructing NewPeerWithHandler: %s", err) } peer.ledgerWrapper = &ledgerWrapper{ledger: ledgerPtr} peer.engine, err = engFactory(peer) if err != nil { return nil, err } peer.handlerFactory = peer.engine.GetHandlerFactory() if peer.handlerFactory == nil { return nil, errors.New("Cannot supply nil handler factory") } peer.chatWithSomePeers(peerNodes) return peer, nil}2.1.NewPeerWithHandler返回一个使用提供的handler工厂函数在新的聊天服务调用上创建新的handlers句柄的Peer, func NewPeerWithHandler(secHelperFunc func() crypto.Peer, handlerFact HandlerFactory) (*Impl, error) { peer := new(Impl) peerNodes := peer.initDiscovery() if handlerFact == nil { return nil, errors.New("Cannot supply nil handler factory") } peer.handlerFactory = handlerFact peer.handlerMap = &handlerMap{m: make(map[pb.PeerID]MessageHandler)} peer.secHelper = secHelperFunc() // 为peer安装安全对象 if SecurityEnabled() { if peer.secHelper == nil { return nil, fmt.Errorf("Security helper not provided") } } ledgerPtr, err := ledger.GetLedger() if err != nil { return nil, fmt.Errorf("Error constructing NewPeerWithHandler: %s", err) } peer.ledgerWrapper = &ledgerWrapper{ledger: ledgerPtr} peer.chatWithSomePeers(peerNodes) return peer, nil}2.2.Discovery是合并引导peer成员选择和对于NVP节点的VP节点选择接口 type Discovery interface { AddNode(string) bool // 添加一个地址到discovery列表 RemoveNode(string) bool // 从discovery列表中移出一个地址 GetAllNodes() []string // 返回这个节点维护的所有地址 GetRandomNodes(n int) []string // 返回这个peer连接的随机地址 FindNode(string) bool // 在discovery列表中找一个节点}// DiscoveryImpl是一个Discovery的实现type DiscoveryImpl struct { sync.RWMutex nodes map[string]bool seq []string random *rand.Rand}// NewDiscoveryImpl 是一个Discovery实现的构造器func NewDiscoveryImpl() *DiscoveryImpl { di := DiscoveryImpl{} di.nodes = make(map[string]bool) di.random = rand.New(rand.NewSource(time.Now().Unix())) return &di}// AddNode添加一个地址到discovery列表func (di *DiscoveryImpl) AddNode(address string) bool { di.Lock() defer di.Unlock() if _, ok := di.nodes[address]; !ok { di.seq = append(di.seq, address) di.nodes[address] = true } return di.nodes[address]}// RemoveNode从discovery列表中移出一个地址func (di *DiscoveryImpl) RemoveNode(address string) bool { di.Lock() defer di.Unlock() if _, ok := di.nodes[address]; ok { di.nodes[address] = false return true } return false}// GetAllNodes返回这个节点维护的所有地址func (di *DiscoveryImpl) GetAllNodes() []string { di.RLock() defer di.RUnlock() var addresses []string for address, valid := range di.nodes { if valid { addresses = append(addresses, address) // TODO Expensive, don't quite like it } } return addresses}// GetRandomNodes 返回n个随机节点func (di *DiscoveryImpl) GetRandomNodes(n int) []string { var pick string randomNodes := make([]string, n) di.RLock() defer di.RUnlock() for i := 0; i < n; i++ { for { pick = di.seq[di.random.Intn(len(di.nodes))] if di.nodes[pick] && !inArray(pick, randomNodes) { break } } randomNodes = pick } return randomNodes}// FindNode 如果地址存储在discovery列表中就返回truefunc (di *DiscoveryImpl) FindNode(address string) bool { di.RLock() defer di.RUnlock() _, ok := di.nodes[address] return ok}func inArray(element string, array []string) bool { for _, val := range array { if val == element { return true } } return false}2.3.Peer是一个能够验证transactions的实体 type Peer interface { Node // GetID返回一个peer的验证标识符 GetID() []byte // GetEnrollmentID返回这个peer的登记id GetEnrollmentID() string // Sign 如果没有错误出现的话用这个验证器签名密钥签名消息和到处签名 Sign(msg []byte) ([]byte, error) // Verify 检查签名是否是vkID的验证密钥下的消息的有效签名 // 如果检查成功,Verify返回空意味着没有错误出现 // 如果vkID是空,然后针对该验证者的验证密钥验证签名. Verify(vkID, signature, message []byte) error}3.从discovery列表中加载地址之前保存到磁盘并将它们添加到当前discovery列表 func (p *Impl) initDiscovery() []string { p.discHelper = discovery.NewDiscoveryImpl() p.discPersist = viper.GetBool("peer.discovery.persist") if !p.discPersist { peerLogger.Warning("Discovery list will not be persisted to disk") } // 加载一些预先保存的地址 addresses, err := p.LoadDiscoveryList() if err != nil { peerLogger.Errorf("%s", err) } for _, address := range addresses { // add them to the current discovery list _ = p.discHelper.AddNode(address) } peerLogger.Debugf("Retrieved discovery list from disk: %v", addresses) //解析配置文件, ENV标志, 等等. rootNodes := strings.Split(viper.GetString("peer.discovery.rootnode"), ",") if !(len(rootNodes) == 1 && strings.Compare(rootNodes[0], "") == 0) { addresses = append(rootNodes, p.discHelper.GetAllNodes()...) } return addresses}4.NewDiscoveryImpl是一个Discovery构造器的实现 func NewDiscoveryImpl() *DiscoveryImpl { di := DiscoveryImpl{} di.nodes = make(map[string]bool) di.random = rand.New(rand.NewSource(time.Now().Unix())) return &di}4.1.相关结构体。 type DiscoveryImpl struct { sync.RWMutex nodes map[string]bool seq []string random *rand.Rand}5.加载一些预先保存的地址,LoadDiscoveryList使一个peer从数据库中加载到discovery列表 func (p *Impl) LoadDiscoveryList() ([]string, error) { var err error packed, err := p.Load("discovery") if err != nil { err = fmt.Errorf("Unable to load discovery list from DB: %s", err) peerLogger.Error(err) return nil, err } addresses := &pb.PeersAddresses{} err = proto.Unmarshal(packed, addresses) if err != nil { err = fmt.Errorf("Could not unmarshal discovery list message: %s", err) peerLogger.Error(err) } return addresses.Addresses, err}5.1.PeersAddresses定义 type PeersAddresses struct { Addresses []string `protobuf:"bytes,1,rep,name=addresses" json:"addresses,omitempty"`}5.2 Unmarshaler是表示可以解组本身的对象的接口。 该方法应该在解码开始之前复位接收机。 参数指向可能被覆盖的数据,因此实现不应该保留对缓冲区的引用。 type Unmarshaler interface { Unmarshal([]byte) error}6.Load 使一个peer能够读取与给定数据库键相对应的值 func (p *Impl) Load(key string) ([]byte, error) { db := db.GetDBHandle() return db.Get(db.PersistCF, []byte(key))}7.AddNode添加地址到discovery列表 func (di *DiscoveryImpl) AddNode(address string) bool { di.Lock() defer di.Unlock() if _, ok := di.nodes[address]; !ok { di.seq = append(di.seq, address) di.nodes[address] = true } return di.nodes[address]}8.ValidatorEnabled返回peer.validator.enabled是否可用 func ValidatorEnabled() bool { if !configurationCached { cacheConfiguration() } return validatorEnabled}9.SecurityEnabled 从配置中返回安全可用性能 func SecurityEnabled() bool { if !configurationCached { cacheConfiguration() } return securityEnabled}10.Engine接口负责管理Peer通信(Handlers)和交易的进程, GetHandlerFactory 返回一个handler句柄来接收聊天流 type Engine interface { TransactionProccesor GetHandlerFactory() HandlerFactory}11.chatWithSomePeers根据是不是验证节点的情况启动与1个或所有peer聊天的节点 func (p *Impl) chatWithSomePeers(addresses []string) { p.reconnectOnce.Do(func() { go p.ensureConnected() }) if len(addresses) == 0 { peerLogger.Debug("Starting up the first peer of a new network") return // 没什么事做 } for _, address := range addresses { if pe, err := GetPeerEndpoint(); err == nil { if address == pe.Address { peerLogger.Debugf("Skipping own address: %v", address) continue } } else { peerLogger.Errorf("Failed to obtain peer endpoint, %v", err) return } go p.chatWithPeer(address) }}12.确保节点之间是已经连接好的 func (p *Impl) ensureConnected() { touchPeriod := viper.GetDuration("peer.discovery.touchPeriod") touchMaxNodes := viper.GetInt("peer.discovery.touchMaxNodes") tickChan := time.NewTicker(touchPeriod).C peerLogger.Debugf("Starting Peer reconnect service (touch service), with period = %s", touchPeriod) for { // 简单循环和检查是否需要从新连接 <-tickChan peersMsg, err := p.GetPeers() if err != nil { peerLogger.Errorf("Error in touch service: %s", err.Error()) } allNodes := p.discHelper.GetAllNodes() // these will always be returned in random order if len(peersMsg.Peers) < len(allNodes) { peerLogger.Warning("Touch service indicates dropped connections, attempting to reconnect...") delta := util.FindMissingElements(allNodes, getPeerAddresses(peersMsg)) if len(delta) > touchMaxNodes { delta = delta[:touchMaxNodes] } p.chatWithSomePeers(delta) } else { peerLogger.Debug("Touch service indicates no dropped connections") } peerLogger.Debugf("Connected to: %v", getPeerAddresses(peersMsg)) peerLogger.Debugf("Discovery knows about: %v", allNodes) }}13.GetPeerEndpoint 从缓存配置中返回peerEndpoint func GetPeerEndpoint() (*pb.PeerEndpoint, error) { if !configurationCached { cacheConfiguration() } return peerEndpoint, peerEndpointError}14.Peer节点之间通信处理 func (p *Impl) chatWithPeer(address string) error { peerLogger.Debugf("Initiating Chat with peer address: %s", address) conn, err := NewPeerClientConnectionWithAddress(address) if err != nil { peerLogger.Errorf("Error creating connection to peer address %s: %s", address, err) return err } serverClient := pb.NewPeerClient(conn) ctx := context.Background() stream, err := serverClient.Chat(ctx) if err != nil { peerLogger.Errorf("Error establishing chat with peer address %s: %s", address, err) return err } peerLogger.Debugf("Established Chat with peer address: %s", address) err = p.handleChat(ctx, stream, true) stream.CloseSend() if err != nil { peerLogger.Errorf("Ending Chat with peer address %s due to error: %s", address, err) return err } return nil}15.返回目前已经注册的PeerEndpoints func (p *Impl) GetPeers() (*pb.PeersMessage, error) { p.handlerMap.RLock() defer p.handlerMap.RUnlock() peers := []*pb.PeerEndpoint{} for _, msgHandler := range p.handlerMap.m { peerEndpoint, err := msgHandler.To() if err != nil { return nil, fmt.Errorf("Error getting peers: %s", err) } peers = append(peers, &peerEndpoint) } peersMessage := &pb.PeersMessage{Peers: peers} return peersMessage, nil}15.1.处理开链消息的标准接口 type MessageHandler interface { RemoteLedger HandleMessage(msg *pb.Message) error SendMessage(msg *pb.Message) error To() (pb.PeerEndpoint, error) Stop() error}16.GetAllNodes返回存储在discovery列表中所有地址的一个数组 func (di *DiscoveryImpl) GetAllNodes() []string { di.RLock() defer di.RUnlock() var addresses []string for address, valid := range di.nodes { if valid { addresses = append(addresses, address) // TODO Expensive, don't quite like it } } return addresses}17.FindMissingElements标识第一个切片中不存在于第二个切片中的元素,第二切片预期为第一切片的子集 func FindMissingElements(all []string, some []string) (delta []string) {all: for _, v1 := range all { for _, v2 := range some { if strings.Compare(v1, v2) == 0 { continue all } } delta = append(delta, v1) } return}23.ChatStream两个Peer之间的流支持接口 type ChatStream interface {
Send(pb.Message) error
Recv() (pb.Message, error)
}
转自 巴比特论坛 地址 http://8btc.com/thread-262561-1-1.html 文章仅作为分享
|