diff --git a/lib/hexdocs/queue.ex b/lib/hexdocs/queue.ex index ff9fef0..ac65d06 100644 --- a/lib/hexdocs/queue.ex +++ b/lib/hexdocs/queue.ex @@ -48,7 +48,10 @@ defmodule Hexdocs.Queue do end def handle_message(%{data: %{"Records" => records}} = message) do - Enum.each(records, &handle_record/1) + Enum.each(records, fn record -> + run_in_task(fn -> handle_record(record) end) + end) + message end @@ -56,9 +59,9 @@ defmodule Hexdocs.Queue do Sentry.Context.set_extra_context(%{queue_event: "hexdocs:sitemap"}) Logger.info("#{key}: start") - case key_components(key) do - {:ok, repository, package, version} -> - try do + run_in_task(fn -> + case key_components(key) do + {:ok, repository, package, version} -> tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball") case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do @@ -80,13 +83,11 @@ defmodule Hexdocs.Queue do nil -> Logger.error("#{key}: package not found in store") end - after - Hexdocs.TmpDir.cleanup() - end - :error -> - Logger.info("#{key}: skip") - end + :error -> + Logger.info("#{key}: skip") + end + end) message end @@ -109,16 +110,16 @@ defmodule Hexdocs.Queue do Sentry.Context.set_extra_context(%{queue_event: event_name}) Logger.info("#{log_prefix} #{key}") - case key_components(key) do - {:ok, repository, package, version} -> - Sentry.Context.set_extra_context(%{ - queue_event: event_name, - repository: repository, - package: package, - version: version - }) + run_in_task(fn -> + case key_components(key) do + {:ok, repository, package, version} -> + Sentry.Context.set_extra_context(%{ + queue_event: event_name, + repository: repository, + package: package, + version: version + }) - try do tarball_path = Hexdocs.TmpDir.tmp_file("docs-tarball") case Hexdocs.Store.get_to_file(:repo_bucket, key, tarball_path) do @@ -134,13 +135,11 @@ defmodule Hexdocs.Queue do nil -> Logger.error("#{log_prefix} #{key}: package not found in store") end - after - Hexdocs.TmpDir.cleanup() - end - :error -> - Logger.info("#{key}: skip") - end + :error -> + Logger.info("#{key}: skip") + end + end) end defp process_upload(key, repository, package, version, input, start) do @@ -262,23 +261,19 @@ defmodule Hexdocs.Queue do version: version }) - try do - version = Version.parse!(version) - all_versions = all_versions(repository, package) - Hexdocs.Bucket.delete(repository, package, version, all_versions) - update_index_sitemap(repository, key) - - if repository == "hexpm" do - Hexdocs.Search.delete(package, version) - end + version = Version.parse!(version) + all_versions = all_versions(repository, package) + Hexdocs.Bucket.delete(repository, package, version, all_versions) + update_index_sitemap(repository, key) - elapsed = System.os_time(:millisecond) - start - Logger.info("FINISHED DELETING DOCS #{key} #{elapsed}ms") - :ok - after - Hexdocs.TmpDir.cleanup() + if repository == "hexpm" do + Hexdocs.Search.delete(package, version) end + elapsed = System.os_time(:millisecond) - start + Logger.info("FINISHED DELETING DOCS #{key} #{elapsed}ms") + :ok + {:ok, _repository, _package, _version} -> :skip @@ -395,6 +390,17 @@ defmodule Hexdocs.Queue do end end + defp run_in_task(fun) do + task = Task.Supervisor.async(Hexdocs.Tasks, fun) + + case Task.yield(task, :timer.seconds(270)) || Task.shutdown(task) do + {:ok, result} -> result + {:exit, {exception, stacktrace}} -> reraise(exception, stacktrace) + {:exit, reason} -> exit(reason) + nil -> raise "task timeout" + end + end + defp publish_message(map) do queue = Application.fetch_env!(:hexdocs, :queue_id) message = JSON.encode!(map)