Skip to content

Commit 8823eaf

Browse files
authored
Partition upload server (#1279)
1 parent 1029c62 commit 8823eaf

6 files changed

Lines changed: 122 additions & 25 deletions

File tree

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ jobs:
1717
matrix:
1818
include:
1919
# Earliest-supported Elixir/Erlang pair.
20-
- elixir: "1.12"
20+
- elixir: "1.14"
2121
otp: "24.3"
2222
PLUG_CRYPTO_2_0: "false"
2323

2424
# Latest-supported Elixir/Erlang pair.
2525
- elixir: "1.18"
26-
otp: "27.2"
26+
otp: "27.3"
2727
lint: lint
2828
PLUG_CRYPTO_2_0: "true"
2929

lib/plug/application.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ defmodule Plug.Application do
88
Plug.Keys = :ets.new(Plug.Keys, [:named_table, :public, read_concurrency: true])
99

1010
children = [
11-
Plug.Upload
11+
Plug.Upload.Supervisor
1212
]
1313

1414
Supervisor.start_link(children, name: __MODULE__, strategy: :one_for_one)

lib/plug/upload.ex

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ defmodule Plug.Upload do
4040
@dir_table __MODULE__.Dir
4141
@path_table __MODULE__.Path
4242
@max_attempts 10
43-
@temp_env_vars ~w(PLUG_TMPDIR TMPDIR TMP TEMP)s
4443

4544
@doc """
4645
Requests a random file to be created in the upload directory
@@ -85,7 +84,7 @@ defmodule Plug.Upload do
8584
:ok
8685

8786
[] ->
88-
server = plug_server()
87+
server = plug_server(to_pid)
8988
{:ok, tmp} = generate_tmp_dir()
9089
:ok = GenServer.call(server, {:give_away, to_pid, tmp, path})
9190
:ets.delete_object(@path_table, {from_pid, path})
@@ -105,7 +104,7 @@ defmodule Plug.Upload do
105104
{:ok, tmp}
106105

107106
[] ->
108-
server = plug_server()
107+
server = plug_server(pid)
109108
GenServer.cast(server, {:monitor, pid})
110109

111110
with {:ok, tmp} <- generate_tmp_dir() do
@@ -188,30 +187,23 @@ defmodule Plug.Upload do
188187
end
189188
end
190189

191-
defp plug_server do
192-
Process.whereis(__MODULE__) ||
190+
defp plug_server(pid) do
191+
PartitionSupervisor.whereis_name({__MODULE__, pid})
192+
rescue
193+
ArgumentError ->
193194
raise Plug.UploadError,
194195
"could not find process Plug.Upload. Have you started the :plug application?"
195196
end
196197

197198
@doc false
198199
def start_link(_) do
199-
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
200+
GenServer.start_link(__MODULE__, :ok)
200201
end
201202

202203
## Callbacks
203204

204205
@impl true
205206
def init(:ok) do
206-
Process.flag(:trap_exit, true)
207-
tmp = Enum.find_value(@temp_env_vars, "/tmp", &System.get_env/1) |> Path.expand()
208-
cwd = Path.join(File.cwd!(), "tmp")
209-
# Add a tiny random component to avoid clashes between nodes
210-
suffix = :crypto.strong_rand_bytes(3) |> Base.url_encode64()
211-
:persistent_term.put(__MODULE__, {[tmp, cwd], suffix})
212-
213-
:ets.new(@dir_table, [:named_table, :public, :set])
214-
:ets.new(@path_table, [:named_table, :public, :duplicate_bag])
215207
{:ok, %{}}
216208
end
217209

@@ -255,12 +247,6 @@ defmodule Plug.Upload do
255247
{:noreply, state}
256248
end
257249

258-
@impl true
259-
def terminate(_reason, _state) do
260-
folder = fn entry, :ok -> delete_path(entry) end
261-
:ets.foldl(folder, :ok, @path_table)
262-
end
263-
264250
defp delete_path({_pid, path}) do
265251
:file.delete(path)
266252
:ok

lib/plug/upload/supervisor.ex

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
defmodule Plug.Upload.Supervisor do
2+
@moduledoc false
3+
use Supervisor
4+
5+
@temp_env_vars ~w(PLUG_TMPDIR TMPDIR TMP TEMP)s
6+
@dir_table Plug.Upload.Dir
7+
@path_table Plug.Upload.Path
8+
@otp_vsn System.otp_release() |> String.to_integer()
9+
@write_mode if @otp_vsn >= 25, do: :auto, else: true
10+
@ets_opts [:public, :named_table, read_concurrency: true, write_concurrency: @write_mode]
11+
12+
def start_link(args) do
13+
Supervisor.start_link(__MODULE__, args, name: __MODULE__)
14+
end
15+
16+
@impl true
17+
def init(_args) do
18+
# Initialize the upload system
19+
tmp = Enum.find_value(@temp_env_vars, "/tmp", &System.get_env/1) |> Path.expand()
20+
cwd = Path.join(File.cwd!(), "tmp")
21+
# Add a tiny random component to avoid clashes between nodes
22+
suffix = :crypto.strong_rand_bytes(3) |> Base.url_encode64()
23+
:persistent_term.put(Plug.Upload, {[tmp, cwd], suffix})
24+
:ets.new(@dir_table, [:set | @ets_opts])
25+
:ets.new(@path_table, [:duplicate_bag | @ets_opts])
26+
27+
children = [
28+
Plug.Upload.Terminator,
29+
{PartitionSupervisor, child_spec: Plug.Upload, name: Plug.Upload}
30+
]
31+
32+
Supervisor.init(children, strategy: :one_for_one)
33+
end
34+
end

lib/plug/upload/terminator.ex

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
defmodule Plug.Upload.Terminator do
2+
@moduledoc false
3+
use GenServer
4+
5+
@path_table Plug.Upload.Path
6+
7+
def start_link(_) do
8+
GenServer.start_link(__MODULE__, :ok)
9+
end
10+
11+
@impl true
12+
def init(:ok) do
13+
Process.flag(:trap_exit, true)
14+
{:ok, %{}}
15+
end
16+
17+
@impl true
18+
def terminate(_reason, _state) do
19+
folder = fn entry, :ok -> delete_path(entry) end
20+
:ets.foldl(folder, :ok, @path_table)
21+
end
22+
23+
defp delete_path({_pid, path}) do
24+
:file.delete(path)
25+
:ok
26+
end
27+
end

test/plug/upload_test.exs

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ defmodule Plug.UploadTest do
3535

3636
test "terminate removes all files" do
3737
{:ok, path} = Plug.Upload.random_file("sample")
38-
:ok = Plug.Upload.terminate(:shutdown, [])
38+
:ok = Plug.Upload.Terminator.terminate(:shutdown, [])
3939
refute File.exists?(path)
4040
end
4141

@@ -224,4 +224,54 @@ defmodule Plug.UploadTest do
224224
wait_until(fn -> not File.exists?(path1) end)
225225
end
226226
end
227+
228+
test "give_away with invalid path returns error" do
229+
result = Plug.Upload.give_away("/invalid/path", spawn(fn -> :ok end))
230+
assert result == {:error, :unknown_path}
231+
end
232+
233+
test "give_away when target process dies during transfer" do
234+
{:ok, path} = Plug.Upload.random_file("target_dies")
235+
236+
# Create a process that dies immediately
237+
pid = spawn(fn -> :ok end)
238+
239+
# This should still work but file will be cleaned up when dead process is detected
240+
result = Plug.Upload.give_away(path, pid)
241+
assert result == :ok
242+
wait_until(fn -> not File.exists?(path) end)
243+
end
244+
245+
test "routes uploads to correct partition based on process" do
246+
parent = self()
247+
num_processes = 10
248+
249+
# Create uploads from different processes and verify they get different servers
250+
tasks =
251+
Enum.map(1..num_processes, fn i ->
252+
Task.async(fn ->
253+
{:ok, path} = Plug.Upload.random_file("partition_test_#{i}")
254+
server = PartitionSupervisor.whereis_name({Plug.Upload, self()})
255+
send(parent, {:result, i, path, server})
256+
path
257+
end)
258+
end)
259+
260+
# Collect results
261+
results =
262+
Enum.map(1..num_processes, fn _ ->
263+
receive do
264+
{:result, i, path, server} -> {i, path, server}
265+
after
266+
1_000 -> flunk("didn't get result")
267+
end
268+
end)
269+
270+
# Verify different processes got different servers (partitioning working)
271+
servers = Enum.map(results, fn {_, _, server} -> server end)
272+
assert length(Enum.uniq(servers)) > 1
273+
274+
# Cleanup
275+
Enum.each(tasks, &Task.await/1)
276+
end
227277
end

0 commit comments

Comments
 (0)