Skip to content

Instantly share code, notes, and snippets.

@borischerkasky
Created February 23, 2020 20:48
Show Gist options
  • Save borischerkasky/c4b38049f8f9d73a0ea569f71e496e49 to your computer and use it in GitHub Desktop.
Save borischerkasky/c4b38049f8f9d73a0ea569f71e496e49 to your computer and use it in GitHub Desktop.
DynamoDB writer with write through cache
import akka.actor.Scheduler
import com.github.blemale.scaffeine.{Cache, Scaffeine}
import play.api.libs.json.OWrites
import scala.reflect.runtime.universe._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
class CachingDynamoStore[A : Indexable : OWrites](store: DataStore[A], cache: Cache[String, A])(
implicit executionContext: ExecutionContext,
scheduler: Scheduler)
extends DataStore[A] {
override def putBatch(batch: Seq[A],
tableName: String,
retry: RetryPolicy.Config,
ttl: FiniteDuration): Future[Unit] = {
val newItems = reduceToChangedItems(batch)
store
.putBatch(newItems, tableName, retry, ttl)
.transform(
_ => cacheNewItems(newItems),
cause => new Exception(s"put batch failed: $batch", cause))
}
private def cacheNewItems(newItems: Seq[A]): Unit = {
val toCache = newItems.map(item => Indexable[A].indexKey(item) -> item).toMap
cache.putAll(toCache)
}
private def reduceToChangedItems(batch: Seq[A]) = {
val cachedItems = cache.getAllPresent(batch.map(Indexable[A].indexKey))
batch.filter { inputItem =>
cachedItems
.get(Indexable[A].indexKey(inputItem))
.forall(isItemChanged(_, inputItem))
}
}
private def isItemChanged(value: A, originalItem: A): Boolean =
!value.equals(originalItem)
}
object CachingDynamoStore {
trait Indexable[A] {
val indexKey: A => String
val maxBy: A => Long
}
object Indexable {
def apply[A: Indexable]: Indexable[A] = implicitly[Indexable[A]]
}
def apply[A : TypeTag : Indexable](store: DynamoDataStore[A], config: CachingConfig)(
implicit executionContext: ExecutionContext,
scheduler: Scheduler,
writes: OWrites[A]): CachingDynamoStore[A] = {
val cache = Scaffeine()
.recordStats()
.expireAfterWrite(config.ttl)
.maximumSize(config.maxSize)
.softValues()
.build[String, A]()
registerCacheMonitor(cache, config)
new CachingDynamoStore[A](store, cache)
}
private def registerCacheMonitor[A: TypeTag](cache: Cache[String, A], config: CachingConfig)(
implicit scheduler: Scheduler,
ec: ExecutionContext) =
scheduler.schedule(config.statsReportingInterval, config.statsReportingInterval) {
val stats = cache.stats()
val statsEvent = Map[String, Any](
"hit_count" -> stats.hitCount(),
"hit_rate" -> stats.hitRate(),
"eviction_count" -> stats.evictionCount(),
"cache_name" -> typeOf[A].toString
)
ReportingClient.report("beacon_cache_stats", statsEvent)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment