Skip to content

Commit 0cda79e

Browse files
authored
Clean up temp files after each queue message (#91)
TmpDir only cleans up on process exit, but Broadway processors are long-lived. Accumulated temp files cause page cache and kernel slab memory to grow, counted against the container's cgroup memory limit, eventually triggering OOM kills. Add TmpDir.cleanup/0 and call it after each message is processed.
1 parent f4bda55 commit 0cda79e

3 files changed

Lines changed: 95 additions & 41 deletions

File tree

lib/hexdocs/queue.ex

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -58,26 +58,30 @@ defmodule Hexdocs.Queue do
5858

5959
case key_components(key) do
6060
{:ok, repository, package, version} ->
61-
tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball")
62-
63-
case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do
64-
:ok ->
65-
case Hexdocs.Tar.unpack_to_dir({:file, tarball_path},
66-
repository: repository,
67-
package: package,
68-
version: version
69-
) do
70-
{:ok, _dir, files} ->
71-
update_index_sitemap(repository, key)
72-
update_package_sitemap(repository, key, package, files)
73-
Logger.info("#{key}: done")
74-
75-
{:error, reason} ->
76-
Logger.error("Failed unpack #{repository}/#{package} #{version}: #{reason}")
77-
end
78-
79-
nil ->
80-
Logger.error("#{key}: package not found in store")
61+
try do
62+
tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball")
63+
64+
case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do
65+
:ok ->
66+
case Hexdocs.Tar.unpack_to_dir({:file, tarball_path},
67+
repository: repository,
68+
package: package,
69+
version: version
70+
) do
71+
{:ok, _dir, files} ->
72+
update_index_sitemap(repository, key)
73+
update_package_sitemap(repository, key, package, files)
74+
Logger.info("#{key}: done")
75+
76+
{:error, reason} ->
77+
Logger.error("Failed unpack #{repository}/#{package} #{version}: #{reason}")
78+
end
79+
80+
nil ->
81+
Logger.error("#{key}: package not found in store")
82+
end
83+
after
84+
Hexdocs.TmpDir.cleanup()
8185
end
8286

8387
:error ->
@@ -114,20 +118,24 @@ defmodule Hexdocs.Queue do
114118
version: version
115119
})
116120

117-
tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball")
121+
try do
122+
tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball")
118123

119-
case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do
120-
:ok ->
121-
case type do
122-
:upload ->
123-
process_upload(key, repository, package, version, {:file, tarball_path}, start)
124+
case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do
125+
:ok ->
126+
case type do
127+
:upload ->
128+
process_upload(key, repository, package, version, {:file, tarball_path}, start)
124129

125-
:search ->
126-
process_search(key, repository, package, version, {:file, tarball_path}, start)
127-
end
130+
:search ->
131+
process_search(key, repository, package, version, {:file, tarball_path}, start)
132+
end
128133

129-
nil ->
130-
Logger.error("#{log_prefix} #{key}: package not found in store")
134+
nil ->
135+
Logger.error("#{log_prefix} #{key}: package not found in store")
136+
end
137+
after
138+
Hexdocs.TmpDir.cleanup()
131139
end
132140

133141
:error ->
@@ -254,18 +262,22 @@ defmodule Hexdocs.Queue do
254262
version: version
255263
})
256264

257-
version = Version.parse!(version)
258-
all_versions = all_versions(repository, package)
259-
Hexdocs.Bucket.delete(repository, package, version, all_versions)
260-
update_index_sitemap(repository, key)
265+
try do
266+
version = Version.parse!(version)
267+
all_versions = all_versions(repository, package)
268+
Hexdocs.Bucket.delete(repository, package, version, all_versions)
269+
update_index_sitemap(repository, key)
261270

262-
if repository == "hexpm" do
263-
Hexdocs.Search.delete(package, version)
264-
end
271+
if repository == "hexpm" do
272+
Hexdocs.Search.delete(package, version)
273+
end
265274

266-
elapsed = System.os_time(:millisecond) - start
267-
Logger.info("FINISHED DELETING DOCS #{key} #{elapsed}ms")
268-
:ok
275+
elapsed = System.os_time(:millisecond) - start
276+
Logger.info("FINISHED DELETING DOCS #{key} #{elapsed}ms")
277+
:ok
278+
after
279+
Hexdocs.TmpDir.cleanup()
280+
end
269281

270282
{:ok, _repository, _package, _version} ->
271283
:skip

lib/hexdocs/tmp_dir.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,17 @@ defmodule Hexdocs.TmpDir do
3232
dir
3333
end
3434

35+
def cleanup do
36+
pid = self()
37+
entries = :ets.lookup(@table, pid)
38+
39+
Enum.each(entries, fn {_pid, path} ->
40+
File.rm_rf(path)
41+
end)
42+
43+
:ets.delete(@table, pid)
44+
end
45+
3546
def await_cleanup(pid) do
3647
GenServer.call(__MODULE__, {:await_cleanup, pid}, 5000)
3748
end

test/hexdocs/tmp_dir_test.exs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,37 @@ defmodule Hexdocs.TmpDirTest do
7272
end
7373
end
7474

75+
test "cleanup removes paths for calling process" do
76+
file = Hexdocs.TmpDir.tmp_file("test")
77+
dir = Hexdocs.TmpDir.tmp_dir("test")
78+
79+
assert File.exists?(file)
80+
assert File.dir?(dir)
81+
82+
Hexdocs.TmpDir.cleanup()
83+
84+
refute File.exists?(file)
85+
refute File.exists?(dir)
86+
end
87+
88+
test "cleanup only removes paths for calling process" do
89+
test_pid = self()
90+
91+
Task.start(fn ->
92+
other_file = Hexdocs.TmpDir.tmp_file("other")
93+
send(test_pid, {:other_path, other_file})
94+
Process.sleep(:infinity)
95+
end)
96+
97+
assert_receive {:other_path, other_file}
98+
99+
file = Hexdocs.TmpDir.tmp_file("test")
100+
Hexdocs.TmpDir.cleanup()
101+
102+
refute File.exists?(file)
103+
assert File.exists?(other_file)
104+
end
105+
75106
test "paths persist while process is alive" do
76107
file = Hexdocs.TmpDir.tmp_file("test")
77108
dir = Hexdocs.TmpDir.tmp_dir("test")

0 commit comments

Comments
 (0)