We can't find the internet
Attempting to reconnect
Something went wrong!
Hang in there while we get back on track
Using NimbleCSV
to export large CSV files is simple, but I encountered a few issues along the way as the size of exported data grew. I’ll go through a few iterations I saw, starting with the most naive implementation.
Let us begin with streaming data from our database:
defmodule MyApp.Resources do
alias MyApp.{Repo, Resources.Resource}
def stream_resources(opts \\ []) do
Repo.stream(Resource, opts)
end
end
With this, we’ll now set up the controller action to export our CSV:
defmodule MyAppWeb.CSVController do
use MyAppWeb, :controller
alias NimbleCSV.RFC4180, as: CSV
alias MyApp.{Resources, Repo}
def export(conn, _params) do
conn =
conn
|> put_resp_content_type("text/csv")
|> put_resp_header("content-disposition", "attachment; filename=\"export.csv\"")
|> send_chunked(:ok)
# Dump header
rows = CSV.dump_to_iodata([["id", "field1", "field2"]])
{:ok, _conn} = chunk(conn, rows)
# Stream data
Repo.transaction(fn ->
rows =
Resources.stream_resources()
|> Enum.map(fn resource ->
# Do something potentially expensive here
# [id, value1, value2]
end)
|> CSV.dump_to_iodata()
{:ok, _conn} = chunk(conn, rows)
end)
end
conn
end
Immediately there are obvious issues here. Can you spot them?
Enum.map/2
is greedy, this means that all data will be in memory (the same is true with the rows
assign). In the above the chunk sent will be ALL data! Chunking the response doesn’t make sense if we do that.
Instead of Enum.map/2
we should be using Stream.each/2
or Stream.transform/3
dumping data when transforming each row:
# Stream data
Repo.transaction(fn ->
Resources.stream_resources()
|> Stream.each(fn resource ->
# Do something potentially expensive here
# row = [id, value1, value2]
{:ok, _conn} = chunk(conn, CSV.dump_to_iodata([row]))
end)
|> Stream.run()
end)
Now each row is pushed as a chunk.
DBConnection timeouts
As we began exporting larger amounts of data (or more expensive data transformations) we experienced db connection timeouts:
[error] Postgrex.Protocol (#PID<0.2180.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.1113969.0> timed out because it queued and checked out the connection for longer than 15000ms
Since we are streaming inside a transaction, which is required for Repo.stream/2
, we got a default timeout of 15 seconds. We could increase the timeout for Repo.transaction/2
, but we are doing somewhat expensive data transformation for each row (in the order of 5-10+ ms), and we’re dealing with ever increasing datasets now in the hundreds of thousands of rows.
I don’t want to just add timeout: :infinity
, and I don’t think we need this to run in a transaction at all. We should stop using Repo.stream/2
and instead paginate through the results:
def stream_resources(opts \\ []) do
cursor_stream(Resource, opts)
end
defp cursor_stream(query, opts) do
starting_after = Keyword.get(opts, :starting_after)
limit = Keyword.get(opts, :max_rows, 500)
starting_after
|> Stream.unfold(fn starting_after ->
case stream_chunk(query, limit, starting_after) do
[] -> nil
rows -> {rows, List.last(rows)}
end
end)
|> Stream.flat_map(& &1)
end
defp stream_chunk(query, limit, starting_after) do
query
|> order_by(asc: :inserted_at)
|> limit(^limit)
|> starting_after(starting_after)
|> Repo.all()
end
defp starting_after(query, nil), do: query
defp starting_after(query, starting_after) do
where(query, [r], r.id != ^starting_after.id and r.inserted_at >= ^starting_after.inserted_at)
end
In the above, we got cursor pagination to work as an Elixir stream so we can use it the same way we were using the Repo.stream/2
. With this, we no longer see db connection timeouts!
Cowboy timeouts
However, we began to experience another issue. For some reason, the CSV just stopped streaming data after about a minute, with the connection being closed.
This error doesn’t reveal itself at all. Everything runs as it should with no errors. The CSV file looks correct. So we should first make sure that the end-user can confirm whether they got the full dataset by adding comments to the header and footer of the CSV file:
def export(conn, _params) do
conn =
conn
|> put_resp_content_type("text/csv")
|> put_resp_header("content-disposition", "attachment; filename=\"export.csv\"")
|> send_chunked(:ok)
{:ok, _conn} = chunk(conn, CSV.dump_to_iodata([["# Sometimes the browser may truncate the file, verify that END OF FILE exists at the bottom"]]))
# Dump header
rows = CSV.dump_to_iodata([["id", "field1", "field2"]])
{:ok, _conn} = chunk(conn, rows)
# Stream data
Resources.stream_resources()
|> Stream.each(fn resource ->
# Do something potentially expensive here
# row = [id, value1, value2]
{:ok, _conn} = chunk(conn, CSV.dump_to_iodata([row]))
end)
|> Stream.run()
{:ok, _conn} = chunk(conn, CSV.dump_to_iodata([["# END OF FILE"]]))
conn
end
The culprit for the streaming being halted was the idle_timeout
setting in cowboy
:
idle_timeout (60000)
Time in ms with no data received before Cowboy closes the connection.
We won’t receive any data from the client, though we’re still sending a bunch of data to the client! I don’t know why cowboy
does this as the connection is very much still active with the data being streamed.
My solution was to switch to Bandit
which I had already planned. Otherwise, I would have had to find a way to get rid of idle_timeout
for the CSV export, which I’m not even sure is possible with cowboy
.
Connection halt
A small detail in the above logic is that we only handle happy paths. If the end-user stops the download midway we will experience a match error, as Plug.Conn.chunk/2
will return {:error, :closed}
. If we don’t want to have the process blow up, we should handle it by halting the stream (i.e. returning {:halt, acc}
using Stream.transform/3
). For our particular use case, it was fine that it blew up.
Addendum: Sequence data in Postgres
A small addendum to this was how I tested this in our staging environment. With our staging environment, we have no shell access so I couldn’t use the IEx console to insert a large amount of data.
I did have access to Postgres though so I ran a query with generate_series
:
INSERT INTO responses(id, field1, field2, inserted_at, updated_at)
SELECT
gen_random_uuid(),
'value1-' || a.n,
'email-' || a.n || '@example.com',
'2024-01-12 03:09:53.549195'::timestamp + (a.n || ' ms')::interval,
'2024-01-12 03:09:53.549195'::timestamp + (a.n || ' ms')::interval
FROM generate_series(1, 100000) as a(n)
Future
In the above, we’re streaming the data directly to the end-user. The proper way would instead be to push this to a background task, storing the CSV file somewhere where it can be served as static content (e.g. S3). Streaming data directly could become expensive, error-prone, and potentially a vulnerability.
We may also want to speed up the CSV export by parallelizing transformations of rows (iterate through chunks of rows instead of each row).
Hi, I'm Dan Schultzer, I write in this blog, work a lot in Elixir, maintain several open source projects, and help companies streamline their development process