Skip to content

Instantly share code, notes, and snippets.

@liggitt
Created July 2, 2025 14:37
Show Gist options
  • Save liggitt/49f9eb909daea6f6e24c97ce73053591 to your computer and use it in GitHub Desktop.
Save liggitt/49f9eb909daea6f6e24c97ce73053591 to your computer and use it in GitHub Desktop.
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pullmanager
import (
"strings"
"sync"
"sync/atomic"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/utils/lru"
)
type LRUCache[K comparable, V any] struct {
cache *lru.Cache
maxSize int
authoritative atomic.Bool
}
func NewLRUCache[K comparable, V any](size int) *LRUCache[K, V] {
c := lru.New(size)
l := &LRUCache[K, V]{
maxSize: size,
cache: c,
}
c.SetEvictionFunc(func(_ lru.Key, _ any) {
// any eviction makes our cache non-authoritative
l.authoritative.Store(false)
})
return l
}
func (c *LRUCache[K, V]) Get(key K) (*V, bool) {
value, found := c.cache.Get(key)
if !found {
return nil, false
}
if value == nil {
return nil, true
}
return value.(*V), true
}
func (c *LRUCache[K, V]) Set(key K, value *V) { c.cache.Add(key, value) }
func (c *LRUCache[K, V]) Delete(key K) { c.cache.Remove(key) }
func (c *LRUCache[K, V]) Len() int { return c.cache.Len() }
func (c *LRUCache[K, V]) Clear() { c.cache.Clear() }
// cachedPullRecordsAccessor implements a write-through cache layer on top
// of another PullRecordsAccessor
type cachedPullRecordsAccessor struct {
delegate PullRecordsAccessor
intentsMutex *StripedLockSet
intents *LRUCache[string, kubeletconfiginternal.ImagePullIntent]
pulledRecordsMutex *sync.RWMutex
pulledRecords *LRUCache[string, kubeletconfiginternal.ImagePulledRecord]
}
func NewCachedPullRecordsAccessor(delegate PullRecordsAccessor) *cachedPullRecordsAccessor {
c := &cachedPullRecordsAccessor{
delegate: delegate,
intentsMutex: NewStripedLockSet(10), // TODO: plumb stripe size
intents: NewLRUCache[string, kubeletconfiginternal.ImagePullIntent](50),
pulledRecordsMutex: &sync.RWMutex{},
pulledRecords: NewLRUCache[string, kubeletconfiginternal.ImagePulledRecord](100),
}
// warm our caches and set authoritative
c.ListImagePullIntents()
c.ListImagePulledRecords()
return c
}
func (c *cachedPullRecordsAccessor) ListImagePullIntents() ([]*kubeletconfiginternal.ImagePullIntent, error) {
wasAuthoritative := c.intents.authoritative.Load()
if !wasAuthoritative {
// doing a full list gives us an opportunity to become authoritative
// if we get back an error-free result that fits in our cache
c.intentsMutex.GlobalLock()
defer c.intentsMutex.GlobalUnlock()
}
results, err := c.delegate.ListImagePullIntents()
if !wasAuthoritative {
resultsAreAuthoritative := err == nil && len(results) < c.intents.maxSize
// populate the cache if that would make our cache authoritative or if the cache is currently empty
if resultsAreAuthoritative || c.intents.Len() == 0 {
c.intents.Clear()
// populate up to maxSize results in the cache
for _, intent := range results[:min(len(results), c.intents.maxSize)] {
c.intents.Set(intent.Image, &kubeletconfiginternal.ImagePullIntent{
Image: intent.Image,
})
}
c.intents.authoritative.Store(resultsAreAuthoritative)
}
}
return results, err
}
func (c *cachedPullRecordsAccessor) ImagePullIntentExists(image string) (bool, error) {
// do the cheap get lock-free
if _, exists := c.intents.Get(image); exists {
return true, nil
}
// on a miss, lock on the image
c.intentsMutex.Lock(image)
defer c.intentsMutex.Unlock(image)
// check again if the image exists in the cache under image lock
if _, exists := c.intents.Get(image); exists {
return true, nil
}
// if the cache is authoritative, return false on a miss
if c.intents.authoritative.Load() {
return false, nil
}
// fall through to the expensive lookup
exists, err := c.delegate.ImagePullIntentExists(image)
if err == nil && exists {
c.intents.Set(image, &kubeletconfiginternal.ImagePullIntent{
Image: image,
})
}
return exists, err
}
func (c *cachedPullRecordsAccessor) WriteImagePullIntent(image string) error {
c.intentsMutex.Lock(image)
defer c.intentsMutex.Unlock(image)
if err := c.delegate.WriteImagePullIntent(image); err != nil {
return err
}
c.intents.Set(image, &kubeletconfiginternal.ImagePullIntent{
Image: image,
})
return nil
}
func (c *cachedPullRecordsAccessor) DeleteImagePullIntent(image string) error {
c.intentsMutex.Lock(image)
defer c.intentsMutex.Unlock(image)
if err := c.delegate.DeleteImagePullIntent(image); err != nil {
return err
}
c.intents.Delete(image)
return nil
}
func (c *cachedPullRecordsAccessor) ListImagePulledRecords() ([]*kubeletconfiginternal.ImagePulledRecord, error) {
//TODO: maybe this should be protected by a read lock
c.pulledRecordsMutex.RLock()
defer c.pulledRecordsMutex.RUnlock()
return c.delegate.ListImagePulledRecords()
}
func (c *cachedPullRecordsAccessor) GetImagePulledRecord(imageRef string) (*kubeletconfiginternal.ImagePulledRecord, bool, error) {
pulledRecord, exists := c.pulledRecords.Get(imageRef)
if exists {
return pulledRecord, true, nil
}
c.pulledRecordsMutex.RLock()
defer c.pulledRecordsMutex.RUnlock()
pulledRecord, exists, err := c.delegate.GetImagePulledRecord(imageRef)
if err == nil && exists && pulledRecord != nil {
c.pulledRecords.Set(imageRef, pulledRecord)
}
return pulledRecord, exists, err
}
func (c *cachedPullRecordsAccessor) WriteImagePulledRecord(record *kubeletconfiginternal.ImagePulledRecord) error {
c.pulledRecordsMutex.Lock()
defer c.pulledRecordsMutex.Unlock()
if err := c.delegate.WriteImagePulledRecord(record); err != nil {
return err
}
c.pulledRecords.Set(record.ImageRef, record)
return nil
}
func (c *cachedPullRecordsAccessor) DeleteImagePulledRecord(imageRef string) error {
c.pulledRecordsMutex.Lock()
defer c.pulledRecordsMutex.Unlock()
if err := c.delegate.DeleteImagePulledRecord(imageRef); err != nil {
return err
}
c.pulledRecords.Delete(imageRef)
return nil
}
func pullIntentsCmp(a, b *kubeletconfiginternal.ImagePullIntent) int {
return strings.Compare(a.Image, b.Image)
}
func pulledRecordsCmp(a, b *kubeletconfiginternal.ImagePulledRecord) int {
return strings.Compare(a.ImageRef, b.ImageRef)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment