Mix.install([
{:kino, "~> 0.6.2"},
{:telemetry, "~> 1.2.1"}
])
If you want to know more about the book:
- You can download two free chapters + accompanying Livebook notebooks.
- Or, you can buy the complete book together with's accompanying Livebook notebooks.
We'll learn how to run initialization processes within our supervision trees. This technique is useful in a wide array of scenarios when you need to perform some unit of work as part of the supervision tree startup.
In this section, we'll be running our initialization jobs synchronously. You will need to lean on this technique when there is a dependency between your processes, and running your init job asynchronously would introduce race conditions.
This may seem like an esoteric technique, but we have used this approach in production when the need was there for it. If you have data that rarely changes but is read often, that is a good candidate to have persisted to DETS and then loaded into ETS when your application begins. That way, you have access to the data in its fastest form versus having to make network requests to your database. In addition, you can also lean on something like Git LFS to store large DETS files along with your code so that they can be deployed together when data changes occur.
Let's start by learning how we can create DETS files with our data, and then we'll see how we can load that data into ETS when the application starts using an init GenServer.
Let's walk through how to create
a DETS file so you will know how to create them going forward. Like ETS, DETS is a key
value store and the Erlang :dets
module actually implements many of the same functions that
the :ets
module has. This should make it pretty familiar and should look similar to what you
saw in Chapter 2.
# We define the path to our DETS file as a charlist. Often when we work with
# Erlang libraries, we'll need to send charlists instead of strings.
dets_file_path =
__DIR__
|> Path.join("./exotic_car_filter.dets")
|> String.to_charlist()
# If the DETS file we specify does not exist, we create it. If it does
# exist, it is opened, and we can update its contents. If you want to clear the
# contents of a DETS file to ensure you are starting from a clean state, you can
# use the `:dets.delete_all_objects/1` function.
{:ok, dets_table} =
:dets.open_file(
:vehicles_to_track,
file: dets_file_path,
type: :set,
auto_save: 10_000
)
# DETS and ETS are capable of housing gigabytes of data, so our little map of data
# should be no problem at all.
vehicles_to_track = %{
2023 => %{
"Bugatti" => ["Chiron", "Veyron"],
"Ferrari" => ["488", "F90"],
"Lamborghini" => ["Aventador", "Huracan"],
"Pagani" => ["Hyuara", "Zonda"],
"Porsche" => ["918 Spyder", "911 GT3 RS"]
},
2022 => %{
"Bugatti" => ["Chiron", "Veyron"],
"Ferrari" => ["488", "F90"],
"Lamborghini" => ["Aventador", "Huracan"],
"Pagani" => ["Hyuara", "Zonda"],
"Porsche" => ["918 Spyder", "911 GT3 RS"]
}
}
# We insert all of our key-value data into the DETS file.
Enum.each(vehicles_to_track, fn {year, makes_models} ->
Enum.each(makes_models, fn {make, models} ->
:ok = :dets.insert(dets_table, {{year, make}, MapSet.new(models)})
end)
end)
# Here, we flush any pending writes to disk.
:ok = :dets.sync(dets_table)
# We properly close our handler to the DETS file in case of an
# unexpected shutdown. This avoids us having to repair the file the next time
# it is opened in case it was left in a bad state.
:ok = :dets.close(dets_table)
After running the above code, you should have a DETS file written out to the dets_file
directory wherever you unzipped Elixir Patterns. With that in place, we can create the
GenServer module that will transfer all the contents of the DETS file into ETS. One thing to
note since we are using ETS is that an ETS table will be garbage collected if no process owns
the ETS table. As a result, our init GenServer this time will not return :ignore
in the
init/1
callback and will continue to run so that our ETS table has a process that owns it.
defmodule ExoticCarLookup do
# If this process terminates, we want it to restart, given that
# the ETS table will also be deleted since it is tied to the
# process.
use GenServer
@ets_table_name __MODULE__.ETS
# +--------------------+
# | Public API |
# +--------------------+
def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
# Other modules can use this function to determine
# if the provided year+make+model is in the list of exotic
# cars.
def exotic_car?(year, make, model) do
case :ets.lookup(@ets_table_name, {year, make}) do
[{_year_make, models}] ->
MapSet.member?(models, model)
_ ->
false
end
end
# +---------------------+
# | Callbacks |
# +---------------------+
@impl true
def init(_) do
# Create the ETS table before hydrating it
:ets.new(
@ets_table_name,
[:set, :protected, :named_table, {:read_concurrency, true}]
)
# We call the `hydrate_ets/0` function in the `init/1` callback
# so that the supervision tree is blocked until the hydration process
# is complete. That way, we ensure that there are no race conditions
# when our application starts receiving requests.
hydrate_ets()
# Instead of returning `:ignore` and terminating the process, we
# allow the process to keep running since the ETS table will be tied
# to the lifecycle of the process.
{:ok, nil}
end
# +---------------------------+
# | Private Helpers |
# +---------------------------+
# In this function, we hydrate our ETS table using a DETS file. We can
# hydrate the ETS table from anywhere (CSV, database, HTTP call, etc.),
# but it is useful to know how you can lean on DETS to restore ETS
# tables.
defp hydrate_ets do
# This specifies where the DETS file is
exotic_car_dets_file =
__DIR__
|> Path.join("./exotic_car_filter.dets")
|> String.to_charlist()
# Open the existing DETS file
{:ok, dets_instance} =
:dets.open_file(
:exotic_car_dets_backup,
file: exotic_car_dets_file,
type: :set,
access: :read
)
# Copy all of the contents of the DETS file to the ETS table that
# we created in the `init/1` callback so that all of our operations
# take place in memory as opposed to on disk.
:dets.to_ets(dets_instance, @ets_table_name)
# Close the DETS file, as we no longer need it
:ok = :dets.close(dets_instance)
end
end
The layout of this module looks similar to what we had in the previous Livebook. We have the
exotic_car?/3
public function that other modules can use to determine whether the car in question
is an exotic car. Granted, the implementation has switched from querying :persistent_term
to
querying :ets
, but downstream consumers of this module need not be concerned with the
implementation details. Then we have our init/1
callback that creates an empty ETS table and
then calls the hydrate_ets/0
private function where the DETS file that we created previously
is copied to our newly created ETS table. With that in place, our init/1
callback returns an
ok tuple with a nil state since we do not need to track any state in this GenServer, but we need
it to stick around since it owns the ETS table where our data is held.
With that in place, we can wholesale copy the code that we had in the previous Livebook and fire up the supervision tree.
# Copied from the previous Livebook without change
defmodule ExoticCarSaleTracker do
use GenServer
# +--------------------+
# | Public API |
# +--------------------+
# Start this process as a singleton process since we
# only need one instance of it running and processing data.
def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def get_avg_price_stats do
GenServer.call(__MODULE__, :get_avg_price_stats)
end
# We lean on `GenServer.cast/2` here since we don't want to block
# the calling process in order to run our analytics processing.
def track_sale(year, make, model, price) do
GenServer.cast(__MODULE__, {:track_sale, {year, make, model, price}})
end
# +---------------------+
# | Callbacks |
# +---------------------+
@impl true
def init(_) do
{:ok, %{}}
end
@impl true
def handle_call(:get_avg_price_stats, _from, state) do
reply =
Enum.map(state, fn {year_make_model, {count, total_price}} ->
{year_make_model, total_price / count}
end)
|> Map.new()
{:reply, reply, state}
end
@impl true
def handle_cast({:track_sale, {year, make, model, price}}, state) do
vehicle = {year, make, model}
# Only add the vehicle sale to the GenServer state if it is an
# exotic vehicle that we are interested in tracking.
new_state =
if ExoticCarLookup.exotic_car?(year, make, model) do
Map.update(state, vehicle, {1, price}, fn {count, total_price} ->
{count + 1, total_price + price}
end)
else
state
end
{:noreply, new_state}
end
end
# Copied from the previous Livebook without change
defmodule ExoticCarSaleTrackerSupervisor do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_) do
# To ensure that `ExoticCarSaleTracker` has access to all of
# the necessary data in ETS, we need to start `ExoticCarLookup`
# first.
children = [
ExoticCarLookup,
ExoticCarSaleTracker
]
Supervisor.init(children, strategy: :one_for_one)
end
end
With all those pieces in place, we are ready to fire up the supervision tree and see how it performs. We'll actually use the same test code from the previous Livebook so you can see that the output lines up exactly with what we previously had.
mock_sales_list = [
{2000, "Honda", "Accord", 22_000},
{2019, "Acura", "Integra", 32_000},
{2022, "Pagani", "Zonda", 4_181_000},
{2023, "Lamborghini", "Huracan", 324_000},
{2022, "Ferrari", "488", 450_000},
{2018, "Mazda", "MX-5", 29_000},
{2015, "Chevy", "Camaro", 65_000},
{2023, "Lamborghini", "Huracan", 349_500},
{2015, "Chevy", "Camaro", 62_000},
{2022, "Ferrari", "488", 492_600}
]
# Stop the supervisor if it is already running
if is_pid(Process.whereis(ExoticCarSaleTrackerSupervisor)) do
Supervisor.stop(ExoticCarSaleTrackerSupervisor)
end
# Start the supervisor
ExoticCarSaleTrackerSupervisor.start_link([])
# Mock out a list of sales that could be coming off of a
# stream of data
mock_sales_list
|> Enum.each(fn {year, make, model, price} ->
ExoticCarSaleTracker.track_sale(year, make, model, price)
end)
ExoticCarSaleTracker.get_avg_price_stats()
As you can see, by using the same list of mock sales, we can generate the same output as
before. The nice thing is that the ExoticCarSaleTracker
GenServer didn't have to change in
the slightest to accommodate the changed ExoticCarLookup
implementation. So long as the
public interface remains the same, the supervisor is able to start all the necessary components
to work together harmoniously. This is one of the niceties of breaking down
your problem space into self-contained components. As needed, you can adjust implementations,
and downstream-dependent processes can continue to operate without needing to know the
internal implementation details of other processes.
You may be wondering: "This setup does not look very complicated...how fast could it possibly be?" Let's put a number to that question and stress test this simple supervision tree. We'll create a list of 1,000,000 mock events and see how long our supervision tree takes to work through the data.
mock_sales_list = [
{2000, "Honda", "Accord", 22_000},
{2019, "Acura", "Integra", 32_000},
{2022, "Pagani", "Zonda", 4_181_000},
{2023, "Lamborghini", "Huracan", 324_000},
{2022, "Ferrari", "488", 450_000},
{2018, "Mazda", "MX-5", 29_000},
{2015, "Chevy", "Camaro", 65_000},
{2023, "Lamborghini", "Huracan", 349_500},
{2015, "Chevy", "Camaro", 62_000},
{2022, "Ferrari", "488", 492_600}
]
num_sale_events = 1_000_000
# Stop the supervisor if it is already running
if is_pid(Process.whereis(ExoticCarSaleTrackerSupervisor)) do
Supervisor.stop(ExoticCarSaleTrackerSupervisor)
end
# Start the supervisor
ExoticCarSaleTrackerSupervisor.start_link([])
# Mock out a list of sales that could be coming off of a
# stream of data
{time_in_native, output} =
:timer.tc(fn ->
# These events will pile up on the process message queue since they are
# all casts
mock_sales_list
|> Stream.cycle()
|> Enum.take(num_sale_events)
|> Enum.each(fn {year, make, model, price} ->
ExoticCarSaleTracker.track_sale(year, make, model, price)
end)
# This will be the last message that the process receives, and it is synchronous
# since it is a call
output_result = ExoticCarSaleTracker.get_avg_price_stats()
# This will always return 0 since the final call makes sure that all of the
# async messages are processed
{:message_queue_len, message_queue_length} =
ExoticCarSaleTracker
|> Process.whereis()
|> Process.info(:message_queue_len)
IO.inspect("Message queue length: #{message_queue_length}")
output_result
end)
time_in_ms = System.convert_time_unit(time_in_native, :microsecond, :millisecond)
IO.inspect("Worked through #{num_sale_events} mock sale events in #{time_in_ms}ms")
output
On my Mac Studio, I was able to work through 1,000,000 mock sales events in just over one second.
That is just with one process working through sales events and an ETS table to check every
mock sale event that flows through the process. To ensure that our timer is measuring
correctly, we call ExoticCarSaleTracker.get_avg_price_stats/0
as part of the timing function.
This will be the last message that the ExoticCarSaleTracker
process receives, and it is also
a call that blocks the calling process until a response is sent (or until the call times out).
To make sure that we are not measuring the wrong things, we also have the log
statement that checks to see how many messages are in the process mailbox after running through
everything. This will always come back as zero since the synchronous call occurs right
before it and ensures we work through all the mock data.
As you can see, by just using the primitives available to us via Elixir, Erlang, and the BEAM, we can create very performant application components with very little code. No messy micoservices, or external key-value stores, or caching layers.
Another option is to call
hydrate_ets()
inhandle_continue
:In this case supervision tree will not be blocked and there will be no race conditions as
handle_continue
would be called right afterinit
.Overall that's depends on how you want your GenServers to behave.