Created
May 4, 2021 07:12
-
-
Save evilz/248e4469ff17bf0b341111acf89414e0 to your computer and use it in GitHub Desktop.
ProtbufConnection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module ProtbufConnection2 = | |
let readWriteTimeout = 120000 | |
let writeMessage (clientSocket: ClientWebSocket) (token: Threading.CancellationToken) (req: #IMessage<Request>) = | |
let sendBuf = Array.zeroCreate (1024 * 1024) | |
use co = | |
new Google.Protobuf.CodedOutputStream(sendBuf) | |
req.WriteTo(co) | |
let written = int co.Position | |
let send = ArraySegment<byte>(sendBuf, 0, written) | |
use cancellationSource = new CancellationTokenSource() | |
cancellationSource.CancelAfter(readWriteTimeout) | |
clientSocket.SendAsync(send, WebSocketMessageType.Binary, true, cancellationSource.Token) | |
let readMessage (clientSocket: ClientWebSocket) (token: Threading.CancellationToken) = | |
task { | |
let mutable finished = false | |
let mutable curPos = 0 | |
let receiveBuf = Array.zeroCreate (1024 * 1024) | |
while not finished do | |
use cancellationSource = new CancellationTokenSource() | |
let left = receiveBuf.Length - curPos | |
// TODO DOUBLE SIZE and COPY | |
if left <= 0 then | |
failwithf "Our buffer wasn't large enough for the current message!" | |
let segment = | |
System.ArraySegment(receiveBuf, curPos, left) | |
cancellationSource.CancelAfter(readWriteTimeout) | |
let! result = clientSocket.ReceiveAsync(segment, cancellationSource.Token) | |
match result.MessageType with | |
| WebSocketMessageType.Binary -> | |
curPos <- curPos + result.Count | |
finished <- result.EndOfMessage | |
| _ -> failwithf "Expected a binary response!" | |
let response = | |
Response.Parser.ParseFrom(new System.IO.MemoryStream(receiveBuf, 0, curPos)) | |
return response | |
} | |
let sendRequest (clientSocket: ClientWebSocket) (token: Threading.CancellationToken) (logger: ILogger<_>) request = | |
task { | |
do! request |> writeMessage clientSocket token | |
let! response = readMessage clientSocket token | |
return | |
match response.Response with | |
| ValueNone -> | |
if response.Error.Count = 0 then | |
failwith "Unexpected result and no error information!" | |
else | |
logger.LogError(response.Error.ToString()) | |
(response.Status.Value, response.Response) | |
| ValueSome r -> | |
for error in response.Error do | |
logger.LogError $"Response warning: {error}" | |
(response.Status.Value, response.Response) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment