GO-HPB源码解读--挖矿流程(二)

这章来看下HPB的挖矿流程。在节点启动的时候通过flag参数mine指定是否进行挖矿,当然也可以在节点启动后通过API来调用,启动流程代码调用顺序是go-hpb/cmd/ghpb/main.go-main->ghpb->startNode`。
在startNode方法最后部署代码可以看到,首先判断MiningEnabledFlag和RoleType两个参数后,设置交易的GasPrice后(系统默认为defaultGasPrice = 50 * config.Shannon),开启挖矿。

1
2
3
4
5
6
if ctx.GlobalBool(utils.MiningEnabledFlag.Name) && (conf.Network.RoleType == "") {
stack.TxPool().SetGasPrice(utils.GlobalBig(ctx, utils.GasPriceFlag.Name))
if err := stack.StartMining(true); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
}

在StartMining方法中

  1. 设置挖矿地址coinbase,这个地址是在HpbConfig初始化的时候设置的,取的是钱包中第一个帐户地址
  2. 获挖矿引擎Prometheus,并把coinbase和所在钱包的签名方法传递给引擎。这个操作主要是为了从wallet中取出coinbase的私钥,然后进行挖矿签名
  3. 开启接收交易的flag
  4. miner协程启动
    1
    2
    3
    4
    5
    if wallets := hpbnode.AccountManager().Wallets(); len(wallets) > 0 {
    if account := wallets[0].Accounts(); len(account) > 0 {
    hpbnode.hpberbase = account[0].Address
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (s *Node) StartMining(local bool) error {
//read coinbase from node
eb := s.hpberbase

if promeengine, ok := s.Hpbengine.(*prometheus.Prometheus); ok {
wallet, err := s.accman.Find(accounts.Account{Address: eb})
if wallet == nil || err != nil {
log.Error("Hpberbase account unavailable locally", "err", err)
return fmt.Errorf("signer missing: %v", err)
}
promeengine.Authorize(eb, wallet.SignHash)
} else {
log.Error("Cannot start mining without prometheus", "err", s.Hpbengine)
}
if local {
// If local (CPU) mining is started, we can disable the transaction rejection
// mechanism introduced to speed sync times. CPU mining on mainnet is ludicrous
// so noone will ever hit this path, whereas marking sync done on CPU mining
// will ensure that private networks work in single miner mode too.
atomic.StoreUint32(&s.Hpbsyncctr.AcceptTxs, 1)
}
go s.miner.Start(eb)
return nil
}

在Start方法中,

  1. 执行miner.update()方法
  2. 设置状态为shouldStart==1,表示不允许再次挖矿了,因为已经开始了。
  3. 设置挖矿地址coinbase
  4. 判断能否进行挖矿,如果不能则返回,不启动挖矿
  5. worker.start()开使用挖矿,worker.startNewMinerRound()尝试再次启动挖工worker
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    func (self *Miner) Start(coinbase common.Address) {
    //
    go self.update()

    atomic.StoreInt32(&self.shouldStart, 1)
    self.worker.setHpberbase(coinbase)
    self.coinbase = coinbase

    if atomic.LoadInt32(&self.canStart) == 0 {
    log.Info("Network syncing, will start miner afterwards")
    return
    }
    atomic.StoreInt32(&self.mining, 1)

    log.Info("Starting mining operation")
    self.worker.start()
    self.worker.startNewMinerRound()
    }

在miner.update()方法中,miner.mux订阅了三个事件,分别是下载开始事件,下载结束事件,下载失败事件,这个和以太坊的源码有些不同,以太坊是downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{},作用的是一样,在HPB源码中可以看下三个事件的发出都是和信息同步有关的,比如在synfast.go、synfull.go、synlight.go中的syncWithPeer方法中进行事件发布

  • 如果收到下载开始事件,则停止当前的挖矿工作
  • 如查收到下载完成或失败事件,则开启挖矿工作,同时取消事件订阅
    可以看到update中也有可能启动挖矿行为的,所以使用atomic来进行状态管理,实现线程安全。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// update keeps track of the synctrl events. Please be aware that this is a one shot type of update loop.
// It's entered once and as soon as `Done` or `Failed` has been broadcasted the events are unregistered and
// the loop is exited. This to prevent a major security vuln where external parties can DOS you with blocks
// and halt your mining operation for as long as the DOS continues.
func (self *Miner) update() {
events := self.mux.Subscribe(synctrl.StartEvent{}, synctrl.DoneEvent{}, synctrl.FailedEvent{})
out:
for ev := range events.Chan() {
switch ev.Data.(type) {
case synctrl.StartEvent:
atomic.StoreInt32(&self.canStart, 0)
if self.Mining() {
self.Stop()
atomic.StoreInt32(&self.shouldStart, 1)
log.Info("Mining aborted due to sync")
}
case synctrl.DoneEvent, synctrl.FailedEvent:
shouldStart := atomic.LoadInt32(&self.shouldStart) == 1

atomic.StoreInt32(&self.canStart, 1)
atomic.StoreInt32(&self.shouldStart, 0)
if shouldStart {
self.Start(self.coinbase)
}
// unsubscribe. we're only interested in this event once
events.Unsubscribe()
// stop immediately and ignore all further pending events
break out
}
}
}

在worker.start()方法中,首先设置了worker的状态,然后再启动producer.Start()。这里producer是接口类,其实现类只有一个,是CpuAgent。
producer绑定CpuAgent是通过这行代码进行注册的miner.Register(NewCpuAgent(bc.InstanceBlockChain(), engine))。代码调用顺序是
–>ghpb
–>startNode
–>utils.StartNode(stack)
–>stack.Start(stack.Hpbconfig)
–>hpbnode.WorkerInit(conf)
–>hpbnode.miner = worker.New(&conf.BlockChain, hpbnode.NewBlockMux(), hpbnode.Hpbengine, hpbnode.hpberbase)
–>miner.Register(NewCpuAgent(bc.InstanceBlockChain(), engine))

1
2
3
4
5
6
7
8
9
10
11
func (self *worker) start() {
self.mu.Lock()
defer self.mu.Unlock()

atomic.StoreInt32(&self.mining, 1)

// spin up agents
for producer := range self.producers {
producer.Start()
}
}

接下来直接看下CpuAgent.Start()方法

  1. 首先进行CAS状态判断
  2. 协程启动update方法,这个类似于miner的update方法
1
2
3
4
5
6
func (self *CpuAgent) Start() {
if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) {
return // producer already started
}
go self.update()
}

update方法中,在死循环中不断从chan中获取数据,如果数据类型是stop,则退出循环,如果是workCh则开始挖矿。stop事件是在miner.update方法中会传递。work的传递是在miner.Start()方法中startNewMinerRound()<-makeCurrent()传递进去的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (self *CpuAgent) update() {
out:
for {
select {
case work := <-self.workCh:
self.mu.Lock()
if self.quitCurrentOp != nil {
close(self.quitCurrentOp)
}
self.quitCurrentOp = make(chan struct{})
go self.mine(work, self.quitCurrentOp)
self.mu.Unlock()

case <-self.stop:
self.mu.Lock()
if self.quitCurrentOp != nil {
close(self.quitCurrentOp)
self.quitCurrentOp = nil
}
self.mu.Unlock()
break out
}
}
}

go self.mine(work, self.quitCurrentOp)才开始真正的挖矿计算,,下一章节再详细分析。