We can't find the internet
Attempting to reconnect
Something went wrong!
Hang in there while we get back on track
This week I released a plug library called IdempotencyPlug
to make POST (and PATCH) requests idempotent!
While scouring the internet I didn’t find a whole lot of useful resources for this, no libraries and few articles. So in this blog post, I’ll detail what idempotent requests are, why you would need this, and how IdempotencyPlug
works.
What is an idempotent request?
An idempotent request ensures that a request only affects the resource at most once. In REST all methods are idempotent except POST and PATCH.
Take the endpoint POST /api/payments
that initiate a new payment charge at a payment processor. If the client experiences a network interruption during the request, how can the client safely retry the request without creating a new charge?
There are many ways this can be solved.
A common approach, and the one I use for IdempotencyPlug
, is to have the client send an Idempotency-Key
HTTP header in the POST request. We know that we’re dealing with the same request acting on the same resource if the value of that header and the URI matches.
If you want a deep dive, then I highly recommend reading Brandur’s blog post on idempotency keys.
The requirements
I’m using the IETF Idempotency-Key Header draft for all assumptions on how to deal with idempotency keys. These are the requirements for idempotent request handling:
-
Require a single
Idempotency-Key
HTTP header for all POST and PATCH requests -
Idempotency-Key
value MUST be unique for a URI -
Idempotency-Key
value MUST NOT be reused with a different request payload - First-time requests MUST be processed normally, and the response cached
- Duplicate requests MUST return the cached response
- Concurrent requests MUST return an error
- We MUST handle unexpected process termination
- The cached responses SHOULD expire after 24 hours
- The cache SHOULD be distributed and persisted
I also had this special requirement for the specific environment I was dealing with:
- Handle globally isolated instances or intermittent isolations
With this in hand, it’s time to build!
The GenServer
We’ll be using a GenServer since we must track unexpected process termination. The request process is monitored in the GenServer, and if the request response is never set we’ll update the cache when the process terminates.
Here’s a snippet of how the logic works:
defmodule IdempotencyPlug.RequestTracker do
use GenServer
## API
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
# This must be called when we run the plug
def track(name_or_pid, request_id, fingerprint) do
GenServer.call(name_or_pid, {:track, request_id, fingerprint})
end
# This must be called in a before_send callback
def put_response(name_or_pid, request_id, response) do
GenServer.call(name_or_pid, {:put_response, request_id, response})
end
## Callbacks
def init(_opts) do
{:ok, monitored: []}
end
def handle_call({:track, request_id, fingerprint}, {caller, _}, state) do
case store.lookup(request_id) do
:not_found ->
# Store initial state in cache
case store.insert(request_id, :processing, fingerprint, expires_at) do
:ok ->
# Monitor request process
monitored =
state.monitored ++ [{request_id, caller, Process.monitor(caller)}]
{:reply, :init, %{state | monitored: monitored}}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
:processing ->
{:reply, :processing, state}
{:halted, reason, ^fingerprint} ->
{:reply, {:cache, {:halted, reason}}, state}
{:ok, response, ^fingerprint} ->
{:reply, {:cache, {:ok, response}}, state}
# When fingerprint is a mismatch for response and halted cache states
{_any, _data, _fingerprint} ->
{:reply, :invalid_fingerprint, state}
end
end
def handle_call({:put_response, request_id, response}, _from, state) do
# Fetch monitored requests from state by request_id
{unmonitored, monitored} =
Enum.split_with(state.monitored, fn {id, _, _} ->
id == request_id
end)
# Demonitor request processes
Enum.each(unmonitored, fn {_, _, ref} -> Process.demonitor(ref) end)
state = %{state | monitored: monitored}
# Update with response in cache
case store.update(request_id, {:ok, response}, expires_at) do
:ok -> {:reply, :ok, state}
{:error, reason} -> {:reply, {:error, reason}, state}
end
end
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
# Fetch monitored requests from state by pid
{terminated, monitored} =
Enum.split_with(state.monitored, fn {_, caller, _} ->
caller == pid
end)
# Demonitor request processes
Enum.each(terminated, fn {_, _, ref} -> Process.demonitor(ref) end)
state = %{state | monitored: monitored}
# Update with halted state in cache
Enum.each(terminated, fn {request_id, _, _} ->
store.update(request_id, {:halted, reason}, expires_at)
end)
{:noreply, state}
end
end
The Plug
The plug will parse the Idempotency-Key
HTTP header, store responses for first-time requests, and return cached responses for subsequent requests.
defmodule IdempotencyPlug do
alias IdempotencyPlug.RequestTracker
def init(opts), do: opts
def call(%{method: method} = conn, opts) when method in ~w(POST PATCH) do
case Plug.Conn.get_req_header(conn, "idempotency-key") do
[id] -> handle_idempotent_request(conn, id, opts)
[_ | _] -> # Raise or return error
[] -> # Raise or return error
end
end
def call(conn, _opts), do: conn
defp handle_idempotent_request(conn, id, opts) do
idempotent_id = sha256_hash({id, conn.path_info})
fingerprint = sha256_hash(conn.params |> Map.to_list() |> Enum.sort())
case RequestTracker.track(tracker, idempotent_id, fingerprint) do
:processing ->
# Raise or return concurrent request error
:invalid_fingerprint ->
# Raise or return fingerprint mismatch error
{:cache, {:halted, _reason}} ->
# Raise or return unexpected process termination error
{:cache, {:ok, %{resp_body: body, resp_headers: headers, status: status}}} ->
headers
|> Enum.reduce(conn, fn {key, value}, conn ->
Plug.Conn.put_resp_header(conn, key, value)
end)
|> Plug.Conn.resp(status, body)
|> Plug.Conn.halt()
:init ->
Plug.Conn.register_before_send(conn, fn conn ->
data = Map.take(conn, [:resp_body, :resp_headers, :status])
case RequestTracker.put_response(tracker, idempotent_id, data) do
:ok -> conn
{:error, error} -> # Raise error
end
end)
{:error, error} ->
# Raise error
end
end
end
Distribution and persistence
An ETS store is used as the default store in IdempotencyPlug
. It requires the least configuration to get going, but it’s not distributed and persisted. In my specific use case, I couldn’t depend on cluster nodes to always be connected. So I’m using the Ecto store that’s shipped with IdempotencyPlug
for persisted and distributed caching.
There are many more ways distribution and persistence can be dealt with instead, for example:
-
Use
Phoenix.PubSub
for distribution - Use disk-based ETS for persistence
-
Use a distributed cache library like
Cachex
To that end IdempotencyPlug
ships with an IdempotencyPlug.Store
behaviour for any store implementations.
Testing with Phoenix
After implementing IdempotencyPlug
you may have some custom configuration you want to ensure works. In my case, I had to ensure that the responses were correct since I use OpenAPISpex
, and also ensure that the idempotent request was scoped to the authenticated user. The custom :with
error handler returned a halted conn with a JSON response instead of raising the error.
I implemented a test controller that only was included for tests:
if Mix.env == :test do
post "/idempotency-plug-handler-test", IdempotencyPlugHandlerTestController, :create
end
# test/support/idempotency_plug_handler_test_controller.ex
defmodule MyAppWeb.IdempotencyPlugHandlerTestController do
use MyAppWeb, :controller
def create(conn, _params) do
message = if callback = Process.get(:callback), do: callback.(), else: "OK"
json(conn, %{message: message})
end
end
And then ran through the whole pipeline in tests:
defmodule MyAppWeb.IdempotencyPlugHandlerTest do
use MyAppWeb.ConnCase
setup [:setup_authenticated_conn, :setup_request]
test "with concurrent request", %{conn: conn} do
pid = self()
task =
Task.async(fn ->
post_request(conn, fn ->
send(pid, :continue)
receive do
:continue -> :ok
end
:ok
end)
end)
receive do
:continue -> :ok
end
conn = post_request(conn)
assert conn.halted
# assert custom json response
send(task.pid, :continue)
Task.await(task)
end
@tag capture_log: true
test "with halted response", %{conn: conn} do
Process.flag(:trap_exit, true)
task = Task.async(fn -> post_request(conn, fn -> exit(:fatal) end) end)
{:fatal, _} = catch_exit(Task.await(task))
conn = post_request(conn)
assert conn.halted
# assert custom json response
end
test "with cached response with different request payload", %{conn: conn} do
_other_conn = post_request(conn, nil, %{"other-key" => "1"})
conn = post_request(conn)
assert conn.halted
# assert custom json response
end
test "with cached response with different user", %{conn: conn} do
# Set up authentication with different user for other_conn
{:ok, context} = setup_authenticated_conn(%{conn: conn})
other_conn = post_request(context[:conn], fn -> "OTHER-RESPONSE" end)
conn = post_request(conn)
refute conn.halted
refute conn.resp_body == other_conn.resp_body
end
defp setup_request(%{conn: conn}) do
conn =
conn
|> put_req_header("idempotency-key", "key")
|> Map.put(:method, "POST")
|> Map.put(:params, %{"a" => 1, "b" => 2})
%{conn: conn}
end
defp post_request(conn, callback \\ nil, params \\ %{}) do
Process.put(:callback, callback)
post(conn, ~p"/api/idempotency-plug-handler-test", Map.merge(%{"a" => 1}, params))
end
end
Hi, I'm Dan Schultzer, I write in this blog, work a lot in Elixir, maintain several open source projects, and help companies streamline their development process