Created
August 30, 2015 23:23
-
-
Save fiadliel/92c1c37880f916a89df9 to your computer and use it in GitHub Desktop.
Reading Zip files with scalaz-stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{File, FileNotFoundException, IOException, InputStream} | |
import java.util.zip.{ZipEntry, ZipException, ZipFile} | |
import org.http4s.DateTime | |
import scodec.bits.ByteVector | |
import scalaz._ | |
import scalaz.Scalaz._ | |
import scalaz.concurrent.Task | |
import scalaz.stream._ | |
/** | |
* Provides utilities for reading Zip files, and safely providing streams | |
* of data from those Zip files to a user. | |
*/ | |
object ZipStream { | |
type ZipReader[A] = EitherT[Task, ReadError, A] | |
object ZipReader { | |
def apply[A](t: Task[ReadError \/ A]): ZipReader[A] = EitherT(t) | |
def apply[A](t: ReadError \/ A): ZipReader[A] = EitherT(Task.delay(t)) | |
def apply[A](t: A): ZipReader[A] = EitherT(Task.delay(t.right)) | |
def handle[A](t: Task[A])(pf: PartialFunction[Throwable, ReadError]): ZipReader[A] = { | |
ZipReader(t.map(_.right).handle(pf.andThen(_.left))) | |
} | |
def getOrError[A](t: Task[Option[A]], none: ReadError): ZipReader[A] = { | |
ZipReader(t.map(_.map(_.right).getOrElse(none.left))) | |
} | |
} | |
def getZipFile(file: File): ZipReader[ZipFile] = { | |
Task.delay(new ZipFile(file)) |> { t => | |
ZipReader.handle(t) { | |
case e: FileNotFoundException => FileNotFound | |
} | |
} | |
} | |
def getZipFileOrNone(file: File, ifModifiedSince: Option[DateTime]): ZipReader[ZipFile] = { | |
ifModifiedSince match { | |
case None => getZipFile(file) | |
case Some(lm) => | |
ZipReader(DateTime(file.lastModified)) flatMap { lm => | |
if (lm < DateTime(file.lastModified)) | |
getZipFile(file) | |
else | |
ZipReader(ContentUnchanged.left) | |
} | |
} | |
} | |
def getZipEntry(zipFile: ZipFile, name: String): ZipReader[ZipEntry] = { | |
Task.delay(Option(zipFile.getEntry(name))) |> { t => | |
ZipReader.getOrError(t, EntryNotFound) | |
} | |
} | |
def getInputStream(zipFile: ZipFile, entry: ZipEntry): ZipReader[InputStream] = { | |
Task.delay(zipFile.getInputStream(entry)) |> { t => | |
ZipReader.handle(t) { | |
case e: ZipException => FormatError | |
case io: IOException => IOError | |
} | |
} | |
} | |
def getChunkStream(is: InputStream, chunkSize: Int): ZipReader[Process[Task, ByteVector]] = { | |
ZipReader(Process.constant(chunkSize).through(io.chunkR(is))) | |
} | |
/** | |
* | |
* @param file The [[java.io.File]] representing the Zip file to read from. | |
* @param name The name of the entry in the Zip file to retrieve. | |
* @param ifModifiedSince Only return data if modified since this time. | |
* @param chunkSize Data is returned in batches of this size. | |
* @return A [[scalaz.concurrent.Task]] which will give either an error, or | |
* a [[scalaz.stream.Process]] providing a stream of [[scodec.bits.ByteVector]] | |
* which contains the content of the Zip entry. | |
*/ | |
def fromZip(file: File, name: String, ifModifiedSince: Option[DateTime], chunkSize: Int = 10240): Task[ReadError \/ Process[Task, ByteVector]] = { | |
(for { | |
zipFile <- getZipFileOrNone(file, ifModifiedSince) | |
entry <- getZipEntry(zipFile, name) | |
is <- getInputStream(zipFile, entry) | |
stream <- getChunkStream(is, chunkSize) | |
} yield stream).run | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment