Published on

Normalize chart data using Ecto and PostgreSQL

If you don’t yet have charts in your Phoenix app, you may want to set up ECharts with Phoenix LiveView.

Producing good datasets for charts is essential. A graph is only as good as the data it uses. Here is a collection of helpful snippets I have used to chart data.

Finding the right step size

Numerical

For numerical steps, you can do a simple calculation with (end - start) / steps to find the step size, but unless you have fixed start and end values, you will end up with odd numbers (e.g. 167.3998).

Instead, we want to snap the steps to 1, 2.5, 5 (scaled to the proper size) so it reads well for the end-user. We’re using Decimal to calculate the step size below:

defmodule MyApp.ChartUtils do
  @moduledoc """
  Utility functions for charts.
  """

  @doc """
  Finds an appropriate numerical step size.
  """
  def step_size(min, max, steps \\ 100) do
    step_size =
      max
      |> Decimal.sub(min)
      |> Decimal.div(steps)
      # We don't want a step size lower than `1` adjusted to the lowest value exponent
      |> Decimal.max(Decimal.new(1, 1, min.exp))

    # Calculating the exponent we need for our step size
    pow10 = Integer.pow(10, length(Integer.digits(step_size.coef)) - 1)
    exp = - 1 - Decimal.scale(step_size)

    # Find the right fit in our list of step sizes to snap to
    [
      Decimal.new(1, 10 * pow10, exp),
      Decimal.new(1, 25 * pow10, exp),
      Decimal.new(1, 50 * pow10, exp),
      Decimal.new(1, 100 * pow10, exp)
    ]
    |> Enum.find(&Decimal.lte?(step_size, &1))
    |> Decimal.normalize()
  end
end
Tests for step_size/3.
defmodule MyApp.ChartUtilsTest do
  use MyApp.DataCase

  alias MyApp.ChartUtils

  test "step_size/3" do
    assert ChartUtils.step_size(Decimal.new(1), Decimal.new(100)) == Decimal.new("1")
    assert ChartUtils.step_size(Decimal.new(-100), Decimal.new(1)) == Decimal.new("2.5")
    assert ChartUtils.step_size(Decimal.new(100), Decimal.new(1_000)) == Decimal.new("1E+1")
    assert ChartUtils.step_size(Decimal.new(100), Decimal.new(1_000), 3) == Decimal.new("5E+2")
    assert ChartUtils.step_size(Decimal.from_float(331.32), Decimal.from_float(7_893.47)) == Decimal.new("1E+2")
  end
end

Datetime

Usually, we don’t have a fixed time frame for our graphs. Some data may have much lower or higher time ranges than others.

One difference from numerical step sizes is that minutes, hours, and days are not 1:1 like the numerical values above, and there isn’t a good way to calculate it. We don’t think of time in base-10, but rather in terms of 5 minutes, half hours, daily, biweekly, quarterly, and so on.

Therefore, we don’t pass in the desired number of steps but simply hardcode the breakpoints for each time interval:

defmodule MyApp.ChartUtils do
  # ...

  @doc """
  Finds an appropriate time interval.
  """
  def time_interval(begins_at, ends_at \\ nil) do
    ends_at = ends_at || DateTime.utc_now()

    ends_at
    |> DateTime.diff(begins_at, :millisecond)
    |> do_time_interval()
  end

  defp do_time_interval(diff) when diff < unquote(:timer.minutes(60)), do: {:minute, 1}
  defp do_time_interval(diff) when diff < unquote(:timer.hours(4)), do: {:minute, 5}
  defp do_time_interval(diff) when diff < unquote(:timer.hours(12)), do: {:minute, 30}
  defp do_time_interval(diff) when diff < unquote(:timer.hours(24) * 2), do: {:hour, 1}
  defp do_time_interval(diff) when diff < unquote(:timer.hours(24) * 3), do: {:hour, 6}
  defp do_time_interval(_diff), do: {:day, 1}
end

You will need to adjust it to your specific use case, as these time intervals are context-dependent. The above example is used for running events where, after a few days, you want to see daily intervals even though it only produces three data points.

With the above, we can now use the interval for grouping in our Ecto query:

defmodule MyApp.Records do
  alias MyApp.{Records.Record, Repo}

  import Ecto.Query

  def list_record_counts_over_time(time_interval) do
    Record
    |> group_by([r], fragment("timestamp"))
    |> select([r], %{
      timestamp: fragment("date_bin(?, ?, 'epoch') as timestamp", ^to_interval(time_interval), r.inserted_at),
      count: count(r.id)
    })
    |> order_by([r], asc: fragment("timestamp"))
    |> Repo.all()
  end

  defp to_interval({:minute, n}), do: %Postgrex.Interval{secs: n * 60}
  defp to_interval({:hour, n}), do: %Postgrex.Interval{secs: n * 60 * 60}
  defp to_interval({:day, n}), do: %Postgrex.Interval{days: n}
end
Tests for time_interval/2 and list_record_counts_over_time/1.
defmodule MyApp.ChartUtilsTest do
  # ...

  test "time_interval/2" do
    now = DateTime.utc_now()

    assert ChartUtils.time_interval(now) == {:minute, 1}
    assert ChartUtils.time_interval(now, DateTime.add(now, -1, :hour)) == {:minute, 1}
    assert ChartUtils.time_interval(DateTime.add(now, -1, :hour)) == {:minute, 5}
    assert ChartUtils.time_interval(DateTime.add(now, -365, :day)) == {:day, 1}
  end
end
defmodule MyApp.RecordsTest do
  use MyApp.DataCase

  alias MyApp.{Records, Records.Record, Repo}

  test "list_record_counts_over_time/1" do
    beginning_of_day = %{DateTime.utc_now() | hour: 0, minute: 0, second: 0, microsecond: {0, 6}}

    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -2, :day)})
    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -1, :day)})
    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -1, :hour)})
    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -1, :minute)})
    record_fixture(%{inserted_at: beginning_of_day})

    assert [count_1, count_2, count_3] = Records.list_record_counts_over_time({:day, 1})
    assert count_1.timestamp == NaiveDateTime.beginning_of_day(NaiveDateTime.add(beginning_of_day, -2, :day))
    assert count_1.count == 1
    assert count_2.timestamp == NaiveDateTime.beginning_of_day(NaiveDateTime.add(beginning_of_day, -1, :day))
    assert count_2.count == 3
    assert count_3.timestamp == NaiveDateTime.beginning_of_day(beginning_of_day)
    assert count_3.count == 1
  end

  defp record_fixture(attrs) do
    Record
    |> struct!(attrs)
    |> Repo.insert!()
  end
end

Summarizations

Ecto with PostgreSQL makes it easy to calculate summarizations such as p-values when querying values over time:

defmodule MyApp.Records do
  # ...

  def list_record_percentiles_over_time(time_interval) do
    Record
    |> select([r], %{
      ended_at: r.ended_at,
      duration: fragment("extract(epoch from ? - ?)", r.ended_at, r.began_at),
    })
    |> subquery()
    |> from()
    |> group_by([r], [fragment("timestamp")])
    |> select([r], %{
      timestamp: fragment("date_bin(?, ?, 'epoch') as timestamp", ^to_interval(time_interval), r.ended_at),
      min: min(r.duration),
      p25: type(fragment("percentile_cont (0.25) WITHIN GROUP (ORDER BY ?)", r.duration), :decimal),
      p50: type(fragment("percentile_cont (0.5) WITHIN GROUP (ORDER BY ?)", r.duration), :decimal),
      p75: type(fragment("percentile_cont (0.75) WITHIN GROUP (ORDER BY ?)", r.duration), :decimal),
      max: max(r.duration)
    })
    |> order_by([r], asc: fragment("timestamp"))
    |> Repo.all()
  end
end
Tests for time_interval/2 and list_record_counts_over_time/1.
defmodule MyApp.RecordsTest do
  use MyApp.DataCase

  # ...

  test "list_record_percentiles_over_time/1" do
    beginning_of_day = %{DateTime.utc_now() | hour: 0, minute: 0, second: 0, microsecond: {0, 6}}

    record_fixture(%{began_at: DateTime.add(beginning_of_day, -2, :day), ended_at: DateTime.add(beginning_of_day, -1, :day)})
    record_fixture(%{began_at: DateTime.add(beginning_of_day, -2, :day), ended_at: beginning_of_day})
    record_fixture(%{began_at: DateTime.add(beginning_of_day, -1, :day), ended_at: DateTime.add(beginning_of_day, 12, :hour)})

    assert [precentile_1, precentile_2] = Records.list_record_percentiles_over_time({:day, 1})
    assert precentile_1.timestamp == NaiveDateTime.add(beginning_of_day, -1, :day)
    assert Decimal.eq?(precentile_1.min, 60 * 60 * 24)
    assert Decimal.eq?(precentile_1.max, 60 * 60 * 24)
    assert precentile_2.timestamp == DateTime.to_naive(beginning_of_day)
    assert Decimal.eq?(precentile_2.min, 60 * 60 * 36)
    assert Decimal.eq?(precentile_2.max, 60 * 60 * 48)
    assert Decimal.eq?(precentile_2.p25, 60 * 60 * 39)
    assert Decimal.eq?(precentile_2.p50, 60 * 60 * 42)
    assert Decimal.eq?(precentile_2.p75, 60 * 60 * 45)
  end
end

Filling the gaps

It is common to experience gaps in your data with the above queries. This is not a problem for all charts, but for continuous charts like line charts, you’ll need to fill them in.

You have two options:

1. Use series generation in the PostgreSQL query

You’ll need to fetch the minimum and maximum values before you can generate the series:

defmodule MyApp.Records do
  # ...

  def list_record_counts_over_time(time_interval) do
    interval = to_interval(time_interval)

    Record
    |> select([r], %{min: min(r.inserted_at), max: max(r.inserted_at)})
    |> Repo.one()
    |> case do
      nil ->
        []

      %{min: min, max: max} ->
        query =
          Record
          |> group_by([r], fragment("timestamp"))
          |> select([r], %{
            timestamp: fragment("date_bin(?, ?, 'epoch')", ^interval, r.inserted_at),
            count: count(r.id)
          })

        from(
          fragment("GENERATE_SERIES(?::timestamp, ?::timestamp, ?::interval)", ^min, ^max, ^interval)
        )
        |> select([s], %{timestamp: fragment("DATE_BIN(?, ?::timestamp, 'epoch')", ^interval, s)})
        |> subquery()
        |> join(:left, [s], r in subquery(query), on: s.timestamp == r.timestamp)
        |> order_by([s], asc: s.timestamp)
        |> select([s, r], %{
          timestamp: s.timestamp,
          count: coalesce(r.count, 0)
        })
        |> Repo.all()
    end
  end
end
Tests for list_record_counts_over_time/1.
defmodule MyApp.RecordsTest do
  use MyApp.DataCase

  # ...

  test "list_record_counts_over_time/1" do
    assert Records.list_record_counts_over_time({:day, 1}) == []

    beginning_of_day = %{DateTime.utc_now() | hour: 0, minute: 0, second: 0, microsecond: {0, 6}}

    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -3, :day)})
    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -1, :day)})
    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -1, :hour)})
    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -1, :minute)})
    record_fixture(%{inserted_at: beginning_of_day})

    assert [count_1, count_2, count_3, count_4] = Records.list_record_counts_over_time_2({:day, 1})
    assert count_1.timestamp == NaiveDateTime.beginning_of_day(NaiveDateTime.add(beginning_of_day, -3, :day))
    assert count_1.count == 1
    assert count_2.timestamp == NaiveDateTime.beginning_of_day(NaiveDateTime.add(beginning_of_day, -2, :day))
    assert count_2.count == 0
    assert count_3.timestamp == NaiveDateTime.beginning_of_day(NaiveDateTime.add(beginning_of_day, -1, :day))
    assert count_3.count == 3
    assert count_4.timestamp == NaiveDateTime.beginning_of_day(beginning_of_day)
    assert count_4.count == 1
  end
end

It’ll be the similar process for numeric steps, but you must ensure the upper and lower boundaries as otherwise the steps may start at odd number (e.g. 435.43, 535.43, 635.43). I’m also guarding against extreme tail end by collecting p99 as a max, and including anything above the p99 in the last step.

defmodule MyApp.Records do
  alias MyApp.ChartUtils

  # ...

  def list_record_value_distribution(steps \\ 100) do
    Record
    |> select([r], %{
      min: min(r.value),
      max: max(r.value),
      p99: type(fragment("percentile_cont (0.99) WITHIN GROUP (ORDER BY ?)", r.value), :decimal)
      })
    |> Repo.one()
    |> case do
      %{min: nil, max: nil} ->
        []

      %{min: min, max: max, p99: p99} ->
        step_size = ChartUtils.step_size(min, p99, steps)
        floor_min = lower_bound(min, step_size)
        ceil_max = upper_bound(max, step_size)
        floor_p99 = lower_bound(p99, step_size)

        from(
          fragment("GENERATE_SERIES(?::numeric, ?::numeric, ?::numeric)", ^floor_min, ^floor_p99, ^step_size)
        )
        |> select([s], %{
            min: fragment("?", s),
            # We want to ensure that the last step includes everything beyond p99
            max: fragment("CASE WHEN ? = ? THEN ? ELSE ? + ? END", s, ^floor_p99, ^ceil_max, s, ^step_size)
          })
        |> subquery()
        |> join(:left, [s], r in Record, on: r.value >= s.min and r.value < s.max)
        |> group_by([s], [s.min, s.max])
        |> order_by([s], asc: s.min)
        |> select([s, r], %{
          min: s.min,
          max: s.max,
          count: count(r.id)
        })
        |> Repo.all()
    end
  end

  defp lower_bound(value, step_size), do: Decimal.sub(value, Decimal.rem(value, step_size))

  defp upper_bound(value, step_size), do: Decimal.add(lower_bound(value, step_size), step_size)
end
Tests for list_record_value_distribution/1.
defmodule MyApp.RecordsTest do
  use MyApp.DataCase

  # ...

  test "list_record_counts_over_time/1" do
    assert Records.list_record_counts_over_time({:day, 1}) == []

    beginning_of_day = %{DateTime.utc_now() | hour: 0, minute: 0, second: 0, microsecond: {0, 6}}

    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -3, :day)})
    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -1, :day)})
    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -1, :hour)})
    record_fixture(%{inserted_at: DateTime.add(beginning_of_day, -1, :minute)})
    record_fixture(%{inserted_at: beginning_of_day})

    assert [count_1, count_2, count_3, count_4] = Records.list_record_counts_over_time_2({:day, 1})
    assert count_1.timestamp == NaiveDateTime.beginning_of_day(NaiveDateTime.add(beginning_of_day, -3, :day))
    assert count_1.count == 1
    assert count_2.timestamp == NaiveDateTime.beginning_of_day(NaiveDateTime.add(beginning_of_day, -2, :day))
    assert count_2.count == 0
    assert count_3.timestamp == NaiveDateTime.beginning_of_day(NaiveDateTime.add(beginning_of_day, -1, :day))
    assert count_3.count == 3
    assert count_4.timestamp == NaiveDateTime.beginning_of_day(beginning_of_day)
    assert count_4.count == 1
  end

  test "list_record_value_distribution/1" do
    assert Records.list_record_value_distribution() == []

    for _ <- 1..100, do: record_fixture(%{value: 3})
    record_fixture(%{value: Decimal.new("1.1")})
    record_fixture(%{value: Decimal.new("1.2")})
    record_fixture(%{value: 3})
    record_fixture(%{value: 5})
    record_fixture(%{value: Decimal.new("10.5")})
    record_fixture(%{value: 10_000})

    [value_1, value_2, value_3 | rest] = Records.list_record_value_distribution(10)
    value_10 = List.last(rest)

    assert length(rest) == 7

    assert value_1.count == 2
    assert value_1.min == Decimal.new("1.0")
    assert value_1.max == Decimal.new("2.0")

    assert value_2.count == 0
    assert value_2.min == Decimal.new("2.0")
    assert value_2.max == Decimal.new("3.0")

    assert value_3.count == 101
    assert value_3.min == Decimal.new("3.0")
    assert value_3.max == Decimal.new("4.0")

    assert value_10.count == 2
    assert value_10.min == Decimal.new("10.0")
    assert value_10.max == Decimal.new("10001")
  end
end

2. Fill in using Elixir

The second way is to just fill it with Elixir. Assuming the list is always sorted in ascending order you can simply iterate through it and fill in the missing data:

defmodule MyApp.ChartUtils do
  # ...

  def fill_gaps([], _), do: []
  def fill_gaps([el], _), do: [el]

  def fill_gaps([first | list], {unit, amount}) do
    {data, _} =
      Enum.reduce(list, {[first], first.timestamp}, fn data, {acc, prev_timestamp} ->
        {
          acc ++ gen_empty_range(prev_timestamp, data.timestamp, unit, amount) ++ [data],
          data.timestamp
        }
      end)

    data
  end

  defp gen_empty_range(from, till, unit, amount, acc \\ []) do
    case NaiveDateTime.add(from, amount, unit) do
      ^till -> acc
      timestamp -> acc ++ gen_empty_range(timestamp, till, unit, amount, [%{timestamp: timestamp, count: 0}])
    end
  end
end
Tests for fill_gaps/2.
defmodule MyApp.ChartUtilsTest do
  use MyApp.DataCase

  # ...

  test "fill_gaps/2" do
    beginning_of_day = NaiveDateTime.beginning_of_day(DateTime.utc_now())

    counts = [
      first = %{timestamp: NaiveDateTime.add(beginning_of_day, -3, :day), count: 1},
      %{timestamp: NaiveDateTime.add(beginning_of_day, -1, :day), count: 3},
      %{timestamp: beginning_of_day, count: 2}
    ]

    assert ChartUtils.fill_gaps([], {:day, 1}) == []
    assert ChartUtils.fill_gaps([first], {:day, 1}) == [first]
    assert [^first, gap, _, _] = ChartUtils.fill_gaps(counts, {:day, 1})
    assert gap.timestamp == NaiveDateTime.add(beginning_of_day, -2, :day)
    assert gap.count == 0
    assert length(ChartUtils.fill_gaps(counts, {:hour, 12})) == 7
  end
end

Wrap up

There are no one solution for charts, which is why you see so many charting libraries. The same goes for the dataset used for charts. The reason is that the charts/data needs context to have meaning. This post gives you ideas of how to deal with aggregation and data transformation, but you’ll have to find the best presentation for your data by experimenting.

I may add more code snippets to this in the future. Happy coding!

Hi, I'm Dan Schultzer, I write in this blog, work a lot in Elixir, maintain several open source projects, and help companies streamline their development process