Ethereum在txpool中的Pending transaction监测方法

Ethereum在txpool中的Pending transaction监测方法,第1张

Pending transaction监测方法

ETH BSC HECO适用:

方法零:解决了方法一导致的qps上限问题

在方法一中,对qps压力最大的过程在于去获取每一笔交易的详细信息,而传统的subsribe无法解决这一点,以此替换,直接使用alchemy的相关功能
alchemy下有一个订阅过滤器:
使用方法:

const WebSocket = require("ws");
let web3 = new Web3(new Web3.providers.WebsocketProvider(url, options));
let RouterContract = new web3.eth.Contract(routerV2_ABI, pancakeSwapRouter);
export const init = function () {
    const WebSocket = require('ws');
    const ws = new WebSocket('wss://eth-mainnet.alchemyapi.io/v2/【输入你在alchemy上的apikey】');
    ws.onopen = function(e){
        console.log("连接服务器成功");
        // 向服务器发送消息
        ws.send("{\"jsonrpc\":\"2.0\",\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"alchemy_filteredNewFullPendingTransactions\", {\"address\": \"0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F\"}]}");
    }
    ws.onmessage = function(e){
    	let message = eval("(" + e.data + ")")
    	【在这里写入你自己的逻辑】
    	}
};
方法一(原有方法不推荐,使用此方法)

使用web3连接到wss接口,使用js进行pendingtransaction的请求
准备工作:

新建文件夹->npm install->npm install web3 搭建好环境
新建js,名字自定,粘贴以下代码
优点:js直接输出
缺点:此例中数据请求节点使用的第三方infura的wss协议的rpc接口,有tps限制,当前以太坊的pendingtransaction超过了150000个,BSC约为491个,HECO约为301个,此方法比较适用于BSC与HECO
那应该咋办呢,白嫖metamask的
Technical documentation on how to connect to blockchain nodes | GetBlock.io

var Web3 = require("web3");
var url = "wss://mainnet.infura.io/ws/v3/d0a9fa5da15448b19980eaa9bf0a9b59";

var options = {
    timeout: 30000,
    clientConfig: {
        maxReceivedFrameSize: 100000000,
        maxReceivedMessageSize: 100000000,
    },
    reconnect: {
        auto: true,
        delay: 5000,
        maxAttempts: 15,
        onTimeout: false,
    },
};

var web3 = new Web3(new Web3.providers.WebsocketProvider(url, options));
const subscription = web3.eth.subscribe("pendingTransactions", (err, res) => {
    if (err) console.error(err);
});

var init = function () {
    subscription.on("data", (txHash) => {
        setTimeout(async () => {
            try {
               // console.log(txHash)
                let tx = await web3.eth.getTransaction(txHash);
                console.log(tx)
            } catch (err) {
                console.error(err);
            }
        });
    });
};

init();
方法二(不推荐,弃用)

!!!!!修改前一定要备份文件
1、修改方法:

修改文件(以go语言客户端为例):

1)添加全局函数:

func checkFileIsExist(filename string) bool {
	var exist = true
	if _, err := os.Stat(filename); os.IsNotExist(err) {
		exist = false
	}
	return exist
}

func checkFileIsExist_and_sizebig(filename string) bool{
	if checkFileIsExist(filename){
		fileInfo, _ := os.Stat(filename)
		if fileInfo.Size()>500000000{
			return true
		}else{
			return false
		}
	}else{
		return false
	}
}

2)promoteExecutables函数添加插桩
在pool.promoteTx(addr, hash, tx){后添加:

//============================**记录被pending的txs**==========================
//============================**Author:BradMoon**===========================
//============================**Date:21/7/29**===========================
var filename = ""

//收集pengding txs信息
hashStr := hash.String()
dataStr := hex.EncodeToString(tx.Data())
nonceStr := strconv.FormatUint(tx.Nonce(), 10)
gasStr := strconv.FormatUint(tx.Gas(), 10)
gasPriceStr := tx.GasPrice().String()
toStr := tx.To().String()
valueStr := tx.Value().String()
var f *os.File

// TODO: 这里不能这么用循环搞
for i := 0; i <= 1000; i++ {
	str_i := strconv.Itoa(i)
	filename = "./pendingTxs" + str_i + ".csv"
	if checkFileIsExist_and_sizebig(filename) {
		continue
	} else {
		if checkFileIsExist(filename) {
			f, _ = os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
			break
		} else {
			f, _ = os.Create(filename) //创建文件
			break
		}
	}
}
writeString := hashStr + "," + dataStr + "," + nonceStr + "," + gasStr + "," + gasPriceStr + "," + toStr + "," + valueStr
writeString+="\r\n"
f.WriteString(writeString)
f.Close()
//============================**记录被pending的txs**==========================
修改后的函数:
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
	// Track the promoted transactions to broadcast them at once
	var promoted []*types.Transaction

	// Iterate over all accounts and promote any executable transactions
	for _, addr := range accounts {
		list := pool.queue[addr]
		if list == nil {
			continue // Just in case someone calls with a non existing account
		}
		// Drop all transactions that are deemed too old (low nonce)
		forwards := list.Forward(pool.currentState.GetNonce(addr))
		for _, tx := range forwards {
			hash := tx.Hash()
			pool.all.Remove(hash)
		}
		log.Trace("Removed old queued transactions", "count", len(forwards))
		// Drop all transactions that are too costly (low balance or out of gas)
		drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
		for _, tx := range drops {
			hash := tx.Hash()
			pool.all.Remove(hash)
		}
		log.Trace("Removed unpayable queued transactions", "count", len(drops))
		queuedNofundsMeter.Mark(int64(len(drops)))

		// Gather all executable transactions and promote them
		readies := list.Ready(pool.pendingNonces.get(addr))
		for _, tx := range readies {
			hash := tx.Hash()
			if pool.promoteTx(addr, hash, tx) {
				//============================**记录被pending的txs**==========================
				//============================**Author:BradMoon**===========================
				//============================**Date:21/7/29**===========================
				var filename = ""

				//收集pengding txs信息
				hashStr := hash.String()
				dataStr := hex.EncodeToString(tx.Data())
				nonceStr := strconv.FormatUint(tx.Nonce(), 10)
				gasStr := strconv.FormatUint(tx.Gas(), 10)
				gasPriceStr := tx.GasPrice().String()
				toStr := tx.To().String()
				valueStr := tx.Value().String()
				var f *os.File

				// TODO: 这里不能这么用循环搞
				for i := 0; i <= 1000; i++ {
					str_i := strconv.Itoa(i)
					filename = "./pendingTxs" + str_i + ".csv"
					if checkFileIsExist_and_sizebig(filename) {
						continue
					} else {
						if checkFileIsExist(filename) {
							f, _ = os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)
							break
						} else {
							f, _ = os.Create(filename) //创建文件
							break
						}
					}
				}
				writeString := hashStr + "," + dataStr + "," + nonceStr + "," + gasStr + "," + gasPriceStr + "," + toStr + "," + valueStr
				writeString+="\r\n"
				f.WriteString(writeString)
				f.Close()
				//============================**记录被pending的txs**==========================
				promoted = append(promoted, tx)
			}
		}
		log.Trace("Promoted queued transactions", "count", len(promoted))
		queuedGauge.Dec(int64(len(readies)))

		// Drop all transactions over the allowed limit
		var caps types.Transactions
		if !pool.locals.contains(addr) {
			caps = list.Cap(int(pool.config.AccountQueue))
			for _, tx := range caps {
				hash := tx.Hash()
				pool.all.Remove(hash)
				log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
			}
			queuedRateLimitMeter.Mark(int64(len(caps)))
		}
		// Mark all the items dropped as removed
		pool.priced.Removed(len(forwards) + len(drops) + len(caps))
		queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
		if pool.locals.contains(addr) {
			localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
		}
		// Delete the entire queue entry if it became empty.
		if list.Empty() {
			delete(pool.queue, addr)
			delete(pool.beats, addr)
		}
	}
	return promoted
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/2992521.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-09-23
下一篇 2022-09-23

发表评论

登录后才能评论

评论列表(0条)

保存