Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Dagger.finish_stream() propagate downstream #579

Open
wants to merge 56 commits into
base: jps/stream2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
90a974f
Add metadata to EagerThunk
jpsamaroo Dec 1, 2023
cbac605
Sch: Allow occupancy key to be Any
jpsamaroo Dec 1, 2023
e441bd0
Add streaming API
jpsamaroo Sep 12, 2023
17096fa
Reference Dagger.EAGER_THUNK_STREAMS explicitly
JamesWrigley Mar 13, 2024
563f646
Use Base.promote_op() instead of Base._return_type()
JamesWrigley Mar 14, 2024
2f29be7
Special-case StreamingFunction in EagerThunkMetadata() constructor
JamesWrigley Mar 17, 2024
d25d6c1
Fix reference to task-queues.md in the docs
JamesWrigley Apr 1, 2024
aa07cc9
Delete Dagger.cleanup()
JamesWrigley Apr 1, 2024
f8d0b8b
streaming: Show thunk ID in logs
jpsamaroo Apr 10, 2024
1be1a41
streaming: Add tests
jpsamaroo Apr 10, 2024
f58404a
Use procs() when initializing EAGER_CONTEXT
JamesWrigley Apr 9, 2024
ee11f3f
streaming: Fix concurrency issues
jpsamaroo May 2, 2024
0d70835
Add a --verbose option to runtests.jl
JamesWrigley May 24, 2024
27392f0
Ensure that stream_fetch_values!() yields in its loop
JamesWrigley May 16, 2024
717ceb7
Add support for limiting the evaluations of a streaming DAG
JamesWrigley May 24, 2024
a0c0805
Dev the migration-helper branch of MemPool.jl
JamesWrigley May 24, 2024
f4d709c
Minor style cleanup
JamesWrigley Jun 25, 2024
a7bdfdb
Use `DTaskFailedException` and increase the default timeout
JamesWrigley Aug 18, 2024
770a241
Initial support for robustly migrating streaming tasks
JamesWrigley Aug 18, 2024
0b968d6
Inherit the top-level testsets in the streaming tests
JamesWrigley Aug 20, 2024
0268b7e
Replace `rand_finite()` with a deterministic `Producer` functor
JamesWrigley Aug 20, 2024
0dbdab3
fixup! Initial support for robustly migrating streaming tasks
JamesWrigley Sep 6, 2024
1cf99b8
fixup! fixup! Initial support for robustly migrating streaming tasks
jpsamaroo Sep 13, 2024
71ee854
task-tls: Refactor into DTaskTLS struct
jpsamaroo May 22, 2024
79ee021
fixup! task-tls: Refactor into DTaskTLS struct
jpsamaroo Sep 13, 2024
09e5826
cancellation: Add cancel token support
jpsamaroo Sep 13, 2024
3911a73
streaming: Handle cancellation
jpsamaroo Sep 13, 2024
f71f604
fixup! cancellation: Add cancel token support
jpsamaroo Sep 13, 2024
b930a42
fixup! fixup! fixup! Initial support for robustly migrating streaming…
jpsamaroo Sep 13, 2024
16d73c9
Sch: Add unwrap_nested_exception for DTaskFailedException
jpsamaroo Sep 14, 2024
2b2da8e
ProcessRingBuffer: Add length method
jpsamaroo Sep 14, 2024
5be724f
fixup! fixup! cancellation: Add cancel token support
jpsamaroo Sep 14, 2024
d545637
streaming: Buffers and tasks per input/output
jpsamaroo Sep 14, 2024
61ab9c1
fixup! fixup! fixup! cancellation: Add cancel token support
jpsamaroo Sep 24, 2024
a51cbf9
Sch: Trigger cancel token on task exit
jpsamaroo Sep 24, 2024
31944af
Add task_id for DTask
jpsamaroo Sep 24, 2024
d5c27ab
ProcessRingBuffer: Allow closure
jpsamaroo Sep 24, 2024
fbae73f
RemoteFetcher: Only collect values up to free buffer space
jpsamaroo Sep 24, 2024
bf53117
streaming: Close buffers on closing StreamStore
jpsamaroo Sep 24, 2024
b9e3c70
task-tls: Tweaks and fixes, task_id helper
jpsamaroo Sep 24, 2024
8908478
task-tls: Add task_cancel!
jpsamaroo Sep 24, 2024
1f21693
streaming: max_evals cannot be specified as 0
jpsamaroo Sep 24, 2024
c4bc7b2
streaming: Small tweaks to migration and cancellation
jpsamaroo Sep 24, 2024
51e1606
dagdebug: Always yield to avoid heisenbugs
jpsamaroo Sep 24, 2024
4ea09c4
tests: Revamp streaming tests
jpsamaroo Sep 24, 2024
8bf5fbf
tests: Add offline mode
jpsamaroo Sep 24, 2024
07ba8b1
dagdebug: Add JULIA_DAGGER_DEBUG config variable
jpsamaroo Oct 2, 2024
3aba122
cancellation: Add graceful vs. forced
jpsamaroo Oct 3, 2024
6ac140c
cancellation: Wrap InterruptException in DTaskFailedException
jpsamaroo Oct 3, 2024
f60cb77
options: Add internal helper to strip all options
jpsamaroo Oct 3, 2024
b3b70e1
streaming: Get tests passing
jpsamaroo Oct 3, 2024
efc80be
Bump MemPool compat
JamesWrigley Nov 16, 2024
e6a504d
Fully lock StreamStore in close(::StreamStore)
JamesWrigley Nov 16, 2024
149adb5
Streaming tests cleanup and fixes
JamesWrigley Nov 16, 2024
079e9fa
Make Dagger.finish_stream() propagate downstream
JamesWrigley Nov 16, 2024
999bdd7
Fix @test_throws_unwrap tests
JamesWrigley Nov 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
os: linux
arch: x86_64
command: "julia --project -e 'using Pkg; Pkg.develop(;path=\"lib/TimespanLogging\")'"

.bench: &bench
if: build.message =~ /\[run benchmarks\]/
agents:
Expand All @@ -14,6 +15,7 @@
os: linux
arch: x86_64
num_cpus: 16

steps:
- label: Julia 1.9
timeout_in_minutes: 90
Expand All @@ -25,6 +27,7 @@ steps:
julia_args: "--threads=1"
- JuliaCI/julia-coverage#v1:
codecov: true

- label: Julia 1.10
timeout_in_minutes: 90
<<: *test
Expand All @@ -35,6 +38,7 @@ steps:
julia_args: "--threads=1"
- JuliaCI/julia-coverage#v1:
codecov: true

- label: Julia nightly
timeout_in_minutes: 90
<<: *test
Expand Down Expand Up @@ -77,6 +81,7 @@ steps:
- JuliaCI/julia-coverage#v1:
codecov: true
command: "julia -e 'using Pkg; Pkg.develop(;path=pwd()); Pkg.develop(;path=\"lib/TimespanLogging\"); Pkg.develop(;path=\"lib/DaggerWebDash\"); include(\"lib/DaggerWebDash/test/runtests.jl\")'"

- label: Benchmarks
timeout_in_minutes: 120
<<: *bench
Expand All @@ -93,6 +98,7 @@ steps:
BENCHMARK_SCALE: "5:5:50"
artifacts:
- benchmarks/result*

- label: DTables.jl stability test
timeout_in_minutes: 20
plugins:
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ GraphViz = "0.2"
Graphs = "1"
JSON3 = "1"
MacroTools = "0.5"
MemPool = "0.4.6"
MemPool = "0.4.10"
OnlineStats = "1"
Plots = "1"
PrecompileTools = "1.2"
Expand Down
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ makedocs(;
"Task Spawning" => "task-spawning.md",
"Data Management" => "data-management.md",
"Distributed Arrays" => "darray.md",
"Streaming Tasks" => "streaming.md",
"Scopes" => "scopes.md",
"Processors" => "processors.md",
"Task Queues" => "task-queues.md",
Expand Down
105 changes: 105 additions & 0 deletions docs/src/streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Streaming Tasks

Dagger tasks have a limited lifetime - they are created, execute, finish, and
are eventually destroyed when they're no longer needed. Thus, if one wants
to run the same kind of computations over and over, one might re-create a
similar set of tasks for each unit of data that needs processing.

This might be fine for computations which take a long time to run (thus
dwarfing the cost of task creation, which is quite small), or when working with
a limited set of data, but this approach is not great for doing lots of small
computations on a large (or endless) amount of data. For example, processing
image frames from a webcam, reacting to messages from a message bus, reading
samples from a software radio, etc. All of these tasks are better suited to a
"streaming" model of data processing, where data is simply piped into a
continuously-running task (or DAG of tasks) forever, or until the data runs
out.

Thankfully, if you have a problem which is best modeled as a streaming system
of tasks, Dagger has you covered! Building on its support for
[Task Queues](@ref), Dagger provides a means to convert an entire DAG of
tasks into a streaming DAG, where data flows into and out of each task
asynchronously, using the `spawn_streaming` function:

```julia
Dagger.spawn_streaming() do # enters a streaming region
vals = Dagger.@spawn rand()
print_vals = Dagger.@spawn println(vals)
end # exits the streaming region, and starts the DAG running
```

In the above example, `vals` is a Dagger task which has been transformed to run
in a streaming manner - instead of just calling `rand()` once and returning its
result, it will re-run `rand()` endlessly, continuously producing new random
values. In typical Dagger style, `print_vals` is a Dagger task which depends on
`vals`, but in streaming form - it will continuously `println` the random
values produced from `vals`. Both tasks will run forever, and will run
efficiently, only doing the work necessary to generate, transfer, and consume
values.

As the comments point out, `spawn_streaming` creates a streaming region, during
which `vals` and `print_vals` are created and configured. Both tasks are halted
until `spawn_streaming` returns, allowing large DAGs to be built all at once,
without any task losing a single value. If desired, streaming regions can be
connected, although some values might be lost while tasks are being connected:

```julia
vals = Dagger.spawn_streaming() do
Dagger.@spawn rand()
end

# Some values might be generated by `vals` but thrown away
# before `print_vals` is fully setup and connected to it

print_vals = Dagger.spawn_streaming() do
Dagger.@spawn println(vals)
end
```

More complicated streaming DAGs can be easily constructed, without doing
anything different. For example, we can generate multiple streams of random
numbers, write them all to their own files, and print the combined results:

```julia
Dagger.spawn_streaming() do
all_vals = [Dagger.spawn(rand) for i in 1:4]
all_vals_written = map(1:4) do i
Dagger.spawn(all_vals[i]) do val
open("results_$i.txt"; write=true, create=true, append=true) do io
println(io, repr(val))
end
return val
end
end
Dagger.spawn(all_vals_written...) do all_vals_written...
vals_sum = sum(all_vals_written)
println(vals_sum)
end
end
```

If you want to stop the streaming DAG and tear it all down, you can call
`Dagger.kill!(all_vals[1])` (or `Dagger.kill!(all_vals_written[2])`, etc., the
kill propagates throughout the DAG).

Alternatively, tasks can stop themselves from the inside with
`finish_streaming`, optionally returning a value that can be `fetch`'d. Let's
do this when our randomly-drawn number falls within some arbitrary range:

```julia
vals = Dagger.spawn_streaming() do
Dagger.spawn() do
x = rand()
if x < 0.001
# That's good enough, let's be done
return Dagger.finish_streaming("Finished!")
end
return x
end
end
fetch(vals)
```

In this example, the call to `fetch` will hang (while random numbers continue
to be drawn), until a drawn number is less than 0.001; at that point, `fetch`
will return with "Finished!", and the task `vals` will have terminated.
26 changes: 24 additions & 2 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ if !isdefined(Base, :ScopedValues)
else
import Base.ScopedValues: ScopedValue, with
end
import TaskLocalValues: TaskLocalValue

import TaskLocalValues: TaskLocalValue

if !isdefined(Base, :get_extension)
import Requires: @require
Expand All @@ -46,16 +49,16 @@ include("processor.jl")
include("threadproc.jl")
include("context.jl")
include("utils/processors.jl")
include("dtask.jl")
include("cancellation.jl")
include("task-tls.jl")
include("scopes.jl")
include("utils/scopes.jl")
include("dtask.jl")
include("queue.jl")
include("thunk.jl")
include("submission.jl")
include("chunks.jl")
include("memory-spaces.jl")
include("cancellation.jl")

# Task scheduling
include("compute.jl")
Expand All @@ -67,6 +70,11 @@ include("sch/Sch.jl"); using .Sch
# Data dependency task queue
include("datadeps.jl")

# Streaming
include("stream.jl")
include("stream-buffers.jl")
include("stream-transfer.jl")

# Array computations
include("array/darray.jl")
include("array/alloc.jl")
Expand Down Expand Up @@ -145,6 +153,20 @@ function __init__()
ThreadProc(myid(), tid)
end
end

# Set up @dagdebug categories, if specified
try
if haskey(ENV, "JULIA_DAGGER_DEBUG")
empty!(DAGDEBUG_CATEGORIES)
for category in split(ENV["JULIA_DAGGER_DEBUG"], ",")
if category != ""
push!(DAGDEBUG_CATEGORIES, Symbol(category))
end
end
end
catch err
@warn "Error parsing JULIA_DAGGER_DEBUG" exception=err
end
end

end # module
2 changes: 0 additions & 2 deletions src/array/indexing.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import TaskLocalValues: TaskLocalValue

### getindex

struct GetIndex{T,N} <: ArrayOp{T,N}
Expand Down
55 changes: 52 additions & 3 deletions src/cancellation.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,51 @@
# DTask-level cancellation

mutable struct CancelToken
@atomic cancelled::Bool
@atomic graceful::Bool
event::Base.Event
end
CancelToken() = CancelToken(false, false, Base.Event())
function cancel!(token::CancelToken; graceful::Bool=true)
if !graceful
@atomic token.graceful = false
end
@atomic token.cancelled = true
notify(token.event)
return
end
function is_cancelled(token::CancelToken; must_force::Bool=false)
if token.cancelled[]
if must_force && token.graceful[]
# If we're only responding to forced cancellation, ignore graceful cancellations
return false
end
return true
end
return false
end
Base.wait(token::CancelToken) = wait(token.event)
# TODO: Enable this for safety
#Serialization.serialize(io::AbstractSerializer, ::CancelToken) =
# throw(ConcurrencyViolationError("Cannot serialize a CancelToken"))

const DTASK_CANCEL_TOKEN = TaskLocalValue{Union{CancelToken,Nothing}}(()->nothing)

function clone_cancel_token_remote(orig_token::CancelToken, wid::Integer)
remote_token = remotecall_fetch(wid) do
return poolset(CancelToken())
end
errormonitor_tracked("remote cancel_token communicator", Threads.@spawn begin
wait(orig_token)
@dagdebug nothing :cancel "Cancelling remote token on worker $wid"
MemPool.access_ref(remote_token) do remote_token
cancel!(remote_token)
end
end)
end

# Global-level cancellation

"""
cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)

Expand Down Expand Up @@ -48,7 +96,7 @@ function _cancel!(state, tid, force, halt_sch)
for task in state.ready
tid !== nothing && task.id != tid && continue
@dagdebug tid :cancel "Cancelling ready task"
state.cache[task] = InterruptException()
state.cache[task] = DTaskFailedException(task, task, InterruptException())
state.errored[task] = true
Sch.set_failed!(state, task)
end
Expand All @@ -58,7 +106,7 @@ function _cancel!(state, tid, force, halt_sch)
for task in keys(state.waiting)
tid !== nothing && task.id != tid && continue
@dagdebug tid :cancel "Cancelling waiting task"
state.cache[task] = InterruptException()
state.cache[task] = DTaskFailedException(task, task, InterruptException())
state.errored[task] = true
Sch.set_failed!(state, task)
end
Expand All @@ -80,11 +128,11 @@ function _cancel!(state, tid, force, halt_sch)
Tf === typeof(Sch.eager_thunk) && continue
istaskdone(task) && continue
any_cancelled = true
@dagdebug tid :cancel "Cancelling running task ($Tf)"
if force
@dagdebug tid :cancel "Interrupting running task ($Tf)"
Threads.@spawn Base.throwto(task, InterruptException())
else
@dagdebug tid :cancel "Cancelling running task ($Tf)"
# Tell the processor to just drop this task
task_occupancy = task_spec[4]
time_util = task_spec[2]
Expand All @@ -93,6 +141,7 @@ function _cancel!(state, tid, force, halt_sch)
push!(istate.cancelled, tid)
to_proc = istate.proc
put!(istate.return_queue, (myid(), to_proc, tid, (InterruptException(), nothing)))
cancel!(istate.cancel_tokens[tid]; graceful=false)
end
end
end
Expand Down
6 changes: 0 additions & 6 deletions src/compute.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ end
Base.@deprecate gather(ctx, x) collect(ctx, x)
Base.@deprecate gather(x) collect(x)

cleanup() = cleanup(Context(global_context()))
function cleanup(ctx::Context)
Sch.cleanup(ctx)
nothing
end

function get_type(s::String)
local T
for t in split(s, ".")
Expand Down
14 changes: 13 additions & 1 deletion src/dtask.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ end
Options(;options...) = Options((;options...))
Options(options...) = Options((;options...))

"""
DTaskMetadata

Represents some useful metadata pertaining to a `DTask`:
- `return_type::Type` - The inferred return type of the task
"""
mutable struct DTaskMetadata
return_type::Type
end

"""
DTask

Expand All @@ -50,9 +60,11 @@ more details.
mutable struct DTask
uid::UInt
future::ThunkFuture
metadata::DTaskMetadata
finalizer_ref::DRef
thunk_ref::DRef
DTask(uid, future, finalizer_ref) = new(uid, future, finalizer_ref)

DTask(uid, future, metadata, finalizer_ref) = new(uid, future, metadata, finalizer_ref)
end

const EagerThunk = DTask
Expand Down
6 changes: 6 additions & 0 deletions src/options.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ function with_options(f, options::NamedTuple)
end
with_options(f; options...) = with_options(f, NamedTuple(options))

function _without_options(f)
with(options_context => NamedTuple()) do
f()
end
end

"""
get_options(key::Symbol, default) -> Any
get_options(key::Symbol) -> Any
Expand Down
Loading