We can't find the internet
Attempting to reconnect
Something went wrong!
Hang in there while we get back on track
Distributed Erlang is required when running multiple nodes with Phoenix LiveView and using LongPoll fallback. The Phoenix code generator adds DNSCluster
by default, which is great when you can set up DNS records. Unfortunately, in our production setup, we have a separation between the network account and the service account, which makes it impossible to set up Service Discovery (see this issue for more).
What if we simply list the private IPs of the ECS tasks using the AWS API? We could use ClusterEcs
, but I prefer the simplicity of DNSCluster
, and one issue with ClusterEcs
is that it uses ExAWS
instead of AWS
, which we exclusively use.
Prerequisites
You can skip this section if you have already set up your project to be ready for distributed Erlang.
You must allow for internode communication in the cluster. Add the following security group to your ECS tasks:
resource "aws_security_group" "erlang_distribution" {
name = "${data.aws_ecs_cluster.this.cluster_name}-erlang-distribution"
description = "Communication between Elixir nodes in the cluster"
vpc_id = data.aws_vpc.this.id
ingress {
description = "Allow distributed Erlang cluster communication"
from_port = 0
to_port = 0
protocol = "-1"
self = true
}
}
Set up an entrypoint script to pull the IP and set the release node name. I do this in the GitHub workflow that packages the Docker file:
# ...
jobs:
package:
# ...
# It is necessary to set up an entrypoint script that sets the release node name
# from the ECS container metadata, to enable distributed Erlang.
- name: Set up AWS ECS bootstrap entrypoint script for distributed Erlang
run: |
cat >> .release/rel/overlays/bin/entrypoint <<'SH'
#!/bin/sh
set -eu
export RELEASE_DISTRIBUTION=name
export RELEASE_NODE=my_app-${{ github.sha }}@`curl -s $ECS_CONTAINER_METADATA_URI_V4 | jq -r ".Networks[0].IPv4Addresses[0]"`
/app/bin/$1 ${2:-}
SH
chmod +x .release/rel/overlays/bin/entrypoint
Ensure that curl
and jq
is installed in the Docker image:
# Start a new build stage so that the final image will only contain
# the compiled release and other runtime necessities
FROM ${RUNNER_IMAGE}
RUN apt-get update -y && apt-get install -y libstdc++6 openssl libncurses5 locales \
# For distributed Erlang it is necessary to set up an entrypoint that will pull
# the ECS instance IP. We need curl and jq to fetch the ECS instance info.
&& apt-get install -y curl jq \
&& apt-get clean && rm -f /var/lib/apt/lists/*_*
Attaching to DNSCluster
First, configure the DNSCluster
in application.ex
to use the custom resolver:
{DNSCluster, resolver: DNSCluster.ECSResolver, query: Application.get_env(:my_app, :dns_cluster_ecs_query) || :ignore}
Now set the :dns_cluster_ecs_query
config in runtime.exs
:
config :my_app, :dns_cluster_ecs_query, System.get_env("DNS_CLUSTER_ECS_QUERY")
Here is the awkward part of using DNSCluster
as it expects the query option to be a binary. We’ll set the container environment as a JSON string (using Terraform here):
# The DNS_CLUSTER_ECS_QUERY is used for Erlang distribution for nodes to
# discover other ECS instances using `tags:GetResources`. It must contain the
# `cluster`, `region`, and `params` keys.
{
name = "DNS_CLUSTER_ECS_QUERY"
value = jsonencode({
"cluster": data.aws_ecs_cluster.this.arn,
"region": var.region,
"params": {
"TagFilters": [
{
"Key": "Environment"
"Values": [var.environment]
},
{
"Key": "Project"
"Values": [var.project]
}
]
}
})
},
You should adjust the TagFilters
to the tags you use in your project.
You’ll also need to allow for ecs:DescribeTasks
and tags:GetResources
permissions for the task runner. You should note that tags:GetResources
is not a resource-level permission, and you have to set it with a wildcard resource.
With that out of the way, we can build the ECS resolver.
The resolver will fetch the ECS resources with the AWS Resource Groups Tagging API using the provided params and then fetch the private IPv4 address using the ECS API to describe the tasks. The resolver will recursively query the API if a pagination token is returned.
defmodule DNSCluster.ECSResolver do
@moduledoc """
`DNSCluster` resolver for ECS.
This resolver uses the AWS Resource Groups Tagging API to look up ECS task
IPs. The following policies are required for the role(s) running the tasks:
- `ecs:DescribeTasks`
- `tags:GetResources`
Note that `tags:*` permissions don't have resource-level permissions so you
must set it with a wildcard resource.
You want to start the supervisor for the `DNSCluster` application in your
`application.ex` using this resolver:
{DNSCluster, resolver: DNSCluster.ECSResolver, query: Application.get_env(:my_app, :dns_cluster_ecs_query) || :ignore}
The query should be a JSON encoded binary as `DNSCluster` requires the
argument to be a binary:
{
"cluster": "my-cluster",
"region": "us-east-1",
"params": {
"TagFilters": [
{"Key": "my-tag", "Values": ["my-tag-value"]}
]
}
}
The query must have a cluster, region, and params key. The params key is a
map of parameters to pass to the AWS Resource Groups Tagging API.
"""
require Logger
def basename(node_name) when is_atom(node_name) do
[basename, _] = String.split(to_string(node_name), "@")
basename
end
def connect_node(node_name) when is_atom(node_name), do: Node.connect(node_name)
def list_nodes, do: Node.list(:visible)
def lookup(query, type) when is_binary(query), do: lookup(Jason.decode!(query), type)
def lookup(%{"cluster" => cluster, "region" => region, "params" => params}, :a), do: list_task_ips(cluster, region, params)
def lookup(_query, :aaaa), do: []
defp list_task_ips(cluster, region, params) do
client = aws_client(region)
params = Map.put(params, "ResourceTypeFilters", ["ecs:task"])
list_task_ips(client, cluster, params, [])
end
defp list_task_ips(client, cluster, params, ips) do
case fetch_task_ipv4_addresses(client, cluster, params) do
{:ok, task_ips, next} -> list_task_ips(client, cluster, params, ips ++ task_ips, next)
:error -> []
end
end
defp list_task_ips(_client, _cluster, _params, ips, nil), do: ips
defp list_task_ips(client, cluster, params, ips, next) do
list_task_ips(client, cluster, Map.put(params, "PaginationToken", next), ips)
end
defp aws_client(region) do
credentials = :aws_credentials.get_credentials()
access_key = Map.fetch!(credentials, :access_key_id)
secret_key = Map.fetch!(credentials, :secret_access_key)
session_token = Map.get(credentials, :token)
AWS.Client.create(access_key, secret_key, session_token, region)
end
defp fetch_task_ipv4_addresses(client, cluster, params) do
with {:ok, task_arns, next} <- fetch_resource_arns(client, params),
{:ok, task_ips} <- fetch_ipv4_addresses(client, cluster, task_arns) do
{:ok, task_ips, next}
end
end
defp fetch_resource_arns(client, params) do
case AWS.ResourceGroupsTaggingAPI.get_resources(client, params) do
{:ok, %{"ResourceTagMappingList" => resources} = resp, _} ->
task_arns = Enum.map(resources, &Map.fetch!(&1, "ResourceARN"))
pagination_token = Map.fetch!(resp, "PaginationToken")
next = String.length(pagination_token) > 0 && pagination_token || nil
{:ok, task_arns, next}
{:error, error} ->
Logger.warning("Error looking up resources by tags: #{inspect(error)}")
:error
end
end
defp fetch_ipv4_addresses(client, cluster, task_arns) do
case AWS.ECS.describe_tasks(client, %{"cluster" => cluster, "tasks" => task_arns}) do
{:ok, data, _} ->
{:ok, extract_ipv4s(data)}
{:error, error} ->
Logger.warning("Error looking up task IPs: #{inspect(error)}")
:error
end
end
defp extract_ipv4s(data) do
data
|> Map.fetch!("tasks")
|> Enum.flat_map(&Map.get(&1, "containers", []))
|> Enum.flat_map(&Map.get(&1, "networkInterfaces", []))
|> Enum.map(&Map.get(&1, "privateIpv4Address"))
|> Enum.reduce([], fn
nil, acc ->
acc
ip, acc ->
{:ok, ip} = :inet.parse_address(String.to_charlist(ip))
acc ++ [ip]
end)
end
end
That’s it! We got distributed Erlang without having to deal with Service Discovery.
Tests for DNSCluster.ECSResolver
.
The following test uses TestServer
to mock the AWS API.
Before you can run these tests you’ll need to update the aws_client
function in DNSCluster.ECSResolver
. I recommend setting up a helper module that can be used to generate the AWS client:
defmodule AWS.MyApp do
@moduledoc """
Module that contains MyApp specific helpers.
"""
@doc """
Creates AWS client struct.
"""
def aws_client(region) do
aws_credentials = aws_credentials()
access_key = Map.fetch!(aws_credentials, :access_key_id)
secret_key = Map.fetch!(aws_credentials, :secret_access_key)
session_token = Map.get(aws_credentials, :token)
access_key
|> AWS.Client.create(secret_key, session_token, region)
|> put_endpoint()
|> Map.merge(%{proto: aws_proto()})
end
# In tests we don't want to require :aws_credentials to be running
if Mix.env() == :test do
defp aws_credentials do
Application.get_env(:aws, :credentials, %{access_key_id: "test", secret_access_key: "test", token: "test"})
end
else
defp aws_credentials, do: :aws_credentials.get_credentials()
end
# In tests we want to replace the whole endpoint while in prod/dev we just want to replace the root
if Mix.env() == :test do
defp put_endpoint(client), do: AWS.Client.put_endpoint(client, aws_endpoint())
else
defp put_endpoint(client), do: AWS.Client.put_endpoint(client, {:keep_prefixes, aws_endpoint()})
end
defp aws_endpoint, do: Application.get_env(:aws, :endpoint, AWS.Client.default_endpoint())
defp aws_proto, do: Application.get_env(:aws, :proto, "https")
end
We don’t want to start :aws_credentials
in tests:
# mix.exs
defmodule MyApp.MixProject do
use Mix.Project
# ...
def deps do
[
# ...
{:aws, "~> 1.0.2"},
# We don't want `aws_credentials` to start in dev and test since
# the startup will fail if credentials are not available.
{:aws_credentials, "~> 0.3.1", runtime: Mix.env() == :prod}
]
end
#...
end
We can now test the ECSResolver (and any other functions you got that is using the aws_client
helper function):
defmodule DNSCluster.ECSResolverTest do
use MyApp.DataCase
alias DNSCluster.ECSResolver
alias ExUnit.CaptureLog
@params """
{
"cluster": "default",
"region": "us-east-1",
"params": {
"TagFilters": [
{
"Key": "Environment",
"Values": ["test"]
},
{
"Key": "Project",
"Values": ["my-project"]
}
]
}
}
"""
@task_arns ~w(arn:aws:ecs:us-east-1:012345678910:task/1 arn:aws:ecs:us-east-1:012345678910:task/2 arn:aws:ecs:us-east-1:012345678910:task/3)
describe "lookup/2" do
test "with `:aaaa` type" do
assert ECSResolver.lookup(@params, :aaaa) == []
end
test "with `:a` type when fetching resource arns fails" do
start_aws_test_server()
expect_aws_post_request(403, aws_api_error_fixture())
assert CaptureLog.capture_log(fn ->
assert ECSResolver.lookup(@params, :a) == []
end) =~ "Error looking up resources by tags"
end
test "with `:a` type when describing tasks fails" do
start_aws_test_server()
expect_aws_post_request(200, aws_resources_fixture(@task_arns))
expect_aws_post_request(403, aws_api_error_fixture())
assert CaptureLog.capture_log(fn ->
assert ECSResolver.lookup(@params, :a) == []
end) =~ "Error looking up task IPs"
end
test "with `:a` type" do
start_aws_test_server()
aws_resources = aws_resources_fixture(@task_arns)
expect_aws_post_request(200, fn params ->
assert params["ResourceTypeFilters"] == ["ecs:task"]
assert params["TagFilters"] == Jason.decode!(@params)["params"]["TagFilters"]
aws_resources
end)
expect_aws_post_request(200, fn params ->
assert params["cluster"] == Jason.decode!(@params)["cluster"]
assert params["tasks"] == @task_arns
aws_tasks_fixture(@task_arns)
end)
assert ECSResolver.lookup(@params, :a) == [{10, 0, 0, 0}, {10, 0, 0, 1}, {10, 0, 0, 2}]
end
test "with `:a` type when has nextPaginationToken" do
{task_arns_1, task_arns_2} = Enum.split(@task_arns, 2)
start_aws_test_server()
expect_aws_post_request(200, %{aws_resources_fixture(task_arns_1) | "PaginationToken" => "nextPaginationToken"})
expect_aws_post_request(200, aws_tasks_fixture(task_arns_1))
expect_aws_post_request(200, fn params ->
assert params["PaginationToken"] == "nextPaginationToken"
aws_resources_fixture(task_arns_2)
end)
expect_aws_post_request(200, aws_tasks_fixture(task_arns_2, ip_start_at: 3))
assert ECSResolver.lookup(@params, :a) == [{10, 0, 0, 0}, {10, 0, 0, 1}, {10, 0, 0, 3}]
end
defp start_aws_test_server do
TestServer.start(http_server: {TestServer.HTTPServer.Bandit, http_options: [compress: false]})
uri = URI.parse(TestServer.url())
Application.put_env(:aws, :endpoint, "#{uri.host}:#{uri.port}")
Application.put_env(:aws, :proto, uri.scheme)
on_exit(fn ->
Application.delete_env(:aws, :endpoint)
Application.delete_env(:aws, :proto)
end)
end
defp expect_aws_post_request(status, body_or_function) do
TestServer.add("/", via: :post, to: fn conn ->
{:ok, body, _} = Plug.Conn.read_body(conn)
params = Jason.decode!(body)
body = is_function(body_or_function) && body_or_function.(params) || body_or_function
conn
|> Plug.Conn.put_resp_header("Content-Type", "application/x-amz-json-1.1")
|> Plug.Conn.send_resp(status, Jason.encode!(body))
end)
end
defp aws_api_error_fixture do
%{
"error" => %{
"code" => "ExpiredToken",
"message" => "",
"requestId" => ""
}
}
end
defp aws_resources_fixture(resource_arns) do
%{
"PaginationToken" => "",
"ResourceTagMappingList" => (
for resource_arn <- resource_arns do
%{
"ComplianceDetails" => %{
"ComplianceStatus" => true,
"KeysWithNoncompliantValues" => [],
"NoncompliantKeys" => []
},
"ResourceARN" => resource_arn,
"Tags" => []
}
end
)
}
end
defp aws_tasks_fixture(resource_arns, opts \\ []) do
%{
"failures" => [],
"tasks" => (
for {resource_arn, n} <- Enum.with_index(resource_arns) do
%{
"taskArn" => resource_arn,
"overrides" => %{
"containerOverrides" => [
%{
"name" => "simple-app"
},
%{
"name" => "busybox"
}
]
},
"lastStatus" => "RUNNING",
"containerInstanceArn" => "arn:aws:ecs:us-east-1:012345678910:container-instance/default/5991d8da-1d59-49d2-a31f-4230f9e73140",
"createdAt" => 1_476_822_811.295,
"version" => 0,
"clusterArn" => "arn:aws:ecs:us-east-1:012345678910:cluster/default",
"startedAt" => 1_476_822_833.998,
"desiredStatus" => "RUNNING",
"taskDefinitionArn" => "arn:aws:ecs:us-east-1:012345678910:task-definition/console-sample-app-dynamic-ports:1",
"startedBy" => "ecs-svc/9223370560032507596",
"containers" => [
%{
"containerArn" => "arn:aws:ecs:us-east-1:012345678910:container/4df26bb4-f057-467b-a079-961675296e64",
"taskArn" => "arn:aws:ecs:us-east-1:012345678910:task/default/1dc5c17a-422b-4dc4-b493-371970c6c4d6",
"lastStatus" => "RUNNING",
"name" => "simple-app",
"networkBindings" => [
%{
"protocol" => "tcp",
"bindIP" => "0.0.0.0",
"containerPort" => 80,
"hostPort" => 32_774
}
],
"networkInterfaces" => [
%{
"attachmentId" => "",
"ipv6Address" => "",
"privateIpv4Address" => "10.0.0.#{Keyword.get(opts, :ip_start_at, 0) + n}"
}
]
},
%{
"containerArn" => "arn:aws:ecs:us-east-1:012345678910:container/e09064f7-7361-4c87-8ab9-8d073bbdbcb9",
"taskArn" => "arn:aws:ecs:us-east-1:012345678910:task/default/1dc5c17a-422b-4dc4-b493-371970c6c4d6",
"lastStatus" => "RUNNING",
"name" => "busybox",
"networkBindings" => []
}
]
}
end
)
}
end
end
end
Hi, I'm Dan Schultzer, I write in this blog, work a lot in Elixir, maintain several open source projects, and help companies streamline their development process