Created
May 23, 2022 05:34
-
-
Save quelgar/d6b6014eba48727062ba7d2b7ad2b328 to your computer and use it in GitHub Desktop.
A ring buffer implementation for ZIO STM
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zio.stm.* | |
import zio.prelude.* | |
final class TRingBuffer[A] private (array: TArray[A], stateRef: TRef[TRingBuffer.State]) { | |
import TRingBuffer.* | |
private def uncheckedRead(state: State): USTM[A] = array(state.start) <* | |
stateRef.set(state.copy(start = (state.start + 1) % capacity, size = state.size - 1)) | |
def put(a: A): USTM[Unit] = for { | |
current <- stateRef.get | |
_ <- array.update(current.end, _ => a) | |
newState = | |
if (current.size == capacity) | |
current.copy( | |
end = (current.end + 1) % capacity, | |
start = (current.start + 1) % capacity | |
) | |
else | |
current.copy(end = (current.end + 1) % capacity, size = current.size + 1) | |
_ <- stateRef.set(newState) | |
} yield () | |
def poll: USTM[Option[A]] = { | |
val changes = for { | |
current <- stateRef.get | |
_ <- ZSTM.when(current.size == 0)(ZSTM.fail(None)) | |
value <- uncheckedRead(current) | |
} yield value | |
changes.optional | |
} | |
def take: USTM[A] = stateRef.get.flatMap { current => | |
if (current.size == 0) STM.retry | |
else uncheckedRead(current) | |
} | |
def peekOption: USTM[Option[A]] = stateRef.get.flatMap { current => | |
if (current.size == 0) STM.none else array(current.start).asSome | |
} | |
def peek: USTM[A] = stateRef.get.flatMap { current => | |
if (current.size == 0) STM.retry else array(current.start) | |
} | |
def drop: USTM[Boolean] = stateRef.get.flatMap { current => | |
if (current.size == 0) ZSTM.succeed(false) | |
else stateRef.set( | |
current.copy(start = (current.start + 1) % capacity, size = current.size - 1) | |
).as(true) | |
} | |
def capacity: Int = array.size | |
def size: USTM[Int] = stateRef.get.map(_.size) | |
def clear: USTM[Unit] = stateRef.set(State(0, 0, 0)) | |
def isFull: USTM[Boolean] = size.map(_ == capacity) | |
def isEmpty: USTM[Boolean] = size.map(_ == 0) | |
def apply(index: Int): STM[Option[Nothing], A] = | |
size.flatMap { currentSize => | |
if (index >= currentSize) STM.fail(None) | |
else stateRef.get.flatMap(current => array((current.start + index) % capacity)) | |
} | |
def fold[B](initial: B)(f: (B, A) => B): USTM[B] = stateRef.get.flatMap { currentState => | |
def help(index: Int, b: B): USTM[B] = | |
if (index >= currentState.size) STM.succeed(b) | |
else array((currentState.start + index) % capacity).flatMap(a => help(index + 1, f(b, a))) | |
help(0, initial) | |
} | |
def sum(implicit ev: Numeric[A]): USTM[A] = fold(ev.zero)(ev.plus) | |
} | |
object TRingBuffer { | |
final private case class State(start: Int, end: Int, size: Int) | |
def make[A](initial: A*): USTM[TRingBuffer[A]] = TArray.make(initial: _*).flatMap(array => | |
TRef.make(State(0, 0, initial.size)).map(new TRingBuffer[A](array, _)) | |
) | |
def fromIterable[A]( | |
initial: Iterable[A] | |
): USTM[TRingBuffer[A]] = TArray.fromIterable(initial).flatMap(array => | |
TRef.make(State(0, 0, initial.size)).map(new TRingBuffer[A](array, _)) | |
) | |
def empty[A](capacity: Int, initial: A): USTM[TRingBuffer[A]] = | |
TArray.fromIterable(Iterable.fill(capacity)(initial)).flatMap(array => | |
TRef.make(State(0, 0, 0)).map(new TRingBuffer[A](array, _)) | |
) | |
def empty[A: Identity](capacity: Int): USTM[TRingBuffer[A]] = | |
empty(capacity, Identity[A].identity) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment