Skip to content

Commit 58c787b

Browse files
authored
Stream tarball downloads to disk and add file-based unpack (#114)
* Stream tarball downloads to disk and add file-based unpack Use hex_core's new request_to_file to stream tarball downloads directly to disk via hackney instead of buffering in memory. Add TmpDir GenServer for process-based temp file cleanup. * Configure hex_core repo_url and repo_public_key from application config The hex_core config was hardcoded to default_config() which always points to production repo.hex.pm. Read repo_url and repo_public_key from application config so staging uses the correct repo and key.
1 parent c1ea423 commit 58c787b

9 files changed

Lines changed: 242 additions & 40 deletions

File tree

config/config.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import Config
99

1010
config :diff,
1111
cache_version: 2,
12-
package_store_impl: Diff.Package.DefaultStore
12+
package_store_impl: Diff.Package.DefaultStore,
13+
repo_url: "https://repo.hex.pm"
1314

1415
# Configures the endpoint
1516
config :diff, DiffWeb.Endpoint,

config/runtime.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ if config_env() == :prod do
55
host: System.fetch_env!("DIFF_HOST"),
66
hexpm_host: System.fetch_env!("DIFF_HEXPM_HOST"),
77
cache_version: String.to_integer(System.fetch_env!("DIFF_CACHE_VERSION")),
8-
bucket: System.fetch_env!("DIFF_BUCKET")
8+
bucket: System.fetch_env!("DIFF_BUCKET"),
9+
repo_url: System.fetch_env!("DIFF_REPO_URL"),
10+
repo_public_key: System.fetch_env!("DIFF_REPO_PUBLIC_KEY")
911

1012
config :diff, DiffWeb.Endpoint,
1113
http: [port: String.to_integer(System.fetch_env!("DIFF_PORT"))],

lib/diff/application.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ defmodule Diff.Application do
1010

1111
# List all child processes to be supervised
1212
children = [
13+
Diff.TmpDir,
1314
goth_spec(),
1415
{Task.Supervisor, name: Diff.Tasks},
1516
{Phoenix.PubSub, name: Diff.PubSub},

lib/diff/hex/adapter.ex

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,50 @@ defmodule Diff.Hex.Adapter do
1111

1212
with {:ok, status, resp_headers, client_ref} <- resp,
1313
{:ok, resp_body} <- :hackney.body(client_ref) do
14-
# :hex_core expects headers to be a Map
1514
resp_headers = Map.new(resp_headers)
1615
{:ok, {status, resp_headers, resp_body}}
1716
end
1817
end
1918

19+
@impl true
20+
def request_to_file(method, uri, req_headers, req_body, filename, _config) do
21+
{content_type, payload} = deconstruct_body(req_body)
22+
req_headers = prepare_headers(req_headers, content_type)
23+
24+
case :hackney.request(method, uri, req_headers, payload, @opts) do
25+
{:ok, 200, resp_headers, ref} ->
26+
resp_headers = Map.new(resp_headers)
27+
28+
case File.open(filename, [:write, :binary], &stream_body_to_file(ref, &1)) do
29+
{:ok, :ok} -> {:ok, {200, resp_headers}}
30+
{:ok, {:error, _} = error} -> error
31+
{:error, reason} -> {:error, reason}
32+
end
33+
34+
{:ok, status, resp_headers, ref} ->
35+
:hackney.skip_body(ref)
36+
resp_headers = Map.new(resp_headers)
37+
{:ok, {status, resp_headers}}
38+
39+
{:error, reason} ->
40+
{:error, reason}
41+
end
42+
end
43+
44+
defp stream_body_to_file(ref, file) do
45+
case :hackney.stream_body(ref) do
46+
{:ok, data} ->
47+
:ok = IO.binwrite(file, data)
48+
stream_body_to_file(ref, file)
49+
50+
:done ->
51+
:ok
52+
53+
{:error, reason} ->
54+
{:error, reason}
55+
end
56+
end
57+
2058
defp prepare_headers(req_headers, content_type) do
2159
if content_type do
2260
req_headers

lib/diff/hex/hex.ex

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,27 @@
11
defmodule Diff.Hex do
22
@behaviour Diff.Hex.Behaviour
33

4-
@config %{
5-
:hex_core.default_config()
6-
| http_adapter: {Diff.Hex.Adapter, %{}},
7-
http_user_agent_fragment: "hexpm_diff"
8-
}
4+
defp config() do
5+
config = %{
6+
:hex_core.default_config()
7+
| http_adapter: {Diff.Hex.Adapter, %{}},
8+
http_user_agent_fragment: "hexpm_diff",
9+
repo_url: Application.fetch_env!(:diff, :repo_url)
10+
}
11+
12+
if repo_public_key = Application.get_env(:diff, :repo_public_key) do
13+
%{config | repo_public_key: repo_public_key}
14+
else
15+
%{config | repo_verify: false}
16+
end
17+
end
918

1019
@max_file_size 1024 * 1024
1120

1221
require Logger
1322

1423
def get_versions() do
15-
with {:ok, {200, _, results}} <- :hex_repo.get_versions(@config) do
24+
with {:ok, {200, _, results}} <- :hex_repo.get_versions(config()) do
1625
{:ok, results}
1726
else
1827
{:ok, {status, _, _}} ->
@@ -26,14 +35,17 @@ defmodule Diff.Hex do
2635
end
2736

2837
def get_tarball(package, version) do
29-
with {:ok, {200, _, tarball}} <- :hex_repo.get_tarball(@config, package, version) do
30-
{:ok, tarball}
31-
else
32-
{:ok, {403, _, _}} ->
38+
path = Diff.TmpDir.tmp_file("tarball")
39+
40+
case :hex_repo.get_tarball_to_file(config(), package, version, to_charlist(path)) do
41+
{:ok, {200, _headers}} ->
42+
{:ok, path}
43+
44+
{:ok, {403, _}} ->
3345
{:error, :not_found}
3446

35-
{:ok, {status, _, _}} ->
36-
Logger.error("Failed to get package versions. Status: #{status}.")
47+
{:ok, {status, _}} ->
48+
Logger.error("Failed to get tarball for package: #{package}. Status: #{status}.")
3749
{:error, :not_found}
3850

3951
{:error, reason} ->
@@ -42,16 +54,15 @@ defmodule Diff.Hex do
4254
end
4355
end
4456

45-
def unpack_tarball(tarball, path) when is_binary(path) do
46-
path = to_charlist(path)
47-
48-
with {:ok, _} <- :hex_tarball.unpack(tarball, path) do
57+
def unpack_tarball(tarball_path, output_path) do
58+
with {:ok, _} <-
59+
:hex_tarball.unpack({:file, to_charlist(tarball_path)}, to_charlist(output_path)) do
4960
:ok
5061
end
5162
end
5263

5364
def get_checksums(package, versions) do
54-
with {:ok, {200, _, releases}} <- :hex_repo.get_package(@config, package) do
65+
with {:ok, {200, _, releases}} <- :hex_repo.get_package(config(), package) do
5566
checksums =
5667
for release <- releases.releases, release.version in versions do
5768
release.outer_checksum
@@ -73,12 +84,11 @@ defmodule Diff.Hex do
7384
end
7485

7586
def diff(package, from, to) do
76-
path_from = tmp_path("package-#{package}-#{from}-")
77-
path_to = tmp_path("package-#{package}-#{to}-")
78-
7987
with {:ok, tarball_from} <- get_tarball(package, from),
88+
path_from = Diff.TmpDir.tmp_dir("package-#{package}-#{from}"),
8089
:ok <- unpack_tarball(tarball_from, path_from),
8190
{:ok, tarball_to} <- get_tarball(package, to),
91+
path_to = Diff.TmpDir.tmp_dir("package-#{package}-#{to}"),
8292
:ok <- unpack_tarball(tarball_to, path_to) do
8393
from_files = tree_files(path_from)
8494
to_files = tree_files(path_to)
@@ -92,8 +102,7 @@ defmodule Diff.Hex do
92102
all_files = (from_files ++ to_files) |> Enum.uniq() |> Enum.sort()
93103

94104
stream =
95-
all_files
96-
|> Stream.flat_map(fn file ->
105+
Stream.flat_map(all_files, fn file ->
97106
{path_old, path_new} =
98107
cond do
99108
file in new_files -> {"/dev/null", Path.join(path_to, file)}
@@ -120,14 +129,6 @@ defmodule Diff.Hex do
120129
[{:error, {error, reason}}]
121130
end
122131
end)
123-
|> Stream.transform(
124-
fn -> :ok end,
125-
fn elem, :ok -> {[elem], :ok} end,
126-
fn :ok ->
127-
File.rm_rf(path_from)
128-
File.rm_rf(path_to)
129-
end
130-
)
131132

132133
{:ok, stream}
133134
else
@@ -166,9 +167,4 @@ defmodule Diff.Hex do
166167
|> Enum.filter(&File.regular?(&1, raw: true))
167168
|> Enum.map(&Path.relative_to(&1, directory))
168169
end
169-
170-
defp tmp_path(prefix) do
171-
random_string = Base.encode16(:crypto.strong_rand_bytes(4))
172-
Path.join([System.tmp_dir!(), "diff", prefix <> random_string])
173-
end
174170
end

lib/diff/tmp_dir.ex

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
defmodule Diff.TmpDir do
2+
use GenServer
3+
4+
@table __MODULE__
5+
6+
def start_link(opts \\ []) do
7+
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
8+
end
9+
10+
def tmp_file(prefix) do
11+
path = path(prefix)
12+
File.touch!(path)
13+
track(path)
14+
path
15+
end
16+
17+
def tmp_dir(prefix) do
18+
path = path(prefix)
19+
File.mkdir_p!(path)
20+
track(path)
21+
path
22+
end
23+
24+
defp path(prefix) do
25+
random = Base.encode16(:crypto.strong_rand_bytes(4))
26+
Path.join(base_dir(), prefix <> "-" <> random)
27+
end
28+
29+
defp base_dir() do
30+
dir = Path.join(System.tmp_dir!(), "diff")
31+
File.mkdir_p!(dir)
32+
dir
33+
end
34+
35+
defp track(path) do
36+
pid = self()
37+
:ets.insert(@table, {pid, path})
38+
GenServer.cast(__MODULE__, {:monitor, pid})
39+
end
40+
41+
@impl true
42+
def init(_opts) do
43+
Process.flag(:trap_exit, true)
44+
:ets.new(@table, [:named_table, :duplicate_bag, :public])
45+
{:ok, %{monitors: MapSet.new()}}
46+
end
47+
48+
@impl true
49+
def handle_cast({:monitor, pid}, state) do
50+
if pid in state.monitors do
51+
{:noreply, state}
52+
else
53+
Process.monitor(pid)
54+
{:noreply, %{state | monitors: MapSet.put(state.monitors, pid)}}
55+
end
56+
end
57+
58+
@impl true
59+
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
60+
cleanup_pid(pid)
61+
{:noreply, %{state | monitors: MapSet.delete(state.monitors, pid)}}
62+
end
63+
64+
@impl true
65+
def terminate(_reason, _state) do
66+
:ets.foldl(
67+
fn {_pid, path}, :ok ->
68+
File.rm_rf(path)
69+
:ok
70+
end,
71+
:ok,
72+
@table
73+
)
74+
end
75+
76+
defp cleanup_pid(pid) do
77+
entries = :ets.lookup(@table, pid)
78+
79+
Enum.each(entries, fn {_pid, path} ->
80+
File.rm_rf(path)
81+
end)
82+
83+
:ets.delete(@table, pid)
84+
end
85+
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ defmodule Diff.MixProject do
3838
{:git_diff, github: "ericmj/git_diff", branch: "ericmj/fix-modes"},
3939
{:goth, "~> 1.0"},
4040
{:hackney, "~> 1.15"},
41-
{:hex_core, "~> 0.11.0"},
41+
{:hex_core, "~> 0.15.0"},
4242
{:jason, "~> 1.0"},
4343
{:logster, "~> 1.0.0"},
4444
{:mox, "~> 1.0", only: :test},

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"git_diff": {:git, "https://github.com/ericmj/git_diff.git", "e4ee06cfd139b8a911d07e42d0ff3b15eee2b740", [branch: "ericmj/fix-modes"]},
1515
"goth": {:hex, :goth, "1.4.3", "80e86225ae174844e6a77b61982fafadfc715277db465e0956348d8bdd56b231", [:mix], [{:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:jose, "~> 1.11", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm", "34e2012ed1af2fe2446083da60a988fd9829943d30e4447832646c3a6863a7e6"},
1616
"hackney": {:hex, :hackney, "1.25.0", "390e9b83f31e5b325b9f43b76e1a785cbdb69b5b6cd4e079aa67835ded046867", [:rebar3], [{:certifi, "~> 2.15.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.4", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "7209bfd75fd1f42467211ff8f59ea74d6f2a9e81cbcee95a56711ee79fd6b1d4"},
17-
"hex_core": {:hex, :hex_core, "0.11.0", "d1c6bbf2a4ee6b5f002bec6fa52b5080c53c8b63b7caf6eb88b943687547bff4", [:rebar3], [], "hexpm", "707893677a425491962a2db522f1d2b1f85f97ea27418b06f7929f1d30cde0b0"},
17+
"hex_core": {:hex, :hex_core, "0.15.0", "8eadc0ccb08e3742f2313073d04f39eaa7904617329039e9d3c402f5dd227673", [:rebar3], [], "hexpm", "c2093764c7af8ef0818c104fa141eba431e7be93f8374638c45c7037b26a52f8"},
1818
"hpax": {:hex, :hpax, "1.0.1", "c857057f89e8bd71d97d9042e009df2a42705d6d690d54eca84c8b29af0787b0", [:mix], [], "hexpm", "4e2d5a4f76ae1e3048f35ae7adb1641c36265510a2d4638157fbcb53dda38445"},
1919
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
2020
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},

test/diff/tmp_dir_test.exs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
defmodule Diff.TmpDirTest do
2+
use ExUnit.Case, async: true
3+
4+
test "tmp_file/1 creates a file" do
5+
path = Diff.TmpDir.tmp_file("test")
6+
assert File.exists?(path)
7+
assert File.regular?(path)
8+
end
9+
10+
test "tmp_dir/1 creates a directory" do
11+
path = Diff.TmpDir.tmp_dir("test")
12+
assert File.dir?(path)
13+
end
14+
15+
test "cleanup on normal process exit" do
16+
test_pid = self()
17+
18+
Task.start(fn ->
19+
file = Diff.TmpDir.tmp_file("test")
20+
dir = Diff.TmpDir.tmp_dir("test")
21+
send(test_pid, {:paths, file, dir})
22+
end)
23+
24+
assert_receive {:paths, file, dir}
25+
Process.sleep(100)
26+
27+
refute File.exists?(file)
28+
refute File.exists?(dir)
29+
end
30+
31+
@tag :capture_log
32+
test "cleanup on process crash" do
33+
test_pid = self()
34+
35+
Task.start(fn ->
36+
file = Diff.TmpDir.tmp_file("test")
37+
dir = Diff.TmpDir.tmp_dir("test")
38+
send(test_pid, {:paths, file, dir})
39+
raise "crash"
40+
end)
41+
42+
assert_receive {:paths, file, dir}
43+
Process.sleep(100)
44+
45+
refute File.exists?(file)
46+
refute File.exists?(dir)
47+
end
48+
49+
test "multiple paths for one process" do
50+
test_pid = self()
51+
52+
Task.start(fn ->
53+
paths =
54+
for i <- 1..5 do
55+
file = Diff.TmpDir.tmp_file("test-#{i}")
56+
dir = Diff.TmpDir.tmp_dir("test-#{i}")
57+
{file, dir}
58+
end
59+
60+
send(test_pid, {:paths, paths})
61+
end)
62+
63+
assert_receive {:paths, paths}
64+
Process.sleep(100)
65+
66+
for {file, dir} <- paths do
67+
refute File.exists?(file)
68+
refute File.exists?(dir)
69+
end
70+
end
71+
72+
test "paths persist while process is alive" do
73+
file = Diff.TmpDir.tmp_file("test")
74+
dir = Diff.TmpDir.tmp_dir("test")
75+
76+
assert File.exists?(file)
77+
assert File.dir?(dir)
78+
end
79+
end

0 commit comments

Comments
 (0)