Published on

IdempotencyPlug - idempotent POST requests

This week I released a plug library called IdempotencyPlug to make POST (and PATCH) requests idempotent!

While scouring the internet I didn’t find a whole lot of useful resources for this, no libraries and few articles. So in this blog post, I’ll detail what idempotent requests are, why you would need this, and how IdempotencyPlug works.

What is an idempotent request?

An idempotent request ensures that a request only affects the resource at most once. In REST all methods are idempotent except POST and PATCH.

Take the endpoint POST /api/payments that initiate a new payment charge at a payment processor. If the client experiences a network interruption during the request, how can the client safely retry the request without creating a new charge?

There are many ways this can be solved.

A common approach, and the one I use for IdempotencyPlug, is to have the client send an Idempotency-Key HTTP header in the POST request. We know that we’re dealing with the same request acting on the same resource if the value of that header and the URI matches.

If you want a deep dive, then I highly recommend reading Brandur’s blog post on idempotency keys.

The requirements

I’m using the IETF Idempotency-Key Header draft for all assumptions on how to deal with idempotency keys. These are the requirements for idempotent request handling:

  • Require a single Idempotency-Key HTTP header for all POST and PATCH requests
  • Idempotency-Key value MUST be unique for a URI
  • Idempotency-Key value MUST NOT be reused with a different request payload
  • First-time requests MUST be processed normally, and the response cached
  • Duplicate requests MUST return the cached response
  • Concurrent requests MUST return an error
  • We MUST handle unexpected process termination
  • The cached responses SHOULD expire after 24 hours
  • The cache SHOULD be distributed and persisted

I also had this special requirement for the specific environment I was dealing with:

  • Handle globally isolated instances or intermittent isolations

With this in hand, it’s time to build!

The GenServer

We’ll be using a GenServer since we must track unexpected process termination. The request process is monitored in the GenServer, and if the request response is never set we’ll update the cache when the process terminates.

Here’s a snippet of how the logic works:

defmodule IdempotencyPlug.RequestTracker do
  use GenServer

  ## API

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  # This must be called when we run the plug
  def track(name_or_pid, request_id, fingerprint) do
    GenServer.call(name_or_pid, {:track, request_id, fingerprint})
  end

  # This must be called in a before_send callback
  def put_response(name_or_pid, request_id, response) do
    GenServer.call(name_or_pid, {:put_response, request_id, response})
  end

  ## Callbacks

  def init(_opts) do
    {:ok, monitored: []}
  end

  def handle_call({:track, request_id, fingerprint}, {caller, _}, state) do
    case store.lookup(request_id) do
      :not_found ->
        # Store initial state in cache
        case store.insert(request_id, :processing, fingerprint, expires_at) do
          :ok ->
            # Monitor request process
            monitored =
              state.monitored ++ [{request_id, caller, Process.monitor(caller)}]

            {:reply, :init, %{state | monitored: monitored}}

          {:error, reason} ->
            {:reply, {:error, reason}, state}
        end

      :processing ->
        {:reply, :processing, state}

      {:halted, reason, ^fingerprint} ->
        {:reply, {:cache, {:halted, reason}}, state}

      {:ok, response, ^fingerprint} ->
        {:reply, {:cache, {:ok, response}}, state}

      # When fingerprint is a mismatch for response and halted cache states
      {_any, _data, _fingerprint} ->
        {:reply, :invalid_fingerprint, state}
    end
  end

  def handle_call({:put_response, request_id, response}, _from, state) do
    # Fetch monitored requests from state by request_id
    {unmonitored, monitored} =
      Enum.split_with(state.monitored, fn {id, _, _} -> 
        id == request_id
      end)

    # Demonitor request processes
    Enum.each(unmonitored, fn {_, _, ref} -> Process.demonitor(ref) end)
    state = %{state | monitored: monitored}

    # Update with response in cache
    case store.update(request_id, {:ok, response}, expires_at) do
      :ok -> {:reply, :ok, state}
      {:error, reason} -> {:reply, {:error, reason}, state}
    end
  end

  def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
    # Fetch monitored requests from state by pid
    {terminated, monitored} =
      Enum.split_with(state.monitored, fn {_, caller, _} ->
        caller == pid
      end)

    # Demonitor request processes
    Enum.each(terminated, fn {_, _, ref} -> Process.demonitor(ref) end)
    state = %{state | monitored: monitored}

    # Update with halted state in cache
    Enum.each(terminated, fn {request_id, _, _} ->
      store.update(request_id, {:halted, reason}, expires_at)
    end)

    {:noreply, state}
  end
end

The Plug

The plug will parse the Idempotency-Key HTTP header, store responses for first-time requests, and return cached responses for subsequent requests.

defmodule IdempotencyPlug do
  alias IdempotencyPlug.RequestTracker

  def init(opts), do: opts

  def call(%{method: method} = conn, opts) when method in ~w(POST PATCH) do
    case Plug.Conn.get_req_header(conn, "idempotency-key") do
      [id] -> handle_idempotent_request(conn, id, opts)
      [_ | _] -> # Raise or return error
      [] -> # Raise or return error
    end
  end

  def call(conn, _opts), do: conn

  defp handle_idempotent_request(conn, id, opts) do
    idempotent_id = sha256_hash({id, conn.path_info})
    fingerprint = sha256_hash(conn.params |> Map.to_list() |> Enum.sort())

    case RequestTracker.track(tracker, idempotent_id, fingerprint) do
      :processing ->
        # Raise or return concurrent request error

      :invalid_fingerprint ->
        # Raise or return fingerprint mismatch error

      {:cache, {:halted, _reason}} ->
        # Raise or return unexpected process termination error

      {:cache, {:ok, %{resp_body: body, resp_headers: headers, status: status}}} ->
        headers
        |> Enum.reduce(conn, fn {key, value}, conn ->
          Plug.Conn.put_resp_header(conn, key, value)
        end)
        |> Plug.Conn.resp(status, body)
        |> Plug.Conn.halt()

      :init ->
        Plug.Conn.register_before_send(conn, fn conn ->
          data = Map.take(conn, [:resp_body, :resp_headers, :status])

          case RequestTracker.put_response(tracker, idempotent_id, data) do
            :ok -> conn
            {:error, error} -> # Raise error
          end
        end)

      {:error, error} ->
        # Raise error
    end
  end
end

Distribution and persistence

An ETS store is used as the default store in IdempotencyPlug. It requires the least configuration to get going, but it’s not distributed and persisted. In my specific use case, I couldn’t depend on cluster nodes to always be connected. So I’m using the Ecto store that’s shipped with IdempotencyPlug for persisted and distributed caching.

There are many more ways distribution and persistence can be dealt with instead, for example:

To that end IdempotencyPlug ships with an IdempotencyPlug.Store behaviour for any store implementations.

Testing with Phoenix

After implementing IdempotencyPlug you may have some custom configuration you want to ensure works. In my case, I had to ensure that the responses were correct since I use OpenAPISpex, and also ensure that the idempotent request was scoped to the authenticated user. The custom :with error handler returned a halted conn with a JSON response instead of raising the error.

I implemented a test controller that only was included for tests:

if Mix.env == :test do
  post "/idempotency-plug-handler-test", IdempotencyPlugHandlerTestController, :create
end
# test/support/idempotency_plug_handler_test_controller.ex
defmodule MyAppWeb.IdempotencyPlugHandlerTestController do
  use MyAppWeb, :controller

  def create(conn, _params) do
    message = if callback = Process.get(:callback), do: callback.(), else: "OK"

    json(conn, %{message: message})
  end
end

And then ran through the whole pipeline in tests:

defmodule MyAppWeb.IdempotencyPlugHandlerTest do
  use MyAppWeb.ConnCase

  setup [:setup_authenticated_conn, :setup_request]

  test "with concurrent request", %{conn: conn} do
    pid = self()

    task =
      Task.async(fn ->
        post_request(conn, fn ->
          send(pid, :continue)
          receive do
            :continue -> :ok
          end

          :ok
        end)
      end)

    receive do
      :continue -> :ok
    end

    conn = post_request(conn)

    assert conn.halted

    # assert custom json response

    send(task.pid, :continue)
    Task.await(task)
  end

  @tag capture_log: true
  test "with halted response", %{conn: conn} do
    Process.flag(:trap_exit, true)
    task = Task.async(fn -> post_request(conn, fn -> exit(:fatal) end) end)
    {:fatal, _} = catch_exit(Task.await(task))

    conn = post_request(conn)

    assert conn.halted

    # assert custom json response
  end

  test "with cached response with different request payload", %{conn: conn} do
    _other_conn = post_request(conn, nil, %{"other-key" => "1"})
    conn = post_request(conn)

    assert conn.halted

    # assert custom json response
  end

  test "with cached response with different user", %{conn: conn} do
    # Set up authentication with different user for other_conn
    {:ok, context} = setup_authenticated_conn(%{conn: conn})
    other_conn = post_request(context[:conn], fn -> "OTHER-RESPONSE" end)

    conn = post_request(conn)

    refute conn.halted
    refute conn.resp_body == other_conn.resp_body
  end

  defp setup_request(%{conn: conn}) do
    conn =
      conn
      |> put_req_header("idempotency-key", "key")
      |> Map.put(:method, "POST")
      |> Map.put(:params, %{"a" => 1, "b" => 2})

    %{conn: conn}
  end

  defp post_request(conn, callback \\ nil, params \\ %{}) do
    Process.put(:callback, callback)

    post(conn, ~p"/api/idempotency-plug-handler-test", Map.merge(%{"a" => 1}, params))
  end
end

Hi, I'm Dan Schultzer, I write in this blog, work a lot in Elixir, maintain several open source projects, and help companies streamline their development process