Skip to content

Instantly share code, notes, and snippets.

@dustingetz
Created December 25, 2021 21:05
Show Gist options
  • Save dustingetz/d3fb5338b43ef86df5f97387f1d5e249 to your computer and use it in GitHub Desktop.
Save dustingetz/d3fb5338b43ef86df5f97387f1d5e249 to your computer and use it in GitHub Desktop.
(ns dustin.y2021.missionary_promise
(:require [hyperfiddle.rcf :as rcf :refer [tests % !]]
[missionary.core :as m]))
; We want to turn a promise-thing into a Task
; Leo: The problem with Promise and CompletableFuture is no cancellation
; What task do we want? Is the task listening to the promise? Or is the task the process backing the promise?
; Background: when you get a promise, there is a process in the background which eventually completes the promise.
; Do you want to await the result of an already running promise
; or do you want to run the process of the promise when the task is run?
; This is the first ambiguity
; What is a promise? Promise is an object with a lifecycle that completes with a value
; Leo: "single-assignment variable" aka a "dataflow variable", from the
; book *Concepts, Techniques and Models of Computer Programming*
(defn await-promise-broken "Naive promise to task adapter that doesn't implement cancellation"
[p]
(fn [s f]
; If the promise doesn't ever complete, S or F is never called
(.then p s f)
; If the task is cancelled, S or F is never called, so the task never terminates. It hangs forever
; This is a violation of the task contract
#()
))
; You have to succeed or fail. But what if it never completes or is cancelled before completion?
; if you throw away cancellation you must be sure the callback will complete in reasonable time, it's dangerous
(tests
"this program hangs forever"
((m/race (fn [s f]
; ignore cancellation signal
#()
; Violation of task contract! does not propogate success or failure ever!
)
(m/sleep 800 42)) ! !)
% := ::rcf/timeout) ; Should return 42 when the sleep completes!
; The only time it is OK to ignore the cancellation signal is if the underlying process (e.g. http request) has a timeout thus the F callback
; does get called in a reasonable timeframe.
; How do we fix it?
(tests
""
; This program is malformed because the task is malformed.
; The contract of task is broken because in reaction to termination signal we must terminate
; in a reasonable amount of time.
((m/race (fn [s f]
; What should happens when canceled? The task can succeed or fail
; This is use case dependent, it doesn't matter in this test because the race discards the value
#(f nil))
(m/sleep 800 42))
! !) ; passes, but still malformed
% := 42
; Leo says this program is also malformed because the cancellation callback must be idempotent.
)
(tests
"m/never"
; A task never succeeding. Cancelling makes it fail immediately.
((m/race m/never
(m/sleep 800 42))
! !)
% := 42)
; Better answer:
; To make it idempotent you need state
(tests
((m/race (fn [s f]
; clojure.core/delay will ensure the callback is called once.
(let [x (delay (s nil))] ; memoized
(fn [] @x)))
(m/sleep 800 42))
! !)
% := 42)
; The problem is state.
; No DFV which is a problem.
; Javascript promises have another feature which is their status (success, pending, failure)- not just a result.
; clojure.core/promise (JVM only) does not have a status, only completion effect.
; Tasks do not have this state (tasks are programs, they have no identity, and they are pure values)
; In JS if promises had no status they would be equivalent to m/dfv but since they do have status we need
; attempt/absolve (unless they never fail or succeed). JS Promises are DVF + Either. A result + status.
; Final answer
; Derive the task from a DFV, inheriting the DFV's cancellation semantics
; Use the promise process's effect to set value in the DFV or fail the DFV.
; Cancellation is defined on this task even if the promise never completes.
(defn await-promise "Returns a task completing with the result of given promise"
[p]
; p is already running, we have no control of that
; (Note that promise guarantees that even if already completed, a new .then chain will fire)
(let [v (m/dfv)] ; dataflow "atom"
(.then p
#(v (fn [] %)) ; wrap result in closure and put closure in atom
#(v (fn [] (throw %)))) ; delayed throw
(m/absolve v))) ; run whichever closure once it is set, yielding an async value or async exception
; m/absolve just lets just fail inside the context of the DFV, this is just machinery
@dustingetz
Copy link
Author

Leo: Here is an example with CompletableFuture

(defn from-cf-call [->cf]
  (fn [s f]
    (try (let [^CompletableFuture cf (->cf)]
           (.whenComplete cf
             (reify BiConsumer
               (accept [_ r e]
                 (if (nil? e)
                   (s r) (f e)))))
           #(.cancel cf true))
         (catch Throwable e
           (f e) #(do)))))

(defmacro from-cf [& body]
  `(from-cf-call (fn [] ~@body)))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment