P2P启动是在节点启动的时候进行的,启动代码是hpbnode.Hpbpeermanager.Start(hpbnode.hpberbase)
1 | func (hpbnode *Node) Start(conf *config.HpbConfig) error { |
在Start方法中
- 配置网络启动参数信息
- 注册NodeMsg消息处理函数
- 设置本地节点的类型分别是候选节点、引导节点、同步节点
- 启动服务(prm.server.Start())
- 检查本地节点是不是在BootstrapNodes列表中,也就是程序首次安装的时候连接的节点。
- 启动iperf带宽测试服务,该端口是peer的端口加100。iperf的启动是通过调用shell命令来启来的,iperf应用包就在ghpb-bin安装目录下。命令是:bin/bash iperf3 -s -p port
- 如果是挖矿节点的话,异步启动iperf带宽测试客户端
- 如果boot节点的话,需要解析一下binding.json文件,这个用来指定启动时可以连接的peer地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77func (prm *PeerManager) Start(coinbase common.Address) error {
config := config.GetHpbConfigInstance()
prm.server.Config = Config{
NAT: config.Network.NAT,
Name: config.Network.Name,
TestMode: config.Node.TestMode == 1,
PrivateKey: config.Node.PrivateKey,
NetworkId: config.Node.NetworkId,
ListenAddr: config.Network.ListenAddr,
NetRestrict: config.Network.NetRestrict,
NodeDatabase: config.Network.NodeDatabase,
BootstrapNodes: config.Network.BootstrapNodes,
EnableMsgEvents: config.Network.EnableMsgEvents,
Protocols: prm.hpbpro.Protocols(),
}
prm.server.Config.CoinBase = coinbase
log.Info("Set coinbase address by start", "address", coinbase.String())
if coinbase.String() == "0x0000000000000000000000000000000000000000" {
panic("coinbase address is nil.")
}
prm.hpbpro.networkId = prm.server.NetworkId
prm.hpbpro.regMsgProcess(ReqNodesMsg, HandleReqNodesMsg)
prm.hpbpro.regMsgProcess(ResNodesMsg, HandleResNodesMsg)
prm.hpbpro.regMsgProcess(ReqBWTestMsg, prm.HandleReqBWTestMsg)
prm.hpbpro.regMsgProcess(ResBWTestMsg, prm.HandleResBWTestMsg)
copy(prm.server.Protocols, prm.hpbpro.Protocols())
localType := discover.PreNode
if config.Network.RoleType == "bootnode" {
localType = discover.BootNode
} else if config.Network.RoleType == "synnode" {
localType = discover.SynNode
}
prm.SetLocalType(localType)
log.Info("Set Init Local Type by p2p", "type", localType.ToString())
if err := prm.server.Start(); err != nil {
log.Error("Hpb protocol", "error", err)
return err
}
////////////////////////////////////////////////////////////////////////////////////////
//for bootnode check
self := prm.server.Self()
for _, n := range config.Network.BootstrapNodes {
if self.ID == n.ID && prm.server.localType != discover.BootNode {
panic("Need BOOTNODE flag.")
}
}
/////////////////////////////////////////////////////////////////////////////////////////
add, _ := net.ResolveUDPAddr("udp", prm.server.ListenAddr)
prm.iport = add.Port + 100
log.Debug("Iperf server start", "port", prm.iport)
prm.startServerBW(strconv.Itoa(prm.iport))
if prm.server.localType != discover.BootNode && prm.server.localType != discover.SynNode {
go prm.startClientBW()
}
/////////////////////////////////////////////////////////////////////////////////////////
//for bing info
if prm.server.localType == discover.BootNode {
filename := filepath.Join(config.Node.DataDir, bindInfoFileName)
log.Debug("bootnode load bindings", "filename", filename)
prm.parseBindInfo(filename)
}
return nil
}
prm.server.Start()方法是用来启动P2P服务的。
- 判断服务是否启动
- 创建RLPX,实现网络数据传输加密,这里有个翻译文档可以详细了解下
- 初始化一些Server的属性
- 启动UDP服务,主要是用来发现节点的
- 设置boot节点地址,添加到table中
- 指定自己的握手协议
- srv.startListening()启动TCP服务
- 创建newDialState对象,并异步运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72// Start starts running the server.
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {
srv.lock.Lock()
defer srv.lock.Unlock()
if srv.running {
return errors.New("server already running")
}
srv.running = true
log.Info("Starting P2P networking")
rand.Seed(time.Now().Unix())
// static fields
if srv.PrivateKey == nil {
return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
srv.peerEvent = event.NewEvent()
srv.delHist = new(dialHistory)
srv.dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
// node table
ntab, ourend, err := discover.ListenUDP(srv.PrivateKey, srv.localType, srv.ListenAddr, srv.NAT, srv.NodeDatabase, srv.NetRestrict)
if err != nil {
return err
}
if err := ntab.SetFallbackNodes(srv.BootstrapNodes); err != nil {
return err
}
srv.ntab = ntab
// handshake
srv.ourHandshake = &protoHandshake{Version: MsgVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey), End:ourend}
for _, p := range srv.Protocols {
srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
}
srv.ourHandshake.CoinBase = srv.CoinBase
if srv.ListenAddr == "" {
log.Error("P2P server start, listen address is nil")
}
if err := srv.startListening(); err != nil {
return err
}
//////////////////////////////////////////////////////////////////////////////////////////////
if srv.TestMode {
srv.parseSynnode()
}
//////////////////////////////////////////////////////////////////////////////////////////////
log.Info("Server start with type.","NodeType",srv.localType.ToString())
dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, srv.NetRestrict)
srv.loopWG.Add(1)
go srv.run(dialer)
srv.running = true
return nil
}
在startListening方法中,首先开始监听端口,然后进行接收连接处理,主要看下srv.listenLoop()方法,另外还进行了NAT转换。
- maxAcceptConns=100参数表示的是最大可握手连接数,并不是实际连接数,比如节点可以同时和200个peer建立连接,但在同一时间点,只能和100peer进行握手。所以看到代码创建了100个slot“信号”,接着for不断的取slot“信号”,相当于消耗了一个,但是在创建完连接之的又创建了一个slot“信号”
- for循环中主要是获取了连接,然后判断一下是不是受限地址(黑名单),如果不是就异步进行SetupConn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80func (srv *Server) startListening() error {
// Launch the TCP listener.
listener, err := net.Listen("tcp", srv.ListenAddr)
if err != nil {
return err
}
laddr := listener.Addr().(*net.TCPAddr)
srv.ListenAddr = laddr.String()
srv.listener = listener
srv.loopWG.Add(1)
go srv.listenLoop()
// Map the TCP listening port if NAT is configured.
if !laddr.IP.IsLoopback() && srv.NAT != nil {
srv.loopWG.Add(1)
go func() {
nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "hpb p2p")
srv.loopWG.Done()
}()
}
return nil
}
// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
//log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener))
// This channel acts as a semaphore limiting
// active inbound connections that are lingering pre-handshake.
// If all slots are taken, no further connections are accepted.
tokens := maxAcceptConns
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
for {
// Wait for a handshake slot before accepting.
<-slots
var (
fd net.Conn
err error
)
for {
fd, err = srv.listener.Accept()
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
log.Debug("Read error", "err", err)
return
}
break
}
// Reject connections that do not match NetRestrict.
if srv.NetRestrict != nil {
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
fd.Close()
slots <- struct{}{}
continue
}
}
fd = newMeteredConn(fd, true)
log.Info("Accepted connection", "addr", fd.RemoteAddr())
// Spawn the handler. It will give the slot back when the connection
// has been established.
go func() {
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
}()
}
}
在SetupConn中
- 判断服务是不是启动
- 这行代码中
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}可以看到srv.newTransport这个参数的传入,其实就是上边的RLPX。 - c.doEncHandshake进行ENC握手,具体实现都是RLPX做的
- c.doProtoHandshake进行协议握手,具体实现都是RLPX做的
- 之的的有关BOE的逻辑没太看明白
- 最后srv.checkpoint是将连接对象conn发送给通道stage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
if !running {
c.close(errServerStopped)
return
}
srv.setupLock.Lock()
defer srv.setupLock.Unlock()
// Run the encryption handshake.
var err error
var ourRand, theirRand []byte
if c.id, ourRand, theirRand, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
log.Debug("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "reason", err)
c.close(err)
return
}
clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
// For dialed connections, check that the remote public key matches.
if dialDest != nil && c.id != dialDest.ID {
c.close(DiscUnexpectedIdentity)
clog.Error("Dialed identity mismatch", "want", c, dialDest.ID)
return
}
if err := srv.checkpoint(c, srv.posthandshake); err != nil {
clog.Trace("Rejected peer before protocol handshake", "err", err)
c.close(err)
return
}
log.Debug("Do enc handshake OK.","id",c.id)
/////////////////////////////////////////////////////////////////////////////////
// Run the protocol handshake
c.our = *srv.ourHandshake
c.our.RandNonce = ourRand
if c.our.Sign, err = boe.BoeGetInstance().HW_Auth_Sign(theirRand); err!=nil{
clog.Debug("Do hardware sign error.","err",err)
//todo close and return
}
clog.Debug("Hardware has signed remote rand.","rand",theirRand,"sign",c.our.Sign)
their, err := c.doProtoHandshake(&c.our)
if err != nil {
clog.Debug("Failed proto handshake", "err", err)
c.close(err)
return
}
if their.ID != c.id {
clog.Error("Wrong devp2p handshake identity", "err", their.ID)
c.close(DiscUnexpectedIdentity)
return
}
c.their = *their
clog.Debug("Do protocol handshake OK.","id",c.id)
clog.Trace("Do protocol handshake.","our",c.our,"their",c.their)
/////////////////////////////////////////////////////////////////////////////////
isRemoteBoot := false
hdtab := srv.getHdtab()
for _, n := range srv.BootstrapNodes {
if c.id == n.ID {
clog.Info("Remote node is boot.","id",c.id)
c.isboe = true
isRemoteBoot = true
}
}
if !c.isboe {
remoteCoinbase := strings.ToLower(c.their.CoinBase.String())
clog.Trace("Remote coinbase","address",remoteCoinbase)
if len(hdtab) == 0 {
clog.Debug("Do not ready for connected.","id",c.id.TerminalString())
c.close(DiscHwSignError)
return
}
for _,hw := range hdtab {
if hw.Adr == remoteCoinbase {
clog.Debug("Input to boe paras","rand",c.our.RandNonce,"hid",hw.Hid,"cid",hw.Cid,"sign",c.their.Sign)
c.isboe = boe.BoeGetInstance().HW_Auth_Verify(c.our.RandNonce,hw.Hid,hw.Cid,c.their.Sign)
clog.Info("Boe verify the remote.","id",c.id.TerminalString(),"result",c.isboe)
}
}
}
clog.Info("Verify the remote hardware.","id",c.id.TerminalString(),"result",c.isboe)
if !srv.TestMode && srv.localType == discover.SynNode && c.isboe == false {
clog.Debug("SynNode peer SynNode, dorp peer.")
c.close(DiscHwSignError)
return
}
if isRemoteBoot || srv.localType == discover.BootNode {
ourHdtable := &hardwareTable{Version:0x00,Hdtab:hdtab}
theirHdtable, err := c.doHardwareTable(ourHdtable)
if err != nil {
clog.Debug("Failed hardware table handshake", "reason", err)
c.close(err)
return
}
clog.Trace("Exchange hardware table.","our",ourHdtable, "their",theirHdtable)
if isRemoteBoot{
srv.updateHdtab(theirHdtable.Hdtab,true)
clog.Trace("Update hardware table from boot.","srv hdtab", srv.getHdtab() )
}
}
/////////////////////////////////////////////////////////////////////////////////
if err := srv.checkpoint(c, srv.addpeer); err != nil {
clog.Warn("Rejected peer", "err", err, "dialDest",dialDest)
c.close(err)
return
}
}
当接收到的连接发送到stage通道后,那么通道的处理是在srv.run进行处理的,也就是prm.server.Start()最后一步代码srv.run。其中分支case c := <-srv.addpeer:中的go srv.runPeer(p)方法开启了对连接的心跳检测,同时进行了节点广播。
上边分析的都是本节点作为服务监听者被动接收建立的连接,而节点主动连接其他节点的动作是在也是在srv.run(dialer)中进行的,其参数dialer包含有对方节点连接信息,包括boot节点地址。
- 首先创建了三个任务方法delTask、startTasks和scheduleTasks,同时创建了两个数组,一个是保存正在运行的task,一个缓存将被运行的task。删除的时候只能从正在运行的数组中删除,运行的时候最大运行个数不能超过maxActiveDialTasks=16个,其中scheduleTasks用来创建待运行的task,主动连接的节点分boot节点、静态节点和动态节点,其中静态节点就是手动配置的,动态节点是程序KAD算法计算得到的。
- 在startTasks方法中通过t.Do(srv)进行节点的连接操作,最终还是会执行srv.SetupConn方法,也就是上边分析的建立连接的过程。
1 | func (srv *Server) run(dialstate dialer) { |
这块代码挺复杂的,各种go routine和chan的使用。😂😂😂😂😂😂😂😂