Last active
July 6, 2024 21:43
-
-
Save whatyouhide/e7531e10128af58b9830af8938eae478 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(connection). | |
-behaviour(gen_statem). | |
-export([start_link/1, request/2]). | |
-export([callback_mode/0, init/1]). | |
-export([disconnected/3, connected/3]). | |
%% Public API. | |
start_link(Opts) -> | |
{host, Host} = proplists:lookup(host, Opts), | |
{port, Port} = proplists:lookup(port, Opts), | |
gen_statem:start_link(?MODULE, {Host, Port}, []). | |
request(Pid, Request) -> | |
gen_statem:call(Pid, {request, Request}). | |
%% gen_statem callbacks | |
callback_mode() -> [state_functions, state_enter]. | |
init({Host, Port}) -> | |
Data = #{host => Host, port => Port, requests => #{}}, | |
Actions = [{next_event, internal, connect}], | |
{ok, disconnected, Data, Actions}. | |
%% Disconnected state | |
disconnected(enter, disconnected, _Data) -> keep_state_and_data; | |
disconnected(enter, connected, #{requests := Requests} = Data) -> | |
io:format("Connection closed~n"), | |
lists:foreach(fun({_, From}) -> gen_statem:reply(From, {error, disconnected}) end, | |
Requests), | |
Data1 = maps:put(socket, undefined, Data), | |
Data2 = maps:put(requests, #{}, Data1), | |
Actions = [{{timeout, reconnect}, 500, undefined}], | |
{keep_state, Data2, Actions}; | |
disconnected(internal, connect, #{host := Host, port := Port} = Data) -> | |
case gen_tcp:connect(Host, Port, [binary, {active, true}]) of | |
{ok, Socket} -> | |
Data1 = maps:put(socket, Socket, Data), | |
{next_state, connected, Data1}; | |
{error, Error} -> | |
io:puts("Connection failed: ~ts~n", [inet:format_error(Error)]), | |
keep_state_and_data | |
end; | |
disconnected({timeout, reconnect}, _, Data) -> | |
Actions = [{next_event, internal, connect}], | |
{keep_state, Data, Actions}; | |
disconnected({call, From}, {request, _}, _Data) -> | |
Actions = [{reply, From, {error, disconnected}}], | |
{keep_state_and_data, Actions}. | |
%% Connected state | |
connected(enter, _OldState, _Data) -> keep_state_and_data; | |
connected(info, {tcp_closed, Socket}, #{socket := Socket} = Data) -> | |
{next_state, disconnected, Data}; | |
connected({call, From}, {request, Request}, #{socket := Socket} = Data) -> | |
#{id := RequestId} = Request, | |
case gen_tcp:send(Socket, encode_request(Request)) of | |
ok -> | |
#{requests := Requests} = Data, | |
Requests1 = maps:put(RequestId, From, Requests), | |
Data1 = maps:put(requests, Data, Requests1), | |
{keep_state, Data1}; | |
{error, _} -> | |
ok = gen_tcp:close(Socket), | |
{next_state, disconnected, Data} | |
end; | |
connected(info, {tcp, Socket, Packet}, #{socket := Socket} = Data) -> | |
#{requests := Requests} = Data, | |
#{id := Id} = Response = decode_response(Packet), | |
From = maps:get(Id, Requests), | |
Requests1 = maps:remove(Id, Requests), | |
Data1 = maps:put(requests, Requests1, Data), | |
gen_statem:reply(From, {ok, Response}), | |
{keep_state, Data1}. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Connection do | |
@behaviour :gen_statem | |
require Logger | |
defstruct [:host, :port, :socket, requests: %{}] | |
## Public API | |
def start_link(opts) do | |
host = Keyword.fetch!(opts, :host) | |
port = Keyword.fetch!(opts, :port) | |
:gen_statem.start_link(__MODULE__, {String.to_charlist(host), port}, []) | |
end | |
def request(pid, request) do | |
:gen_statem.call(pid, {:request, request}) | |
end | |
## :gen_statem callbacks | |
@impl true | |
def callback_mode(), do: [:state_functions, :state_enter] | |
@impl true | |
def init({host, port}) do | |
data = %__MODULE__{host: host, port: port} | |
actions = [{:next_event, :internal, :connect}] | |
{:ok, :disconnected, data, actions} | |
end | |
## Disconnected state | |
def disconnected(:enter, :disconnected, _data), do: :keep_state_and_data | |
def disconnected(:enter, :connected, data) do | |
Logger.error("Connection closed") | |
Enum.each(data.requests, fn {_id, from} -> | |
:gen_statem.reply(from, {:error, :disconnected}) | |
end) | |
data = %{data | socket: nil, requests: %{}} | |
actions = [{{:timeout, :reconnect}, 500, nil}] | |
{:keep_state, data, actions} | |
end | |
def disconnected(:internal, :connect, data) do | |
# We use the socket in active mode for simplicity, but | |
# it's often better to use "active: :once" for better control. | |
socket_opts = [:binary, active: true] | |
case :gen_tcp.connect(data.host, data.port, socket_opts) do | |
{:ok, socket} -> | |
{:next_state, :connected, %{data | socket: socket}} | |
{:error, error} -> | |
Logger.error("Connection failed: #{:inet.format_error(error)}") | |
:keep_state_and_data | |
end | |
end | |
def disconnected({:timeout, :reconnect}, _, data) do | |
actions = [{:next_event, :internal, :connect}] | |
{:keep_state, data, actions} | |
end | |
def disconnected({:call, from}, {:request, request}, data) do | |
actions = [{:reply, from, {:error, :disconnected}}] | |
{:keep_state_and_data, actions} | |
end | |
## Connected state | |
def connected(:enter, _old_state, _data), do: :keep_state_and_data | |
def connected(:info, {:tcp_closed, socket}, %{socket: socket} = data) do | |
{:next_state, :disconnected, data} | |
end | |
def connected({:call, from}, {:request, request}, data) do | |
case :gen_tcp.send(data.socket, encode_request(request)) do | |
:ok -> | |
data = %{data | requests: Map.put(data.requests, request.id, from)} | |
{:keep_state, data} | |
{:error, _reason} -> | |
:ok = :gen_tcp.close(socket) | |
{:next_state, :disconnected, data} | |
end | |
end | |
def connected(:info, {:tcp, socket, packet}, %{socket: socket} = data) do | |
response = decode_response(packet) | |
{from, requests} = Map.pop(data.requests, response.id) | |
:gen_statem.reply(from, {:ok, response}) | |
{:keep_state, %{data | requests: requests}} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment