Last active
September 14, 2021 12:43
-
-
Save slashmili/67db483616ad93be3033f95ce7babe92 to your computer and use it in GitHub Desktop.
Telemetry Metrics Reporter for Fluxter & InfluxDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Metrics.InfluxReporter do | |
@moduledoc """ | |
A reporter that writes the events in the influx_writer | |
This Reporter ignores the metric type and simply writes the report to influxdb | |
This GenServer could be used in a Supervisor like: | |
children = [ | |
{Metrics.InfluxReporter, metrics: metrics(), influx_writer: &MyApp.Fluxter.write/3} | |
] | |
by that it attaches itself to the events described in the metric and report the events | |
to influxdb. Read more about metrics at https://hexdocs.pm/telemetry_metrics/Telemetry.Metrics.html | |
This module is based on Telemetry.Metrics.ConsoleReporter which | |
is released under Apache License 2.0 | |
https://github.com/beam-telemetry/telemetry_metrics/blob/main/lib/telemetry_metrics/console_reporter.ex | |
""" | |
use GenServer | |
require Logger | |
def start_link(opts) do | |
server_opts = Keyword.take(opts, [:name]) | |
influx_writer = | |
opts[:influx_writer] || | |
raise ArgumentError, "the :influx_writer option is required by #{inspect(__MODULE__)}" | |
is_function(influx_writer, 3) || | |
raise ArgumentError, | |
"#{inspect(__MODULE__)} requires :influx_writer to be a function with 3 arity" | |
metrics = | |
opts[:metrics] || | |
raise ArgumentError, "the :metrics option is required by #{inspect(__MODULE__)}" | |
GenServer.start_link(__MODULE__, {metrics, influx_writer}, server_opts) | |
end | |
@impl true | |
def init({metrics, influx_writer}) do | |
Process.flag(:trap_exit, true) | |
groups = Enum.group_by(metrics, & &1.event_name) | |
for {event, metrics} <- groups do | |
id = {__MODULE__, event, self()} | |
:telemetry.attach(id, event, &handle_event/4, {metrics, influx_writer}) | |
end | |
{:ok, Map.keys(groups)} | |
end | |
@impl true | |
def terminate(_, events) do | |
for event <- events do | |
:telemetry.detach({__MODULE__, event, self()}) | |
end | |
:ok | |
end | |
# This function must follow logics as described: | |
# https://hexdocs.pm/telemetry_metrics/writing_reporters.html#reacting-to-events | |
defp handle_event(event_name, measurements, metadata, {metrics, influx_writer}) do | |
event_name_in_string = Enum.join(event_name, ".") | |
for %{} = metric <- metrics do | |
try do | |
measurement = extract_measurement(metric, measurements, metadata) | |
tags = extract_tags(metric, metadata) | |
cond do | |
is_nil(measurement) -> | |
:noop | |
Logger.debug( | |
"No measurement is detected for #{inspect(metric)}/#{event_name_in_string}" | |
) | |
not keep?(metric, metadata) -> | |
Logger.debug("Measurement for #{event_name_in_string} should not be recorded") | |
true -> | |
influx_writer.( | |
Enum.join(event_name, "."), | |
Keyword.new(tags), | |
measurement | |
) | |
end | |
rescue | |
e -> | |
Logger.error([ | |
"Could not format metric #{inspect(metric)}\n", | |
Exception.format(:error, e, __STACKTRACE__) | |
]) | |
end | |
end | |
end | |
defp keep?(%{keep: nil}, _metadata), do: true | |
defp keep?(metric, metadata), do: metric.keep.(metadata) | |
defp extract_measurement(metric, measurements, metadata) do | |
case metric.measurement do | |
fun when is_function(fun, 2) -> fun.(measurements, metadata) | |
fun when is_function(fun, 1) -> fun.(measurements) | |
key -> measurements[key] | |
end | |
end | |
defp extract_tags(metric, metadata) do | |
tag_values = metric.tag_values.(metadata) | |
Map.take(tag_values, metric.tags) | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Metrics.InfluxReporter do | |
use ExUnit.Case, async: true | |
import ExUnit.CaptureLog | |
alias Metrics.InfluxDbReporter, as: SUT | |
alias Telemetry.Metrics | |
describe "start_link/1" do | |
test "with valid arguments" do | |
SUT.start_link(metrics: [], influx_writer: fn _, _, _ -> :ok end) | |
end | |
test "has a mandetory influx_writer option" do | |
assert_raise ArgumentError, | |
"the :influx_writer option is required by Metrics.InfluxDbReporter", | |
fn -> | |
SUT.start_link([]) | |
end | |
end | |
test "with invalid influx_writer option" do | |
assert_raise ArgumentError, | |
"Metrics.InfluxDbReporter requires :influx_writer to be a function with 3 arity", | |
fn -> | |
SUT.start_link(influx_writer: :boo) | |
end | |
end | |
test "has a mandetory metrics option" do | |
assert_raise ArgumentError, | |
"the :metrics option is required by Metrics.InfluxDbReporter", | |
fn -> | |
SUT.start_link(influx_writer: fn _, _, _ -> :ok end) | |
end | |
end | |
end | |
describe "init/2" do | |
test "attach to telemetry event once for duplicated metrics with same event name" do | |
assert :telemetry.list_handlers([:my_event, :request, :stop]) == [] | |
metrics = [ | |
Metrics.summary("my_event.request.stop.duration", | |
unit: {:native, :millisecond} | |
), | |
Metrics.counter("my_event.request.stop.duration", | |
unit: {:native, :millisecond} | |
) | |
] | |
assert {:ok, _} = SUT.init({metrics, fn _, _, _ -> :ok end}) | |
assert [_] = :telemetry.list_handlers([:my_event, :request, :stop]) | |
end | |
test "returns non-duplicate list of events" do | |
metrics = [ | |
Metrics.summary("phoenix.endpoint.stop.duration", | |
unit: {:native, :millisecond} | |
), | |
Metrics.counter("phoenix.endpoint.stop.duration", | |
unit: {:native, :millisecond} | |
), | |
Metrics.counter("phoenix.endpoint.start.time", | |
unit: {:native, :millisecond} | |
) | |
] | |
assert SUT.init({metrics, fn _, _, _ -> :ok end}) == | |
{:ok, [[:phoenix, :endpoint, :start], [:phoenix, :endpoint, :stop]]} | |
end | |
end | |
describe "telemetry callback" do | |
setup do | |
test_pid = self() | |
self_writer = fn event, value, tags -> send(test_pid, {event, value, tags}) end | |
metrics = [ | |
Metrics.counter("my_event.request.stop.duration", tags: [:my_tag]) | |
] | |
%{influx_writer: self_writer, metrics: metrics} | |
end | |
test "reports an event to influxdb", %{influx_writer: influx_writer, metrics: metrics} do | |
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer) | |
:telemetry.execute([:my_event, :request, :stop], %{duration: 1}, %{my_tag: "hello"}) | |
assert_receive {"my_event.request.stop", [my_tag: "hello"], 1} | |
end | |
test "does not report when the measurement is nil", %{ | |
influx_writer: influx_writer, | |
metrics: metrics | |
} do | |
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer) | |
:telemetry.execute([:my_event, :request, :stop], %{}, %{my_tag: "hello"}) | |
refute_received {"my_event.request.stop", [my_tag: "hello"], nil} | |
end | |
test "reports only when keep function allows it", %{influx_writer: influx_writer} do | |
metrics = [ | |
Metrics.counter("my_event.request.stop.duration", | |
keep: fn metadata -> match?(%{keep_it: true}, metadata) end | |
) | |
] | |
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer) | |
:telemetry.execute([:my_event, :request, :stop], %{duration: 1}, %{keep_it: false}) | |
refute_received {"my_event.request.stop", [], 1} | |
:telemetry.execute([:my_event, :request, :stop], %{duration: 1}, %{keep_it: true}) | |
assert_receive {"my_event.request.stop", [], 1} | |
end | |
test "reports measurement based on measurement/1 function", %{influx_writer: influx_writer} do | |
measurement_convertor = fn measurements -> measurements.duration / 60 end | |
metrics = [ | |
Metrics.counter("my_event.request.stop.duration", | |
measurement: measurement_convertor | |
) | |
] | |
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer) | |
:telemetry.execute([:my_event, :request, :stop], %{duration: 120}) | |
assert_receive {"my_event.request.stop", [], 2.0} | |
end | |
test "reports measurement based on measurement/2 function", %{influx_writer: influx_writer} do | |
measurement_multiplier = fn measurements, metadata -> | |
measurements.duration * metadata.scale | |
end | |
metrics = [ | |
Metrics.counter("my_event.request.stop.duration", | |
measurement: measurement_multiplier | |
) | |
] | |
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer) | |
:telemetry.execute([:my_event, :request, :stop], %{duration: 120}, %{scale: 1000}) | |
assert_receive {"my_event.request.stop", [], 120_000} | |
end | |
test "logs the error when is not able to write to influx_writer", %{metrics: metrics} do | |
capture_runtime_error = fn -> | |
faulty_writer = fn _, _, _ -> raise RuntimeError end | |
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: faulty_writer) | |
:telemetry.execute([:my_event, :request, :stop], %{duration: 120}, %{scale: 1000}) | |
end | |
assert capture_log(capture_runtime_error) =~ "RuntimeError" | |
end | |
test "tags value", %{influx_writer: influx_writer} do | |
metrics = [ | |
Metrics.summary("my_client.request.stop", [ | |
{:event_name, [:my_client, :request, :stop]}, | |
{:measurement, :duration}, | |
{:tags, [:http_status_code]}, | |
{:tag_values, | |
fn meta -> | |
case meta.response do | |
{:ok, %{status_code: status_code}} -> %{http_status_code: status_code} | |
{:error, _} -> %{http_status_code: 0} | |
end | |
end} | |
]) | |
] | |
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer) | |
:telemetry.execute([:my_client, :request, :stop], %{duration: 120}, %{ | |
response: {:ok, %{status_code: 200}} | |
}) | |
assert_receive {"my_client.request.stop", [http_status_code: 200], 120} | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment