Last active
May 20, 2020 15:47
-
-
Save pron/6b2b5c3a60047ed668d6a85a24eb0c6d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Using a hypothetical channel API, I assume I have channel `in` with a 4096-wide buffer, | |
// and a channel `out` writing to Kafka. | |
// lookupNodeInRedis and processOnServer have semaphores of, say, 16 and 8, to *globally* limit | |
// concurrent use of those resources. | |
HighLevelLib.processInOrder(50, // how many processors | |
in, | |
HighLevelLib.batchEvery(Duration.ofSeconds(1), out), | |
msg -> | |
try { | |
var uri = lookupNodeInRedis(msg.record().value()); // blocking | |
var uriDataOffset = new UriDataOffset(uri, msg.record().value(), msg.committableOffset()); | |
processOnServer(uriDataOffset.uri, uriDataOffset.data); // blocking | |
return uriDataOffset.offset; | |
} catch (Exception e) { ... }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment