Created
July 2, 2025 14:37
-
-
Save liggitt/49f9eb909daea6f6e24c97ce73053591 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Copyright 2025 The Kubernetes Authors. | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
*/ | |
package pullmanager | |
import ( | |
"strings" | |
"sync" | |
"sync/atomic" | |
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" | |
"k8s.io/utils/lru" | |
) | |
type LRUCache[K comparable, V any] struct { | |
cache *lru.Cache | |
maxSize int | |
authoritative atomic.Bool | |
} | |
func NewLRUCache[K comparable, V any](size int) *LRUCache[K, V] { | |
c := lru.New(size) | |
l := &LRUCache[K, V]{ | |
maxSize: size, | |
cache: c, | |
} | |
c.SetEvictionFunc(func(_ lru.Key, _ any) { | |
// any eviction makes our cache non-authoritative | |
l.authoritative.Store(false) | |
}) | |
return l | |
} | |
func (c *LRUCache[K, V]) Get(key K) (*V, bool) { | |
value, found := c.cache.Get(key) | |
if !found { | |
return nil, false | |
} | |
if value == nil { | |
return nil, true | |
} | |
return value.(*V), true | |
} | |
func (c *LRUCache[K, V]) Set(key K, value *V) { c.cache.Add(key, value) } | |
func (c *LRUCache[K, V]) Delete(key K) { c.cache.Remove(key) } | |
func (c *LRUCache[K, V]) Len() int { return c.cache.Len() } | |
func (c *LRUCache[K, V]) Clear() { c.cache.Clear() } | |
// cachedPullRecordsAccessor implements a write-through cache layer on top | |
// of another PullRecordsAccessor | |
type cachedPullRecordsAccessor struct { | |
delegate PullRecordsAccessor | |
intentsMutex *StripedLockSet | |
intents *LRUCache[string, kubeletconfiginternal.ImagePullIntent] | |
pulledRecordsMutex *sync.RWMutex | |
pulledRecords *LRUCache[string, kubeletconfiginternal.ImagePulledRecord] | |
} | |
func NewCachedPullRecordsAccessor(delegate PullRecordsAccessor) *cachedPullRecordsAccessor { | |
c := &cachedPullRecordsAccessor{ | |
delegate: delegate, | |
intentsMutex: NewStripedLockSet(10), // TODO: plumb stripe size | |
intents: NewLRUCache[string, kubeletconfiginternal.ImagePullIntent](50), | |
pulledRecordsMutex: &sync.RWMutex{}, | |
pulledRecords: NewLRUCache[string, kubeletconfiginternal.ImagePulledRecord](100), | |
} | |
// warm our caches and set authoritative | |
c.ListImagePullIntents() | |
c.ListImagePulledRecords() | |
return c | |
} | |
func (c *cachedPullRecordsAccessor) ListImagePullIntents() ([]*kubeletconfiginternal.ImagePullIntent, error) { | |
wasAuthoritative := c.intents.authoritative.Load() | |
if !wasAuthoritative { | |
// doing a full list gives us an opportunity to become authoritative | |
// if we get back an error-free result that fits in our cache | |
c.intentsMutex.GlobalLock() | |
defer c.intentsMutex.GlobalUnlock() | |
} | |
results, err := c.delegate.ListImagePullIntents() | |
if !wasAuthoritative { | |
resultsAreAuthoritative := err == nil && len(results) < c.intents.maxSize | |
// populate the cache if that would make our cache authoritative or if the cache is currently empty | |
if resultsAreAuthoritative || c.intents.Len() == 0 { | |
c.intents.Clear() | |
// populate up to maxSize results in the cache | |
for _, intent := range results[:min(len(results), c.intents.maxSize)] { | |
c.intents.Set(intent.Image, &kubeletconfiginternal.ImagePullIntent{ | |
Image: intent.Image, | |
}) | |
} | |
c.intents.authoritative.Store(resultsAreAuthoritative) | |
} | |
} | |
return results, err | |
} | |
func (c *cachedPullRecordsAccessor) ImagePullIntentExists(image string) (bool, error) { | |
// do the cheap get lock-free | |
if _, exists := c.intents.Get(image); exists { | |
return true, nil | |
} | |
// on a miss, lock on the image | |
c.intentsMutex.Lock(image) | |
defer c.intentsMutex.Unlock(image) | |
// check again if the image exists in the cache under image lock | |
if _, exists := c.intents.Get(image); exists { | |
return true, nil | |
} | |
// if the cache is authoritative, return false on a miss | |
if c.intents.authoritative.Load() { | |
return false, nil | |
} | |
// fall through to the expensive lookup | |
exists, err := c.delegate.ImagePullIntentExists(image) | |
if err == nil && exists { | |
c.intents.Set(image, &kubeletconfiginternal.ImagePullIntent{ | |
Image: image, | |
}) | |
} | |
return exists, err | |
} | |
func (c *cachedPullRecordsAccessor) WriteImagePullIntent(image string) error { | |
c.intentsMutex.Lock(image) | |
defer c.intentsMutex.Unlock(image) | |
if err := c.delegate.WriteImagePullIntent(image); err != nil { | |
return err | |
} | |
c.intents.Set(image, &kubeletconfiginternal.ImagePullIntent{ | |
Image: image, | |
}) | |
return nil | |
} | |
func (c *cachedPullRecordsAccessor) DeleteImagePullIntent(image string) error { | |
c.intentsMutex.Lock(image) | |
defer c.intentsMutex.Unlock(image) | |
if err := c.delegate.DeleteImagePullIntent(image); err != nil { | |
return err | |
} | |
c.intents.Delete(image) | |
return nil | |
} | |
func (c *cachedPullRecordsAccessor) ListImagePulledRecords() ([]*kubeletconfiginternal.ImagePulledRecord, error) { | |
//TODO: maybe this should be protected by a read lock | |
c.pulledRecordsMutex.RLock() | |
defer c.pulledRecordsMutex.RUnlock() | |
return c.delegate.ListImagePulledRecords() | |
} | |
func (c *cachedPullRecordsAccessor) GetImagePulledRecord(imageRef string) (*kubeletconfiginternal.ImagePulledRecord, bool, error) { | |
pulledRecord, exists := c.pulledRecords.Get(imageRef) | |
if exists { | |
return pulledRecord, true, nil | |
} | |
c.pulledRecordsMutex.RLock() | |
defer c.pulledRecordsMutex.RUnlock() | |
pulledRecord, exists, err := c.delegate.GetImagePulledRecord(imageRef) | |
if err == nil && exists && pulledRecord != nil { | |
c.pulledRecords.Set(imageRef, pulledRecord) | |
} | |
return pulledRecord, exists, err | |
} | |
func (c *cachedPullRecordsAccessor) WriteImagePulledRecord(record *kubeletconfiginternal.ImagePulledRecord) error { | |
c.pulledRecordsMutex.Lock() | |
defer c.pulledRecordsMutex.Unlock() | |
if err := c.delegate.WriteImagePulledRecord(record); err != nil { | |
return err | |
} | |
c.pulledRecords.Set(record.ImageRef, record) | |
return nil | |
} | |
func (c *cachedPullRecordsAccessor) DeleteImagePulledRecord(imageRef string) error { | |
c.pulledRecordsMutex.Lock() | |
defer c.pulledRecordsMutex.Unlock() | |
if err := c.delegate.DeleteImagePulledRecord(imageRef); err != nil { | |
return err | |
} | |
c.pulledRecords.Delete(imageRef) | |
return nil | |
} | |
func pullIntentsCmp(a, b *kubeletconfiginternal.ImagePullIntent) int { | |
return strings.Compare(a.Image, b.Image) | |
} | |
func pulledRecordsCmp(a, b *kubeletconfiginternal.ImagePulledRecord) int { | |
return strings.Compare(a.ImageRef, b.ImageRef) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment