Created
March 31, 2014 08:02
-
-
Save halfelf/9887429 to your computer and use it in GitHub Desktop.
Reliable word count example for storm in Clojure. Note the `ack` and `fail` parts in spout.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns storm.starter.clj.word-count2 | |
(:import [backtype.storm StormSubmitter LocalCluster]) | |
(:use [backtype.storm clojure config log]) | |
(:gen-class)) | |
(def id-count (atom 0)) ;; tuple counter for debugging -- something to make ids out of | |
(defspout sentence-spout ["sentence"] | |
[conf context collector] | |
(let [sentences ["a little brown dog" | |
"the man petted the dog" | |
"four score and seven years ago" | |
"an apple a day keeps the doctor away"]] | |
(spout | |
(nextTuple [] | |
(Thread/sleep 10) | |
(let [sentence (rand-nth sentences) | |
id (swap! id-count inc)] | |
(emit-spout! collector [sentence] :id id) | |
)) | |
(ack [id] | |
(log-message "Acking " id)) | |
(fail [id] | |
(log-message "Failing " id))))) | |
(defbolt split-sentence ["word"] [tuple collector] | |
(let [sentence (.getString tuple 0) | |
words (.split sentence " ")] | |
(doseq [w words] | |
(emit-bolt! collector [w] :anchor tuple)) | |
(ack! collector tuple) | |
)) | |
(defbolt word-count ["word" "count"] {:prepare true} | |
[conf context collector] | |
(let [counts (atom {})] | |
(bolt | |
(execute [tuple] | |
(let [word (.getString tuple 0)] | |
(swap! counts (partial merge-with +) {word 1}) | |
;(emit-bolt! collector [word (@counts word)] :anchor tuple) | |
(ack! collector tuple) | |
))))) | |
(defn mk-topology [] | |
(topology | |
{"1" (spout-spec sentence-spout)} | |
{"3" (bolt-spec {"1" :shuffle} | |
split-sentence | |
:p 5) | |
"4" (bolt-spec {"3" ["word"]} | |
word-count | |
:p 6)})) | |
(defn run-local! [] | |
(let [cluster (LocalCluster.)] | |
(.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology)) | |
(Thread/sleep 10000) | |
(.shutdown cluster) | |
)) | |
(defn -main [name] | |
(StormSubmitter/submitTopology | |
name | |
{TOPOLOGY-DEBUG false | |
TOPOLOGY-WORKERS 3} | |
(mk-topology))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment