Created
February 18, 2019 20:17
-
-
Save leonoel/c72a3f57885c9108203c49dad3142292 to your computer and use it in GitHub Desktop.
happy eyeballs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:paths ["."] | |
:deps {missionary {:mvn/version "a.3"}}} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns happyeyeballs | |
(:require [missionary.core :as m]) | |
(:import (java.net InetAddress InetSocketAddress) | |
(java.nio.channels AsynchronousSocketChannel CompletionHandler) | |
(java.util.concurrent.atomic AtomicBoolean))) | |
(defn happyeyeballs | |
"Tries to establish a connection to a host associated with multiple endpoints, following the Happy Eyeballs protocol. | |
Given a sequence of tasks performing the connection to each endpoint, returns a task sequentially performing connection | |
attempts, staggered by given delay (in milliseconds). The first connector is tried, then if it's still pending after | |
staggering delay or when it fails the second connector is tried, and so on until the connector sequence is exhausted. | |
The first successful connection attempt triggers cancellation of other pending connection attempts and makes the whole | |
task terminate with returned socket. If another connection succeeds in the interim, returned sockets are immediately | |
closed with given close! function. If every attempt fails, the whole task fails. | |
Cancelling the task triggers cancellation of all pending connection attempts." | |
[delay close! connectors] | |
(m/sp | |
(try | |
(let [chosen (m/dfv)] | |
(m/? ((fn attempt [cs] | |
(if cs | |
(let [try-next (m/dfv)] | |
(m/race | |
(m/sp (try (let [x (m/? (first cs)) | |
y (chosen x)] | |
(when-not (identical? x y) (close! x)) y) | |
(catch Throwable e | |
(m/?) | |
(try-next nil) | |
(throw e)))) | |
(m/sp (m/? (m/sleep delay)) | |
(try-next nil) | |
(m/? m/never)) | |
(m/sp (m/? try-next) | |
(m/? (attempt (next cs)))))) | |
(m/race))) (seq connectors)))) | |
(catch Throwable _ | |
(throw (ex-info "Unable to reach target." {:connectors connectors})))))) | |
(defn connector | |
"Given an InetAddress and a port (int), returns a task connecting to this endpoint and returning a connected | |
AsynchronousSocketChannel if successful." | |
[^InetAddress addr port] | |
(fn [s! f!] | |
(let [channel (AsynchronousSocketChannel/open) | |
pending (AtomicBoolean. true)] | |
(.connect channel (InetSocketAddress. addr (int port)) nil | |
(reify CompletionHandler | |
(completed [_ _ _] (.set pending false) (s! channel)) | |
(failed [_ _ e] (.set pending false) (f! e)))) | |
#(when (.getAndSet pending false) (.close channel))))) | |
(defn close! | |
"Closes given AsynchronousSocketChannel." | |
[^AsynchronousSocketChannel channel] | |
(.close channel)) | |
(defn happy-connect!! | |
"Connects to given host/port, synchronously returning an AsynchronousSocketChannel." | |
[^String host port] | |
(m/? (happyeyeballs 300 close! (map #(connector % port) (InetAddress/getAllByName host))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment