Skip to content

Instantly share code, notes, and snippets.

@stuarthalloway
Created June 7, 2025 13:48
Show Gist options
  • Save stuarthalloway/f7d852d4dded5344df77e409e69e10e2 to your computer and use it in GitHub Desktop.
Save stuarthalloway/f7d852d4dded5344df77e409e69e10e2 to your computer and use it in GitHub Desktop.
Confused by :chan-opts
(require '[clojure.core.async.flow :as flow]
'[clojure.core.async.flow-monitor :as mon])
(defn tap-n-drop [x] (tap> x) nil)
;; chan-opts on out channel seem to be ignored
;; override buf size not visible in flow-monitor
;; transform not applied
(def flow
"Simple flow connecting credit-core to an event sync that taps"
(flow/create-flow
{:procs {:source {:proc (-> identity flow/lift1->step flow/process)
:chan-opts {:out {:buf-or-n 11
:xform (map (fn [x] (str "Saw " x)))}}}
:sink {:proc (-> #'tap-n-drop flow/lift1->step flow/process)}}
:conns [[[:source :out] [:sink :in]]]}))
;; chan-opts on in channel seem to work
;; override buf size visible in flow-monitor
;; transform applied
(def flow
"Simple flow connecting credit-core to an event sync that taps"
(flow/create-flow
{:procs {:source {:proc (-> identity flow/lift1->step flow/process)}
:sink {:proc (-> #'tap-n-drop flow/lift1->step flow/process)
:chan-opts {:in {:buf-or-n 11
:xform (map (fn [x] (str "Saw " x)))}}}}
:conns [[[:source :out] [:sink :in]]]}))
;; start
(do
(flow/start flow)
(flow/resume flow)
(def server (mon/start-server {:flow flow})))
;; test
@(flow/inject flow [:sink :in] ["hello"])
;; stop
(do
(mon/stop-server server)
(flow/stop flow))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment