以太坊bloom和logs及代码解析

以太坊bloom和logs及代码解析,第1张

一. 以太坊中的Bloom filter 的使用

以太坊中,通过eth_getTransactionReceipt方法获取交易收据。

eth_getTransactionReceipt 得到的结果如下:

{
    "jsonrpc": "2.0",
    "id": 1,
    "result": {
        "blockHash": "0x04099d9fff6403bf6518ccc94a1d4ed19e5e00f61a1fa43b1ef528e97a8c2ad4",
        "blockNumber": "0x260",
        "contractAddress": "0xdbd3c12200b337791eaa7acf28fce3ab96dedab9",
        "cumulativeGasUsed": "0x408a89",
        "from": "0x385df4353966d91fbad2d287c46d4e89c4934c7e",
        "gasUsed": "0x408a89",
        "logs": [
            {
                "address": "0xdbd3c12200b337791eaa7acf28fce3ab96dedab9",
                "topics": [
                    "0x8be0079c531659141344cd1fd0a4f28419497f9722a3daafe3b4186f6b6457e0",
                    "0x0000000000000000000000000000000000000000000000000000000000000000",
                    "0x000000000000000000000000385df4353966d91fbad2d287c46d4e89c4934c7e"
                ],
                "data": "0x",
                "blockNumber": "0x260",
                "transactionHash": "0x7462fc45f454ccdac2fc166ae05f91e800c5d2bfa5a65d3a6c3732eefdbc94d9",
                "transactionIndex": "0x0",
                "blockHash": "0x04099d9fff6403bf6518ccc94a1d4ed19e5e00f61a1fa43b1ef528e97a8c2ad4",
                "logIndex": "0x0",
                "removed": false
            }
        ],
        "logsBloom": "0x00000000000000000000200000000800000000000000000000800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000020000000000000000000800000000000000000000000000000000400000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000020000000000000000000080000000040000000000000000000000000000000000040",
        "status": "0x1",
        "to": null,
        "transactionHash": "0x7462fc45f454ccdac2fc166ae05f91e800c5d2bfa5a65d3a6c3732eefdbc94d9",
        "transactionIndex": "0x0"
    }
}

在reciept中会为日志数据生成一个Bloom过滤器,在func (pre *Prestate) Apply 方法中:

CreateBloom方法:

// CreateBloom creates a bloom filter out of the give Receipts (+Logs)
func CreateBloom(receipts Receipts) Bloom {
	buf := make([]byte, 6)
	var bin Bloom
	for _, receipt := range receipts {
		for _, log := range receipt.Logs {
			bin.add(log.Address.Bytes(), buf)
			for _, b := range log.Topics {
				bin.add(b[:], buf)
			}
		}
	}
	return bin
}

那么,bloom的作用是什么呢?
布隆过滤器并不是以太坊的专属,Bloom Filter是1970年由布隆提出的,特点是非常节省空间的概率数据结构,运行速度快,占用内存小,缺点是有一定的误判率。

很多场景中,都存在判断一个元素是否存在的需求,比较熟知的方法是采用 HashMap的数据结构,使用put方法存储数据,然后通过取值或者判断存在等方式来确定,在区块链中,在链式结构存储了大量数据以后,采用这种方式需要大量查询,就会非常耗费资源,检索的速度会越来越慢。所以以太坊使用了bloom过滤器来快速查询日志。

我们来看一下以太坊中Bloom:

// Bloom represents a 2048 bit bloom filter.
type Bloom [BloomByteLength]byte

它实际上是一个很长的二进制向量和一系列随机映射函数组成,主要用于判断一个元素是否在一个集合中。

需要注意的是,以太坊的bloom过滤器适合大量数据中高效判断元素是否存在,但是并不存储数据本身,仅存储hash结果取模运算后的位标记。过滤结果如果是存在于,实际情况不一定存在,但是判断结果为不存在时,则一定不存在,基于这个特点,以太坊中的判断逻辑是,根据bloom和过滤的参数判定是否存在,不存在则直接返回。

// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
	if bloomFilter(header.Bloom, f.addresses, f.topics) {
		found, err := f.checkMatches(ctx, header)
		if err != nil {
			return logs, err
		}
		logs = append(logs, found...)
	}
	return logs, nil
}
二. Bloom 过滤logs和源码分析

以太坊eth_getLogs方法获取交易的logs

// GetLogs returns logs matching the given argument that are stored within the state.
//
// https://eth.wiki/json-rpc/API#eth_getlogs
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
	var filter *Filter
	if crit.BlockHash != nil {
		// Block filter requested, construct a single-shot filter
		filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
	} else {
		// Convert the RPC block numbers into internal representations
		begin := rpc.LatestBlockNumber.Int64()
		if crit.FromBlock != nil {
			begin = crit.FromBlock.Int64()
		}
		end := rpc.LatestBlockNumber.Int64()
		if crit.ToBlock != nil {
			end = crit.ToBlock.Int64()
		}
		// Construct the range filter
		filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
	}
	// Run the filter and return all the logs
	logs, err := filter.Logs(ctx)
	if err != nil {
		return nil, err
	}
	return returnLogs(logs), err
}

在区块链中搜索匹配的logs,并从包含匹配项的第一个块,相应地更新过滤器的Start。

// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
// canonical chain for fast logs filtering.
// Logs searches the blockchain for matching log entries, returning all from the
// first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
	// If we're doing singleton block filtering, execute and return
	if f.block != (common.Hash{}) {
		header, err := f.backend.HeaderByHash(ctx, f.block)
		if err != nil {
			return nil, err
		}
		if header == nil {
			return nil, errors.New("unknown block")
		}
		return f.blockLogs(ctx, header)
	}
	// Short-cut if all we care about is pending logs
	if f.begin == rpc.PendingBlockNumber.Int64() {
		if f.end != rpc.PendingBlockNumber.Int64() {
			return nil, errors.New("invalid block range")
		}
		return f.pendingLogs()
	}
	// Figure out the limits of the filter range
	header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
	if header == nil {
		return nil, nil
	}
	var (
		head    = header.Number.Uint64()
		end     = uint64(f.end)
		pending = f.end == rpc.PendingBlockNumber.Int64()
	)
	if f.begin == rpc.LatestBlockNumber.Int64() {
		f.begin = int64(head)
	}
	if f.end == rpc.LatestBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() {
		end = head
	}
	// Gather all indexed logs, and finish with non indexed ones
	var (
		logs           []*types.Log
		err            error
		size, sections = f.backend.BloomStatus()
	)
	if indexed := sections * size; indexed > uint64(f.begin) {
		if indexed > end {
			logs, err = f.indexedLogs(ctx, end)
		} else {
			logs, err = f.indexedLogs(ctx, indexed-1)
		}
		if err != nil {
			return logs, err
		}
	}
	rest, err := f.unindexedLogs(ctx, end)
	logs = append(logs, rest...)
	if pending {
		pendingLogs, err := f.pendingLogs()
		if err != nil {
			return nil, err
		}
		logs = append(logs, pendingLogs...)
	}
	return logs, err
}

我们先看Logs方法中的上边部分,根据区块hash从区块头中获取到区块bloom值,调用blockLogs方法:

// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
	if bloomFilter(header.Bloom, f.addresses, f.topics) {
		found, err := f.checkMatches(ctx, header)
		if err != nil {
			return logs, err
		}
		logs = append(logs, found...)
	}
	return logs, nil
}

先通过bloomFilter方法判断判定给定参数所要查询的logs是否存在:

func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
	if len(addresses) > 0 {
		var included bool
		for _, addr := range addresses {
			if types.BloomLookup(bloom, addr) {
				included = true
				break
			}
		}
		if !included {
			return false
		}
	}

	for _, sub := range topics {
		included := len(sub) == 0 // empty rule set == wildcard
		for _, topic := range sub {
			if types.BloomLookup(bloom, topic) {
				included = true
				break
			}
		}
		if !included {
			return false
		}
	}
	return true
}

然后通过checkMatches方法,得到符合筛选条件的日志:

// checkMatches checks if the receipts belonging to the given header contain any log events that
// match the filter criteria. This function is called when the bloom filter signals a potential match.
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
	// Get the logs of the block
	logsList, err := f.backend.GetLogs(ctx, header.Hash())
	if err != nil {
		return nil, err
	}
	var unfiltered []*types.Log
	for _, logs := range logsList {
		unfiltered = append(unfiltered, logs...)
	}
	logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
	if len(logs) > 0 {
		// We have matching logs, check if we need to resolve full logs via the light client
		if logs[0].TxHash == (common.Hash{}) {
			receipts, err := f.backend.GetReceipts(ctx, header.Hash())
			if err != nil {
				return nil, err
			}
			unfiltered = unfiltered[:0]
			for _, receipt := range receipts {
				unfiltered = append(unfiltered, receipt.Logs...)
			}
			logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
		}
		return logs, nil
	}
	return nil, nil
}

在Logs方法的下半部分:

indexedLogs根据本地或通过网络索引的bloom位返回与筛选条件匹配的日志。

// indexedLogs returns the logs matching the filter criteria based on the bloom
// bits indexed available locally or via the network.
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
	// Create a matcher session and request servicing from the backend
	matches := make(chan uint64, 64)

	session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
	if err != nil {
		return nil, err
	}
	defer session.Close()

	f.backend.ServiceFilter(ctx, session)

	// Iterate over the matches until exhausted or context closed
	var logs []*types.Log

	for {
		select {
		case number, ok := <-matches:
			// Abort if all matches have been fulfilled
			if !ok {
				err := session.Error()
				if err == nil {
					f.begin = int64(end) + 1
				}
				return logs, err
			}
			f.begin = int64(number) + 1

			// Retrieve the suggested block and pull any truly matching logs
			header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
			if header == nil || err != nil {
				return logs, err
			}
			found, err := f.checkMatches(ctx, header)
			if err != nil {
				return logs, err
			}
			logs = append(logs, found...)

		case <-ctx.Done():
			return logs, ctx.Err()
		}
	}
}

先通过Start方法启动匹配过程并返回给定块范围内的bloom匹配流。如果该范围内没有更多匹配项,则结果通道将关闭。然后调用ServiceFilter方法,执行找到Multiplex方法,通过轮询获取检索任务,并将其多路复用到请求的检索队列中,以便与其他会话一起提供服务。

总的来说,就是根据以上两种方式先通过bloom判定在给定参数条件下,是否存在logs,然后继续检索logs,返回。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/1498169.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-25
下一篇 2022-06-25

发表评论

登录后才能评论

评论列表(0条)

保存