Last active
May 22, 2020 05:42
-
-
Save pron/e492d6f69a224c3c6dd7edaa07a54245 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Using a hypothetical channel API, I assume I have channel `in` with a 4096-wide buffer, | |
// and a channel `out` writing to Kafka. | |
// lookupNodeInRedis and processOnServer have semaphores of, say, 16 and 8, to *globally* limit | |
// concurrent use of those resources. | |
var processingQueue = new BufferedChannel(50); // this buffer bounds the number of processing threads | |
// Consume and process input | |
Thread.startVirtualThread(() -> { | |
try (var e = Executors.newUnboundedVirtualThreadExecutor()) { | |
while (true) { | |
var msg = in.receive(); // for each incoming message we start a processing thread | |
var future = e.submit(() -> { | |
try { | |
var uri = lookupNodeInRedis(msg.record().value()); // blocking | |
var uriDataOffset = new UriDataOffset(uri, msg.record().value(), msg.committableOffset()); | |
processOnServer(uriDataOffset.uri, uriDataOffset.data); // blocking | |
return uriDataOffset.offset; | |
} catch (Exception e) { ... } | |
}); | |
processingQueue.send(future); // we block here if there are too many processing threads | |
} | |
} | |
}); | |
// Collect and batch output | |
Thread.startVirtualThread(() -> { | |
while (true) { | |
var batch = new ArrayList<Result>(); | |
try (var e = Executors.newUnboundedVirtualThreadExecutor() | |
.withDeadline(Instant.now().plusSeconds(1))) { | |
e.submit(() -> | |
while (true) { | |
window.add(processingQueue.map(Future::get).receive()); // WARNING: map must be atomic | |
} | |
}); | |
} catch (Exception e) { ... } | |
out.send(batch); | |
} | |
}).join(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment