GO-HPB源码解读--交易入池(一)

在本地启动节点后,通过控制台输入命令hpb.sendTransaction({from:"db3cf5c98af974d3cccdc8662f1ab3c27c6e47ee",to:"742c8a59152a397ad09f7fb280c4fc378598a4ba",value:web3.toWei(10000,"hpb")})来完成转账交易。交易内容最初始会进入到SendTransaction。

  1. 打印了一下传进来的参数args
    {From:[219 60 245 201 138 249 116 211 204 205 200 102 47 26 179 194 124 110 71 238] To:[116 44 138 89 21 42 57 122 208 159 127 178 128 196 252 55 133 152 164 186] Gas: GasPrice: Value:0x21e19e0c9bab2400000 Data:0x Nonce:}
  2. 首先检查一下from地址是不是存在钱包中,如果不存在就返回错误
  3. 锁定交易的nonce,因为nonce是唯一的,防止交易的重播攻击
  4. 对交易需要的其他参数进行设置,Gas默认值为90000,GasPrice和Nonce经由计算获得,这个后续再分析
  5. 创建一个交易对象,将以上参数进行赋值
  6. 使用from的帐户钱包对交易进行签名,来证明这签交易确实是from发出的。
    签名过程:
    (1). 首先使用from的私钥对nonce、price、gaslimit、recipient、amount、payload进行secp256k1签名
    (2). 然后签名结果分解成R、S、V,并赋给交易对象,注意这个时候交易对象中并没有from地址
  7. 最后提交交易
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// SendTransaction creates a transaction for the given argument, sign it and submit it to the
// transaction pool.
func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) {

// Look up the wallet containing the requested signer
account := accounts.Account{Address: args.From}

wallet, err := s.b.AccountManager().Find(account)
if err != nil {
return common.Hash{}, err
}

if args.Nonce == nil {
// Hold the addresse's mutex around signing to prevent concurrent assignment of
// the same nonce to multiple accounts.
s.nonceLock.LockAddr(args.From)
defer s.nonceLock.UnlockAddr(args.From)
}

// Set some sanity defaults and terminate on failure
if err := args.setDefaults(ctx, s.b); err != nil {
return common.Hash{}, err
}
// Assemble the transaction and sign with the wallet
tx := args.toTransaction()

var chainID *big.Int
if config := s.b.ChainConfig(); true {
chainID = config.ChainId
}
signed, err := wallet.SignTx(account, tx, chainID)
if err != nil {
return common.Hash{}, err
}
return submitTransaction(ctx, s.b, signed)
}

submitTransaction方法把交易提交到交易池txpool

  1. 首先计算交易的hash,这个交易在输入交易命令后显示的那个hash值
  2. 判断交易池中是否包含该交易,包含的话就直接回退失败
  3. 验证交易
  4. 锁定交易,也就是把交易保存到交易池的过程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // AddTx attempts to queue a transactions if valid.
    func (pool *TxPool) AddTx(tx *types.Transaction) error {
    pool.mu.Lock()
    defer pool.mu.Unlock()

    hash := tx.Hash()
    if pool.all[hash] != nil {
    log.Trace("Discarding already known transaction", "hash", hash)
    return fmt.Errorf("known transaction: %x", hash)
    }
    // If the transaction fails basic validation, discard it
    if err := pool.validateTx(tx); err != nil {
    log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
    return err
    }

    return pool.addTxLocked(tx)
    }

validateTx对交易进行验证

  1. 交易的大小是否超过32k,其中的tx.Size主要计算的是交易的data部分的数据大小
  2. 交易金额不能小于0
  3. 当前交易的gasLimit是否大于当前区块的最大gas值,每个区块块都有一个总的gas限制,也就是一个区块中的所有交易的gas总和不能超过区块的gas限制
  4. 验证签名是否有效,并还原出地址from(这块没细看,之后再看下)
  5. 交易给出的gas价格必须大于txpool的最低价格,要不然挖工收益会少
  6. 当前交易的nonce必须要大于上一次的交易nonce。帐户信息直接保存在stateDB中,可以从中获取
  7. 交易花费gasprice*gasLimit必须大于帐户余额
  8. 如果交易是创建合约或者调用合约,需要计算一下需要多少gas,如果交易支付的最大gas不够多也是拒绝的(这块没细看,之后再看下)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    // validateTx checks whether a transaction is valid according to the consensus
    // rules and adheres to some heuristic limits of the local node (price and size).
    func (pool *TxPool) validateTx(tx *types.Transaction) error {
    // Heuristic limit, reject transactions over 32KB to prevent DOS attacks
    if tx.Size() > maxTransactionSize {
    log.Trace("ErrOversizedData maxTransactionSize", "ErrOversizedData",ErrOversizedData)
    return ErrOversizedData
    }
    // Transactions can't be negative. This may never happen using RLP decoded
    // transactions but may occur if you create a transaction using the RPC.
    if tx.Value().Sign() < 0 {
    log.Trace("ErrNegativeValue", "ErrNegativeValue",ErrNegativeValue)
    return ErrNegativeValue
    }
    // Ensure the transaction doesn't exceed the current block limit gas.
    if pool.currentMaxGas.Cmp(tx.Gas()) < 0 {
    log.Trace("ErrGasLimit", "ErrGasLimit",ErrGasLimit)
    return ErrGasLimit
    }
    // Call BOE recover sender.
    from, err := types.Sender(pool.signer, tx)
    if err != nil {
    log.Trace("ErrInvalidSender", "ErrInvalidSender",ErrInvalidSender)
    return ErrInvalidSender
    }
    // Check gasPrice.
    if pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
    log.Trace("ErrUnderpriced", "ErrUnderpriced",ErrUnderpriced)
    return ErrUnderpriced
    }
    // Ensure the transaction adheres to nonce ordering
    if pool.currentState.GetNonce(from) > tx.Nonce() {
    log.Trace("ErrNonceTooLow", "tx.Nonce()",tx.Nonce())
    return ErrNonceTooLow
    }
    // Transactor should have enough funds to cover the costs
    // cost == V + GP * GL
    if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
    log.Trace("ErrInsufficientFunds", "ErrInsufficientFunds",ErrInsufficientFunds)
    return ErrInsufficientFunds
    }
    intrGas := types.IntrinsicGas(tx.Data(), tx.To() == nil)
    if tx.Gas().Cmp(intrGas) < 0 {
    log.Trace("ErrIntrinsicGas", "ErrIntrinsicGas",ErrIntrinsicGas)
    return ErrIntrinsicGas
    }
    return nil
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// addTx enqueues a single transaction into the pool if it is valid.
func (pool *TxPool) addTxLocked(tx *types.Transaction) error {
// Try to inject the transaction and update any state
replace, err := pool.add(tx)
if err != nil {
return err
}
// If we added a new transaction, run promotion checks and return
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
pool.promoteExecutables([]common.Address{from})
}
return nil
}

pool.add方法中

  1. 首先计算交易的hash,并获取from地址
  2. 判断交易池是否满了,如果满了就返回失败。这里需要注意一下交易池维护着两个集合一个是pending,一个是queue,pending存放的是可以处理的交易,queue存放的是不可以处理的交易。
  3. 把交易池中pending所有from地址的交易都取出来赋到list,看看是不是包含相同的nonce的交易。(上边在创建交易的时候判断了nonce大于当前的,什么情况下会重复呢?)如果包含的话就往list里插入,但是插入的时候要判断一下这笔交易值不值替换上一笔相同nonce的交易。
    交易池有个参数priceBump,表示上浮价格,比如是20,当旧交易的old_gasprice*(100+20)/100=1.2倍的old_gaspreice,如果当前交易的价格大于1.2倍的old_gaspreice,则进行替换。同时list也会更新一下最大交易额和最大gas
    这时txpool才会把当前交易缓存起来
  4. 如果list也就是pending中没有重复的交易,那么对于queue也做同样的替换操作。注意如果queue中不存在该交易是会插入的,pending是不会的。(txpool.queue是个map,key是from地址,value是个list,保存着所有from地址的交易信息,也就是上边提到的list)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (pool *TxPool) add(tx *types.Transaction) (bool, error) {
// If the transaction is already known, discard it
hash := tx.Hash()
from, _ := types.Sender(pool.signer, tx) // already validated

// If the transaction pool is full, reject
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
log.Warn("TxPool is full, reject tx", "current size", len(pool.all),
"max size", pool.config.GlobalSlots+pool.config.GlobalQueue, "hash", hash, "from", from, "to", tx.To())
return false, ErrTxPoolFull
}

// If the transaction is replacing an already pending one, do directly
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {

// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
delete(pool.all, old.Hash())
}
pool.all[tx.Hash()] = tx
pool.tmpqueue[tx.Hash()] = tx

log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

// We've directly injected a replacement transaction, notify subsystems
//TODO why inject event here
//event.FireEvent(&event.Event{Trigger: pool.txPreTrigger, Payload: event.TxPreEvent{tx}, Topic: event.TxPreTopic})

return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
replace, err := pool.enqueueTx(hash, tx)
if err != nil {
return false, err
}
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replace, nil
}

如果交易池中的queue插入成功的话,queue数据发生变化,接下来promoteExecutables方法就是要把queue中某些交易移到pending中,从不可以处理的状态变成可处理的状态

  1. 删除所有nonce太低的交易
  2. 删除帐户余额太低或gas超过交易池限制的交易
  3. 将queue交易添加到pending中,同时会把交易发送到chan通道,chan接收之后广播到网络中的其他peer节点
  4. 删除超过queue容量限制的交易
  5. 最后keepFit处理超过pending容量限制的交易
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    func (pool *TxPool) promoteExecutables(accounts []common.Address) {

    // Gather all the accounts potentially needing updates
    if accounts == nil {
    accounts = make([]common.Address, 0, len(pool.queue))
    for addr := range pool.queue {
    accounts = append(accounts, addr)
    }
    }

    // Iterate over all accounts and promote any executable transactions
    for _, addr := range accounts {
    list := pool.queue[addr]

    if list == nil {

    continue // Just in case someone calls with a non existing account
    }

    // Drop all transactions that are deemed too old (low nonce)
    for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
    hash := tx.Hash()
    log.Trace("Removed old queued transaction", "hash", hash)
    delete(pool.all, hash)
    }

    // Drop all transactions that are too costly (low balance or out of gas)
    drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
    for _, tx := range drops {
    hash := tx.Hash()
    log.Trace("Removed unpayable queued transaction", "hash", hash)
    delete(pool.all, hash)
    }

    // Gather all executable transactions and promote them
    for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
    hash := tx.Hash()
    log.Trace("Promoting queued transaction", "hash", hash,"pool.pendingState.GetNonce(addr)",pool.pendingState.GetNonce(addr))
    pool.promoteTx(addr, hash, tx)

    // Delete a single queue transaction
    if list != nil {
    list.Remove(tx)
    }
    }

    // Drop all transactions over the allowed limit
    for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
    hash := tx.Hash()
    delete(pool.all, hash)
    log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
    }

    // Delete the entire queue entry if it became empty.
    if list.Empty() {
    log.Trace("promoteExecutables list.Empty()")
    delete(pool.queue, addr)
    }
    }

    pool.keepFit()

    }