上一节看到交易通过pool.promoteTx(addr, hash, tx)方法进行了传递,在该方法的最后一步通过pool.txFeed.Send(bc.TxPreEvent{tx}),将交易封装成bc.TxPreEvent发送了出去。而这个Feed实现了订阅发布模式,Send方法是发布消息,SubscribeTxPreEvent是订阅消息,订单的时候只需要把订单者的chan发进来就可以了。 (Feed通过反射包reflect中的selectcase、trySend和tryReceive方法进行的,操作包装在Feed文件中,golang的源码包中有all_test.go测试文件可以详细看下)
订阅这个消费的一共有三个地方,分中在api_backend.go、synctrl.go、worker.go中。这里主要看一下广播和evn执行。
- api_backend.go中只是订单并没处理
- synctrl.go中进行消费的广播
- worker.go中进行了evn执行
1 | func (pool *TxPool) SubscribeTxPreEvent(ch chan<-bc.TxPreEvent) sub.Subscription { |
在synctrl.go的调用顺序是
–>main–>ghpb–>startNode–>
–>utils.StartNode(stack)–>hpbnode.Start()–>
–>hpbnode.Hpbsyncctr.Start()
- 在Start方法中初始化this.txCh,并进行消息订单
- 消息的消费是在txRoutingLoop协程中
- 在routTx方法中,找出不知道该交易的peer,然后逐个发出去了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24func (this *SynCtrl) Start() {
// broadcast transactions
this.txCh = make(chan bc.TxPreEvent, txChanSize)
this.txSub = this.txpool.SubscribeTxPreEvent(this.txCh)
go this.txRoutingLoop()
// broadcast mined blocks
this.minedBlockSub = this.newBlockMux.Subscribe(bc.NewMinedBlockEvent{})
go this.minedRoutingLoop()
// start sync handlers
go this.sync()
go this.txsyncLoop()
}
func (this *SynCtrl) txRoutingLoop() {
for {
select {
case event := <-this.txCh:
routTx(event.Tx.Hash(), event.Tx)
}
}
}
在worker.go的调用顺序是
–>main–>ghpb–>startNode–>
–>hpbnode.WorkerInit–>worker.New–>newWorker
在newWorker中初始化worker.txCh,并订阅消息,消息的处理是在协程worker.eventListener()中进行的。在系统没有挖矿的状态下,把消息发送到commitTransactions进行处理。commitTransactions会把交易提交到evm进行执行,这个单独解析下吧。
1 | func newWorker(config *config.ChainConfig, engine consensus.Engine, coinbase common.Address, /*eth Backend,*/ mux *sub.TypeMux) *worker { |