GO-HPB源码解读--交易入池(二)

上一节看到交易通过pool.promoteTx(addr, hash, tx)方法进行了传递,在该方法的最后一步通过pool.txFeed.Send(bc.TxPreEvent{tx}),将交易封装成bc.TxPreEvent发送了出去。而这个Feed实现了订阅发布模式,Send方法是发布消息,SubscribeTxPreEvent是订阅消息,订单的时候只需要把订单者的chan发进来就可以了。 (Feed通过反射包reflect中的selectcase、trySend和tryReceive方法进行的,操作包装在Feed文件中,golang的源码包中有all_test.go测试文件可以详细看下)
订阅这个消费的一共有三个地方,分中在api_backend.go、synctrl.go、worker.go中。这里主要看一下广播和evn执行。

  • api_backend.go中只是订单并没处理
  • synctrl.go中进行消费的广播
  • worker.go中进行了evn执行
1
2
3
func (pool *TxPool) SubscribeTxPreEvent(ch chan<-bc.TxPreEvent) sub.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

在synctrl.go的调用顺序是
–>main–>ghpb–>startNode–>
–>utils.StartNode(stack)–>hpbnode.Start()–>
–>hpbnode.Hpbsyncctr.Start()

  1. 在Start方法中初始化this.txCh,并进行消息订单
  2. 消息的消费是在txRoutingLoop协程中
  3. 在routTx方法中,找出不知道该交易的peer,然后逐个发出去了
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    func (this *SynCtrl) Start() {
    // broadcast transactions
    this.txCh = make(chan bc.TxPreEvent, txChanSize)
    this.txSub = this.txpool.SubscribeTxPreEvent(this.txCh)

    go this.txRoutingLoop()

    // broadcast mined blocks
    this.minedBlockSub = this.newBlockMux.Subscribe(bc.NewMinedBlockEvent{})
    go this.minedRoutingLoop()

    // start sync handlers
    go this.sync()
    go this.txsyncLoop()
    }

    func (this *SynCtrl) txRoutingLoop() {
    for {
    select {
    case event := <-this.txCh:
    routTx(event.Tx.Hash(), event.Tx)
    }
    }
    }

在worker.go的调用顺序是
–>main–>ghpb–>startNode–>
–>hpbnode.WorkerInit–>worker.New–>newWorker
在newWorker中初始化worker.txCh,并订阅消息,消息的处理是在协程worker.eventListener()中进行的。在系统没有挖矿的状态下,把消息发送到commitTransactions进行处理。commitTransactions会把交易提交到evm进行执行,这个单独解析下吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func newWorker(config *config.ChainConfig, engine consensus.Engine, coinbase common.Address, /*eth Backend,*/ mux *sub.TypeMux) *worker {
worker := &worker{
config: config,
engine: engine,
mux: mux,
/*txCh: make(chan bc.TxPreEvent, txChanSize),*/
chainHeadCh: make(chan bc.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan bc.ChainSideEvent, chainSideChanSize),
chainDb: nil,//hpbdb.ChainDbInstance(),
recv: make(chan *Result, resultQueueSize),
chain: bc.InstanceBlockChain(),
proc: bc.InstanceBlockChain().Validator(),
possibleUncles: make(map[common.Hash]*types.Block),
coinbase: coinbase,
producers: make(map[Producer]struct{}),
unconfirmed: newUnconfirmedBlocks(bc.InstanceBlockChain(), miningLogAtDepth),
}

worker.pool = txpool.GetTxPool()
worker.txCh = make(chan bc.TxPreEvent, txChanSize)
worker.txSub = worker.pool.SubscribeTxPreEvent(worker.txCh)
worker.chainHeadSub = bc.InstanceBlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = bc.InstanceBlockChain().SubscribeChainSideEvent(worker.chainSideCh)
//对以上事件的监听
go worker.eventListener()
go worker.handlerSelfMinedBlock()

return worker
}

func (self *worker) eventListener() {

defer self.txSub.Unsubscribe()
defer self.chainHeadSub.Unsubscribe()
defer self.chainSideSub.Unsubscribe()

for {
// A real event arrived, process interesting content
select {
// Handle ChainHeadEvent
case <-self.chainHeadCh:
self.startNewMinerRound()

// Handle ChainSideEvent
case ev := <-self.chainSideCh:
self.uncleMu.Lock()
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
// Handle TxPreEvent
case ev := <-self.txCh:
// Apply transaction to the pending state if we're not mining
if atomic.LoadInt32(&self.mining) == 0 && self.current != nil {
self.currentMu.Lock()
acc, _ := types.Sender(self.current.signer, ev.Tx)
txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)

self.current.commitTransactions(self.mux, txset, self.coinbase)
self.currentMu.Unlock()
}

case <-self.chainHeadSub.Err():
return
case <-self.chainSideSub.Err():
return
}
}
}