Skip to content

Commit 92e7903

Browse files
committed
Stream tarball downloads to disk and add file-based unpack
Use hex_core's new request_to_file to stream tarball downloads directly to disk via hackney instead of buffering in memory. Add TmpDir GenServer for process-based temp file cleanup.
1 parent c1ea423 commit 92e7903

7 files changed

Lines changed: 221 additions & 31 deletions

File tree

lib/diff/application.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ defmodule Diff.Application do
1010

1111
# List all child processes to be supervised
1212
children = [
13+
Diff.TmpDir,
1314
goth_spec(),
1415
{Task.Supervisor, name: Diff.Tasks},
1516
{Phoenix.PubSub, name: Diff.PubSub},

lib/diff/hex/adapter.ex

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,50 @@ defmodule Diff.Hex.Adapter do
1111

1212
with {:ok, status, resp_headers, client_ref} <- resp,
1313
{:ok, resp_body} <- :hackney.body(client_ref) do
14-
# :hex_core expects headers to be a Map
1514
resp_headers = Map.new(resp_headers)
1615
{:ok, {status, resp_headers, resp_body}}
1716
end
1817
end
1918

19+
@impl true
20+
def request_to_file(method, uri, req_headers, req_body, filename, _config) do
21+
{content_type, payload} = deconstruct_body(req_body)
22+
req_headers = prepare_headers(req_headers, content_type)
23+
24+
case :hackney.request(method, uri, req_headers, payload, @opts) do
25+
{:ok, 200, resp_headers, ref} ->
26+
resp_headers = Map.new(resp_headers)
27+
28+
case File.open(filename, [:write, :binary], &stream_body_to_file(ref, &1)) do
29+
{:ok, :ok} -> {:ok, {200, resp_headers}}
30+
{:ok, {:error, _} = error} -> error
31+
{:error, reason} -> {:error, reason}
32+
end
33+
34+
{:ok, status, resp_headers, ref} ->
35+
:hackney.skip_body(ref)
36+
resp_headers = Map.new(resp_headers)
37+
{:ok, {status, resp_headers}}
38+
39+
{:error, reason} ->
40+
{:error, reason}
41+
end
42+
end
43+
44+
defp stream_body_to_file(ref, file) do
45+
case :hackney.stream_body(ref) do
46+
{:ok, data} ->
47+
:ok = IO.binwrite(file, data)
48+
stream_body_to_file(ref, file)
49+
50+
:done ->
51+
:ok
52+
53+
{:error, reason} ->
54+
{:error, reason}
55+
end
56+
end
57+
2058
defp prepare_headers(req_headers, content_type) do
2159
if content_type do
2260
req_headers

lib/diff/hex/hex.ex

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@ defmodule Diff.Hex do
2626
end
2727

2828
def get_tarball(package, version) do
29-
with {:ok, {200, _, tarball}} <- :hex_repo.get_tarball(@config, package, version) do
30-
{:ok, tarball}
31-
else
32-
{:ok, {403, _, _}} ->
29+
path = Diff.TmpDir.tmp_file("tarball")
30+
31+
case :hex_repo.get_tarball_to_file(@config, package, version, to_charlist(path)) do
32+
{:ok, {200, _headers}} ->
33+
{:ok, path}
34+
35+
{:ok, {403, _}} ->
3336
{:error, :not_found}
3437

35-
{:ok, {status, _, _}} ->
36-
Logger.error("Failed to get package versions. Status: #{status}.")
38+
{:ok, {status, _}} ->
39+
Logger.error("Failed to get tarball for package: #{package}. Status: #{status}.")
3740
{:error, :not_found}
3841

3942
{:error, reason} ->
@@ -42,10 +45,9 @@ defmodule Diff.Hex do
4245
end
4346
end
4447

45-
def unpack_tarball(tarball, path) when is_binary(path) do
46-
path = to_charlist(path)
47-
48-
with {:ok, _} <- :hex_tarball.unpack(tarball, path) do
48+
def unpack_tarball(tarball_path, output_path) do
49+
with {:ok, _} <-
50+
:hex_tarball.unpack({:file, to_charlist(tarball_path)}, to_charlist(output_path)) do
4951
:ok
5052
end
5153
end
@@ -73,12 +75,11 @@ defmodule Diff.Hex do
7375
end
7476

7577
def diff(package, from, to) do
76-
path_from = tmp_path("package-#{package}-#{from}-")
77-
path_to = tmp_path("package-#{package}-#{to}-")
78-
7978
with {:ok, tarball_from} <- get_tarball(package, from),
79+
path_from = Diff.TmpDir.tmp_dir("package-#{package}-#{from}"),
8080
:ok <- unpack_tarball(tarball_from, path_from),
8181
{:ok, tarball_to} <- get_tarball(package, to),
82+
path_to = Diff.TmpDir.tmp_dir("package-#{package}-#{to}"),
8283
:ok <- unpack_tarball(tarball_to, path_to) do
8384
from_files = tree_files(path_from)
8485
to_files = tree_files(path_to)
@@ -92,8 +93,7 @@ defmodule Diff.Hex do
9293
all_files = (from_files ++ to_files) |> Enum.uniq() |> Enum.sort()
9394

9495
stream =
95-
all_files
96-
|> Stream.flat_map(fn file ->
96+
Stream.flat_map(all_files, fn file ->
9797
{path_old, path_new} =
9898
cond do
9999
file in new_files -> {"/dev/null", Path.join(path_to, file)}
@@ -120,14 +120,6 @@ defmodule Diff.Hex do
120120
[{:error, {error, reason}}]
121121
end
122122
end)
123-
|> Stream.transform(
124-
fn -> :ok end,
125-
fn elem, :ok -> {[elem], :ok} end,
126-
fn :ok ->
127-
File.rm_rf(path_from)
128-
File.rm_rf(path_to)
129-
end
130-
)
131123

132124
{:ok, stream}
133125
else
@@ -166,9 +158,4 @@ defmodule Diff.Hex do
166158
|> Enum.filter(&File.regular?(&1, raw: true))
167159
|> Enum.map(&Path.relative_to(&1, directory))
168160
end
169-
170-
defp tmp_path(prefix) do
171-
random_string = Base.encode16(:crypto.strong_rand_bytes(4))
172-
Path.join([System.tmp_dir!(), "diff", prefix <> random_string])
173-
end
174161
end

lib/diff/tmp_dir.ex

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
defmodule Diff.TmpDir do
2+
use GenServer
3+
4+
@table __MODULE__
5+
6+
def start_link(opts \\ []) do
7+
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
8+
end
9+
10+
def tmp_file(prefix) do
11+
path = path(prefix)
12+
File.touch!(path)
13+
track(path)
14+
path
15+
end
16+
17+
def tmp_dir(prefix) do
18+
path = path(prefix)
19+
File.mkdir_p!(path)
20+
track(path)
21+
path
22+
end
23+
24+
defp path(prefix) do
25+
random = Base.encode16(:crypto.strong_rand_bytes(4))
26+
Path.join(base_dir(), prefix <> "-" <> random)
27+
end
28+
29+
defp base_dir() do
30+
dir = Path.join(System.tmp_dir!(), "diff")
31+
File.mkdir_p!(dir)
32+
dir
33+
end
34+
35+
defp track(path) do
36+
pid = self()
37+
:ets.insert(@table, {pid, path})
38+
GenServer.cast(__MODULE__, {:monitor, pid})
39+
end
40+
41+
@impl true
42+
def init(_opts) do
43+
Process.flag(:trap_exit, true)
44+
:ets.new(@table, [:named_table, :duplicate_bag, :public])
45+
{:ok, %{monitors: MapSet.new()}}
46+
end
47+
48+
@impl true
49+
def handle_cast({:monitor, pid}, state) do
50+
if pid in state.monitors do
51+
{:noreply, state}
52+
else
53+
Process.monitor(pid)
54+
{:noreply, %{state | monitors: MapSet.put(state.monitors, pid)}}
55+
end
56+
end
57+
58+
@impl true
59+
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
60+
cleanup_pid(pid)
61+
{:noreply, %{state | monitors: MapSet.delete(state.monitors, pid)}}
62+
end
63+
64+
@impl true
65+
def terminate(_reason, _state) do
66+
:ets.foldl(
67+
fn {_pid, path}, :ok ->
68+
File.rm_rf(path)
69+
:ok
70+
end,
71+
:ok,
72+
@table
73+
)
74+
end
75+
76+
defp cleanup_pid(pid) do
77+
entries = :ets.lookup(@table, pid)
78+
79+
Enum.each(entries, fn {_pid, path} ->
80+
File.rm_rf(path)
81+
end)
82+
83+
:ets.delete(@table, pid)
84+
end
85+
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ defmodule Diff.MixProject do
3838
{:git_diff, github: "ericmj/git_diff", branch: "ericmj/fix-modes"},
3939
{:goth, "~> 1.0"},
4040
{:hackney, "~> 1.15"},
41-
{:hex_core, "~> 0.11.0"},
41+
{:hex_core, "~> 0.15.0"},
4242
{:jason, "~> 1.0"},
4343
{:logster, "~> 1.0.0"},
4444
{:mox, "~> 1.0", only: :test},

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"git_diff": {:git, "https://github.com/ericmj/git_diff.git", "e4ee06cfd139b8a911d07e42d0ff3b15eee2b740", [branch: "ericmj/fix-modes"]},
1515
"goth": {:hex, :goth, "1.4.3", "80e86225ae174844e6a77b61982fafadfc715277db465e0956348d8bdd56b231", [:mix], [{:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "34e2012ed1af2fe2446083da60a988fd9829943d30e4447832646c3a6863a7e6"},
1616
"hackney": {:hex, :hackney, "1.25.0", "390e9b83f31e5b325b9f43b76e1a785cbdb69b5b6cd4e079aa67835ded046867", [:rebar3], [{:certifi, "~> 2.15.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.4", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "7209bfd75fd1f42467211ff8f59ea74d6f2a9e81cbcee95a56711ee79fd6b1d4"},
17-
"hex_core": {:hex, :hex_core, "0.11.0", "d1c6bbf2a4ee6b5f002bec6fa52b5080c53c8b63b7caf6eb88b943687547bff4", [:rebar3], [], "hexpm", "707893677a425491962a2db522f1d2b1f85f97ea27418b06f7929f1d30cde0b0"},
17+
"hex_core": {:hex, :hex_core, "0.15.0", "8eadc0ccb08e3742f2313073d04f39eaa7904617329039e9d3c402f5dd227673", [:rebar3], [], "hexpm", "c2093764c7af8ef0818c104fa141eba431e7be93f8374638c45c7037b26a52f8"},
1818
"hpax": {:hex, :hpax, "1.0.1", "c857057f89e8bd71d97d9042e009df2a42705d6d690d54eca84c8b29af0787b0", [:mix], [], "hexpm", "4e2d5a4f76ae1e3048f35ae7adb1641c36265510a2d4638157fbcb53dda38445"},
1919
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
2020
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},

test/diff/tmp_dir_test.exs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
defmodule Diff.TmpDirTest do
2+
use ExUnit.Case, async: true
3+
4+
test "tmp_file/1 creates a file" do
5+
path = Diff.TmpDir.tmp_file("test")
6+
assert File.exists?(path)
7+
assert File.regular?(path)
8+
end
9+
10+
test "tmp_dir/1 creates a directory" do
11+
path = Diff.TmpDir.tmp_dir("test")
12+
assert File.dir?(path)
13+
end
14+
15+
test "cleanup on normal process exit" do
16+
test_pid = self()
17+
18+
Task.start(fn ->
19+
file = Diff.TmpDir.tmp_file("test")
20+
dir = Diff.TmpDir.tmp_dir("test")
21+
send(test_pid, {:paths, file, dir})
22+
end)
23+
24+
assert_receive {:paths, file, dir}
25+
Process.sleep(100)
26+
27+
refute File.exists?(file)
28+
refute File.exists?(dir)
29+
end
30+
31+
@tag :capture_log
32+
test "cleanup on process crash" do
33+
test_pid = self()
34+
35+
Task.start(fn ->
36+
file = Diff.TmpDir.tmp_file("test")
37+
dir = Diff.TmpDir.tmp_dir("test")
38+
send(test_pid, {:paths, file, dir})
39+
raise "crash"
40+
end)
41+
42+
assert_receive {:paths, file, dir}
43+
Process.sleep(100)
44+
45+
refute File.exists?(file)
46+
refute File.exists?(dir)
47+
end
48+
49+
test "multiple paths for one process" do
50+
test_pid = self()
51+
52+
Task.start(fn ->
53+
paths =
54+
for i <- 1..5 do
55+
file = Diff.TmpDir.tmp_file("test-#{i}")
56+
dir = Diff.TmpDir.tmp_dir("test-#{i}")
57+
{file, dir}
58+
end
59+
60+
send(test_pid, {:paths, paths})
61+
end)
62+
63+
assert_receive {:paths, paths}
64+
Process.sleep(100)
65+
66+
for {file, dir} <- paths do
67+
refute File.exists?(file)
68+
refute File.exists?(dir)
69+
end
70+
end
71+
72+
test "paths persist while process is alive" do
73+
file = Diff.TmpDir.tmp_file("test")
74+
dir = Diff.TmpDir.tmp_dir("test")
75+
76+
assert File.exists?(file)
77+
assert File.dir?(dir)
78+
end
79+
end

0 commit comments

Comments
 (0)