Skip to content

Commit 8d0455e

Browse files
committed
it works
1 parent 6a6d88f commit 8d0455e

File tree

5 files changed

+175
-76
lines changed

5 files changed

+175
-76
lines changed

lib/exkml.ex

+60-32
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ defmodule Exkml do
2020
defstruct geometries: []
2121
end
2222

23+
defmodule KMLParseError do
24+
defexception [:message, :event]
25+
end
26+
2327
defp do_str_to_point([x, y]) do
2428
with {x, _} <- Float.parse(x),
2529
{y, _} <- Float.parse(y) do
@@ -84,12 +88,14 @@ defmodule Exkml do
8488
defstruct [
8589
:receiver,
8690
:receiver_ref,
91+
status: :out_kml,
8792
geom_stack: [],
8893
placemark: nil,
8994
stack: [],
9095
path: [],
9196
emit: [],
92-
point_count: 0
97+
point_count: 0,
98+
batch_size: 64
9399
]
94100
end
95101

@@ -101,8 +107,8 @@ defmodule Exkml do
101107
%State{s | placemark: %Placemark{pm | attrs: Map.put(attrs, name, value)}}
102108
end
103109

104-
def put_error(state, reason) do
105-
IO.inspect {:error, reason}
110+
def put_error(state, _reason) do
111+
# TODO: partial error handling?
106112
state
107113
end
108114

@@ -147,7 +153,7 @@ defmodule Exkml do
147153
%State{state | geom_stack: [merge_up(child, parent, kind) | rest]}
148154
end
149155

150-
def pop_geom(state, kind) do
156+
def pop_geom(_state, kind) do
151157
throw "Cannot pop #{kind}"
152158
end
153159

@@ -178,7 +184,7 @@ defmodule Exkml do
178184
end
179185

180186
defp merge_up(child, parent, _) do
181-
throw "No merge_up impl #{inspect parent}"
187+
throw "No merge_up impl #{inspect child} #{inspect parent}"
182188
end
183189

184190
def put_point(%State{} = state, text) do
@@ -268,63 +274,85 @@ defmodule Exkml do
268274
%{emit(state) | stack: [], path: [], placemark: nil}
269275
end
270276

271-
def on_event(:endDocument, _, %State{receiver: r, receiver_ref: ref} = state) do
277+
on_enter 'kml', _, state, do: %State{state | status: :kml}
278+
on_exit 'kml', _, state, do: %State{state | status: :out_kml}
279+
280+
def on_event(:endDocument, _, %State{status: :out_kml, receiver: r, receiver_ref: ref} = state) do
272281
flush(state)
273282
send(r, {:done, ref})
274283
state
275284
end
276285

286+
def on_event(:endDocument, event, %State{status: :kml, receiver: r, receiver_ref: ref} = state) do
287+
send(r, {:error, ref, self(), event})
288+
state
289+
end
290+
277291
def on_event(_event, _, state), do: state
278292

279293
defp flush(%State{receiver: r, receiver_ref: ref, emit: emit} = state) do
280-
send(r, {:placemarks, ref, Enum.reverse(emit)})
294+
send(r, {:placemarks, ref, self(), Enum.reverse(emit)})
295+
receive do
296+
{:ack, ^ref} -> :ok
297+
end
281298
%State{state | emit: []}
282299
end
283300

284-
285-
def emit(state) do
301+
def emit(%State{batch_size: batch_size} = state) do
286302
case %State{state | emit: [state.placemark | state.emit]} do
287-
%State{emit: emit} when length(emit) > 10 ->
288-
flush(state)
303+
%State{emit: emit} = new_state when length(emit) > batch_size -> flush(new_state)
289304
new_state -> new_state
290305
end
291306
end
292307

308+
def setup(binstream, chunk_size, ref) do
309+
receiver = self()
293310

311+
spawn_link(fn ->
312+
continuation = &Enumerable.reduce(binstream, &1, fn
313+
x, {acc, counter} when counter <= 0 -> {:suspend, {[x | acc], 0}}
314+
x, {acc, counter} -> {:cont, {[x | acc], counter - :erlang.byte_size(x)}}
315+
end)
294316

295-
def stream!(binstream, chunk_size \\ 2048) do
296-
continuation = &Enumerable.reduce(binstream, &1, fn
297-
x, {acc, counter} when counter <= 0 -> {:suspend, {[x | acc], 0}}
298-
x, {acc, counter} -> {:cont, {[x | acc], counter - :erlang.byte_size(x)}}
317+
take = fn cont ->
318+
case cont.({:cont, {[], chunk_size}}) do
319+
{:suspended, {list, 0}, new_cont} ->
320+
{:lists.reverse(list) |> Enum.join(), new_cont}
321+
{status, {list, _}} ->
322+
{:lists.reverse(list) |> Enum.join(), status}
323+
end
324+
end
325+
326+
:xmerl_sax_parser.stream("", [
327+
continuation_fun: take,
328+
continuation_state: continuation,
329+
event_fun: &Exkml.on_event/3,
330+
event_state: %State{receiver: receiver, receiver_ref: ref},
331+
encoding: :utf8
332+
])
299333
end)
334+
end
300335

301-
take = fn cont ->
302-
case cont.({:cont, {[], chunk_size}}) do
303-
{:suspended, {list, 0}, new_cont} ->
304-
{:lists.reverse(list) |> Enum.join(), new_cont}
305-
{status, {list, _}} ->
306-
{:lists.reverse(list) |> Enum.join(), status}
307-
end
308-
end
336+
def stage(binstream, chunk_size \\ 2048) do
337+
Exkml.Stage.start_link(binstream, chunk_size)
338+
end
309339

310-
ref = make_ref()
311340

312-
:xmerl_sax_parser.stream("", [
313-
continuation_fun: take,
314-
continuation_state: continuation,
315-
event_fun: &Exkml.on_event/3,
316-
event_state: %State{receiver: self(), receiver_ref: ref},
317-
encoding: :utf8
318-
])
341+
def stream!(binstream, chunk_size \\ 2048) do
342+
ref = make_ref()
343+
pid = setup(binstream, chunk_size, ref)
319344

320345
Stream.resource(
321346
fn -> :ok end,
322347
fn state ->
323348
receive do
324-
{:placemarks, ^ref, pms} ->
349+
{:placemarks, ^ref, from, pms} ->
350+
send from, {:ack, ref}
325351
{pms, state}
326352
{:done, ^ref} ->
327353
{:halt, state}
354+
{:error, ^ref, ^pid, event} ->
355+
raise KMLParseError, message: "Document ended prematurely", event: event
328356
end
329357
end,
330358
fn _ -> :ok end

lib/exkml/stage.ex

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
defmodule Exkml.Stage do
2+
use GenStage
3+
4+
def start_link(binstream, chunk_size) do
5+
GenStage.start_link(__MODULE__, [binstream, chunk_size])
6+
end
7+
8+
def init([binstream, chunk_size]) do
9+
ref = make_ref()
10+
Exkml.setup(binstream, chunk_size, ref)
11+
buf = []
12+
demand = 0
13+
{:producer, {:started, ref, buf, demand, nil}}
14+
end
15+
16+
def handle_demand(more_demand, {status, ref, buf, demand, from}) do
17+
new_demand = demand + more_demand
18+
{emit, keep} = Enum.split(buf, new_demand)
19+
less_demand = new_demand - length(emit)
20+
21+
new_state = {status, ref, keep, less_demand, from}
22+
23+
ack(ref, from)
24+
25+
maybe_end(new_state)
26+
27+
{:noreply, emit, new_state}
28+
end
29+
30+
def handle_info({:placemarks, ref, from, pms}, {status, ref, buf, 0, _}) do
31+
{:noreply, [], {status, ref, buf ++ pms, 0, from}}
32+
end
33+
34+
def handle_info({:placemarks, ref, from, pms}, {status, ref, buf, demand, _}) do
35+
{emit, keep} = Enum.split(buf ++ pms, demand)
36+
new_demand = demand - length(emit)
37+
ack(ref, from)
38+
39+
{:noreply, emit, {status, ref, keep, new_demand, from}}
40+
end
41+
42+
def handle_info({:done, ref}, {_, ref, buf, demand, from}) do
43+
new_state = {:done, ref, buf, demand, from}
44+
maybe_end(new_state)
45+
{:noreply, [], new_state}
46+
end
47+
48+
def handle_info(:stop, state) do
49+
{:stop, :normal, state}
50+
end
51+
52+
defp ack(_, nil), do: :ok
53+
defp ack(ref, from), do: send from, {:ack, ref}
54+
55+
defp maybe_end({:done, _, [], _, _}), do: GenStage.async_info(self(), :stop)
56+
defp maybe_end(_), do: :nope
57+
end

mix.exs

+2-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Exkml.Mixfile do
44
def project do
55
[
66
app: :exkml,
7-
version: "0.1.2",
7+
version: "0.2.0",
88
elixir: "~> 1.5",
99
start_permanent: Mix.env == :prod,
1010
deps: deps(),
@@ -37,11 +37,8 @@ defmodule Exkml.Mixfile do
3737
# Run "mix help deps" to learn about dependencies.
3838
defp deps do
3939
[
40-
{:sweet_xml, "~> 0.6.5"},
4140
{:ex_doc, ">= 0.0.0", only: :dev},
42-
{:fast_xml, "~> 1.1"},
43-
{:gen_stage, "~> 0.12.2"},
44-
{:erlsom, "~> 1.4.1"}
41+
{:gen_stage, "~> 0.12.2"}
4542
]
4643
end
4744
end

test.sh

-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
11
#!/bin/bash
2-
3-
# Simple thing to watch your project and run tests on change
4-
# Takes one optional argument, which will grep tests and run the ones that match
5-
62
while inotifywait -r -e modify ./test ./lib; do
73
mix test test/exkml_test.exs
84
done

test/exkml_test.exs

+56-35
Original file line numberDiff line numberDiff line change
@@ -257,56 +257,77 @@ defmodule ExkmlTest do
257257
end)
258258
end
259259

260-
# test "malformed" do
261-
# Process.flag(:trap_exit, true)
262-
263-
# assert "malformed_kml"
264-
# |> kml_fixture
265-
# |> Exkml.placemarks!()
266-
# |> Enum.into([])
267-
268-
# assert_receive {:EXIT, _, {:fatal, _}}
269-
# Process.flag(:trap_exit, false)
270-
# end
260+
test "malformed" do
261+
assert_raise Exkml.KMLParseError, ~r"ended prematurely", fn ->
262+
"malformed_kml"
263+
|> kml_fixture
264+
|> Exkml.stream!()
265+
|> Enum.into([])
266+
end
267+
end
271268

272269

273270
Enum.each([
274-
{"boundaries", [Multigeometry]},
275-
{"cgis-en-6393", [Point]},
276-
{"la_bikelanes", [Multigeometry, Line]},
277-
{"noaa", [Point]},
278-
{"terrassa", [Multigeometry, Point]},
279-
{"wards", [Polygon, Multigeometry]}
280-
], fn {name, kinds} ->
271+
{"boundaries", [Multigeometry], 163},
272+
{"cgis-en-6393", [Point], 233},
273+
{"la_bikelanes", [Multigeometry, Line], 12844},
274+
{"noaa", [Point], 227},
275+
{"terrassa", [Multigeometry, Point], 73},
276+
{"wards", [Polygon, Multigeometry], 53}
277+
], fn {name, kinds, expected_length} ->
281278
test "smoke #{name}" do
279+
282280
expected_set = MapSet.new(unquote(kinds))
283281

284-
out = "smoke/#{unquote(name)}"
282+
"smoke/#{unquote(name)}"
285283
|> kml_fixture
286284
|> Exkml.stream!()
287-
|> Enum.into([])
285+
|> compare_stream(unquote(expected_length), expected_set)
286+
288287

289-
assert length(out) > 0
288+
{:ok, stage} = "smoke/#{unquote(name)}"
289+
|> kml_fixture
290+
|> Exkml.stage()
291+
292+
293+
GenStage.stream([{stage, max_demand: 2, cancel: :transient}])
294+
|> compare_stream(unquote(expected_length), expected_set)
290295

291-
out
292-
|> Enum.each(fn %Placemark{geoms: shapes} ->
293-
Enum.each(shapes, fn actual ->
294-
assert actual.__struct__ in expected_set
295-
end)
296-
end)
297296
end
298297
end)
299298

300-
# test "large" do
301-
# File.stream!("/home/chris/Downloads/large.kml", [], 2048)
302-
# |> Exkml.placemarks!
303-
# |> Enum.take(20_000)
299+
def compare_stream(stream, expected_length, expected_set) do
300+
out = Enum.into(stream, [])
301+
302+
assert length(out) == expected_length
303+
304+
actual_set = Enum.flat_map(out, fn %Placemark{geoms: shapes} ->
305+
Enum.map(shapes, fn actual -> actual.__struct__ end)
306+
end)
307+
|> MapSet.new
308+
309+
assert actual_set == expected_set
310+
end
311+
312+
313+
# test "prof" do
314+
# proc = self()
315+
# spawn(fn ->
316+
# :fprof.trace([:start, {:procs, :all}])
317+
# :timer.sleep(5_000)
318+
# :fprof.trace([:stop])
319+
# :fprof.profile
320+
# :fprof.analyse({:dest, 'outfile.analysis'})
321+
# send proc, :done
322+
# end)
323+
324+
# out = "smoke/usbr"
325+
# |> kml_fixture
326+
# |> Exkml.stream!()
327+
# |> Enum.take(1)
304328

305329
# receive do
306-
# any -> IO.inspect {:wat, any}
307-
# after
308-
# 1000 -> IO.inspect :nope
330+
# :done -> :ok
309331
# end
310332
# end
311-
312333
end

0 commit comments

Comments
 (0)