Last active
October 7, 2015 09:57
-
-
Save kachayev/3146759 to your computer and use it in GitHub Desktop.
Channels-driven concurrency with Clojure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Channels-driven concurrency with Clojure | |
;; Clojure variant for code examples from this gist: | |
;; https://gist.github.com/3124594 | |
;; Primarily taken from Rob Pike's talk on Google I/O 2012: | |
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be | |
;; | |
;; Concurrency is the key to designing high performance network services. | |
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc. | |
;; There is no implementation for "Go channels" in core, but we can use | |
;; 3rd-party library Lamina to do the same things. | |
;; | |
;; https://github.com/ztellman/lamina | |
;; | |
;; I should also mention, that this is not a simple copy of syntax/semantic notations | |
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with | |
;; functional approach of data transformation from initial to desired state). | |
;; (1) Generator: function that returns the channel | |
(use 'lamina.core) | |
(defn boring | |
[name] | |
(let [ch (channel)] | |
;; future will run separately from main control flow | |
(future | |
;; emit string message five times with random delay | |
(dotimes [_ 5] | |
(let [after (int (rand 500))] | |
(Thread/sleep after) | |
(enqueue ch (str name ": I'm boring after " after))))) | |
;; return the channel to caller | |
ch)) | |
;; With single instance | |
(let [joe (boring "Joe")] | |
(doseq [msg (lazy-channel-seq (take* 5 joe))] (println msg))) | |
(println "You're boring: I'm leaving.") | |
;; Process all messages from channel | |
;; Please, note this notation is asynchronous, so... | |
(let [joe (boring "Joe")] (receive-all joe println)) | |
;; you will see this message first :) | |
(println "You're boring: I'm leaving.") | |
;; More instances... | |
;; Actually, this is little bit tricky and it's definitely other | |
;; mechanism than we use in Go for this example. It's more | |
;; likely what we do in "#2 Fan-in" code examples. | |
(let [joe (boring "Joe") ann (boring "Ann") chs (channel)] | |
(doseq [ch [joe ann]] (join ch chs)) | |
(receive-all chs println)) | |
(println "You're boring: I'm leaving.") | |
;; More instances... | |
;; Read from one channel, than - from second | |
(let [joe (boring "Joe") ann (boring "Ann")] | |
(loop [] | |
(doseq [ch [joe ann]] | |
;; TODO: Fix checking for channel closing (this is wrong way) | |
(when-not (closed? ch) (println @(read-channel ch)))) | |
(recur))) | |
;; Read from one channel, than - from second | |
;; Several improvements in order to stop execution, | |
;; when both channels are closed (without any information | |
;; about total count of messages) | |
(let [joe (boring "Joe") ann (boring "Ann") run (atom 2)] | |
(loop [] | |
(doseq [ch [joe ann]] | |
;; TODO: Fix checking for channel closing (this is wrong way) | |
(if (closed? ch) | |
(swap! run dec) | |
(println @(read-channel ch)))) | |
(if (> @run 0) (recur)))) | |
(println "You're boring: I'm leaving.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Channels-driven concurrency with Clojure | |
;; Clojure variant for code examples from this gist: | |
;; https://gist.github.com/3124594 | |
;; Primarily taken from Rob Pike's talk on Google I/O 2012: | |
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be | |
;; | |
;; Concurrency is the key to designing high performance network services. | |
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc. | |
;; There is no implementation for "Go channels" in core, but we can use | |
;; 3rd-party library Lamina to do the same things. | |
;; | |
;; https://github.com/ztellman/lamina | |
;; | |
;; I should also mention, that this is not a simple copy of syntax/semantic notations | |
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with | |
;; functional approach of data transformation from initial to desired state). | |
;; (2) Fan-in | |
;; "Hand-made" one | |
(defn fan-in | |
[input1 input2] | |
(let [ch (channel) pusher (partial enqueue ch)] | |
(doseq [x [input1 input2]] (receive-all x pusher)) ch)) | |
;; Or any count of inputs instead of just 2 | |
(defn fan-in | |
[& inputs] | |
(let [ch (channel) pusher (partial enqueue ch)] | |
(doseq [x inputs] (receive-all x pusher)) ch)) | |
;; Or more "clojurian" approach with join | |
(defn fan-in | |
[& inputs] | |
(let [ch (channel)] (doseq [x inputs] (join x ch)) ch)) | |
;; Do printing only 10 times | |
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))] | |
(receive-all (take* 10 ch) println)) | |
;; Or any times something will be pushed to channel | |
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))] (receive-all ch println)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Channels-driven concurrency with Clojure | |
;; Clojure variant for code examples from this gist: | |
;; https://gist.github.com/3124594 | |
;; Primarily taken from Rob Pike's talk on Google I/O 2012: | |
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be | |
;; | |
;; Concurrency is the key to designing high performance network services. | |
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc. | |
;; There is no implementation for "Go channels" in core, but we can use | |
;; 3rd-party library Lamina to do the same things. | |
;; | |
;; https://github.com/ztellman/lamina | |
;; | |
;; I should also mention, that this is not a simple copy of syntax/semantic notations | |
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with | |
;; functional approach of data transformation from initial to desired state). | |
;; (3) Select | |
;; Clojure doesn't have "select" (mostly cause of functional approach), | |
;; but we can simulate it using map* and case calls | |
(let [joe (boring "Joe") | |
;; Will generate messages each 60 ms | |
timer (periodically 60 (fn [] "You're too slow!")) | |
;; All channels will be joined with this one | |
select (channel)] | |
(doseq | |
[[t ch] [["joe" joe] ["timer" timer]]] | |
;; Map message to struct [type message] | |
;; TODO: Check if I can you (named-channel) for this | |
(join (map* (partial conj [t]) ch) select)) | |
;; Read from channel until it's not closed (in blocking mode) | |
(receive-all select | |
(fn [[name msg]] | |
(println (str msg | |
(case name | |
"joe" " <== Message from Joe" | |
"timer" " <== Timeout")))))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Channels-driven concurrency with Clojure | |
;; Clojure variant for code examples from this gist: | |
;; https://gist.github.com/3124594 | |
;; Primarily taken from Rob Pike's talk on Google I/O 2012: | |
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be | |
;; | |
;; Concurrency is the key to designing high performance network services. | |
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc. | |
;; There is no implementation for "Go channels" in core, but we can use | |
;; 3rd-party library Lamina to do the same things. | |
;; | |
;; https://github.com/ztellman/lamina | |
;; | |
;; I should also mention, that this is not a simple copy of syntax/semantic notations | |
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with | |
;; functional approach of data transformation from initial to desired state). | |
;; (4) Timeouts | |
;; To test timeouts let add one line into boring generator | |
(defn boring | |
[name] | |
(let [ch (channel)] | |
;; future will run separately from main control flow | |
(future | |
;; emit string message five times with random delay | |
(dotimes [_ 5] | |
(let [after (int (rand 100))] | |
(Thread/sleep after) | |
(enqueue ch (str name ": I'm boring after " after)))) | |
(close ch)) ;; <--- Here. Let's close channel after 5 messages. | |
;; return the channel to caller | |
ch)) | |
;; Timeout for whole conversation | |
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))] | |
;; note 3rd param for lazy-channel-seq function | |
(doseq [msg (lazy-channel-seq (take* 10 ch) 500)] (println msg))) | |
(println "You're boring: I'm leaving.") | |
;; Timeout for each message | |
;; There are multiple ways of how to perform such functionality. | |
;; Here you can find approach using (with-timeout _) wrapper for ResultChannel. | |
(let [ch (apply fan-in (map boring ["Joe" "Ann"]))] | |
(loop [] | |
;; TODO: Fix checking for channel closing (this is wrong way) | |
(when-not (closed? ch) | |
(println @(with-timeout 50 (read-channel ch))) (recur)))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Channels-driven concurrency with Clojure | |
;; Clojure variant for code examples from this gist: | |
;; https://gist.github.com/3124594 | |
;; Primarily taken from Rob Pike's talk on Google I/O 2012: | |
;; http://www.youtube.com/watch?v=f6kdp27TYZs&feature=youtu.be | |
;; | |
;; Concurrency is the key to designing high performance network services. | |
;; Clojure provides several concurrency primitives, like futures/promises, atom, agent etc. | |
;; There is no implementation for "Go channels" in core, but we can use | |
;; 3rd-party library Lamina to do the same things. | |
;; | |
;; https://github.com/ztellman/lamina | |
;; | |
;; I should also mention, that this is not a simple copy of syntax/semantic notations | |
;; from Go, I tried to create "clojure-like" variant of how to do the same things (with | |
;; functional approach of data transformation from initial to desired state). | |
;; (7) Fake Google search | |
(defn fake-search | |
[kind] | |
(fn [query ch] | |
;; Fake seach will work in async mode and will push result to channel | |
(future | |
(Thread/sleep (int (rand 100))) | |
(enqueue ch (str kind " result for " query)) (close ch)) | |
ch)) | |
(defn fan-in | |
[& inputs] | |
(let [ch (channel)] | |
(doseq [x inputs] (join x ch)) ch)) | |
(defn fastest | |
[query & replicas] | |
(let [chs (map #(% query (channel)) replicas) | |
ch (apply fan-in chs)] | |
@(read-channel ch))) | |
(defn google | |
[query] | |
(println "Start searching") | |
(let [ch (channel)] | |
(doseq [s '(["Web1" "Web2"] | |
["Image1" "Image2"] | |
["Video1" "Video2"])] | |
(future | |
(enqueue ch | |
(apply fastest (conj (map fake-search s) query))))) | |
(time ;; This macro will check elapsed time for calculations | |
(doseq [msg (lazy-channel-seq (take* 3 ch) 80)] | |
(println msg))))) | |
;; Output result for calling | |
;; (google "Go & Clojure channels") | |
;; | |
;; Start searching | |
;; Video1 result for Go & Clojure channels | |
;; Image2 result for Go & Clojure channels | |
;; Web2 result for Go & Clojure channels | |
;; "Elapsed time: 74.172 msecs" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment