Last active
October 27, 2019 14:44
-
-
Save quelgar/f34dde2d33e3ea58a5ef4f0b2ab193a8 to your computer and use it in GitHub Desktop.
Inputstream to ZIO Stream of byte chunks.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.InputStream | |
import java.nio.file.{Files, Path, Paths} | |
import scalaz.zio.console.putStrLn | |
import scalaz.zio.duration._ | |
import scalaz.zio.stream.Stream | |
import scalaz.zio.stream.Stream.Fold | |
import scalaz.zio.{App, Chunk, IO, Managed} | |
object ZioInputStreamTest extends App { | |
/** | |
* Constructs a stream the repeatedly reads from a resource that must be acquired and released. | |
* | |
* Reading stops when the reading function returns None. | |
*/ | |
def bracketLoop[E, R, A](acquire: IO[E, R])(release: R => IO[Nothing, Unit])(read: R => IO[E, Option[A]]): Stream[E, A] = { | |
managedLoop(Managed.make(acquire)(release))(read) | |
} | |
/** | |
* Constructs a stream that repeatedly reads from a managed resource. | |
* | |
* Reading stops when the reading function returns None. | |
*/ | |
def managedLoop[E, R, A](m: Managed[E, R])(read: R => IO[E, Option[A]]): Stream[E, A] = { | |
new Stream[E, A] { | |
override def fold[E1 >: E, A1 >: A, S]: Fold[E1, A1, S] = { | |
IO.succeed { (s, cont, f) => | |
if (cont(s)) { | |
m.use { r => | |
def loop(s2: S): IO[E1, S] = { | |
read(r).flatMap { | |
case Some(a) => f(s2, a).flatMap(loop) | |
case None => IO.succeed(s2) | |
} | |
} | |
loop(s) | |
} | |
} else { | |
IO.succeed(s) | |
} | |
} | |
} | |
} | |
} | |
private val terminateWait = 5.seconds | |
private val errMsg = (_: Exception).toString | |
private def openFile(p: Path): IO[String, InputStream] = IO.syncException(Files.newInputStream(p)).mapError(errMsg) | |
override def run(args: List[String]): IO[Nothing, ExitStatus] = { | |
val program = for { | |
file <- args.headOption.map(x => IO.succeed(Paths.get(x))).getOrElse(IO.fail("File to read must be specified")) | |
size <- IO.syncException(Files.size(file)).mapError(errMsg) | |
_ <- putStrLn(s"File size is $size bytes") | |
// a stream of byte chunks read from the specified file | |
stream = bracketLoop(openFile(file))(in => IO.syncException(in.close()).attempt.void) { in => | |
IO.syncException { | |
val buf = Array.ofDim[Byte](2000) | |
val read = in.read(buf) | |
if (read < 0) { | |
None | |
} else { | |
Some(Chunk.fromArray(buf).take(read)) | |
} | |
}.mapError(errMsg) | |
} | |
// print the size of each chunk read, and track the total size and number of chunks | |
// requires https://github.com/oleg-py/better-monadic-for | |
(count, totalSize) <- stream | |
.withEffect(chunk => putStrLn(s"Read Chunk of size: ${chunk.length}")) | |
.foldLeft((0, 0))((acc, a) => (acc._1 + 1, acc._2 + a.length)) | |
_ <- putStrLn(s"Done: $count chunks, total bytes $totalSize") | |
} yield () | |
program.redeem( | |
error => putStrLn(error) *> IO.succeed(ExitStatus.ExitWhenDone(1, terminateWait)), | |
Function.const(IO.succeed(ExitStatus.ExitWhenDone(0, terminateWait))) | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment