Skip to content

Commit f862163

Browse files
Retain enumeration properties when using to_rows on columnar data (#14)
1 parent 4bd7948 commit f862163

4 files changed

Lines changed: 270 additions & 139 deletions

File tree

lib/table.ex

Lines changed: 1 addition & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ defmodule Table do
105105
end
106106

107107
defp read_rows({:columns, meta, enum}, only) do
108-
stream_zip_with(enum, fn values ->
108+
Table.Zipper.zip_with(enum, fn values ->
109109
build_row(meta.columns, values, only)
110110
end)
111111
end
@@ -189,142 +189,4 @@ defmodule Table do
189189

190190
defp include_column?(nil, _column), do: true
191191
defp include_column?(only, column), do: MapSet.member?(only, column)
192-
193-
# --- Backports ---
194-
195-
# TODO: remove once we require Elixir v1.12
196-
# Source https://github.com/elixir-lang/elixir/blob/b63f8f541e9d8951dbbcb39a8551bd74a3fe9a59/lib/elixir/lib/stream.ex#L1210-L1342
197-
defp stream_zip_with(enumerables, zip_fun) when is_function(zip_fun, 1) do
198-
if is_list(enumerables) and :lists.all(&is_list/1, enumerables) do
199-
&zip_list(enumerables, &1, &2, zip_fun)
200-
else
201-
&zip_enum(enumerables, &1, &2, zip_fun)
202-
end
203-
end
204-
205-
defp zip_list(_enumerables, {:halt, acc}, _fun, _zip_fun) do
206-
{:halted, acc}
207-
end
208-
209-
defp zip_list(enumerables, {:suspend, acc}, fun, zip_fun) do
210-
{:suspended, acc, &zip_list(enumerables, &1, fun, zip_fun)}
211-
end
212-
213-
defp zip_list(enumerables, {:cont, acc}, fun, zip_fun) do
214-
case zip_list_heads_tails(enumerables, [], []) do
215-
{heads, tails} -> zip_list(tails, fun.(zip_fun.(heads), acc), fun, zip_fun)
216-
:error -> {:done, acc}
217-
end
218-
end
219-
220-
defp zip_list_heads_tails([[head | tail] | rest], heads, tails) do
221-
zip_list_heads_tails(rest, [head | heads], [tail | tails])
222-
end
223-
224-
defp zip_list_heads_tails([[] | _rest], _heads, _tails) do
225-
:error
226-
end
227-
228-
defp zip_list_heads_tails([], heads, tails) do
229-
{:lists.reverse(heads), :lists.reverse(tails)}
230-
end
231-
232-
defp zip_enum(enumerables, acc, fun, zip_fun) do
233-
step = fn x, acc ->
234-
{:suspend, :lists.reverse([x | acc])}
235-
end
236-
237-
enum_funs =
238-
Enum.map(enumerables, fn enum ->
239-
{&Enumerable.reduce(enum, &1, step), [], :cont}
240-
end)
241-
242-
do_zip_enum(enum_funs, acc, fun, zip_fun)
243-
end
244-
245-
# This implementation of do_zip_enum/4 works for any number of streams to zip
246-
defp do_zip_enum(zips, {:halt, acc}, _fun, _zip_fun) do
247-
do_zip_close(zips)
248-
{:halted, acc}
249-
end
250-
251-
defp do_zip_enum(zips, {:suspend, acc}, fun, zip_fun) do
252-
{:suspended, acc, &do_zip_enum(zips, &1, fun, zip_fun)}
253-
end
254-
255-
defp do_zip_enum([], {:cont, acc}, _callback, _zip_fun) do
256-
{:done, acc}
257-
end
258-
259-
defp do_zip_enum(zips, {:cont, acc}, callback, zip_fun) do
260-
try do
261-
do_zip_next(zips, acc, callback, [], [], zip_fun)
262-
catch
263-
kind, reason ->
264-
do_zip_close(zips)
265-
:erlang.raise(kind, reason, __STACKTRACE__)
266-
else
267-
{:next, buffer, acc} ->
268-
do_zip_enum(buffer, acc, callback, zip_fun)
269-
270-
{:done, _acc} = other ->
271-
other
272-
end
273-
end
274-
275-
# do_zip_next/6 computes the next tuple formed by
276-
# the next element of each zipped stream.
277-
defp do_zip_next(
278-
[{_, [], :halt} | zips],
279-
acc,
280-
_callback,
281-
_yielded_elems,
282-
buffer,
283-
_zip_fun
284-
) do
285-
do_zip_close(:lists.reverse(buffer, zips))
286-
{:done, acc}
287-
end
288-
289-
defp do_zip_next([{fun, [], :cont} | zips], acc, callback, yielded_elems, buffer, zip_fun) do
290-
case fun.({:cont, []}) do
291-
{:suspended, [elem | next_acc], fun} ->
292-
next_buffer = [{fun, next_acc, :cont} | buffer]
293-
do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun)
294-
295-
{_, [elem | next_acc]} ->
296-
next_buffer = [{fun, next_acc, :halt} | buffer]
297-
do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun)
298-
299-
{_, []} ->
300-
# The current zipped stream terminated, so we close all the streams
301-
# and return {:halted, acc} (which is returned as is by do_zip/3).
302-
do_zip_close(:lists.reverse(buffer, zips))
303-
{:done, acc}
304-
end
305-
end
306-
307-
defp do_zip_next(
308-
[{fun, zip_acc, zip_op} | zips],
309-
acc,
310-
callback,
311-
yielded_elems,
312-
buffer,
313-
zip_fun
314-
) do
315-
[elem | rest] = zip_acc
316-
next_buffer = [{fun, rest, zip_op} | buffer]
317-
do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun)
318-
end
319-
320-
defp do_zip_next([] = _zips, acc, callback, yielded_elems, buffer, zip_fun) do
321-
# "yielded_elems" is a reversed list of results for the current iteration of
322-
# zipping. That is to say, the nth element from each of the enums being zipped.
323-
# It needs to be reversed and passed to the zipping function so it can do it's thing.
324-
{:next, :lists.reverse(buffer), callback.(zip_fun.(:lists.reverse(yielded_elems)), acc)}
325-
end
326-
327-
defp do_zip_close(zips) do
328-
:lists.foreach(fn {fun, _, _} -> fun.({:halt, []}) end, zips)
329-
end
330192
end

lib/table/zipper.ex

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
defmodule Table.Zipper do
2+
@moduledoc false
3+
4+
# An enumerable that zips several enumerables.
5+
#
6+
# This enumerable proxies traversal to the underlying enumerables,
7+
# so it keeps the same properties, such as optimized slicing.
8+
9+
defstruct [:enumerables, :fun]
10+
11+
@doc """
12+
Returns an enumerable that zips corresponding elements from a
13+
collection of enumerables into a tuple.
14+
"""
15+
@spec zip(list(Enumerable.t())) :: Enumerable.t()
16+
def zip(enumerables) when is_list(enumerables) do
17+
zip_with(enumerables, &List.to_tuple/1)
18+
end
19+
20+
@doc """
21+
Returns an enumerable that zips corresponding elements from a
22+
collection using the `zip_fun` function.
23+
"""
24+
@spec zip_with(list(Enumerable.t()), (list() -> term())) :: Enumerable.t()
25+
def zip_with(enumerables, zip_fun) when is_list(enumerables) do
26+
%__MODULE__{enumerables: enumerables, fun: zip_fun}
27+
end
28+
29+
defimpl Enumerable do
30+
def count(%{enumerables: []}), do: {:ok, 0}
31+
32+
def count(zipper) do
33+
zipper.enumerables
34+
|> Enum.reduce_while(:infinity, fn enumerable, min_count ->
35+
case Enumerable.count(enumerable) do
36+
{:ok, count} -> {:cont, min(count, min_count)}
37+
_ -> {:halt, nil}
38+
end
39+
end)
40+
|> case do
41+
nil -> {:error, __MODULE__}
42+
count -> {:ok, count}
43+
end
44+
end
45+
46+
def member?(_zipper, _element), do: {:error, __MODULE__}
47+
48+
def reduce(zipper, acc, fun) do
49+
zipper.enumerables
50+
|> stream_zip_with(zipper.fun)
51+
|> Enumerable.reduce(acc, fun)
52+
end
53+
54+
def slice(%{enumerables: []}), do: {:ok, 0, fn _start, _length -> [] end}
55+
56+
def slice(zipper) do
57+
zipper.enumerables
58+
|> Enum.reduce_while({[], [], []}, fn enumerable, {sizes, fun2s, fun3s} ->
59+
case Enumerable.slice(enumerable) do
60+
{:ok, size, fun} when is_function(fun, 2) ->
61+
{:cont, {[size | sizes], [fun | fun2s], nil}}
62+
63+
{:ok, size, fun} when is_function(fun, 3) ->
64+
{:cont, {[size | sizes], [(&fun.(&1, &2, 1)) | fun2s], fun3s && [fun | fun3s]}}
65+
66+
_ ->
67+
{:halt, nil}
68+
end
69+
end)
70+
|> case do
71+
# TODO: rely only on 3-arity functions on Elixir v1.18
72+
{sizes, fun2s, nil} ->
73+
fun = fn start, length ->
74+
fun2s
75+
|> Enum.reduce([], fn fun, slices -> [fun.(start, length) | slices] end)
76+
|> stream_zip_with(zipper.fun)
77+
|> Enum.to_list()
78+
end
79+
80+
{:ok, Enum.min(sizes), fun}
81+
82+
{sizes, _fun2s, fun3s} ->
83+
fun = fn start, length, step ->
84+
fun3s
85+
|> Enum.reduce([], fn fun, slices -> [fun.(start, length, step) | slices] end)
86+
|> stream_zip_with(zipper.fun)
87+
|> Enum.to_list()
88+
end
89+
90+
{:ok, Enum.min(sizes), fun}
91+
92+
nil ->
93+
{:error, __MODULE__}
94+
end
95+
end
96+
97+
# --- Backports ---
98+
99+
# TODO: remove once we require Elixir v1.12
100+
# Source https://github.com/elixir-lang/elixir/blob/b63f8f541e9d8951dbbcb39a8551bd74a3fe9a59/lib/elixir/lib/stream.ex#L1210-L1342
101+
defp stream_zip_with(enumerables, zip_fun) when is_function(zip_fun, 1) do
102+
if is_list(enumerables) and :lists.all(&is_list/1, enumerables) do
103+
&zip_list(enumerables, &1, &2, zip_fun)
104+
else
105+
&zip_enum(enumerables, &1, &2, zip_fun)
106+
end
107+
end
108+
109+
defp zip_list(_enumerables, {:halt, acc}, _fun, _zip_fun) do
110+
{:halted, acc}
111+
end
112+
113+
defp zip_list(enumerables, {:suspend, acc}, fun, zip_fun) do
114+
{:suspended, acc, &zip_list(enumerables, &1, fun, zip_fun)}
115+
end
116+
117+
defp zip_list(enumerables, {:cont, acc}, fun, zip_fun) do
118+
case zip_list_heads_tails(enumerables, [], []) do
119+
{heads, tails} -> zip_list(tails, fun.(zip_fun.(heads), acc), fun, zip_fun)
120+
:error -> {:done, acc}
121+
end
122+
end
123+
124+
defp zip_list_heads_tails([[head | tail] | rest], heads, tails) do
125+
zip_list_heads_tails(rest, [head | heads], [tail | tails])
126+
end
127+
128+
defp zip_list_heads_tails([[] | _rest], _heads, _tails) do
129+
:error
130+
end
131+
132+
defp zip_list_heads_tails([], heads, tails) do
133+
{:lists.reverse(heads), :lists.reverse(tails)}
134+
end
135+
136+
defp zip_enum(enumerables, acc, fun, zip_fun) do
137+
step = fn x, acc ->
138+
{:suspend, :lists.reverse([x | acc])}
139+
end
140+
141+
enum_funs =
142+
Enum.map(enumerables, fn enum ->
143+
{&Enumerable.reduce(enum, &1, step), [], :cont}
144+
end)
145+
146+
do_zip_enum(enum_funs, acc, fun, zip_fun)
147+
end
148+
149+
# This implementation of do_zip_enum/4 works for any number of streams to zip
150+
defp do_zip_enum(zips, {:halt, acc}, _fun, _zip_fun) do
151+
do_zip_close(zips)
152+
{:halted, acc}
153+
end
154+
155+
defp do_zip_enum(zips, {:suspend, acc}, fun, zip_fun) do
156+
{:suspended, acc, &do_zip_enum(zips, &1, fun, zip_fun)}
157+
end
158+
159+
defp do_zip_enum([], {:cont, acc}, _callback, _zip_fun) do
160+
{:done, acc}
161+
end
162+
163+
defp do_zip_enum(zips, {:cont, acc}, callback, zip_fun) do
164+
try do
165+
do_zip_next(zips, acc, callback, [], [], zip_fun)
166+
catch
167+
kind, reason ->
168+
do_zip_close(zips)
169+
:erlang.raise(kind, reason, __STACKTRACE__)
170+
else
171+
{:next, buffer, acc} ->
172+
do_zip_enum(buffer, acc, callback, zip_fun)
173+
174+
{:done, _acc} = other ->
175+
other
176+
end
177+
end
178+
179+
# do_zip_next/6 computes the next tuple formed by
180+
# the next element of each zipped stream.
181+
defp do_zip_next(
182+
[{_, [], :halt} | zips],
183+
acc,
184+
_callback,
185+
_yielded_elems,
186+
buffer,
187+
_zip_fun
188+
) do
189+
do_zip_close(:lists.reverse(buffer, zips))
190+
{:done, acc}
191+
end
192+
193+
defp do_zip_next([{fun, [], :cont} | zips], acc, callback, yielded_elems, buffer, zip_fun) do
194+
case fun.({:cont, []}) do
195+
{:suspended, [elem | next_acc], fun} ->
196+
next_buffer = [{fun, next_acc, :cont} | buffer]
197+
do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun)
198+
199+
{_, [elem | next_acc]} ->
200+
next_buffer = [{fun, next_acc, :halt} | buffer]
201+
do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun)
202+
203+
{_, []} ->
204+
# The current zipped stream terminated, so we close all the streams
205+
# and return {:halted, acc} (which is returned as is by do_zip/3).
206+
do_zip_close(:lists.reverse(buffer, zips))
207+
{:done, acc}
208+
end
209+
end
210+
211+
defp do_zip_next(
212+
[{fun, zip_acc, zip_op} | zips],
213+
acc,
214+
callback,
215+
yielded_elems,
216+
buffer,
217+
zip_fun
218+
) do
219+
[elem | rest] = zip_acc
220+
next_buffer = [{fun, rest, zip_op} | buffer]
221+
do_zip_next(zips, acc, callback, [elem | yielded_elems], next_buffer, zip_fun)
222+
end
223+
224+
defp do_zip_next([] = _zips, acc, callback, yielded_elems, buffer, zip_fun) do
225+
# "yielded_elems" is a reversed list of results for the current iteration of
226+
# zipping. That is to say, the nth element from each of the enums being zipped.
227+
# It needs to be reversed and passed to the zipping function so it can do it's thing.
228+
{:next, :lists.reverse(buffer), callback.(zip_fun.(:lists.reverse(yielded_elems)), acc)}
229+
end
230+
231+
defp do_zip_close(zips) do
232+
:lists.foreach(fn {fun, _, _} -> fun.({:halt, []}) end, zips)
233+
end
234+
end
235+
end

test/mapper_test.exs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ defmodule Table.MapperTest do
33

44
alias Table.Mapper
55

6+
test "count" do
7+
enumerable = 1..3 |> Mapper.map(fn x -> x * x end)
8+
assert Enum.count(enumerable) == 3
9+
end
10+
611
test "reduce" do
712
enumerable = 1..3 |> Mapper.map(fn x -> x * x end)
813
assert Enum.reduce(enumerable, &(&1 + &2)) == 14

0 commit comments

Comments
 (0)