Created
February 28, 2018 17:13
-
-
Save amit1rrr/205d04bd702ee7fde6d1780e9b0eb24d to your computer and use it in GitHub Desktop.
Word count demo of Apache Kafka.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Serializers/deserializers (serde) for String and Long types | |
final Serde<String> stringSerde = Serdes.String(); | |
final Serde<Long> longSerde = Serdes.Long(); | |
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values | |
// represent lines of text (for the sake of this example, we ignore whatever may be stored | |
// in the message keys). | |
KStream<String, String> textLines = builder.stream("streams-plaintext-input", | |
Consumed.with(stringSerde, stringSerde); | |
KTable<String, Long> wordCounts = textLines | |
// Split each text line, by whitespace, into words. | |
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) | |
// Group the text words as message keys | |
.groupBy((key, value) -> value) | |
// Count the occurrences of each word (message key). | |
.count() | |
// Store the running counts as a changelog stream to the output topic. | |
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment