快速、并发、驱逐的内存缓存写入以保留大量条目而不影响性能。 BigCache 将条目保留在堆上,但省略了 GC。 为了实现这一点,需要对字节片进行 *** 作,因此在大多数用例中都需要缓存前面的条目(反)序列化。
简单使用package main
import (
"fmt"
"time"
"github.com/allegro/bigcache"
)
func main() {
cache, _ := bigcache.NewBigCache(bigcache.DefaultConfig(10 * time.Minute))
cache.Set("my-key", []byte("value"))
entry, _ := cache.Get("my-key")
fmt.Println(string(entry))
}
数据结构
type BigCache struct {
shards []*cacheShard // 缓存分片数据
lifeWindow uint64
clock clock // 时间计算函数
hash Hasher // hash 函数,在计算Key的Hash使用
config Config // Cache 配置
shardMask uint64 // 寻找分片位置时使用
maxShardSize uint32
close chan struct{}
}
type cacheShard struct {
hashmap map[uint64]uint32 // 索引列表,对应KEY在entries中的位置
entries queue.BytesQueue // 实际数据存储
lock sync.RWMutex
entryBuffer []byte
onRemove onRemoveCallback
isVerbose bool
logger Logger
clock clock
lifeWindow uint64
stats Stats
}
可以用下图来表示
BigCache 中有数组 cacheShard 里面是 *** 作实际的地方,cacheShard 中有两个比较重要的数据结构,HashMap 用来存储Key的索引地址,Entries 用来存储实际的数据,底层是[]byte
设置缓存的时候,会先计算出key对应的hash值,找到key对应的分片位置。之后的 *** 作就在分片shard中 *** 作
// Set saves entry under the key
func (c *BigCache) Set(key string, entry []byte) error {
hashedKey := c.hash.Sum64(key) // 计算hash值
shard := c.getShard(hashedKey) // 找到分片
return shard.set(key, hashedKey, entry) //在分片中 *** 作
}
func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
currentTimestamp := uint64(s.clock.epoch())
s.lock.Lock()
// 冲突检查,将之前的key对应value置为空
if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 {
if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil {
resetKeyFromEntry(previousEntry)
}
}
// 每次插入新数据时,bigCache 都会获取 BytesQueue 头部数据,
if oldestEntry, err := s.entries.Peek(); err == nil {
// 然后判断数据是否过期,如果过期则删除
s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry)
}
// 设置缓存的值,使用 wrapEntry 对用户传入的 key 和 value 进行序列化
w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer)
for {
// 找到插入位置,记录在 hashmap 之后返回
// Push 会判断是不是有足够的空间,没有,则返回 err = &queueError{"Full queue. Maximum size limit reached."}
if index, err := s.entries.Push(w); err == nil {
s.hashmap[hashedKey] = uint32(index)
s.lock.Unlock()
return nil
}
// 没有找到插入位置时因为没有空间了,所以要删除。
if s.removeOldestEntry(NoSpace) != nil {
s.lock.Unlock()
return fmt.Errorf("entry is bigger than max shard size")
}
}
}
查询
查询缓存的时候,会先计算出key对应的hash值,找到key对应的分片位置。之后的 *** 作就在分片shard中 *** 作, 分片shard 会根据计算出来的 index,去 entries 里面找对应的 entry
func (c *BigCache) Get(key string) ([]byte, error) {
hashedKey := c.hash.Sum64(key)
shard := c.getShard(hashedKey)
return shard.get(key, hashedKey)
}
func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) {
s.lock.RLock()
itemIndex := s.hashmap[hashedKey] // 找到key的index
if itemIndex == 0 {
s.lock.RUnlock()
s.miss()
return nil, ErrEntryNotFound
}
wrappedEntry, err := s.entries.Get(int(itemIndex)) // 根据 index 获取到array中拿到实际数据
if err != nil {
s.lock.RUnlock()
s.miss()
return nil, err
}
// 处理key冲突情况
if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
if s.isVerbose {
s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey)
}
s.lock.RUnlock()
s.collision()
return nil, ErrEntryNotFound
}
entry := readEntry(wrappedEntry)
s.lock.RUnlock()
s.hit()
// 返回的实际数据
return entry, nil
}
删除
对应的Key的删除其实就是把 entries 中 arrary 对应的 itemIndex 上置为0。其实这种做法并没有正在删除数据,只是置为0,实际的内存并没有归还,但是把 s.hashmap 中的key对应的 index 删除了。这就做了假删除。用户已经查询不到这个数据了
func (s *cacheShard) del(key string, hashedKey uint64) error {
// Optimistic pre-check using only readlock
s.lock.RLock()
itemIndex := s.hashmap[hashedKey]
if itemIndex == 0 {
s.lock.RUnlock()
s.delmiss()
return ErrEntryNotFound
}
// 检查一下 itemIndex 是否是合法的,否则直接就返回了
if err := s.entries.CheckGet(int(itemIndex)); err != nil {
s.lock.RUnlock()
s.delmiss()
return err
}
s.lock.RUnlock()
s.lock.Lock()
{
// After obtaining the writelock, we need to read the same again,
// since the data delivered earlier may be stale now
itemIndex = s.hashmap[hashedKey]
if itemIndex == 0 {
s.lock.Unlock()
s.delmiss()
return ErrEntryNotFound
}
wrappedEntry, err := s.entries.Get(int(itemIndex))
if err != nil {
s.lock.Unlock()
s.delmiss()
return err
}
delete(s.hashmap, hashedKey) // 将索引删除,
s.onRemove(wrappedEntry, Deleted) // key 删除时的回调函数
resetKeyFromEntry(wrappedEntry) // 将对应的array上的数据置为0,就算完成了删除
}
s.lock.Unlock()
s.delhit()
return nil
}
缓存过期
bigCache 可以为插入的数据设置过期时间,但是缺点是所有数据的过期时间都是一样的。bigCache 中自动删除数据有两种场景:
在插入数据时删除过期数据通过设置 CleanWindow,启动 goroutine 后台定时批量删除过期数据if config.CleanWindow > 0 {
go func() {
ticker := time.NewTicker(config.CleanWindow)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
cache.cleanUp(uint64(t.Unix()))
case <-cache.close:
return
}
}
}()
}
// 真正执行删除的逻辑比较简单,就是遍历各个 cacheShard,从 q.head 的位置(这个位置指向的数据必定是最先插入的)开始检查,如果有超时的 item 就直接删除:
func (c *BigCache) cleanUp(currentTimestamp uint64) {
for _, shard := range c.shards {
shard.cleanUp(currentTimestamp)
}
}
func (s *cacheShard) cleanUp(currentTimestamp uint64) {
s.lock.Lock()
for {
if oldestEntry, err := s.entries.Peek(); err != nil {
break
} else if evicted := s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry); !evicted {
break
}
}
s.lock.Unlock()
}
// Peek reads the oldest entry from list without moving head pointer
func (q *BytesQueue) Peek() ([]byte, error) {
data, _, err := q.peek(q.head)
return data, err
}
实际存储设计
bigCache并没有直接使用类似map中的value中进行存储,而是存储在自己写的BytesQueue中
bigCache 存储type BytesQueue struct {
array []byte // 一个字节数组,实际数据存储的地方
capacity int // 容量
maxCapacity int // 最大容量
head int
tail int // 下次可以插入 item 的位置
count int // 当前插入的 item 数量
rightMargin int
headerBuffer []byte // 插入前做临时 buffer 所用(slice-copy)
verbose bool // 打印 log 开关
initialCapacity int // BytesQueue 创建时,array 的初始容量
}
bigCache 扩容
// Push copies entry at the end of queue and moves tail pointer. Allocates more space if needed.
// Returns index for pushed data or error if maximum size queue limit is reached.
func (q *BytesQueue) Push(data []byte) (int, error) {
dataLen := len(data)
if q.availableSpaceAfterTail() < dataLen+headerEntrySize {
if q.availableSpaceBeforeHead() >= dataLen+headerEntrySize {
q.tail = leftMarginIndex
} else if q.capacity+headerEntrySize+dataLen >= q.maxCapacity && q.maxCapacity > 0 {
return -1, &queueError{"Full queue. Maximum size limit reached."}
} else {
// 如果可用的资源<要申请的资源,这个时候就要开始进行扩容
q.allocateAdditionalMemory(dataLen + headerEntrySize)
}
}
index := q.tail
q.push(data, dataLen)
return index, nil
}
func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
start := time.Now()
if q.capacity < minimum {
q.capacity += minimum
}
// 按照2倍大小扩容
q.capacity = q.capacity * 2
if q.capacity > q.maxCapacity && q.maxCapacity > 0 {
q.capacity = q.maxCapacity
}
oldArray := q.array
// 申请内从
q.array = make([]byte, q.capacity)
if leftMarginIndex != q.rightMargin {
// 将老的数据赋值到新的内存中
copy(q.array, oldArray[:q.rightMargin])
if q.tail < q.head {
emptyBlobLen := q.head - q.tail - headerEntrySize
q.push(make([]byte, emptyBlobLen), emptyBlobLen)
q.head = leftMarginIndex
q.tail = q.rightMargin
}
}
if q.verbose {
log.Printf("Allocated new queue in %s; Capacity: %d \n", time.Since(start), q.capacity)
}
}
如何做到高性能
加速并发访问
在 bigCache 中, 缓存使用了分段(shard)存储,每一个shard有一个lock,减小了lock的粒度
避免高额的GC开销在bigCache中,map中没有使用指针,在 Golang(>1.4) 中,如果map中不包含指针的话,GC 便会忽略这个 map。
在bigCache中,bigCache将数据存储在BytesQueue中,BytesQueue的底层结构是[]byte ,这样只给GC增加了一个额外对象,
由于字节切片除了自身对象并不包含其他指针数据,所以GC对于整个对象的标记时间是O(1)的。
参考 [译] Go开源项目BigCache如何加速并发访问以及避免高额的GC开销BigCache妙到颠毫: bigcache优化技巧欢迎分享,转载请注明来源:内存溢出
评论列表(0条)