概述

我们在上一篇以太坊机制详解:Gas Price计算文章内介绍了以太坊中交易的具体gas计算相关规则。本篇文章将在此基础上介绍以下内容:

  1. 客户端如何构建一笔交易
  2. 服务端如何处理交易并进行打包

本文以一笔交易的生命周期为主线安排全文,从交易池初始化、交易构造到交易执行,尽可能为读者全景展示以太坊中的交易所涉及的方方面面。

State with Tx

对于交易的打包涉及状态树、数据库等内容,为强化文章专题性,我们将这一部分内容放到下一篇文章内。

交易池初始化

在交易进行之前,节点首先需要完成节点初始化,本节内容主要聚焦于此。

节点需要对以下数据进行初始化:

type TxPoolConfig struct {
	Locals    []common.Address // Addresses that should be treated by default as local
	NoLocals  bool             // Whether local transaction handling should be disabled
	Journal   string           // Journal of local transactions to survive node restarts
	Rejournal time.Duration    // Time interval to regenerate the local transaction journal

	PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
	PriceBump  uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)

	AccountSlots uint64 // Number of executable transaction slots guaranteed per account
	GlobalSlots  uint64 // Maximum number of executable transaction slots for all accounts
	AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
	GlobalQueue  uint64 // Maximum number of non-executable transaction slots for all accounts

	Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

各参数含义如下:

  • Locals 本地账户地址列表。在以太坊执行层节点内,本地交易拥有一系列特权,节点会监控交易来源地址并确定是否将其列为本地交易
  • NoLocals 禁止本地交易处理,将所有交易均列为远程交易(即无特权交易)
  • Journal 对交易池内的本地交易进行数据持久化的文件名
  • Rejournal 进行数据持久化的时间间隔
  • PriceLimit 交易池接受到最小gas价格,在EIP1559交易中代表最小Priority Fee价格
  • PriceBump 交易gas价格提升的最小百分比
  • AccountSlots 单个账户可执行交易的最大容量
  • GlobalSlots 所有账户可执行交易的最大容量
  • AccountQueue 最大单个账户非可执行交易容量
  • GlobalQueue 全局最大非可执行交易容量
  • Lifetime 非可执行交易的存活时间

我们在上文中使用了交易容量而非交易数量,两者具有一定的区别,一般来说,一笔交易占用 1 单位交易容量,具体计算公式如下:

func numSlots(tx *types.Transaction) int {
	return int((tx.Size() + txSlotSize - 1) / txSlotSize)
}

其中txSlotSize为常量,其值为32768tx.Size()代表一笔交易经过RLP编码后的体积,单位为bytes。正常交易都仅占用 1 单位交易容量。

设置上述规则的原因在于避免部分用户构建超大交易占用交易池资源,正常来说一笔交易仅占用2000 bytes左右。

上述各值的初始化如下:

var DefaultTxPoolConfig = TxPoolConfig{
	Journal:   "transactions.rlp",
	Rejournal: time.Hour,

	PriceLimit: 1,
	PriceBump:  10,

	AccountSlots: 16,
	GlobalSlots:  4096 + 1024, // urgent + floating queue capacity with 4:1 ratio
	AccountQueue: 64,
	GlobalQueue:  1024,

	Lifetime: 3 * time.Hour,
}

在交易池中,我们把交易分为远程交易(remote transactions)和本地交易(local transaction)。其中,后者具有以下优先级:

  1. 写入Journal文件中,在节点启动时直接载入交易池
  2. 不受交易池中的PriceLimit等限制
  3. 在交易排序时优先级最高
  4. 不会因为交易队列已满等原因被交易队列剔除

我们使用RPC API向节点发送的交易也属于local transactions(前提为节点将NoLocals设置为False)。

此处的PriceLimit参数对应EIP1559交易中的Max Fee参数,这意味着只要Max Fee大于PriceLimit即可进入交易池

在此处也出现了两种交易类型,如下:

  1. 可执行交易(executable transaction),此交易位于pending队列中,极有可能被节点封装进入下一个区块
  2. 不可执行交易(non-executable transaction),此交易位于queued队列中,不太可能被节点封装进入下一个区块

封装(seal)是目前以太坊对于区块打包的描述

我们通过NewTxPool函数使用上述初始化配置实现交易池的初始化。由于篇幅限制,我们不对其进行详细介绍。

初始化完成后,我们通过SubscribeNewTxsEvent函数订阅在以太坊网络中广播的新的交易项目。

交易构造

本节内容主要介绍客户端如何构造一笔以太坊交易。鉴于本文的很多读者并没有独立运行的以太坊节点,我们在此处主要介绍通过以太坊的API构建交易。

在一笔交易的最初阶段,用户需要完成交易的初始化,设置一笔交易的各个参数。在此处,我们以以太坊标准APIeth_sendTransaction为例向大家介绍一笔交易的具体构成,具体构成如下:

  • type 交易类型,如果使用EIP1559类型交易,则设置为0x02
  • nonce 用户的nonce,此数值会在用户完成每一笔交易后增加1
  • to 交易目标地址
  • from 交易来源地址
  • gasgas limit,具体参考Gas Limit 的获取
  • value 交易转账的ETH数量(单位为wei)
  • input 交易包含的合约运行数据,如果交互对象不是合约,可置为0x
  • gasPrice 如果使用EIP1559,此项可置为空
  • maxPriorityFeePerGas 设置的Max Priority Fee
  • maxFeePerGas 设置Max Fee
  • accessListEIP2930进行了一些规定,由于目前使用较少,我们不进行介绍
  • chainID 链ID,可通过ChainList获得相关数据

接下来,我们尝试使用MetaMask提供的API构建一笔交易。

首先,我们需要任一已被MetaMask授权的网站进行测试。在此处我们以MetaMask的演示网站MetaMask Test Dapp为例。点击CONNECT进行账户授权,授权完成后,可以点击ETH_ACCOUNTS与自己的地址进行对比。完成上述准备后,点击F12进入开发者模式后选择Console进入Javascript终端。

输入以下内容初始化交易参数:

const transactionParameters = {
  to: '0x0000000000000000000000000000000000000000', 
  from: ethereum.selectedAddress, 
  value: '0x00', 
  data:
    '0x7f7465737432000000000000000000000000000000000000000000000000000000600057'
};

我们在此处省略了很多字段,这些字段会被MetaMask自动补齐。

输入内容并运行:

const txHash = await ethereum.request({
  method: 'eth_sendTransaction',
  params: [transactionParameters],
});

如果一切顺利,读者可以看到如下内容: MetaMask API

如果读者需要设置gasvalue等参数,需要注意这些参数均使用wei作为单位,同时使用 16 进制进行编码。1 wei 为 0.000000000000000001 eth

更加详细的对于此API的说明,读者可以自行参考文档或者前往MetaMask JSON-RPC API Reference

值得注意的是,大部分RPC服务商均不支持此API。读者可以发现上述交易中不包含签名,但由于RPC服务商不托管用户私钥,不能对交易进行签名,所以不能进行交易提交。MetaMask钱包中包含用户私钥所以可以调用此函数。

RPC服务商一般允许使用eth_sendRawTransaction接口,此接口需要提交已经完成签名的并使用RLP编码的交易。本质上,MetaMask也调用了此接口。

如果读者希望通过命令行提交交易,可以使用Foundry提供的cast命令,具体可以参考cast send命令,支持上述所有参数。一个最简单的案例如下:

cast send 0x11475691C2CAA465E19F99c445abB31A4a64955C --value 0.001ether --gas-limit 21000 --gas-price 5gwei --priority-gas-price 1.5gwei --private-key $pk --rpc-url https://goerli.infura.io/v3/9aa3d95b3bc440fa88ea12eaa4456161

其中$pk需要替换为用户自己的私钥。--gas-price的含义为Max Fee--priority-gas-price含义为Max priority fee,详细介绍请参考上文给出的文档

由于后文会使用到以太坊内的交易类型,在此处,我们一并给出交易在go-ethereum中的接口,如下:

type TxData interface {
	txType() byte // returns the type ID
	copy() TxData // creates a deep copy and initializes all fields

	chainID() *big.Int
	accessList() AccessList
	data() []byte
	gas() uint64
	gasPrice() *big.Int
	gasTipCap() *big.Int
	gasFeeCap() *big.Int
	value() *big.Int
	nonce() uint64
	to() *common.Address

	rawSignatureValues() (v, r, s *big.Int)
	setSignatureValues(chainID, v, r, s *big.Int)
}

各个参数的含义如下:

  • txType 返回交易的类型,对应type参数
  • gasTipCap 对应交易设置中的maxPriorityFeePerGas,即Max Priority Fee
  • gasFeeCap 对应交易设置中的maxFeePerGas,即Max Fee

其他参数较为简单,读者可以直接通过名称推断其含义,故不再进行介绍。

交易池添加交易

当交易完成设置并通过API发送后,节点中的交易池会接受到此交易,并将其纳入自己的交易队列中。在详细分析交易进入队列之前,我们首先讨论一下以太坊中交易队列的类型。

在以太坊中,我们可以将交易队列使用以下Venn图表示:

Tx Types

所有交易可以根据来源首先被划分为两类:

  1. 本地交易 local transaction
  2. 远程交易 remote transaction

正如前文所言,前者在优先级上高于后者所有交易,所以在以太坊交易池中属于最高等级,需要独立对待。

而远程交易remote transaction则被细分为了以下两个队列:

  1. pending队列 - 此队列数据基本可以保证会被纳入下一个区块,而在交易广播时也只广播此队列内的交易
  2. queued队列(亦称queue队列) - 此队列内的数据只能在交易池刷新队列时可能被纳入pending队列,我们会在后文进行介绍具体的更新规则

但在go-ethereum中,将local transaction保存在了pending队列中,但另一方面保证了local transaction不会被pending队列剔除

go-ethereum中,上述队列定义如下:

pending map[common.Address]*txList   // All currently processable transactions
queue   map[common.Address]*txList   // Queued but non-processable transactions

在正常情况下,当一个新的交易进入节点交易池后,此交易有以下去向:

  1. 大部分情况下直接进入queue队列等待刷新
  2. 少部分用于增加gas费用的交易替换pending中的原有交易被纳入pending队列
  3. 因不满足交易条件而被删除

我们首先分析用于在交易池中加入单个交易的add函数,其代码非常长,我们将逐块分析:

func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error)

函数定义说明,此函数接受交易和标识交易是否为本地交易的local标识符作为输入,返回代表此交易是否替换了其他交易的replaced标识和错误err

整个流程可以使用以下流程图说明:

Tx Add Flow

hash := tx.Hash()
if pool.all.Get(hash) != nil {
	log.Trace("Discarding already known transaction", "hash", hash)
	knownTxMeter.Mark(1)
	return false, ErrAlreadyKnown
}

首先获得交易的哈希值,如果交易与交易池内的任何交易的哈希值相同,我们则丢弃此交易,并抛出异常

isLocal := local || pool.locals.containsTx(tx)

此代码判断交易是否为local transaction。如果满足local标识符为True或交易发送者位于Locals地址列表内条件,则认定此交易为local transaction

if err := pool.validateTx(tx, isLocal); err != nil {
	log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
	invalidTxMeter.Mark(1)
	return false, err
}

使用validateTx函数验证交易是否符合交易池的要求,限于篇幅,我们在此处直接给出validateTx函数验证的内容,具体代码实现请自行查找。具体验证内容如下:

  1. 交易进行RLP编码后体积不大于131072 bytes
  2. 交易的Gas Limit不大于 3000 万(即当前区块GasLimit)
  3. 交易的Max FeeMax Priority Fee不大于2 ^ 256
  4. 交易的Max Priority Fee小于Max Fee
  5. 交易签名正确
  6. 交易的Max Priority Fee大于交易池设置的PriceLimit
  7. 交易的nonce大于交易者当前的nonce
  8. 交易者账户余额可以支付交易的Gas费用
  9. 满足AccessList的一些gas要求

如果用户提交给节点的交易无法满足上述条件,则直接被丢弃。

当交易经过校验后,交易或被纳入queuedpending队列中,这一部分逻辑较为复杂。

首先,我们分析交易池容量已满的情况,我们使用以下的代码判定此情况:

uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue

其中pool.all.Slots()会返回目前交易池内所有交易所占用的交易容量,numSlots(tx)计算准备进入交易池的交易的所占用的交易容量,如果两者之和大于GlobalSlots(所有账户可执行交易的最大容量)和GlobalQueue(全局最大非可执行交易容量),我们可以判断交易池已满。

if !isLocal && pool.priced.Underpriced(tx) {
	log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
	underpricedTxMeter.Mark(1)
	return false, ErrUnderpriced
}

当交易不是优先级最高的本地交易,且交易的gasTipCap(Max Priority Fee)低于交易池内最低gasTipCap时,我们直接丢弃此交易。

交易池内维护有一个由交易价格构成的堆heap,可以快速查找价格最低的交易

if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) {
	throttleTxMeter.Mark(1)
	return false, ErrTxPoolOverflow
}

此处涉及到一个名词Reorg,此名词表示交易池重排。每当一个新的区块被生成,交易池会根据区块中的交易信息对交易池内的交易进行重组,包括在交易池内删除已被打包的交易(这部分交易往往位于pending队列中)、升级符合条件的queued队列中的交易、在已满的队列中删除交易以及广播交易。该部分核心实现为runReorg函数,此函数会在后文多次出现。我们一般使用channel这种特殊的go数据类型与作为单独线程的runReorg函数进行通信。

在此代码中changesSinceReorg代表现在需要重组的交易数量,如果此交易数量大于GlobalQueue(所有账户可执行交易的最大容量)的1 / 4,我们则认为交易池非常拥挤,直接丢弃新的交易。

在交易池启动后,runReorg函数会自动清除已满队列中的交易,开发者认为通过add函数删除太多交易并不合适,具体可参考#23095

drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)

// Special case, we still can't make the room for the new remote one.
if !isLocal && !success {
	log.Trace("Discarding overflown transaction", "hash", hash)
	overflowedTxMeter.Mark(1)
	return false, ErrTxPoolOverflow
}
// Bump the counter of rejections-since-reorg
pool.changesSinceReorg += len(drop)
// Kick out the underpriced remote transactions.
for _, tx := range drop {
	log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
	underpricedTxMeter.Mark(1)
	pool.removeTx(tx.Hash(), false)
}

当我们认为交易可以被删除,我们则进行真正的交易删除步骤。首先通过pool.priced.Discard函数移除占用pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx)单位的交易容量的交易。此函数不会真正删除交易,而是会返回待删除交易的列表,待删除交易的筛选规则为交易由高到低进行排序,优先删除价格最低的交易(本地交易不会被删除)。

如果无法获得待删除列表,且交易不是本地交易,则返回错误。如果获得待删除交易列表,我们会更新changesSinceReorg变量。然后使用pool.removeTx真正执行删除步骤。

当我们完成交易池容量方面的处理后,我们接下来处理一部分特殊的交易,即用于替换交易池pending队列的交易。这种替换交易往往用于增加已经在队列中的交易的gas,保证交易可以尽快完成。

我们可以通过以下代码判断此交易是否为替换交易:

from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx)

首先使用types.Sender(pool.signer, tx)获得交易的具体签名人,然后前往pool.pending队列中查询此用户名下的所有交易,并使用Overlaps函数判断交易是否存在重复。

pool.pending是一个映射pending map[common.Address]*txList,我们可以通过用户地址在其内部快速检索相关交易

当我们发现交易池内已包含此笔交易后,我们会尝试将交易加入交易池,代码如下:

inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
	pendingDiscardMeter.Mark(1)
	return false, ErrReplaceUnderpriced
}

此处使用到了Add函数,此函数接受交易tx后会在交易队列中查询与txnonce相同的交易。获得交易列表内的旧交易后,函数会校验gasTipCapgasFeeCap相较于旧交易的增加幅度是否符合要求,如果满足上述条件,则直接在交易列表内替换旧的交易。同时返回需要替换的旧交易,以满足后续处理流程。当然,如果此处发现Add函数返回替换失败的标识,我们直接放弃替换。

关于Add函数,其具体代码如下:

func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Transaction) {
	// 获取交易列表内 Nonce 相同的旧交易
	old := l.txs.Get(tx.Nonce())
	if old != nil {
		// 要求新交易的 GasFeeCap 和 GasTipCap 大于旧交易
		if old.GasFeeCapCmp(tx) >= 0 || old.GasTipCapCmp(tx) >= 0 {
			return false, nil
		}
		// thresholdFeeCap = oldFC  * (100 + priceBump) / 100
		a := big.NewInt(100 + int64(priceBump))
		aFeeCap := new(big.Int).Mul(a, old.GasFeeCap())
		aTip := a.Mul(a, old.GasTipCap())

		// thresholdTip    = oldTip * (100 + priceBump) / 100
		b := big.NewInt(100)
		thresholdFeeCap := aFeeCap.Div(aFeeCap, b)
		thresholdTip := aTip.Div(aTip, b)

		// 要求新交易的 GasFeeCap 和 GasTipCapCmp 增加幅度大于 PriceBump
		if tx.GasFeeCapIntCmp(thresholdFeeCap) < 0 || tx.GasTipCapIntCmp(thresholdTip) < 0 {
			return false, nil
		}
	}
	// 使用新交易覆盖旧交易
	l.txs.Put(tx)

	// 以下内容为刷新参数
	if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
		l.costcap = cost
	}
	if gas := tx.Gas(); l.gascap < gas {
		l.gascap = gas
	}
	return true, old
}

读者可自行查阅上述代码及其注释理解实现过程

读者可能发现上述流程仅对交易列表进行了替换,而不是对pool交易池的其他参数进行同步更新,所以我们使用以下代码更新交易池内的其他参数:

if old != nil {
	pool.all.Remove(old.Hash())
	pool.priced.Removed(1)
	pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

首先在用于维护交易池内所有交易的pool.all队列中删除此交易,同时在更新交易价格构成的pool.priced队列。使用pool.journalTx方法尝试将此替换交易纳入用于交易数据存储Journal内。

此处使用了尝试是因为在journalTx函数中会对交易的from进行审查,如果交易不来自local地址,则不会进行存储。

在此处,使用了一个较为特殊的函数queueTxEvent,此函数会将交易推送给pool.queueTxEventCh通道,此通道的最终目的地为runReorg函数,并对queueTxEvent队列进行修改。

case tx := <-pool.queueTxEventCh:
	addr, _ := types.Sender(pool.signer, tx)
	if _, ok := queuedEvents[addr]; !ok {
		queuedEvents[addr] = newTxSortedMap()
	}
	queuedEvents[addr].Put(tx)

此处没有设置scheduleReorgLoop中的一个重要参数launchNextRun,此参数用于判断runReorg,即重排过程是否立即执行,若不设置,则意味着重排过程不会立即进行。

queueTxEvents定义如下:

queuedEvents  = make(map[common.Address]*txSortedMap)

其中txSortedMap是一个nonce到交易transaction的堆(heap)。最终,queuedEvents映射会被用于交易广播,以下给出的代码摘自runReorg函数的最后,其中events即此处的queuedEvents

if len(events) > 0 {
	var txs []*types.Transaction
	for _, set := range events {
		txs = append(txs, set.Flatten()...)
	}
	pool.txFeed.Send(NewTxsEvent{txs})
}

完成上述步骤后,我们会更新账户最新的活动时间,完成整个替换交易流程。代码如下:

pool.beats[from] = time.Now()
return old != nil, nil

如果一笔交易既不是对现有交易的替换,我们会使用使用以下代码直接将其推入queue队列中,代码如下:

replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
if err != nil {
	return false, err
}

此处的enqueueTx会将交易推送进入queue队列,限于篇幅,我们在此处以注释的形式解释此函数的源代码:

func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
	from, _ := types.Sender(pool.signer, tx) // already validated
	// 如果 queue 队列内没有此地址记录,则创建一个新的映射
	// queue 的定义为 map[common.Address]*txList
	if pool.queue[from] == nil {
		pool.queue[from] = newTxList(false)
	}
	// 使用 (l *txList) Add 直接加入交易
	// 详情请参考上文给出的代码解析
	inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)

	// 处理插入失败的情况
	if !inserted {
		queuedDiscardMeter.Mark(1)
		return false, ErrReplaceUnderpriced
	}

	if old != nil {
		// 发生替换交易情况,刷新 pool 参数
		// 类似上文提到的 替换交易 的后续处理
		pool.all.Remove(old.Hash())
		pool.priced.Removed(1)
		queuedReplaceMeter.Mark(1)
	} else {
		// 没有发生替换交易的情况,增加计数器
		// 计数器用于评估等功能
		queuedGauge.Inc(1)
	}
	// If the transaction isn't in lookup set but it's expected to be there,
	// show the error log.
	if pool.all.Get(hash) == nil && !addAll {
		log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
	}
	// 刷新 pool 中的 all 和 priced 变量
	if addAll {
		pool.all.Add(tx, local)
		pool.priced.Put(tx, local)
	}
	// 刷新交易账户的生命周期
	if _, exist := pool.beats[from]; !exist {
		pool.beats[from] = time.Now()
	}
	return old != nil, nil
}

完成上述重要任务后,最后我们处理本地交易的问题,代码如下:

if local && !pool.locals.contains(from) {
	log.Info("Setting new local account", "address", from)
	pool.locals.add(from)
	pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
}
if isLocal {
	localGauge.Inc(1)
}
pool.journalTx(from, tx)

log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replaced, nil

如果发现交易被标记为local,但账户没有被标记为local,则直接将交易发生账户列入locals名单内并对此地址下的所有交易进行提权至本地交易(local transaction),最终使用journalTx存储本地交易。

当然,正常情况下我们更有可能一次性增加大量交易,所以在源代码中,我们可以看到大量函数都使用了addTxs函数,而此函数中的一个核心部分是addTxsLocked函数,我们首先介绍此函数。

代码如下:

func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
	dirty := newAccountSet(pool.signer)
	errs := make([]error, len(txs))
	for i, tx := range txs {
		replaced, err := pool.add(tx, local)
		errs[i] = err
		if err == nil && !replaced {
			dirty.addTx(tx)
		}
	}
	validTxMeter.Mark(int64(len(dirty.accounts)))
	return errs, dirty
}

此处使用到了pool.add函数用于向交易池内增加交易,但为了方便使用runReorg函数进行交易重排,在此处定义了一个较为特殊的dirty变量,此变量为一个交易发送者地址列表,我们使用了dirty.addTx(tx)向此地址列表内增加新的交易发送者信息。

此函数名内包含Locked字样,这意味着此函数必须在交易池拿到线程锁时才能使用

我们也对此函数进行分析。

func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error

在此函数中,各参数含义如下:

  • txs代表需要加入交易池的交易集合
  • local用于标识此交易集合内的交易是否为本地交易
  • sync用于标识此交易集合内的交易是否立即用于提权,用于测试

正如前文所述,一般来说交易会被直接推入queue队列内,而函数runReorg会定期运行对交易进行提权至pending队列内。如果将addTxs中的sync设置为True,则意味着在下一次runReorg运行时会直接进行提权

此函数的流程图如下:

AddTxs

addTxs函数首先对交易进行了一个简单的验证,具体代码如下:

var (
	errs = make([]error, len(txs))
	news = make([]*types.Transaction, 0, len(txs))
)
for i, tx := range txs {
	// 验证交易是否已经存在在交易池内
	if pool.all.Get(tx.Hash()) != nil {
		errs[i] = ErrAlreadyKnown
		knownTxMeter.Mark(1)
		continue
	}
	// 验证交易签名是否正确
	_, err := types.Sender(pool.signer, tx)
	if err != nil {
		errs[i] = ErrInvalidSender
		invalidTxMeter.Mark(1)
		continue
	}
	// 将交易添加到 news 列表内
	news = append(news, tx)
}
if len(news) == 0 {
	return errs
}

在完成基本的交易校验后,使用pool.addTxsLocked函数将交易加入交易池内,代码如下:

pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
pool.mu.Unlock()

此处使用了pool.mu.Lock()pool.mu.Unlock()实现交易池线程锁定和解锁。

var nilSlot = 0
for _, err := range newErrs {
	for errs[nilSlot] != nil {
		nilSlot++
	}
	errs[nilSlot] = err
	nilSlot++
}

一个简单的for循环实现newErrs中的错误到err的转移。

done := pool.requestPromoteExecutables(dirtyAddrs)
if sync {
	<-done
}
return errs

此函数实际上实现了将交易通过channel推送给runReorg的作用。其中requestPromoteExecutables的定义如下:

func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} {
	select {
	case pool.reqPromoteCh <- set:
		return <-pool.reorgDoneCh
	case <-pool.reorgShutdownCh:
		return pool.reorgShutdownCh
	}
}

对于具体的channel作用,我们会在下一节进行介绍。

交易重排

我们在上一节内大量提到了runReorg函数及其作用,在本节我们将介绍runReorg实现交易重排的具体原理及其实现。

runReorg函数是由scheduleReorgLoop函数启动,而scheduleReorgLoop函数在交易池初始化时就被调用,具体可以参考NewTxPool函数中的下述代码:

go pool.scheduleReorgLoop()

而在scheduleReorgLoop函数内,我们可以看到大量的channel的使用。我对于golang语言中的channel使用并不是非常熟悉。所以在后文内可能出现错误,发现错误的读者可以通过我的博客中给出的邮箱地址向我反馈。

我们首先给出一系列的channel定义:

reqResetCh      chan *txpoolResetRequest
reqPromoteCh    chan *accountSet
queueTxEventCh  chan *types.Transaction
reorgDoneCh     chan chan struct{}
reorgShutdownCh chan struct{}  // requests shutdown of scheduleReorgLoop

其传输的信息主要为:

  • reqResetCh 传输用于区块更新的相关信息
  • reqPromoteCh 传输用于更新的指定地址集合
  • queueTxEventCh 传输用于加入queued队列交易的信息
  • reorgDoneCh 传输由空结构体构成的channel
  • reorgShutdownCh 传输reorg停止信号

一个简单的示例图,如下:

scheduleReorgLoop

在这些channel中,较难理解的是reorgDoneChreorgShutdownCh,这两个变量的设计是为了保证并发的正确性。我们首先介绍reorgDoneCh变量,此变量非常奇怪属于chan chan struct{}类型。

对于任何一个channel,分析其作用的最好方法就是分析其数据发送者和数据接收者。reorgDoneCh的数据发送者代码如下:

case req := <-pool.reqResetCh:
	// Reset request: update head if request is already pending.
	if reset == nil {
		reset = req
	} else {
		reset.newHead = req.newHead
	}
	launchNextRun = true
	pool.reorgDoneCh <- nextDone

case req := <-pool.reqPromoteCh:
	// Promote request: update address set if request is already pending.
	if dirtyAccounts == nil {
		dirtyAccounts = req
	} else {
		dirtyAccounts.merge(req)
	}
	launchNextRun = true
	pool.reorgDoneCh <- nextDone

上述代码均来自scheduleReorgLoop函数内,我们可以看到都是在进行一系列数据处理后在进行推送nextDone。其中nextDone的定义为make(chan struct{}),是符合channel的类型要求的。

上文给出的case代码块内,我们可以看到launchNextRun被设置为true,这意味着重排会立即进行。当然,此时进行的重排也会对上文介绍的queuedEvents中的内容一并进行重排。

我们进一步分析数据接收者,代码如下:

func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
	select {
	case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
		return <-pool.reorgDoneCh
	case <-pool.reorgShutdownCh:
		return pool.reorgShutdownCh
	}
}

func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} {
	select {
	case pool.reqPromoteCh <- set:
		return <-pool.reorgDoneCh
	case <-pool.reorgShutdownCh:
		return pool.reorgShutdownCh
	}
}

这些接收函数均直接选择将pool.reorgDoneCh内的空channel作为return返回,如果读者进一步研究这两个函数的应用会发现函数的return值并没有被具体的运行逻辑使用。

造成这种情况的原因是reorgDoneCh的目的仅是保证reqResetChreqPromoteCh函数发送给reqResetChreqPromoteCh的数据会被scheduleReorgLoop正确处理后关闭。更加详细的解释是当我们通过return <-pool.reorgDoneCh获得一个channel(即nextDone)时,由于channel自身具有阻塞性,主函数只有在scheduleReorgLoop进行完数据处理(即上文给出的case块)运行后退出。这一行为有效保障函数运行的同步。这种运行逻辑与async/await类似,在golang中,类似reorgDoneChchan chan struct{}是一种重要的无锁队列结构,

假如我们不进行reorgDoneCh队列操作,那么使用requestPromoteExecutablesaddTxs函数就可以无视scheduleReorgLoop的数据处理流程而自行工作,这可能导致数据在scheduleReorgLoop进行数据处理操作时被推入函数,造成并发冲突。

reorgDoneCh代表的chan chan struct{}是 无锁 Channel 的重要实现方式,其他实现方式可以参考Go channels on steroids。如果读者想进一步深入学习,建议阅读Go语言设计与实现

当然,读者可以分析nextDone也是一个channel,在addTxs函数中,我们使用了以下代码:

done := pool.requestPromoteExecutables(dirtyAddrs)
if sync {
	<-done
}

此代码从done中获取成员,可以保证在addTxs一定在runReorg运行完后结束。

上述流程都使用了channel的阻塞特性,如果读者对于go语言的此部分不熟悉,可以参考Go Channel 详解

另一个较难理解的队列为reorgShutdownCh队列,此队列用于系统关闭。但我们分析此队列没有数据发送者,但存在大量的数据接收者,如下:

case <-pool.reorgShutdownCh:
	// Wait for current run to finish.
	if curDone != nil {
		<-curDone
	}
	close(nextDone)
	return

以上代码块来自scheduleReorgLoop函数,此函数尝试早reorgShutdownCh队列中获得内容,但由于没有数据发送者,这会导致阻塞情况的发送,此case不会被激活。继续查找相关代码,我们可以查询到以下代码:

case <-pool.chainHeadSub.Err():
	close(pool.reorgShutdownCh)
	return

此代码的关键在于close(pool.reorgShutdownCh),在go语言中,当我们关闭channel时,channel的阻塞状态消失,接收者可以在channel中获得nil类型的成员。这意味着当我们关闭reorgShutdownCh时,上文给出的case <-pool.reorgShutdownCh会被激活,在此处依旧使用了channel的阻塞特性,通过阻塞curDone空队列保证了runReorg运行结束后,再进行return操作。

在上文中,我们讨论了scheduleReorgLoop与一系列channel之间的相互关系和数据流动。简单来说,所有对于交易重排的数据都会使用channel推送给的scheduleReorgLoop函数,此函数通过select结构进行分类处理,这些处理后的数据最终都会被用于一个关键函数,也是我们下文主要介绍的函数,即runReorg函数。

runReorg的开始设定了一些用于记录和关闭channeldefer部分,由于这些内容并不重要且主要涉及golang语言特性,所以在此处为我们直接跳过此部分。同时,在下文中我们认为reset结构体为空,因为此结构体主要用于接受新的区块后进行交易池更新,与我们此处设定的单笔交易进入交易池流程不符,我们会在本文的在介绍交易池更新时给出对于此部分的解析。

我们首先给出函数的定义,代码如下:

func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap)

各参数含义如下:

  1. done 用于阻塞状态的channel,实现addTxs中的sync作用
  2. reset 用于区块更新后刷新交易池的结构体,在本节我们不会详细说明
  3. dirtyAccounts 此变量为需要提权交易的账户集合,由上文给出的addTxs函数增加账户
  4. events 用于广播的替换交易,其成员主要来自上文给出的add函数中的替换交易池内已有交易部分

我们首先处理对账户集合内的账户进行提权的过程。代码如下:

var promoteAddrs []common.Address
if dirtyAccounts != nil && reset == nil {
	promoteAddrs = dirtyAccounts.flatten()
}
pool.mu.Lock()
promoted := pool.promoteExecutables(promoteAddrs)

我们在上述代码中省略了reset部分。其中的核心函数是promoteExecutables,其代码及对应的注释如下:

func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
	var promoted []*types.Transaction

	// 循环 accounts 并寻找可提权交易
	for _, addr := range accounts {
		list := pool.queue[addr]
		if list == nil {
			continue // Just in case someone calls with a non existing account
		}
		// 丢弃所有低于当前状态账户 nonce 的交易
		forwards := list.Forward(pool.currentState.GetNonce(addr))
		for _, tx := range forwards {
			hash := tx.Hash()
			pool.all.Remove(hash)
		}
		log.Trace("Removed old queued transactions", "count", len(forwards))
		// 丢弃所有账户余额无法满足 gas 消耗的交易
		drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
		for _, tx := range drops {
			hash := tx.Hash()
			pool.all.Remove(hash)
		}
		log.Trace("Removed unpayable queued transactions", "count", len(drops))
		queuedNofundsMeter.Mark(int64(len(drops)))

		// 使用 pool.pendingNonces.get(addr) 获得 pending 队列中的最大 nonce
		// 使用 list.Ready 在 quenue 队列中获得上述 nonce 后的交易
		readies := list.Ready(pool.pendingNonces.get(addr))
		for _, tx := range readies {
			hash := tx.Hash()
			// 使用 pool.promoteTx 函数对交易进行提权
			// 即将交易自 quenue 转移到 pending
			if pool.promoteTx(addr, hash, tx) {
				promoted = append(promoted, tx)
			}
		}
		log.Trace("Promoted queued transactions", "count", len(promoted))
		queuedGauge.Dec(int64(len(readies)))

		// 判断用户的 quenue 队列中交易数量是否符合需求
		// 如不符合则删除部分交易
		var caps types.Transactions
		if !pool.locals.contains(addr) {
			caps = list.Cap(int(pool.config.AccountQueue))
			for _, tx := range caps {
				hash := tx.Hash()
				pool.all.Remove(hash)
				log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
			}
			queuedRateLimitMeter.Mark(int64(len(caps)))
		}
		// Mark all the items dropped as removed
		pool.priced.Removed(len(forwards) + len(drops) + len(caps))
		queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
		if pool.locals.contains(addr) {
			localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
		}
		// Delete the entire queue entry if it became empty.
		if list.Empty() {
			delete(pool.queue, addr)
			delete(pool.beats, addr)
		}
	}
	return promoted
}

综上所述,对于交易在quenuepending的转换并没有及其严格的审查,更不会校验交易中的内容是否可以正常运行。

可能有读者发现在promoteExecutables函数内,我们没有对交易池pending队列长度等内容进行检测,原因在于这一部分是通过truncate系列函数实现的,代码如下:

pool.truncatePending()
pool.truncateQueue()

限于篇幅,我们无法具体分析这两个函数的代码实现,但我们仍会给出相关的逻辑实现。

truncatePending会将所有交易数量超过交易池限制AccountSlots且不在本地账户的账户地址构成一个以交易数量为权重的prque。完成队列构建后,代码会对此队列按权重进行循环,即权重较大者首先进入循环,并在每一次循环中,将权重较大的部分交易数量依次减去 1 ,直至总的交易数量满足交易池要求为止。值得注意的是,此流程没有考虑账户本身的交易数量限制。

如果经过上述循环依旧不满足交易池要求,则以账户最大可执行交易数量为限制进行循环,直至满足要求。

建议阅读源代码理解部分

truncatePending函数删除账户交易时,首先按账户进入交易池的顺序进行排列,较晚进入交易池的交易被优先清理。如果此地址内的交易小于需要丢弃的交易总量,则删除此账户下的所有交易。否则,则仅删除满足要求的交易。

关于以上内容,一个较好的参考资料是以太坊技术与实现,读者可以自行参考阅读

完成上述流程后,我们进行数据统计和变量重置工作,代码如下:

dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg))
pool.changesSinceReorg = 0 // Reset change counter
pool.mu.Unlock()

dropBetweenReorgHistogram用于统计在两次reorg之间升级的交易数量。统计后重置changesSinceReorg变量,并释放线程锁。

上述步骤完成了对于dirtyAccounts中的交易的提权,我们还需要对events中的单个交易进行广播。此过程的代码我们已在上文解释add函数中的交易替换部分时给出,此处不再赘述。

区块打包

在完成交易进入交易池、交易提升至pending队列后,我们需要处理区块打包问题,考虑到文章的专题性,本节不会讨论以下问题:

  1. 区块的具体结构和生成方法
  2. PoS共识算法

本节仅关注交易池与区块打包的对接部分。此部分主要位于miner/worker.go文件内,我们所需要介绍内容的核心函数为fillTransactions,代码如下:

func (w *worker) fillTransactions(interrupt *int32, env *environment) error {
	// Split the pending transactions into locals and remotes
	// Fill the block with all available pending transactions.
	pending := w.eth.TxPool().Pending(true)
	localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
	for _, account := range w.eth.TxPool().Locals() {
		if txs := remoteTxs[account]; len(txs) > 0 {
			delete(remoteTxs, account)
			localTxs[account] = txs
		}
	}
	if len(localTxs) > 0 {
		txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
		if err := w.commitTransactions(env, txs, interrupt); err != nil {
			return err
		}
	}
	if len(remoteTxs) > 0 {
		txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
		if err := w.commitTransactions(env, txs, interrupt); err != nil {
			return err
		}
	}
	return nil
}

这一部分代码中使用了types.NewTransactionsByPriceAndNonce函数。调用此函数会构造以下类型:

type TransactionsByPriceAndNonce struct {
	txs     map[common.Address]Transactions // Per account nonce-sorted list of transactions
	heads   TxByPriceAndTime                // Next transaction for each unique account (price heap)
	signer  Signer                          // Signer for the set of transactions
	baseFee *big.Int                        // Current base fee
}

其中,heads属于TxByPriceAndTime类型构成堆(heap),其具体定义为[]*TxWithMinerFee,进一步TxWithMinerFee的定义如下:

type TxWithMinerFee struct {
	tx       *Transaction
	minerFee *big.Int
}

此处出现了一个变量minerFee,此变量与gas费用有关,其具体的计算公式如下:

min(GasTipCap, gasFeeCap-baseFee)
等价于
min(maxPriorityFeePerGas, maxFeePerGas-BaseFee)

在后文中,我们经常使用func (*TransactionsByPriceAndNonce).Pop()函数,此函数会返回交易队列中minerFee最大的交易,如果minerFee相同则返回发现时间较早的交易。

在完成NewTransactionsByPriceAndNonce函数后,我们将使用commitTransactions函数,此函数在本节中属于核心地位,我们将着重介绍。

此函数的定义如下:

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error

在此处,我们忽略用于用于传输中断信息的interrupt变量,而另一个变量env则存储有封装区块所需要的一系列其他参数,在此次我们仍将省略不谈。

接下来,我们分析其代码构成:

第一步,初始化gas pool,并限定区块可用gas,代码如下:

gasLimit := env.header.GasLimit
if env.gasPool == nil {
	env.gasPool = new(core.GasPool).AddGas(gasLimit)
}

其中,函数AddGas的功能是提供gas限额。关于区块的GasLimit的讨论,可以参考以太坊机制详解:Gas Price计算中的内容。

接下来,我们会进入到一个for循环,此循环没有限定条件,仅能依靠循环体内的break跳出。

在此循环内首先检查了interrupt变量,我们跳过此部分。然后,检查了gasPool的余额,代码如下:

if env.gasPool.Gas() < params.TxGas {
	log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
	break
}

如果检测到gasPool内的gas剩余小于21000,我们认为已达到区块的gasLimit,跳出循环。

检测完gas限制后,我们进一步检测交易队列的情况,若此笔交易位于队列最后,则退出循环。代码如下:

tx := txs.Peek()
if tx == nil {
	break
}

其中,tx.Peek()会返回TransactionsByPriceAndNonce堆中的下一个元素,但与pop不同的是此操作不会影响堆的结构。

在完成上述步骤后,我们对一项非常重要的参数进行校验,即判断交易的签名是否符合EIP155的规定,关于EIP155签名的详细内容,可以参考基于链下链上双视角深入解析以太坊签名与验证。代码如下:

if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
	log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block)

	txs.Pop()
	continue
}

如果交易不符合EIP155的规定,交易会不配剔除打包序列。

完成基本的校验后,我们准备执行交易,执行交易的代码如下:

env.state.Prepare(tx.Hash(), env.tcount)

logs, err := w.commitTransaction(env, tx)

在此段代码内,我们首先初始化state,即用来记录状态变化的数据库。在第二行里,我们通过commitTransaction正式提交交易。运行交易步骤包含大量的函数调用,限于篇幅,我们无法完整介绍。在此处,我们仅给出一系列函数调用中最核心的代码,如下:

if contractCreation {
	ret, _, st.gas, vmerr = st.evm.Create(sender, st.data, st.gas, st.value)
} else {
	// Increment the nonce for the next transaction
	st.state.SetNonce(msg.From(), st.state.GetNonce(sender.Address())+1)
	ret, st.gas, vmerr = st.evm.Call(sender, st.to(), st.data, st.gas, st.value)
}

如果交易涉及合约创建,则调用st.evm.Create,否则则调用st.evm.Call函数,所以即使交易仅是一笔转账交易,以太坊节点依旧会调用EVM,这与一般的认识是不相符的。

完成交易提交后,我们通过switch语句处理交易运行失败的各种情况,读者可以阅读相关代码。如果一切正常,我们运行以下代码块:

case errors.Is(err, nil):
	// Everything ok, collect the logs and shift in the next transaction from the same account
	coalescedLogs = append(coalescedLogs, logs...)
	env.tcount++
	txs.Shift()

此代码会将日志推送到coalescedLogs日志中,然后运行tx.Shift()执行下一个交易。

在完成交易的执行后,我们使用以下代码将运行日志作为订阅源供用户使用:

if !w.isRunning() && len(coalescedLogs) > 0 {
	cpy := make([]*types.Log, len(coalescedLogs))
	for i, l := range coalescedLogs {
		cpy[i] = new(types.Log)
		*cpy[i] = *l
	}
	w.pendingLogsFeed.Send(cpy)
}
return nil

我们首先对coalescedLogs进行复制,然后直接使用Send发送订阅源。此订阅源并不是在广播交易,而只是供终端用户使用。

在进行使用前,请确保拥有一个ws以太坊节点,在此处,我们使用了infura提供的服务。除此之外,读者应安装ws的客户端,在此处,我使用了utws作为客户端。输入以下命令:

uwsc wss://mainnet.infura.io/ws/v3/YOUR_API_KEY

回车后,在>后键入以下内容:

{"jsonrpc":"2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}

输入如下图: utws input

输出如下图: utws output

在输出中,result代表交易的哈希值,读者可https://etherscan.io/tx/{result}形式的网址访问到交易详情。

最后,我们介绍用于区块密封的函数,代码如下:

func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
	work, err := w.prepareWork(params)
	if err != nil {
		return nil, err
	}
	defer work.discard()

	if !params.noTxs {
		interrupt := new(int32)
		timer := time.AfterFunc(w.newpayloadTimeout, func() {
			atomic.StoreInt32(interrupt, commitInterruptTimeout)
		})
		defer timer.Stop()

		err := w.fillTransactions(interrupt, work)
		if errors.Is(err, errBlockInterruptedByTimeout) {
			log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout))
		}
	}
	return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}

目前以太坊已经完成了合并,所以此处最终的出块是由engine完成,即共识引擎。

简单给出一个当前以太坊的节点架构图: Ethereum Node Chart

简单来说,当共识客户端(Consensus Client)被选为区块提案者(proposer)后,会在执行客户端(Exection Client)的交易池内筛选交易,并由执行客户端运行交易。最终,共识客户端将打包后的交易进行广播,由其他节点进行投票,最终确定一个区块。

目前对于合并后的基础架构,概览性资料较少,我目前仍在研究相关内容,上述的流程可能有错误。如果您发现错误,请通过我的博客给出的邮箱与我联系。

在此处,我们基本完成了一笔交易在以太坊执行层内的完整流程,接下来,我们介绍当区块到达交易池后,交易池重构的相关内容。

交易池重构

我们在上文仅考虑了节点直接生产区块的区块,但在实际情况中,节点更有可能无法生产区块,而仅仅作为区块的接受方,接受其他节点生产的区块。我们势必限需要讨论节点在接受到其他节点发送的区块时如何进行交易池重构的问题 。

在交易池主循环内,我们可以找到如下代码:

case ev := <-pool.chainHeadCh:
	if ev.Block != nil {
		pool.requestReset(head.Header(), ev.Block.Header())
		head = ev.Block
	}

当交易池在chainHeadCh内获得新的区块头后,交易池会启动requestReset函数,此函数我们在上文已有所介绍,requestReset函数会将请求发送到reqResetCh的通道内,最终由scheduleReorgLoop函数接受,代码如下:

case req := <-pool.reqResetCh:
	// Reset request: update head if request is already pending.
	if reset == nil {
		reset = req
	} else {
		reset.newHead = req.newHead
	}
	launchNextRun = true
	pool.reorgDoneCh <- nextDone

实际最终还是由runReorg运行,我们在此处仅接受上文未介绍的Reset部分,第一部分的代码如下:

if reset != nil {
	// Reset from the old head to the new, rescheduling any reorged transactions
	pool.reset(reset.oldHead, reset.newHead)

	// Nonces were reset, discard any events that became stale
	for addr := range events {
		events[addr].Forward(pool.pendingNonces.get(addr))
		if events[addr].Len() == 0 {
			delete(events, addr)
		}
	}
	// Reset needs promote for all addresses
	promoteAddrs = make([]common.Address, 0, len(pool.queue))
	for addr := range pool.queue {
		promoteAddrs = append(promoteAddrs, addr)
	}
}

我们首先介绍除了reset外的其他部分,在完成reset函数后,我们首先删除events(内部含有准备加入quenue队列的交易)内所有小于当前状态数据内的nonce的交易。小于当前状态数据库内的nonce意味着此交易已被打包,不需要进行进一步处理。

然后,我们将当前交易池内quenue队列中的所有交易列入promoteAddrs中,这意味着在之后的代码运行中,这些交易均会被升级为pending队列内的交易。

最后,我们介绍较为复杂的reset函数,此函数负责向交易池内提交差异交易,而不负责删除交易,删除交易的代码我们会在第二部分进行介绍。

我们首先介绍第一个if语句内的交易:

if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
	// 获得区块编号
	oldNum := oldHead.Number.Uint64()
	newNum := newHead.Number.Uint64()
	// 当新旧区块差异过大时,不进行处理
	// 此种区块一般发生在节点同步数据时
	if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
		log.Debug("Skipping deep transaction reorg", "depth", depth)
	} else {
		// 声明需要丢弃和包含的交易
		var discarded, included types.Transactions
		var (
			// 获取区块完整数据
			rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
			add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
		)
		if rem == nil {
			// 此情况属于特殊情况,即旧区块无法检索
			if newNum >= oldNum {
				log.Warn("Transaction pool reset with missing oldhead",
					"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
				return
			}
			log.Debug("Skipping transaction reset caused by setHead",
				"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
		} else {
			for rem.NumberU64() > add.NumberU64() {
				discarded = append(discarded, rem.Transactions()...)
				if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
					log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
					return
				}
			}
			for add.NumberU64() > rem.NumberU64() {
				included = append(included, add.Transactions()...)
				if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
					log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
					return
				}
			}
			for rem.Hash() != add.Hash() {
				discarded = append(discarded, rem.Transactions()...)
				if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
					log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
					return
				}
				included = append(included, add.Transactions()...)
				if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
					log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
					return
				}
			}
			reinject = types.TxDifference(discarded, included)
		}
	}
}

我们在源代码中给出了部分注释,但对于一些特殊情况,我们在此进行解释。

rem == nil情况,此情况较为特殊,我们无法在数据库内检索到旧的区块,此种情况可细分:

  1. newNum >= oldNum 新区块编号大于或等于旧区块,此种情况下意味着我们之前使用的链并不位于主链上,而是位于分支链上。此种情况下,我们不需要对交易池进行特别处理,等待再获得一个区块重置变量即可。
  2. 其他情况,这些情况都较为玄学,可能是函数运行出现问题,也可以通过等待区块重置变量解决问题

除了上面这种情况,我们还会遇到以下情况:

  1. 旧区块大于新区块 这意味着节点获得了一个可能来自分支链的块广播,我们将旧区块的交易列入discarded序列内。正如上文所述,此函数其实不会丢弃交易,discarded仅作为交易序列名存在。我们认为旧区块内的交易都应该删除,无论旧区块是否位于主分支等情况

  2. 新区块大于旧区块 正常情况,将新区块内的交易列入included列表内

在当前以太坊PoS情况下,很难出现分支链等情况

当然,只要新区块与旧区块的哈希值不同,我们就需要处理其内部的交易,即rem.Hash() != add.Hash()情况,在这种情况下,我们将旧区块内的交易列入discarded,并将新区块内的交易列入included。我们也做了两个校验,代码如下:

rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil

上述代码用于判断旧区块是否存在上一个区块,如果没有,则说明此区块不可信,应该直接抛弃。

还有一种极其特殊的情况,代码如下:

if newHead == nil {
	newHead = pool.chain.CurrentBlock().Header() // Special case during testing
}

正如注释,此情况仅用于测试。

在完成上述步骤后,我们通过reinject = types.TxDifference(discarded, included)获得两个区块交易集合之间的差集。这也是我们需要补充到交易池内的交易,并同时根据新的区块更新部分设置,代码如下:

statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
	log.Error("Failed to reset txpool state", "err", err)
	return
}
pool.currentState = statedb
pool.pendingNonces = newTxNoncer(statedb)
pool.currentMaxGas = newHead.GasLimit

// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)

// Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1))
pool.istanbul = pool.chainconfig.IsIstanbul(next)
pool.eip2718 = pool.chainconfig.IsBerlin(next)
pool.eip1559 = pool.chainconfig.IsLondon(next)

上述内容基本就是根据新区块对各个变量进行重新设置,较为简单,不再赘述。

代码中出现的statedb是以太坊的状态数据库,存储有账户等信息,我们会在未来介绍

接下来,我们分析runReorg的第二部分,此部分会删除部分交易。代码如下:

if reset != nil {
	pool.demoteUnexecutables()
	if reset.newHead != nil && pool.chainconfig.IsLondon(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) {
		pendingBaseFee := misc.CalcBaseFee(pool.chainconfig, reset.newHead)
		pool.priced.SetBaseFee(pendingBaseFee)
	}
	// Update all accounts to the latest known pending nonce
	nonces := make(map[common.Address]uint64, len(pool.pending))
	for addr, list := range pool.pending {
		highestPending := list.LastElement()
		nonces[addr] = highestPending.Nonce() + 1
	}
	pool.pendingNonces.setAll(nonces)
}

我们首先分析demoteUnexecutables函数,代码如下:

func (pool *TxPool) demoteUnexecutables() {
	// 迭代交易池内的 pending 队列
	for addr, list := range pool.pending {
		nonce := pool.currentState.GetNonce(addr)

		// 删除交易中 nonce 较低的交易
		olds := list.Forward(nonce)
		for _, tx := range olds {
			hash := tx.Hash()
			pool.all.Remove(hash)
			log.Trace("Removed old pending transaction", "hash", hash)
		}
		// 丢弃到所有账户无法支付 gas 费用的交易
		// drops 是所有无法支付 gas 费用的交易
		// invalids 是指低于 drops 中最低 nonce 交易的列表
		drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
		for _, tx := range drops {
			hash := tx.Hash()
			log.Trace("Removed unpayable pending transaction", "hash", hash)
			pool.all.Remove(hash)
		}
		pendingNofundsMeter.Mark(int64(len(drops)))
		// invalids 内包含的交易可能符合要求,也有可能不符合要求
		// 所以交易应该进入 queued 队列
		for _, tx := range invalids {
			hash := tx.Hash()
			log.Trace("Demoting pending transaction", "hash", hash)

			// Internal shuffle shouldn't touch the lookup set.
			pool.enqueueTx(hash, tx, false, false)
		}
		// 更新计数器
		pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
		if pool.locals.contains(addr) {
			localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
		}
		// 交易列表长度大于 0 ,但交易列表内没有找到 nonce 为最新 nonce 的交易
		// 这意味着交易列表内的所有交易都是可能过时的
		if list.Len() > 0 && list.txs.Get(nonce) == nil {
			// 利用 Cap 返回交易列表内所有交易
			// Cap 的功能是限制列表内的交易数,此处限制为 0
			gapped := list.Cap(0)
			// 执行交易降级
			for _, tx := range gapped {
				hash := tx.Hash()
				log.Error("Demoting invalidated transaction", "hash", hash)

				// Internal shuffle shouldn't touch the lookup set.
				pool.enqueueTx(hash, tx, false, false)
			}
			// 重置计数器
			pendingGauge.Dec(int64(len(gapped)))
			blockReorgInvalidatedTx.Mark(int64(len(gapped)))
		}
		// 如果发现 list(账户地址对应的交易列表)为空,则直接在交易池内删除此账户
		if list.Empty() {
			delete(pool.pending, addr)
		}
	}
}

我们通过注释分析了以上代码,此代码的一大特点是在新区块到来时会对pending队列中不符合新区块要求的交易进行降级处理。

在完成交易降级流程后,我们使用以下代码进行nonce的重置:

nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
	highestPending := list.LastElement()
	nonces[addr] = highestPending.Nonce() + 1
}
pool.pendingNonces.setAll(nonces)

我们通过list.LastElement()获得最新的交易,然后将交易的nonce + 1作为我们目前跟踪的nonce

此处使用的nonce是在区块生产期间内使用,所以我们无法通过statedb内的数据获得。

总结

本文主要介绍了一个交易从构建到打包进入区块的完整过程,基本分析了以太坊交易池的实现和构成,也设计了部分交易打包和执行的内容。主要内容列表如下:

  1. 以太坊交易池的基本参数和初始化
  2. 交易参数的含义与使用MetaMask API构建交易
  3. 交易池内交易队列
  4. 交易池增加交易使用的函数
  5. 交易池内的scheduleReorgLoop调度函数及相关channel
  6. runReorg函数实现交易提权的过程
  7. 交易打包和执行的基本情况
  8. runReorg在新区块到达情况下重置交易池状态的情况

考虑到读者可以希望自己阅读源代码,此处给出关于交易的核心函数流程图,为了简单,此流程图省略了部分数据结构,如下:

Tx Function Flow