Last active
March 1, 2017 14:58
-
-
Save emilsoman/bb83608f5d0ad456657cf024f859f2fe to your computer and use it in GitHub Desktop.
Inactivity based trigger for Flow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MyFlow.Producer do | |
use GenStage | |
def start_link do | |
GenStage.start_link(__MODULE__, nil, name: __MODULE__) | |
end | |
def push(user_id) do | |
GenStage.call(__MODULE__, {:push, user_id}) | |
end | |
def init(nil) do | |
{:producer, {{[], 0}, nil}} | |
end | |
def handle_call({:push, user_id}, _from, {{existing_user_ids, demand}, timer}) do | |
if timer do | |
Process.cancel_timer(timer) | |
end | |
user_ids = [user_id | existing_user_ids] | |
{dispatch, state} = dispatch({user_ids, demand}) | |
timer = Process.send_after(self(), :trigger, 3000) | |
{:reply, :ok, dispatch, {state, timer}} | |
end | |
def handle_info(:trigger, state) do | |
GenStage.async_notify(self(), {:producer, :done}) | |
{:noreply, [], state} | |
end | |
def handle_demand(demand, {{user_ids, existing_demand}, timer}) when demand > 0 do | |
{dispatch, state} = dispatch({user_ids, demand + existing_demand}) | |
{:noreply, dispatch, {state, timer}} | |
end | |
defp dispatch({[], demand} = state) do | |
{[], state} | |
end | |
defp dispatch({user_ids, demand}) do | |
{dispatch, rest} = Enum.split(user_ids, demand) | |
new_state = {rest, demand - length(dispatch)} | |
{dispatch, new_state} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment