Published on

CSV Export using NimbleCSV

Using NimbleCSV to export large CSV files is simple, but I encountered a few issues along the way as the size of exported data grew. I’ll go through a few iterations I saw, starting with the most naive implementation.

Let us begin with streaming data from our database:

defmodule MyApp.Resources do
  alias MyApp.{Repo, Resources.Resource}

  def stream_resources(opts \\ []) do
    Repo.stream(Resource, opts)
  end
end

With this, we’ll now set up the controller action to export our CSV:

defmodule MyAppWeb.CSVController do
  use MyAppWeb, :controller

  alias NimbleCSV.RFC4180, as: CSV
  alias MyApp.{Resources, Repo}

  def export(conn, _params) do
    conn =
      conn
      |> put_resp_content_type("text/csv")
      |> put_resp_header("content-disposition", "attachment; filename=\"export.csv\"")
      |> send_chunked(:ok)

    # Dump header
    rows = CSV.dump_to_iodata([["id", "field1", "field2"]])
    {:ok, _conn} = chunk(conn, rows)

    # Stream data
    Repo.transaction(fn ->
      rows =
        Resources.stream_resources()
        |> Enum.map(fn resource ->
          # Do something potentially expensive here
          # [id, value1, value2]
        end)
        |> CSV.dump_to_iodata()

      {:ok, _conn} = chunk(conn, rows)
    end)
  end

  conn
end

Immediately there are obvious issues here. Can you spot them?

Enum.map/2 is greedy, this means that all data will be in memory (the same is true with the rows assign). In the above the chunk sent will be ALL data! Chunking the response doesn’t make sense if we do that.

Instead of Enum.map/2 we should be using Stream.each/2 or Stream.transform/3 dumping data when transforming each row:

# Stream data
Repo.transaction(fn ->
  Resources.stream_resources()
  |> Stream.each(fn resource ->
    # Do something potentially expensive here
    # row = [id, value1, value2]

    {:ok, _conn} = chunk(conn, CSV.dump_to_iodata([row]))
  end)
  |> Stream.run()
end)

Now each row is pushed as a chunk.

DBConnection timeouts

As we began exporting larger amounts of data (or more expensive data transformations) we experienced db connection timeouts:

[error] Postgrex.Protocol (#PID<0.2180.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.1113969.0> timed out because it queued and checked out the connection for longer than 15000ms	

Since we are streaming inside a transaction, which is required for Repo.stream/2, we got a default timeout of 15 seconds. We could increase the timeout for Repo.transaction/2, but we are doing somewhat expensive data transformation for each row (in the order of 5-10+ ms), and we’re dealing with ever increasing datasets now in the hundreds of thousands of rows.

I don’t want to just add timeout: :infinity, and I don’t think we need this to run in a transaction at all. We should stop using Repo.stream/2 and instead paginate through the results:

def stream_resources(opts \\ []) do
  cursor_stream(Resource, opts)
end

defp cursor_stream(query, opts) do
  starting_after = Keyword.get(opts, :starting_after)
  limit = Keyword.get(opts, :max_rows, 500)

  starting_after
  |> Stream.unfold(fn starting_after ->
    case stream_chunk(query, limit, starting_after) do
      [] -> nil
      rows -> {rows, List.last(rows)}
    end
  end)
  |> Stream.flat_map(& &1)
end

defp stream_chunk(query, limit, starting_after) do
  query
  |> order_by(asc: :inserted_at)
  |> limit(^limit)
  |> starting_after(starting_after)
  |> Repo.all()
end

defp starting_after(query, nil), do: query

defp starting_after(query, starting_after) do
  where(query, [r], r.id != ^starting_after.id and r.inserted_at >= ^starting_after.inserted_at)
end

In the above, we got cursor pagination to work as an Elixir stream so we can use it the same way we were using the Repo.stream/2. With this, we no longer see db connection timeouts!

Cowboy timeouts

However, we began to experience another issue. For some reason, the CSV just stopped streaming data after about a minute, with the connection being closed.

This error doesn’t reveal itself at all. Everything runs as it should with no errors. The CSV file looks correct. So we should first make sure that the end-user can confirm whether they got the full dataset by adding comments to the header and footer of the CSV file:

def export(conn, _params) do
  conn =
    conn
    |> put_resp_content_type("text/csv")
    |> put_resp_header("content-disposition", "attachment; filename=\"export.csv\"")
    |> send_chunked(:ok)

  {:ok, _conn} = chunk(conn, CSV.dump_to_iodata([["# Sometimes the browser may truncate the file, verify that END OF FILE exists at the bottom"]]))

  # Dump header
  rows = CSV.dump_to_iodata([["id", "field1", "field2"]])
  {:ok, _conn} = chunk(conn, rows)

  # Stream data
  Resources.stream_resources()
  |> Stream.each(fn resource ->
    # Do something potentially expensive here
    # row = [id, value1, value2]

    {:ok, _conn} = chunk(conn, CSV.dump_to_iodata([row]))
  end)
  |> Stream.run()

  {:ok, _conn} = chunk(conn, CSV.dump_to_iodata([["# END OF FILE"]]))

  conn
end

The culprit for the streaming being halted was the idle_timeout setting in cowboy:

idle_timeout (60000)

Time in ms with no data received before Cowboy closes the connection.

We won’t receive any data from the client, though we’re still sending a bunch of data to the client! I don’t know why cowboy does this as the connection is very much still active with the data being streamed.

My solution was to switch to Bandit which I had already planned. Otherwise, I would have had to find a way to get rid of idle_timeout for the CSV export, which I’m not even sure is possible with cowboy.

Connection halt

A small detail in the above logic is that we only handle happy paths. If the end-user stops the download midway we will experience a match error, as Plug.Conn.chunk/2 will return {:error, :closed}. If we don’t want to have the process blow up, we should handle it by halting the stream (i.e. returning {:halt, acc} using Stream.transform/3). For our particular use case, it was fine that it blew up.

Addendum: Sequence data in Postgres

A small addendum to this was how I tested this in our staging environment. With our staging environment, we have no shell access so I couldn’t use the IEx console to insert a large amount of data.

I did have access to Postgres though so I ran a query with generate_series:

INSERT INTO responses(id, field1, field2, inserted_at, updated_at)
SELECT
  gen_random_uuid(),
  'value1-' || a.n,
  'email-' || a.n || '@example.com',
  '2024-01-12 03:09:53.549195'::timestamp + (a.n || ' ms')::interval,
  '2024-01-12 03:09:53.549195'::timestamp + (a.n || ' ms')::interval
FROM generate_series(1, 100000) as a(n)

Future

In the above, we’re streaming the data directly to the end-user. The proper way would instead be to push this to a background task, storing the CSV file somewhere where it can be served as static content (e.g. S3). Streaming data directly could become expensive, error-prone, and potentially a vulnerability.

We may also want to speed up the CSV export by parallelizing transformations of rows (iterate through chunks of rows instead of each row).

Hi, I'm Dan Schultzer, I write in this blog, work a lot in Elixir, maintain several open source projects, and help companies streamline their development process