Cache缓存项目学习3

张开发
2026/5/8 22:05:37 15 分钟阅读

分享文章

Cache缓存项目学习3
Group设计type Group struct { name string getter Getter mainCache *Cache peers PeerPicker loader *singleflight.Group expiration time.Duration // 缓存过期时间0表示永不过期 closed int32 // 原子变量标记组是否已关闭 stats groupStats // 统计信息 }Getter作为当缓存中拿不到数据后与数据库的连接接口。type Getter interface { Get(ctx context.Context, key string) ([]byte, error) }Cache是对存储的封装通过store接口提供可插拔底层缓存设计。支持LRU2、LRU等缓存接口。Options提供缓存配置。其余是辅助记录信息。PeerPicker提供peer选择器接口。type ClientPicker struct { selfAddr string svcName string mu sync.RWMutex consHash *consistenthash.Map clients map[string]*Client etcdCli *clientv3.Client ctx context.Context cancel context.CancelFunc }通过一致性哈希来选择节点。从clients中获取grpc客户端并与之通信。Loader负责单飞机制一批请求只允许放行一个其他请求共享结果。// Do 针对相同的key保证多次调用Do()都只会调用一次fn func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { // Check if there is already an ongoing call for this key if existing, ok : g.m.Load(key); ok { c : existing.(*call) c.wg.Wait() // Wait for the existing request to finish return c.val, c.err // Return the result from the ongoing call } // If no ongoing request, create a new one c : call{} c.wg.Add(1) g.m.Store(key, c) // Store the call in the map // Execute the function and set the result c.val, c.err fn() c.wg.Done() // Mark the request as done // After the request is done, clean up the map g.m.Delete(key) return c.val, c.err }各模块详细介绍Gettertype Getter interface { Get(ctx context.Context, key string) ([]byte, error) } // GetterFunc 函数类型实现 Getter 接口 type GetterFunc func(ctx context.Context, key string) ([]byte, error) // Get 实现 Getter 接口 func (f GetterFunc) Get(ctx context.Context, key string) ([]byte, error) { return f(ctx, key) }Getter使用策略模式定义了使用接口并使用GetterFunc类型自动实现了Get接口让传参既可以是简单方法也可以是结构体。Getter是作为数据库接口当加载数据时发现缓存没有数据时对远程数据库进行数据加载。func (g *Group) loadData(ctx context.Context, key string) (value ByteView, err error) { // 尝试从远程节点获取 ........ // 从数据源加载 bytes, err : g.getter.Get(ctx, key) if err ! nil { return ByteView{}, fmt.Errorf(failed to get data: %w, err) } return ByteView{b: cloneBytes(bytes)}, nil }CacheCache是对本地缓存存储实例的封装。type Cache struct { mu sync.RWMutex store store.Store // 底层存储实现 opts CacheOptions // 缓存配置选项 hits int64 // 缓存命中次数 misses int64 // 缓存未命中次数 initialized int32 // 原子变量标记缓存是否已初始化 closed int32 // 原子变量标记缓存是否已关闭 }Store接口是底层缓存实例的封装opts是与store相关的配置信息。store//根据缓存类型和具体配置进行实例化 c.store store.NewStore(c.opts.CacheType, storeOpts) func NewStore(cacheType CacheType, opts Options) Store { switch cacheType { case LRU2: return newLRU2Cache(opts) case LRU: return newLRUCache(opts) default: return newLRUCache(opts) } } //store 接口声明 type Store interface { Get(key string) (Value, bool) Set(key string, value Value) error SetWithExpiration(key string, value Value, expiration time.Duration) error Delete(key string) bool Clear() Len() int Close() }store接口实例type lruCache struct { mu sync.RWMutex list *list.List // 双向链表用于维护 LRU 顺序 items map[string]*list.Element // 键到链表节点的映射 expires map[string]time.Time // 过期时间映射 maxBytes int64 // 最大允许字节数 usedBytes int64 // 当前使用的字节数 onEvicted func(key string, value Value) cleanupInterval time.Duration cleanupTicker *time.Ticker closeCh chan struct{} // 用于优雅关闭清理协程 }cleanupInterval:清理时间间隔超过则启动清理cleanupTicker:根据cleanupInterval生成的定时器定时执行清理任务func newLRUCache{ c.cleanupTicker time.NewTicker(c.cleanupInterval) go c.cleanupLoop() } func (c *lruCache) cleanupLoop() { for { select { case -c.cleanupTicker.C: c.mu.Lock() c.evict() //清理 c.mu.Unlock() case -c.closeCh: return } } }list维护元素顺序便于淘汰时进行针对性删除items映射键与元素expires映射键与过期时间func (c *lruCache) Get(key string) (Value, bool) { c.mu.RLock() elem, ok : c.items[key] if !ok { c.mu.RUnlock() return nil, false } // 检查是否过期 if expTime, hasExp : c.expires[key]; hasExp time.Now().After(expTime) { c.mu.RUnlock() // 异步删除过期项避免在读锁内操作 go c.Delete(key) return nil, false } // 获取值并释放读锁 entry : elem.Value.(*lruEntry) value : entry.value c.mu.RUnlock() // 更新 LRU 位置需要写锁 c.mu.Lock() // 再次检查元素是否仍然存在可能在获取写锁期间被其他协程删除 if _, ok : c.items[key]; ok { c.list.MoveToBack(elem) } c.mu.Unlock() return value, true } func (c *lruCache) SetWithExpiration(key string, value Value, expiration time.Duration) error { if value nil { c.Delete(key) return nil } c.mu.Lock() defer c.mu.Unlock() // 计算过期时间 var expTime time.Time if expiration 0 { expTime time.Now().Add(expiration) c.expires[key] expTime } else { delete(c.expires, key) } // 如果键已存在更新值 if elem, ok : c.items[key]; ok { oldEntry : elem.Value.(*lruEntry) c.usedBytes int64(value.Len() - oldEntry.value.Len()) oldEntry.value value c.list.MoveToBack(elem) return nil } // 添加新项 entry : lruEntry{key: key, value: value} elem : c.list.PushBack(entry) c.items[key] elem c.usedBytes int64(len(key) value.Len()) // 检查是否需要淘汰旧项 c.evict() return nil }PeerPicker// PeerPicker 定义了peer选择器的接口 type PeerPicker interface { PickPeer(key string) (peer Peer, ok bool, self bool) Close() error } type ClientPicker struct { selfAddr string svcName string mu sync.RWMutex consHash *consistenthash.Map clients map[string]*Client etcdCli *clientv3.Client ctx context.Context cancel context.CancelFunc }PeerPicker核心通过consHash一致性哈希来解决问题。一致性Hash并不知道Key在哪里也不存储具体的K-V数据只负责计算。对于输入的Key它输出应该去哪个节点寻找。纯本地计算没有网络通信。具体一致性哈希算法详解见下一节。流程loader单飞设计防止击穿。的实现使用 sync.Map 对相同 Key 的并发请求只执行一次加载有效避免缓存雪崩。// 代表正在进行或已结束的请求 type call struct { wg sync.WaitGroup val interface{} err error } // Group manages all kinds of calls type Group struct { m sync.Map // 使用sync.Map来优化并发性能 } // Do 针对相同的key保证多次调用Do()都只会调用一次fn func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { // Check if there is already an ongoing call for this key if existing, ok : g.m.Load(key); ok { c : existing.(*call) c.wg.Wait() // Wait for the existing request to finish return c.val, c.err // Return the result from the ongoing call } // If no ongoing request, create a new one c : call{} c.wg.Add(1) g.m.Store(key, c) // Store the call in the map // Execute the function and set the result c.val, c.err fn() c.wg.Done() // Mark the request as done // After the request is done, clean up the map g.m.Delete(key) return c.val, c.err }

更多文章