Published on

Distributed Erlang without DNS on AWS ECS

Distributed Erlang is required when running multiple nodes with Phoenix LiveView and using LongPoll fallback. The Phoenix code generator adds DNSCluster by default, which is great when you can set up DNS records. Unfortunately, in our production setup, we have a separation between the network account and the service account, which makes it impossible to set up Service Discovery (see this issue for more).

What if we simply list the private IPs of the ECS tasks using the AWS API? We could use ClusterEcs, but I prefer the simplicity of DNSCluster, and one issue with ClusterEcs is that it uses ExAWS instead of AWS, which we exclusively use.

Prerequisites

You can skip this section if you have already set up your project to be ready for distributed Erlang.

You must allow for internode communication in the cluster. Add the following security group to your ECS tasks:

resource "aws_security_group" "erlang_distribution" {
  name        = "${data.aws_ecs_cluster.this.cluster_name}-erlang-distribution"
  description = "Communication between Elixir nodes in the cluster"
  vpc_id      = data.aws_vpc.this.id

  ingress {
    description = "Allow distributed Erlang cluster communication"
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    self        = true
  }
}

Set up an entrypoint script to pull the IP and set the release node name. I do this in the GitHub workflow that packages the Docker file:

# ...

jobs:
  package:
    # ...

    # It is necessary to set up an entrypoint script that sets the release node name
    # from the ECS container metadata, to enable distributed Erlang.
    - name: Set up AWS ECS bootstrap entrypoint script for distributed Erlang
      run: |
        cat >> .release/rel/overlays/bin/entrypoint <<'SH'
        #!/bin/sh
        set -eu
        export RELEASE_DISTRIBUTION=name
        export RELEASE_NODE=my_app-${{ github.sha }}@`curl -s $ECS_CONTAINER_METADATA_URI_V4 | jq -r ".Networks[0].IPv4Addresses[0]"`
        /app/bin/$1 ${2:-}
        SH

        chmod +x .release/rel/overlays/bin/entrypoint

Ensure that curl and jq is installed in the Docker image:

# Start a new build stage so that the final image will only contain
# the compiled release and other runtime necessities
FROM ${RUNNER_IMAGE}
RUN apt-get update -y && apt-get install -y libstdc++6 openssl libncurses5 locales \
  # For distributed Erlang it is necessary to set up an entrypoint that will pull
  # the ECS instance IP. We need curl and jq to fetch the ECS instance info.
  && apt-get install -y curl jq \
  && apt-get clean && rm -f /var/lib/apt/lists/*_*

Attaching to DNSCluster

First, configure the DNSCluster in application.ex to use the custom resolver:

{DNSCluster, resolver: DNSCluster.ECSResolver, query: Application.get_env(:my_app, :dns_cluster_ecs_query) || :ignore}

Now set the :dns_cluster_ecs_query config in runtime.exs:

config :my_app, :dns_cluster_ecs_query, System.get_env("DNS_CLUSTER_ECS_QUERY")

Here is the awkward part of using DNSCluster as it expects the query option to be a binary. We’ll set the container environment as a JSON string (using Terraform here):

# The DNS_CLUSTER_ECS_QUERY is used for Erlang distribution for nodes to
# discover other ECS instances using `tags:GetResources`. It must contain the
# `cluster`, `region`, and `params` keys.
{
  name  = "DNS_CLUSTER_ECS_QUERY"
  value = jsonencode({
    "cluster": data.aws_ecs_cluster.this.arn,
    "region": var.region,
    "params": {
      "TagFilters": [
        {
          "Key": "Environment"
          "Values": [var.environment]
        },
        {
            "Key": "Project"
            "Values": [var.project]
          }
      ]
    }
  })
},

You should adjust the TagFilters to the tags you use in your project.

You’ll also need to allow for ecs:DescribeTasks and tags:GetResources permissions for the task runner. You should note that tags:GetResources is not a resource-level permission, and you have to set it with a wildcard resource.

With that out of the way, we can build the ECS resolver.

The resolver will fetch the ECS resources with the AWS Resource Groups Tagging API using the provided params and then fetch the private IPv4 address using the ECS API to describe the tasks. The resolver will recursively query the API if a pagination token is returned.

defmodule DNSCluster.ECSResolver do
  @moduledoc """
  `DNSCluster` resolver for ECS.

  This resolver uses the AWS Resource Groups Tagging API to look up ECS task
  IPs. The following policies are required for the role(s) running the tasks:

  - `ecs:DescribeTasks`
  - `tags:GetResources`

  Note that `tags:*` permissions don't have resource-level permissions so you
  must set it with a wildcard resource.

  You want to start the supervisor for the `DNSCluster` application in your
  `application.ex` using this resolver:

      {DNSCluster, resolver: DNSCluster.ECSResolver, query: Application.get_env(:my_app, :dns_cluster_ecs_query) || :ignore}

  The query should be a JSON encoded binary as `DNSCluster` requires the
  argument to be a binary:

      {
        "cluster": "my-cluster",
        "region": "us-east-1",
        "params": {
          "TagFilters": [
            {"Key": "my-tag", "Values": ["my-tag-value"]}
          ]
        }
      }

  The query must have a cluster, region, and params key. The params key is a
  map of parameters to pass to the AWS Resource Groups Tagging API.
  """

  require Logger

  def basename(node_name) when is_atom(node_name) do
    [basename, _] = String.split(to_string(node_name), "@")
    basename
  end

  def connect_node(node_name) when is_atom(node_name), do: Node.connect(node_name)

  def list_nodes, do: Node.list(:visible)

  def lookup(query, type) when is_binary(query), do: lookup(Jason.decode!(query), type)

  def lookup(%{"cluster" => cluster, "region" => region, "params" => params}, :a), do: list_task_ips(cluster, region, params)

  def lookup(_query, :aaaa), do: []

  defp list_task_ips(cluster, region, params) do
    client = aws_client(region)
    params = Map.put(params, "ResourceTypeFilters", ["ecs:task"])

    list_task_ips(client, cluster, params, [])
  end

  defp list_task_ips(client, cluster, params, ips) do
    case fetch_task_ipv4_addresses(client, cluster, params) do
      {:ok, task_ips, next} -> list_task_ips(client, cluster, params, ips ++ task_ips, next)
      :error -> []
    end
  end

  defp list_task_ips(_client, _cluster, _params, ips, nil), do: ips

  defp list_task_ips(client, cluster, params, ips, next) do
    list_task_ips(client, cluster, Map.put(params, "PaginationToken", next), ips)
  end

  defp aws_client(region) do
    credentials = :aws_credentials.get_credentials()
    access_key = Map.fetch!(credentials, :access_key_id)
    secret_key = Map.fetch!(credentials, :secret_access_key)
    session_token = Map.get(credentials, :token)

    AWS.Client.create(access_key, secret_key, session_token, region)
  end

  defp fetch_task_ipv4_addresses(client, cluster, params) do
    with {:ok, task_arns, next} <- fetch_resource_arns(client, params),
         {:ok, task_ips} <- fetch_ipv4_addresses(client, cluster, task_arns) do
      {:ok, task_ips, next}
    end
  end

  defp fetch_resource_arns(client, params) do
    case AWS.ResourceGroupsTaggingAPI.get_resources(client, params) do
      {:ok, %{"ResourceTagMappingList" => resources} = resp, _} ->
        task_arns = Enum.map(resources, &Map.fetch!(&1, "ResourceARN"))
        pagination_token = Map.fetch!(resp, "PaginationToken")
        next = String.length(pagination_token) > 0 && pagination_token || nil

        {:ok, task_arns, next}

      {:error, error} ->
        Logger.warning("Error looking up resources by tags: #{inspect(error)}")

        :error
    end
  end

  defp fetch_ipv4_addresses(client, cluster, task_arns) do
    case AWS.ECS.describe_tasks(client, %{"cluster" => cluster, "tasks" => task_arns}) do
      {:ok, data, _} ->
        {:ok, extract_ipv4s(data)}

      {:error, error} ->
        Logger.warning("Error looking up task IPs: #{inspect(error)}")

        :error
    end
  end

  defp extract_ipv4s(data) do
    data
    |> Map.fetch!("tasks")
    |> Enum.flat_map(&Map.get(&1, "containers", []))
    |> Enum.flat_map(&Map.get(&1, "networkInterfaces", []))
    |> Enum.map(&Map.get(&1, "privateIpv4Address"))
    |> Enum.reduce([], fn
      nil, acc ->
        acc

      ip, acc ->
        {:ok, ip} = :inet.parse_address(String.to_charlist(ip))

        acc ++ [ip]
    end)
  end
end

That’s it! We got distributed Erlang without having to deal with Service Discovery.

Tests for DNSCluster.ECSResolver.

The following test uses TestServer to mock the AWS API.

Before you can run these tests you’ll need to update the aws_client function in DNSCluster.ECSResolver. I recommend setting up a helper module that can be used to generate the AWS client:

defmodule AWS.MyApp do
  @moduledoc """
  Module that contains MyApp specific helpers.
  """

  @doc """
  Creates AWS client struct.
  """
  def aws_client(region) do
    aws_credentials = aws_credentials()
    access_key = Map.fetch!(aws_credentials, :access_key_id)
    secret_key = Map.fetch!(aws_credentials, :secret_access_key)
    session_token = Map.get(aws_credentials, :token)

    access_key
    |> AWS.Client.create(secret_key, session_token, region)
    |> put_endpoint()
    |> Map.merge(%{proto: aws_proto()})
  end

  # In tests we don't want to require :aws_credentials to be running
  if Mix.env() == :test do
    defp aws_credentials do
      Application.get_env(:aws, :credentials, %{access_key_id: "test", secret_access_key: "test", token: "test"})
    end
  else
    defp aws_credentials, do: :aws_credentials.get_credentials()
  end

  # In tests we want to replace the whole endpoint while in prod/dev we just want to replace the root
  if Mix.env() == :test do
    defp put_endpoint(client), do: AWS.Client.put_endpoint(client, aws_endpoint())
  else
    defp put_endpoint(client), do: AWS.Client.put_endpoint(client, {:keep_prefixes, aws_endpoint()})
  end

  defp aws_endpoint, do: Application.get_env(:aws, :endpoint, AWS.Client.default_endpoint())

  defp aws_proto, do: Application.get_env(:aws, :proto, "https")
end

We don’t want to start :aws_credentials in tests:

# mix.exs
defmodule MyApp.MixProject do
  use Mix.Project

  # ...

  def deps do
    [
      # ...
      {:aws, "~> 1.0.2"},
      # We don't want `aws_credentials` to start in dev and test since
      # the startup will fail if credentials are not available.
      {:aws_credentials, "~> 0.3.1", runtime: Mix.env() == :prod}
    ]
  end

  #...
end

We can now test the ECSResolver (and any other functions you got that is using the aws_client helper function):

defmodule DNSCluster.ECSResolverTest do
  use MyApp.DataCase

  alias DNSCluster.ECSResolver
  alias ExUnit.CaptureLog

  @params """
    {
      "cluster": "default",
      "region": "us-east-1",
      "params": {
        "TagFilters": [
          {
            "Key": "Environment",
            "Values": ["test"]
          },
          {
            "Key": "Project",
            "Values": ["my-project"]
          }
        ]
      }
    }
    """

  @task_arns ~w(arn:aws:ecs:us-east-1:012345678910:task/1 arn:aws:ecs:us-east-1:012345678910:task/2 arn:aws:ecs:us-east-1:012345678910:task/3)

  describe "lookup/2" do
    test "with `:aaaa` type" do
      assert ECSResolver.lookup(@params, :aaaa) == []
    end

    test "with `:a` type when fetching resource arns fails" do
      start_aws_test_server()
      expect_aws_post_request(403, aws_api_error_fixture())

      assert CaptureLog.capture_log(fn ->
        assert ECSResolver.lookup(@params, :a) == []
      end) =~ "Error looking up resources by tags"
    end

    test "with `:a` type when describing tasks fails" do
      start_aws_test_server()
      expect_aws_post_request(200, aws_resources_fixture(@task_arns))
      expect_aws_post_request(403, aws_api_error_fixture())

      assert CaptureLog.capture_log(fn ->
        assert ECSResolver.lookup(@params, :a) == []
      end) =~ "Error looking up task IPs"
    end

    test "with `:a` type" do
      start_aws_test_server()

      aws_resources = aws_resources_fixture(@task_arns)

      expect_aws_post_request(200, fn params ->
        assert params["ResourceTypeFilters"] == ["ecs:task"]
        assert params["TagFilters"] == Jason.decode!(@params)["params"]["TagFilters"]

        aws_resources
      end)

      expect_aws_post_request(200, fn params ->
        assert params["cluster"] == Jason.decode!(@params)["cluster"]
        assert params["tasks"] == @task_arns

        aws_tasks_fixture(@task_arns)
      end)

      assert ECSResolver.lookup(@params, :a) == [{10, 0, 0, 0}, {10, 0, 0, 1}, {10, 0, 0, 2}]
    end

    test "with `:a` type when has nextPaginationToken" do
      {task_arns_1, task_arns_2} = Enum.split(@task_arns, 2)

      start_aws_test_server()
      expect_aws_post_request(200, %{aws_resources_fixture(task_arns_1) | "PaginationToken" => "nextPaginationToken"})
      expect_aws_post_request(200, aws_tasks_fixture(task_arns_1))

      expect_aws_post_request(200, fn params ->
        assert params["PaginationToken"] == "nextPaginationToken"

        aws_resources_fixture(task_arns_2)
      end)

      expect_aws_post_request(200, aws_tasks_fixture(task_arns_2, ip_start_at: 3))

      assert ECSResolver.lookup(@params, :a) == [{10, 0, 0, 0}, {10, 0, 0, 1}, {10, 0, 0, 3}]
    end

    defp start_aws_test_server do
      TestServer.start(http_server: {TestServer.HTTPServer.Bandit, http_options: [compress: false]})

      uri = URI.parse(TestServer.url())

      Application.put_env(:aws, :endpoint, "#{uri.host}:#{uri.port}")
      Application.put_env(:aws, :proto, uri.scheme)

      on_exit(fn ->
        Application.delete_env(:aws, :endpoint)
        Application.delete_env(:aws, :proto)
      end)
    end

    defp expect_aws_post_request(status, body_or_function) do
      TestServer.add("/", via: :post, to: fn conn ->
        {:ok, body, _} = Plug.Conn.read_body(conn)
        params = Jason.decode!(body)
        body = is_function(body_or_function) && body_or_function.(params) || body_or_function

        conn
        |> Plug.Conn.put_resp_header("Content-Type", "application/x-amz-json-1.1")
        |> Plug.Conn.send_resp(status, Jason.encode!(body))
      end)
    end

    defp aws_api_error_fixture do
      %{
        "error" => %{
          "code" => "ExpiredToken",
          "message" => "",
          "requestId" => ""
        }
      }
    end

    defp aws_resources_fixture(resource_arns) do
      %{
        "PaginationToken" => "",
        "ResourceTagMappingList" => (
          for resource_arn <- resource_arns do
            %{
                "ComplianceDetails" => %{
                    "ComplianceStatus" => true,
                    "KeysWithNoncompliantValues" => [],
                    "NoncompliantKeys" => []
                },
                "ResourceARN" => resource_arn,
                "Tags" => []
            }
          end
        )
      }
    end

    defp aws_tasks_fixture(resource_arns, opts \\ []) do
      %{
        "failures" => [],
        "tasks" => (
          for {resource_arn, n} <- Enum.with_index(resource_arns) do
            %{
              "taskArn" => resource_arn,
              "overrides" => %{
                "containerOverrides" => [
                  %{
                    "name" => "simple-app"
                  },
                  %{
                    "name" => "busybox"
                  }
                ]
              },
              "lastStatus" => "RUNNING",
              "containerInstanceArn" => "arn:aws:ecs:us-east-1:012345678910:container-instance/default/5991d8da-1d59-49d2-a31f-4230f9e73140",
              "createdAt" => 1_476_822_811.295,
              "version" => 0,
              "clusterArn" => "arn:aws:ecs:us-east-1:012345678910:cluster/default",
              "startedAt" => 1_476_822_833.998,
              "desiredStatus" => "RUNNING",
              "taskDefinitionArn" => "arn:aws:ecs:us-east-1:012345678910:task-definition/console-sample-app-dynamic-ports:1",
              "startedBy" => "ecs-svc/9223370560032507596",
              "containers" => [
                %{
                  "containerArn" => "arn:aws:ecs:us-east-1:012345678910:container/4df26bb4-f057-467b-a079-961675296e64",
                  "taskArn" => "arn:aws:ecs:us-east-1:012345678910:task/default/1dc5c17a-422b-4dc4-b493-371970c6c4d6",
                  "lastStatus" => "RUNNING",
                  "name" => "simple-app",
                  "networkBindings" => [
                    %{
                      "protocol" => "tcp",
                      "bindIP" => "0.0.0.0",
                      "containerPort" => 80,
                      "hostPort" => 32_774
                    }
                  ],
                  "networkInterfaces" => [
                      %{
                        "attachmentId" => "",
                        "ipv6Address" => "",
                        "privateIpv4Address" => "10.0.0.#{Keyword.get(opts, :ip_start_at, 0) + n}"
                      }
                  ]
                },
                %{
                  "containerArn" => "arn:aws:ecs:us-east-1:012345678910:container/e09064f7-7361-4c87-8ab9-8d073bbdbcb9",
                  "taskArn" => "arn:aws:ecs:us-east-1:012345678910:task/default/1dc5c17a-422b-4dc4-b493-371970c6c4d6",
                  "lastStatus" => "RUNNING",
                  "name" => "busybox",
                  "networkBindings" => []
                }
              ]
            }
          end
        )
      }
    end
  end
end

Hi, I'm Dan Schultzer, I write in this blog, work a lot in Elixir, maintain several open source projects, and help companies streamline their development process