Skip to content

Commit 00db00a

Browse files
authored
Run queue message processing in short-lived task processes (#92)
Broadway processors are long-lived, so their heap memory accumulates across messages. By running each message's work in a task under Task.Supervisor, the task's entire heap is freed immediately on exit and TmpDir cleanup triggers automatically via the :DOWN monitor.
1 parent 0cda79e commit 00db00a

1 file changed

Lines changed: 45 additions & 39 deletions

File tree

lib/hexdocs/queue.ex

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,20 @@ defmodule Hexdocs.Queue do
4848
end
4949

5050
def handle_message(%{data: %{"Records" => records}} = message) do
51-
Enum.each(records, &handle_record/1)
51+
Enum.each(records, fn record ->
52+
run_in_task(fn -> handle_record(record) end)
53+
end)
54+
5255
message
5356
end
5457

5558
def handle_message(%{data: %{"hexdocs:sitemap" => key}} = message) do
5659
Sentry.Context.set_extra_context(%{queue_event: "hexdocs:sitemap"})
5760
Logger.info("#{key}: start")
5861

59-
case key_components(key) do
60-
{:ok, repository, package, version} ->
61-
try do
62+
run_in_task(fn ->
63+
case key_components(key) do
64+
{:ok, repository, package, version} ->
6265
tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball")
6366

6467
case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do
@@ -80,13 +83,11 @@ defmodule Hexdocs.Queue do
8083
nil ->
8184
Logger.error("#{key}: package not found in store")
8285
end
83-
after
84-
Hexdocs.TmpDir.cleanup()
85-
end
8686

87-
:error ->
88-
Logger.info("#{key}: skip")
89-
end
87+
:error ->
88+
Logger.info("#{key}: skip")
89+
end
90+
end)
9091

9192
message
9293
end
@@ -109,16 +110,16 @@ defmodule Hexdocs.Queue do
109110
Sentry.Context.set_extra_context(%{queue_event: event_name})
110111
Logger.info("#{log_prefix} #{key}")
111112

112-
case key_components(key) do
113-
{:ok, repository, package, version} ->
114-
Sentry.Context.set_extra_context(%{
115-
queue_event: event_name,
116-
repository: repository,
117-
package: package,
118-
version: version
119-
})
113+
run_in_task(fn ->
114+
case key_components(key) do
115+
{:ok, repository, package, version} ->
116+
Sentry.Context.set_extra_context(%{
117+
queue_event: event_name,
118+
repository: repository,
119+
package: package,
120+
version: version
121+
})
120122

121-
try do
122123
tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball")
123124

124125
case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do
@@ -134,13 +135,11 @@ defmodule Hexdocs.Queue do
134135
nil ->
135136
Logger.error("#{log_prefix} #{key}: package not found in store")
136137
end
137-
after
138-
Hexdocs.TmpDir.cleanup()
139-
end
140138

141-
:error ->
142-
Logger.info("#{key}: skip")
143-
end
139+
:error ->
140+
Logger.info("#{key}: skip")
141+
end
142+
end)
144143
end
145144

146145
defp process_upload(key, repository, package, version, input, start) do
@@ -262,23 +261,19 @@ defmodule Hexdocs.Queue do
262261
version: version
263262
})
264263

265-
try do
266-
version = Version.parse!(version)
267-
all_versions = all_versions(repository, package)
268-
Hexdocs.Bucket.delete(repository, package, version, all_versions)
269-
update_index_sitemap(repository, key)
270-
271-
if repository == "hexpm" do
272-
Hexdocs.Search.delete(package, version)
273-
end
264+
version = Version.parse!(version)
265+
all_versions = all_versions(repository, package)
266+
Hexdocs.Bucket.delete(repository, package, version, all_versions)
267+
update_index_sitemap(repository, key)
274268

275-
elapsed = System.os_time(:millisecond) - start
276-
Logger.info("FINISHED DELETING DOCS #{key} #{elapsed}ms")
277-
:ok
278-
after
279-
Hexdocs.TmpDir.cleanup()
269+
if repository == "hexpm" do
270+
Hexdocs.Search.delete(package, version)
280271
end
281272

273+
elapsed = System.os_time(:millisecond) - start
274+
Logger.info("FINISHED DELETING DOCS #{key} #{elapsed}ms")
275+
:ok
276+
282277
{:ok, _repository, _package, _version} ->
283278
:skip
284279

@@ -395,6 +390,17 @@ defmodule Hexdocs.Queue do
395390
end
396391
end
397392

393+
defp run_in_task(fun) do
394+
task = Task.Supervisor.async(Hexdocs.Tasks, fun)
395+
396+
case Task.yield(task, :timer.seconds(270)) || Task.shutdown(task) do
397+
{:ok, result} -> result
398+
{:exit, {exception, stacktrace}} -> reraise(exception, stacktrace)
399+
{:exit, reason} -> exit(reason)
400+
nil -> raise "task timeout"
401+
end
402+
end
403+
398404
defp publish_message(map) do
399405
queue = Application.fetch_env!(:hexdocs, :queue_id)
400406
message = JSON.encode!(map)

0 commit comments

Comments
 (0)