-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Switch to version Dagger 0.18.11 #7
Conversation
… parallel is upper bounded now
Should we try to merge it now or wait for JuliaParallel/Dagger.jl#531? |
It contains a modified version (with fixes) of GraphVizSimpleExt, which is used by default, so everything works as of 0.18.11. |
Do we need all of the extensions? |
I see, the On the other hand I don't think extensions should be used to get patches from upstream project |
Running ERROR: LoadError: KeyError: key 68 not found
Stacktrace:
[1] getindex
@ ./dict.jl:498 [inlined]
[2] _proc_color(ctx::@NamedTuple{proc_to_color::Dict{Dagger.Processor, String}, proc_colors::Vector{RGB{FixedPointNumbers.N0f8}}, proc_color_idx::Base.RefValue{Int64}, proc_to_shape::Dict{Type, String}, proc_shapes::Tuple{String, String, String}, proc_shape_idx::Base.RefValue{Int64}, id_to_proc::Dict{Int64, Dagger.Processor}}, id::Int64)
@ Main.ModGraphVizSimpleExt ~/fwk2/dagger_exts/GraphVizSimpleExt.jl:176
[3] write_edge(io::IOStream, ts_move::TimespanLogging.Timespan, logs::Vector{TimespanLogging.Timespan}, ctx::@NamedTuple{proc_to_color::Dict{Dagger.Processor, String}, proc_colors::Vector{RGB{FixedPointNumbers.N0f8}}, proc_color_idx::Base.RefValue{Int64}, proc_to_shape::Dict{Type, String}, proc_shapes::Tuple{String, String, String}, proc_shape_idx::Base.RefValue{Int64}, id_to_proc::Dict{Int64, Dagger.Processor}}, inputname::String, inputarg::RemoteChannel{Channel{Int64}})
@ Main.ModGraphVizSimpleExt ~/fwk2/dagger_exts/GraphVizSimpleExt.jl:235
[4] write_dag(io::IOStream, logs::Vector{TimespanLogging.Timespan}, t::Nothing)
@ Main.ModGraphVizSimpleExt ~/fwk2/dagger_exts/GraphVizSimpleExt.jl:323
[5] write_dag
@ ~/fwk2/dagger_exts/GraphVizSimpleExt.jl:260 [inlined]
[6] _show_plan
@ ~/fwk2/dagger_exts/GraphVizSimpleExt.jl:353 [inlined]
[7] show_logs
@ ~/fwk2/dagger_exts/GraphVizSimpleExt.jl:369 [inlined]
[8] show_logs(io::IOStream, logs::Vector{TimespanLogging.Timespan}, vizmode::Symbol; options::@Kwargs{})
@ Main.ModGraphVizSimpleExt ~/fwk2/dagger_exts/GraphVizSimpleExt.jl:363
[9] show_logs
@ ~/fwk2/dagger_exts/GraphVizSimpleExt.jl:363 [inlined]
[10] #17
@ ~/fwk2/graphs_scheduling/src/main.jl:92 [inlined]
[11] open(::var"#17#18"{Vector{TimespanLogging.Timespan}}, ::String, ::Vararg{String}; kwargs::@Kwargs{})
@ Base ./io.jl:396
[12] open(::Function, ::String, ::String)
@ Base ./io.jl:393
[13] main(graphs_map::Dict{String, String})
@ Main ~/fwk2/graphs_scheduling/src/main.jl:91
[14] top-level scope
@ ~/fwk2/graphs_scheduling/src/main.jl:106
in expression starting at /home/mafila/fwk2/graphs_scheduling/src/main.jl:106 Then it reports a series a warnings about workers dying and rescheduling - but it's a known problem |
There are a few issues in the related functionality, so they could be fixed in a similar way, as |
That is a bug I mentioned recently. The error is thrown when the logs are plotted before all the nodes finish computations, and it happens most of the time. I am currently working on fixing this problem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a bug I mentioned recently. The error is thrown when the logs are plotted before all the nodes finish computations, and it happens most of the time. I am currently working on fixing this problem.
I think I found the culprit
The problem appears because the logs are fetched before all the tasks are finished. For regular tasks their results are fetched when plotting individual graphs, but we never wait for the completion of the extra tasks that sends notifications.
We could get the tasks for that extra node which also happens to be the last node in a graph and wait for their completion
function execution(graphs_map) | ||
graphs_being_run = Set{Int}() | ||
graphs_dict = Dict{Int, String}() | ||
|
||
graphs = parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH) | ||
|
||
notifications = RemoteChannel(()->Channel{Int}(32)) | ||
# notifications = Channel{Int}(32) | ||
|
||
for (i, (g_name, g)) in enumerate(graphs) | ||
graphs_dict[i] = g_name | ||
while !(length(graphs_being_run) < MAX_GRAPHS_RUN) | ||
finished_graph_id = take!(notifications) | ||
delete!(graphs_being_run, finished_graph_id) | ||
println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])") | ||
end | ||
|
||
schedule_graph_with_notify(g, notifications, g_name, i) | ||
push!(graphs_being_run, i) | ||
println("Dispatcher: scheduled graph $i: $g_name") | ||
end | ||
|
||
results = [] | ||
for (g_name, g) in graphs | ||
g_map = Dict{Int, Any}() | ||
for vertex_id in Graphs.vertices(g) | ||
future = get_prop(g, vertex_id, :res_data) | ||
g_map[vertex_id] = fetch(future) | ||
end | ||
push!(results, (g_name, g_map)) | ||
end | ||
|
||
for (g_name, res) in results | ||
for (id, value) in res | ||
println("Graph: $g_name, Final result for vertex $id: $value") | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Store whole graph tasks and wait for their completions. The tasks that are finished before the final graph is scheduled can be safely removed, so we wait only for completion of a few last tasks
function execution(graphs_map) | |
graphs_being_run = Set{Int}() | |
graphs_dict = Dict{Int, String}() | |
graphs = parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH) | |
notifications = RemoteChannel(()->Channel{Int}(32)) | |
# notifications = Channel{Int}(32) | |
for (i, (g_name, g)) in enumerate(graphs) | |
graphs_dict[i] = g_name | |
while !(length(graphs_being_run) < MAX_GRAPHS_RUN) | |
finished_graph_id = take!(notifications) | |
delete!(graphs_being_run, finished_graph_id) | |
println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])") | |
end | |
schedule_graph_with_notify(g, notifications, g_name, i) | |
push!(graphs_being_run, i) | |
println("Dispatcher: scheduled graph $i: $g_name") | |
end | |
results = [] | |
for (g_name, g) in graphs | |
g_map = Dict{Int, Any}() | |
for vertex_id in Graphs.vertices(g) | |
future = get_prop(g, vertex_id, :res_data) | |
g_map[vertex_id] = fetch(future) | |
end | |
push!(results, (g_name, g_map)) | |
end | |
for (g_name, res) in results | |
for (id, value) in res | |
println("Graph: $g_name, Final result for vertex $id: $value") | |
end | |
end | |
end | |
function execution(graphs_map) | |
graphs_being_run = Set{Int}() | |
graphs_dict = Dict{Int, String}() | |
graphs_tasks = Dict{Int,Dagger.DTask}() | |
graphs = parse_graphs(graphs_map, OUTPUT_GRAPH_PATH, OUTPUT_GRAPH_IMAGE_PATH) | |
notifications = RemoteChannel(()->Channel{Int}(32)) | |
# notifications = Channel{Int}(32) | |
for (i, (g_name, g)) in enumerate(graphs) | |
graphs_dict[i] = g_name | |
while !(length(graphs_being_run) < MAX_GRAPHS_RUN) | |
finished_graph_id = take!(notifications) | |
delete!(graphs_being_run, finished_graph_id) | |
delete!(graphs_tasks, i) | |
println("Dispatcher: graph finished - $finished_graph_id: $(graphs_dict[finished_graph_id])") | |
end | |
graphs_tasks[i] = schedule_graph_with_notify(g, notifications, g_name, i) | |
push!(graphs_being_run, i) | |
println("Dispatcher: scheduled graph $i: $g_name") | |
end | |
results = [] | |
for (g_name, g) in graphs | |
g_map = Dict{Int, Any}() | |
for vertex_id in Graphs.vertices(g) | |
future = get_prop(g, vertex_id, :res_data) | |
g_map[vertex_id] = fetch(future) | |
end | |
push!(results, (g_name, g_map)) | |
end | |
for (g_name, res) in results | |
for (id, value) in res | |
println("Graph: $g_name, Final result for vertex $id: $value") | |
end | |
end | |
for (_, task) in graphs_tasks | |
wait(task) | |
end | |
end |
Oh, sorry, I did know the exact problem source, just did not describe it much, as was already working on the fix. It started to look too cluttered and hard to read for me, so I added some wrappers etc, consequently, it took a bit more time than expected) Besides, it should be easier to track which tasks are being run and which were already done by now. Moreover, now graphs are parsed one at a time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd rather take the version before adding AbstractMetaTask
end the others.
Refactoring of scheduling graph pipeline should go to a different PR where we could focus solely on it
OK, I will try to revert these changes |
This reverts commit c038887.
I have a different proposal. Let's freeze this as is and I'll cherry pick the relevant commits to a new PR with a clean history and no conflicts |
Oh, sorry, I've seen this too late |
Now I can apply your suggestions on silencing the workers and a bug fix. But will wait for the response this time :) |
No problem. Please take a look at #11 and comment if anything is missing or I messed up something 😅 |
Looks like everything is fine there |
So, should we close this PR? |
BEGINRELEASENOTES
ENDRELEASENOTES