Last active
January 10, 2024 16:33
-
-
Save ruzrobert/f03f80afc1cba0d5affcdd6a413a85dd to your computer and use it in GitHub Desktop.
A special type of Lazy MemoryStream, made to connect an output streaming data and a stream consumer.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
/// <summary> | |
/// A special type of Lazy MemoryStream, made to connect an output streaming data and a stream consumer. | |
/// Useful when an output can only be streamed to a stream, | |
/// and can not create it's own new stream (for example Ionic Zip Extract). | |
/// | |
/// The data is written lazily - this stream does not store any data. | |
/// The writing is done by directly writing the data to the read buffer, provided by the actual reader. | |
/// The stream is Thread-Safe. | |
/// | |
/// When the data writing is done, the stream creator must call the EndWrite() method to mark the data end. | |
/// | |
/// Sources used for inspiration: | |
/// https://stackoverflow.com/questions/1475747/is-there-an-in-memory-stream-that-blocks-like-a-file-stream | |
/// https://stackoverflow.com/questions/27810289/c-sharp-buffered-zip-stream-proxy | |
/// | |
/// By: @ruzrobert (GitHub) | |
/// </summary> | |
public class ThroughStream : Stream | |
{ | |
public override bool CanRead => true; | |
public override bool CanSeek => false; | |
public override bool CanWrite { get { lock (lockObject) { return isWritable; } } } | |
public override long Length => throw new NotImplementedException(); | |
public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } | |
private bool isOpen = true; | |
private bool isWritable = true; | |
private bool hasBufferToReadIn = false; | |
private bool hasLastReadingEnded = true; | |
private bool isWriting = false; | |
private byte[] readBuffer = null; | |
private int readOffset = 0; | |
private int readCount = 0; | |
private object lockObject = new object(); | |
/// <summary> | |
/// Before closing the stream please call the EndWrite method to mark the data end. | |
/// Otherwise the Reader won't be able to read the stream to the end. | |
/// </summary> | |
public override void Close() | |
{ | |
lock (lockObject) | |
{ | |
isOpen = false; | |
EndWriteInternal(); | |
base.Close(); | |
} | |
} | |
/// <summary> | |
/// When the data writing is done, call this method to mark the data end. | |
/// </summary> | |
public void EndWrite() | |
{ | |
lock (lockObject) | |
{ | |
if (!isOpen) throw new InvalidOperationException("The stream is closed"); | |
if (!isWritable) throw new InvalidOperationException("EndWrite can only be called once"); | |
EndWriteInternal(); | |
} | |
} | |
private void EndWriteInternal() | |
{ | |
isWritable = false; | |
hasBufferToReadIn = false; | |
readBuffer = null; | |
} | |
/// <summary> | |
/// Write data to the stream reader. If there is no Read request, this method will wait until there is one. | |
/// </summary> | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
if (count == 0) return; | |
if (!isOpen) throw new InvalidOperationException("Writing to a closed stream is not allowed"); | |
lock (lockObject) // Check for simultanous writing | |
{ | |
if (isWriting) throw new InvalidOperationException("Previous Write request is not done yet"); | |
isWriting = true; | |
} | |
// Write loop | |
while (true) // could have been 'count < 0', but we need the iterative lock to sync properly | |
{ | |
lock (lockObject) | |
{ | |
if (!isWritable) throw new InvalidOperationException("The stream is not writable anymore"); | |
if (hasBufferToReadIn) // Write if we have a Read request | |
{ | |
int writeCount = Math.Min(readCount, count); | |
Buffer.BlockCopy(buffer, offset, readBuffer, readOffset, writeCount); | |
offset += writeCount; | |
count -= writeCount; | |
readOffset += writeCount; | |
readCount -= writeCount; | |
if (readCount <= 0) // the Read request is fulfilled | |
{ | |
hasBufferToReadIn = false; | |
readBuffer = null; | |
} | |
if (count <= 0) // nothing else to write | |
{ | |
isWriting = false; | |
break; | |
} | |
} | |
} | |
} | |
} | |
/// <summary> | |
/// Reads data by creating a read request. Returns the read byte count once the writing is done. | |
/// </summary> | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
if (buffer == null) throw new ArgumentNullException(nameof(buffer)); | |
if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset)); | |
if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); | |
if (buffer.Length - offset < count) throw new ArgumentException("Wrong offset or count"); | |
if (!isOpen) throw new InvalidOperationException("The stream is closed"); | |
if (count == 0) return 0; | |
// Save the buffer we want to read in to | |
lock (lockObject) | |
{ | |
if (!isWritable) return 0; // if there won't be new data anymore, return 0 | |
if (!hasLastReadingEnded) throw new InvalidOperationException("Previous Read request is not done yet"); | |
hasBufferToReadIn = true; | |
hasLastReadingEnded = false; | |
readBuffer = buffer; | |
readOffset = offset; | |
readCount = count; | |
} | |
// Wait till the writing is done | |
while (true) | |
{ | |
lock (lockObject) | |
{ | |
if (!hasBufferToReadIn) | |
{ | |
hasLastReadingEnded = true; | |
return readOffset - offset; | |
} | |
} | |
} | |
} | |
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); | |
public override void SetLength(long value) => throw new NotImplementedException(); | |
public override void Flush() => throw new NotImplementedException(); | |
} |
Hi @VyacheslavPritykin ! Thank you very much for helping to switch away from the raw locks! The performance is identical (or even better) than reading a whole file to RAM and spiking up to 2 GB, which is really great!
Sadly, that we have to use a separate library for managing the locks, though. What's so special about those async locks? I've checked their internal code, but it seems that they are still using the default locks mechanism under the hood, and I don't know what makes them so fast.
Thank you for the tests as well! I think we could reformat this gist to a separate github repo maybe, this code is much better than in those answers on StackOverflow IMO :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I also updated unit tests to cover the case when the reader requests more data than the stream would ever have: