Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DArray: MPI interface #405

Closed
wants to merge 57 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
ac95f8a
Add missing check to walk_data
jpsamaroo Sep 26, 2022
b94306f
Merge pull request #359 from JuliaParallel/jps/walk-data-topmost-bug
jpsamaroo Sep 26, 2022
142eae3
Update Project.toml
krynju Nov 5, 2022
39200fa
signature: Don't capture input arguments
jpsamaroo Nov 10, 2022
d270587
Merge pull request #363 from JuliaParallel/jps/signature-no-capture
jpsamaroo Nov 10, 2022
66bf970
chunks: Allow weak Chunk references in Thunk args
jpsamaroo Nov 10, 2022
2b71950
Merge pull request #364 from JuliaParallel/jps/weak-chunk
jpsamaroo Nov 10, 2022
6f606ce
DaggerWebDash: Add Mux 1.x to compat
jpsamaroo Nov 13, 2022
1d66df7
Merge pull request #367 from JuliaParallel/jps/webdash-mux-1.0
jpsamaroo Nov 13, 2022
188de76
Add DTables.jl CI run (#366)
krynju Nov 14, 2022
66d5147
Update Project.toml
krynju Dec 5, 2022
4e8b209
thunk: Improve ThunkFailedException printing
jpsamaroo Feb 3, 2023
2ebb51b
Sch: Fix task error-related assertion
jpsamaroo Feb 3, 2023
0107b31
Merge pull request #370 from JuliaParallel/jps/task-in-cache-bug
jpsamaroo Feb 3, 2023
42bd36f
checkpoint: Use at-invokelatest
jpsamaroo Feb 24, 2023
54629f9
Merge pull request #371 from JuliaParallel/jps/checkpoint-invokelatest
jpsamaroo Feb 24, 2023
f3986fc
at-spawn: Support broadcasting
jpsamaroo Feb 24, 2023
be403f3
Merge pull request #372 from JuliaParallel/jps/at-spawn-broadcast
jpsamaroo Feb 24, 2023
22378a1
DaggerWebDash: Updates for recent HTTP.jl
jpsamaroo Mar 2, 2023
345b70b
docs: Update scheduler viz docs
jpsamaroo Mar 2, 2023
675e6a4
Merge pull request #377 from JuliaParallel/jps/update-viz-docs
jpsamaroo Mar 3, 2023
8d6b234
Updating README.md (#383)
Madhupatel08 Mar 16, 2023
96aa4dc
Add scope for union of processor type
jpsamaroo Feb 25, 2023
0b108ee
Deprecate proclist and single in favor of scope
jpsamaroo Feb 26, 2023
43178ee
tests: Robustify mutation test
jpsamaroo Feb 26, 2023
5cd574c
scopes: Add show methods
jpsamaroo Mar 3, 2023
49eea98
scopes: Add Dagger.scope helper
jpsamaroo Mar 3, 2023
d264e16
scopes: Unique subscopes in UnionScope
jpsamaroo Mar 10, 2023
fa7f828
docs: Update scope docs
jpsamaroo Apr 5, 2023
85637c0
Merge pull request #374 from JuliaParallel/jps/processor-type-scope
jpsamaroo Apr 5, 2023
023a4cd
Changing the receive and yield function to accomodate new MPI impleme…
fda-tome Apr 18, 2023
284a374
ThreadProc: Mark task sticky
jpsamaroo Feb 25, 2023
44913e8
ThreadProc: Use at-invokelatest
jpsamaroo Mar 1, 2023
96b2c6b
thunk: Dont move (error,value) tuple
jpsamaroo Apr 18, 2023
bcf3f32
Sch: Generate guaranteed unique uid
jpsamaroo Apr 22, 2023
57cbaf0
Sch: Add at-dagdebug helper macro
jpsamaroo Apr 22, 2023
9dd7a89
Add worker-local task stealing and occupancy limiting
jpsamaroo Feb 25, 2023
14dc2b5
Sch: Use at-invokelatest for move
jpsamaroo Apr 28, 2023
4b83c4b
Merge pull request #373 from JuliaParallel/jps/task-balance
jpsamaroo Apr 28, 2023
b9ce129
Add keyword argument support
jpsamaroo May 15, 2023
1c8878d
CI: Add Julia 1.9
jpsamaroo May 18, 2023
5836c4d
Merge pull request #394 from JuliaParallel/jps/kwargs
jpsamaroo May 18, 2023
0ca1703
Included the array on mock hashing scheme and changed deprecated sing…
fda-tome May 25, 2023
3426eac
[Temporary] Changes to the nonblocking broadcast impletations and wra…
fda-tome Jun 9, 2023
6dc6975
Changes regarding allocation, mapping, reducing and indexing for the …
fda-tome Jun 9, 2023
86eb14c
Changes regarding allocation, mapping, reducing and indexing for the …
fda-tome Jun 9, 2023
dd1a595
Finished array implementation, having problems with the darray distri…
fda-tome Jun 17, 2023
1268410
Finished array implementation, having problems with the darray distri…
fda-tome Jun 17, 2023
4acb50c
DArray: Fix adj/transpose and matmul
jpsamaroo Jun 19, 2023
fffcc01
Tests passing
fda-tome Jun 19, 2023
08d2309
Apply suggestions from code review
fda-tome Jun 19, 2023
2a797b1
tests/logging: Remove reliance on DArray
jpsamaroo Jun 19, 2023
b763d58
Comments resolved
fda-tome Jun 19, 2023
afa38a1
Conflict resolution
fda-tome Jun 30, 2023
846ff35
Started MPI style implementation
fda-tome Jun 30, 2023
1ce3b7f
Rebasing MPI branch and DArray interface
fda-tome Jul 19, 2023
4877b6e
Merge branch 'jps/dagger-mpi' into fdat/dagger-mpi
fda-tome Jul 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,22 @@ steps:
julia_args: "--threads=1"
- JuliaCI/julia-coverage#v1:
codecov: true
- label: Julia 1.9
timeout_in_minutes: 60
<<: *test
plugins:
- JuliaCI/julia#v1:
version: "1.9"
- JuliaCI/julia-test#v1:
julia_args: "--threads=1"
- JuliaCI/julia-coverage#v1:
codecov: true
- label: Julia nightly
timeout_in_minutes: 60
<<: *test
plugins:
- JuliaCI/julia#v1:
version: "1.9-nightly"
version: "1.10-nightly"
- JuliaCI/julia-test#v1:
julia_args: "--threads=1"
- JuliaCI/julia-coverage#v1:
Expand Down Expand Up @@ -93,3 +103,16 @@ steps:
BENCHMARK_SCALE: "5:5:50"
artifacts:
- benchmarks/result*
- label: DTables.jl stability test
timeout_in_minutes: 20
plugins:
- JuliaCI/julia#v1:
version: "1.8"
env:
JULIA_NUM_THREADS: "4"
agents:
queue: "juliaecosystem"
sandbox.jl: "true"
os: linux
arch: x86_64
command: "git clone https://github.com/JuliaParallel/DTables.jl.git ; julia -t4 -e 'using Pkg; Pkg.activate(\"DTables.jl\"); Pkg.develop(;path=\".\"); Pkg.instantiate(); Pkg.test()'"
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
name = "Dagger"
uuid = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
version = "0.16.1"
version = "0.16.3"

[deps]
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
ContextVariablesX = "6add18c4-b38d-439d-96f6-d6bc489c04c5"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
Expand Down
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ At the core of Dagger.jl is a scheduler heavily inspired by [Dask](https://docs.

## Installation

You can install Dagger by typing
Dagger.jl can be installed using the Julia package manager. Enter the Pkg REPL mode by typing "]" in the Julia REPL and then run:

```julia
julia> ] add Dagger
pkg> add Dagger
```
Or, equivalently, via the Pkg API:
```julia
julia> import Pkg; Pkg.add("Dagger")
```

## Usage
Expand All @@ -37,6 +41,34 @@ b = Dagger.@spawn rand(a, 4)
c = Dagger.@spawn sum(b)
fetch(c) # some number!
```
## Contributing Guide
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg?style=flat-square)](http://makeapullrequest.com)
[![GitHub issues](https://img.shields.io/github/issues/JuliaParallel/Dagger.jl)](https://github.com/JuliaParallel/Dagger.jl/issues)
[![GitHub contributors](https://img.shields.io/github/contributors/JuliaParallel/Dagger.jl)](https://github.com/JuliaParallel/Dagger.jl/graphs/contributors)

Contributions are encouraged.

There are several ways to contribute to our project:

**Reporting Bugs**: If you find a bug, please open an issue and describe the problem. Make sure to include steps to reproduce the issue and any error messages you receive regarding that issue.

**Fixing Bugs**: If you'd like to fix a bug, please create a pull request with your changes. Make sure to include a description of the problem and how your changes will address it.

Additional examples and documentation improvements are also very welcome.

## Resources
List of recommended Dagger.jl resources:
- Docs [![][docs-master-img]][docs-master-url]
- Videos
- [Distributed Computing with Dagger.jl](https://youtu.be/capjmjVHfMU)
- [Easy, Featureful Parallelism with Dagger.jl](https://youtu.be/t3S8W6A4Ago)
- [Easier parallel Julia workflow with Dagger.jl](https://youtu.be/VrqzOsav61w)
- [Dagger.jl Development and Roadmap](https://youtu.be/G0Y62ysFbDk)

## Help and Discussion
For help and discussion, we suggest asking in the following places:

[Julia Discourse](https://discourse.julialang.org/c/domain/parallel/34) and on the [Julia Slack](https://julialang.org/slack/) in the `#distributed` channel.

## Acknowledgements

Expand Down
6 changes: 3 additions & 3 deletions docs/src/checkpointing.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ Let's see how we'd modify the above example to use checkpointing:

```julia
using Serialization

X = compute(randn(Blocks(128,128), 1024, 1024))
Y = [delayed(sum; options=Dagger.Sch.ThunkOptions(;
checkpoint=(thunk,result)->begin
Y = [delayed(sum; checkpoint=(thunk,result)->begin
open("checkpoint-$idx.bin", "w") do io
serialize(io, collect(result))
end
end, restore=(thunk)->begin
open("checkpoint-$idx.bin", "r") do io
Dagger.tochunk(deserialize(io))
end
end))(chunk) for (idx,chunk) in enumerate(X.chunks)]
end)(chunk) for (idx,chunk) in enumerate(X.chunks)]
inner(x...) = sqrt(sum(x))
Z = delayed(inner)(Y...)
z = collect(Z)
Expand Down
94 changes: 66 additions & 28 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,34 @@

## Usage

The main function for using Dagger is `spawn`:
The main entrypoint to Dagger is `@spawn`:

`Dagger.spawn(f, args...; options...)`
`Dagger.@spawn [option=value]... f(args...; kwargs...)`

or `@spawn` for the more convenient macro form:
or `spawn` if it's more convenient:

`Dagger.@spawn [option=value]... f(args...)`
`Dagger.spawn(f, Dagger.Options(options), args...; kwargs...)`

When called, it creates an `EagerThunk` (also known as a "thunk" or "task")
object representing a call to function `f` with the arguments `args`. If it is
called with other thunks as inputs, such as in `Dagger.@spawn f(Dagger.@spawn
g())`, then the function `f` gets passed the results of those input thunks. If
those thunks aren't yet finished executing, then the execution of `f` waits on
all of its input thunks to complete before executing.
object representing a call to function `f` with the arguments `args` and
keyword arguments `kwargs`. If it is called with other thunks as args/kwargs,
such as in `Dagger.@spawn f(Dagger.@spawn g())`, then the function `f` gets
passed the results of those input thunks, once they're available. If those
thunks aren't yet finished executing, then the execution of `f` waits on all of
its input thunks to complete before executing.

The key point is that, for each argument to a thunk, if the argument is an
`EagerThunk`, it'll be executed before this node and its result will be passed
into the function `f`. If the argument is *not* an `EagerThunk` (instead, some
other type of Julia object), it'll be passed as-is to the function `f`.

Thunks don't accept regular keyword arguments for the function `f`. Instead,
the `options` kwargs are passed to the scheduler to control its behavior:
The `Options` struct in the second argument position is optional; if provided,
it is passed to the scheduler to control its behavior. `Options` contains a
`NamedTuple` of option key-value pairs, which can be any of:
- Any field in `Dagger.Sch.ThunkOptions` (see [Scheduler and Thunk options](@ref))
- `meta::Bool` -- Pass the input `Chunk` objects themselves to `f` and not the value contained in them

There are also some extra kwargs that can be passed, although they're considered advanced options to be used only by developers or library authors:
There are also some extra optionss that can be passed, although they're considered advanced options to be used only by developers or library authors:
- `get_result::Bool` -- return the actual result to the scheduler instead of `Chunk` objects. Used when `f` explicitly constructs a Chunk or when return value is small (e.g. in case of reduce)
- `persist::Bool` -- the result of this Thunk should not be released after it becomes unused in the DAG
- `cache::Bool` -- cache the result of this Thunk such that if the thunk is evaluated again, one can just reuse the cached value. If it’s been removed from cache, recompute the value.
Expand Down Expand Up @@ -133,18 +135,18 @@ via `@par` or `delayed`. The above computation can be executed with the lazy
API by substituting `@spawn` with `@par` and `fetch` with `collect`:

```julia
p = @par add1(4)
q = @par add2(p)
r = @par add1(3)
s = @par combine(p, q, r)
p = Dagger.@par add1(4)
q = Dagger.@par add2(p)
r = Dagger.@par add1(3)
s = Dagger.@par combine(p, q, r)

@assert collect(s) == 16
```

or similarly, in block form:

```julia
s = @par begin
s = Dagger.@par begin
p = add1(4)
q = add2(p)
r = add1(3)
Expand All @@ -159,7 +161,7 @@ operation, you can call `compute` on the thunk. This will return a `Chunk`
object which references the result (see [Chunks](@ref) for more details):

```julia
x = @par 1+2
x = Dagger.@par 1+2
cx = compute(x)
cx::Chunk
@assert collect(cx) == 3
Expand Down Expand Up @@ -198,15 +200,17 @@ While Dagger generally "just works", sometimes one needs to exert some more
fine-grained control over how the scheduler allocates work. There are two
parallel mechanisms to achieve this: Scheduler options (from
`Dagger.Sch.SchedulerOptions`) and Thunk options (from
`Dagger.Sch.ThunkOptions`). These two options structs generally contain the
same options, with the difference being that Scheduler options operate
`Dagger.Sch.ThunkOptions`). These two options structs contain many shared
options, with the difference being that Scheduler options operate
globally across an entire DAG, and Thunk options operate on a thunk-by-thunk
basis. Scheduler options can be constructed and passed to `collect()` or
`compute()` as the keyword argument `options` for lazy API usage:
basis.

Scheduler options can be constructed and passed to `collect()` or `compute()`
as the keyword argument `options` for lazy API usage:

```julia
t = @par 1+2
opts = Dagger.Sch.ThunkOptions(;single=1) # Execute on worker 1
t = Dagger.@par 1+2
opts = Dagger.Sch.SchedulerOptions(;single=1) # Execute on worker 1

compute(t; options=opts)

Expand All @@ -219,12 +223,46 @@ Thunk options can be passed to `@spawn/spawn`, `@par`, and `delayed` similarly:
# Execute on worker 1

Dagger.@spawn single=1 1+2
Dagger.spawn(+, Dagger.Options(;single=1), 1, 2)

Dagger.spawn(+, 1, 2; single=1)

opts = Dagger.Sch.ThunkOptions(;single=1)
delayed(+)(1, 2; options=opts)
delayed(+; single=1)(1, 2)
```

### Core vs. Worker Schedulers

Dagger's scheduler is really two kinds of entities: the "core" scheduler, and
"worker" schedulers:

The core scheduler runs on worker 1, thread 1, and is the entrypoint to tasks
which have been submitted. The core scheduler manages all task dependencies,
notifies calls to `wait` and `fetch` of task completion, and generally performs
initial task placement. The core scheduler has cached information about each
worker and their processors, and uses that information (together with metrics
about previous tasks and other aspects of the Dagger runtime) to generate a
near-optimal just-in-time task schedule.

The worker schedulers each run as a set of tasks across all workers and all
processors, and handles data movement and task execution. Once the core
scheduler has scheduled and launched a task, it arrives at the worker scheduler
for handling. The worker scheduler will pass the task to a queue for the
assigned processor, where it will wait until the processor has a sufficient
amount of "occupancy" for the task. Once the processor is ready for the task,
it will first fetch all arguments to the task from other workers, and then it
will execute the task, package the result into a `Chunk`, and pass that back to
the core scheduler.

### Workload Balancing

In general, Dagger's core scheduler tries to balance workloads as much as
possible across all the available processors, but it can fail to do so
effectively when either the cached per-processor information is outdated, or
when the estimates about the task's behavior are inaccurate. To minimize the
impact of this potential workload imbalance, the worker schedulers' processors
will attempt to steal tasks from each other when they are under-occupied. Tasks
will only be stolen if their [scope](`Scopes`) matches the processor attempting
the steal, so tasks with wider scopes have better balancing potential.

### Scheduler/Thunk Options

[`Dagger.Sch.SchedulerOptions`](@ref)
[`Dagger.Sch.ThunkOptions`](@ref)
2 changes: 1 addition & 1 deletion docs/src/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ called. Let's construct one:

```julia
ctx = Context()
ml = TimspanLogging.MultiEventLog()
ml = TimespanLogging.MultiEventLog()

# Add the BytesAllocd consumer to the log as `:bytes`
ml[:bytes] = Dagger.Events.BytesAllocd()
Expand Down
33 changes: 2 additions & 31 deletions docs/src/processors.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,42 +76,13 @@ processor B. This mechanism uses Julia's Serialization library to serialize and
deserialize data, so data must be serializable for this mechanism to work
properly.

### Future: Hierarchy Generic Path Move

NOTE: This used to be the default move behavior, but was removed because it
wasn't considered helpful, and there were not any processor implementations
that made use of it.

Movement of data between any two processors is decomposable into a sequence of
"moves" between a child and its parent, termed a "generic path move". Movement
of data may also take "shortcuts" between nodes in the tree which are not
directly connected if enabled by libraries or the user, which may make use of
IPC mechanisms to transfer data more directly and efficiently (such as
Infiniband, GPU RDMA, NVLINK, etc.). All data is considered local to some
processor, and may only be operated on by another processor by first doing an
explicit move operation to that processor.

## Processor Selection

By default, Dagger uses the CPU to process work, typically single-threaded per
cluster node. However, Dagger allows access to a wider range of hardware and
software acceleration techniques, such as multithreading and GPUs. These more
advanced (but performant) accelerators are disabled by default, but can easily
be enabled by using Scheduler/Thunk options in the `proclist` field. If
`nothing`, all default processors will be used. If a vector of types, only the
processor types contained in `options.proclist` will be used to compute all or
a given thunk. If a function, it will be called for each processor (with the
processor as the argument) until it returns `true`.

```julia
opts = Dagger.Sch.ThunkOptions(;proclist=nothing) # default behavior
# OR
opts = Dagger.Sch.ThunkOptions(;proclist=[DaggerGPU.CuArrayProc]) # only execute on CuArrayProc
# OR
opts = Dagger.Sch.ThunkOptions(;proclist=(proc)->(proc isa Dagger.ThreadProc && proc.tid == 3)) # only run on ThreadProc with thread ID 3

t = Dagger.@par options=opts sum(X) # do sum(X) on the specified processor
```
be enabled by using scopes (see [Scopes](@ref) for details).

## Resource Control

Expand All @@ -137,7 +108,7 @@ sufficient resources become available by thunks completing execution.
The [DaggerGPU.jl](https://github.com/JuliaGPU/DaggerGPU.jl) package can be
imported to enable GPU acceleration for NVIDIA and AMD GPUs, when available.
The processors provided by that package are not enabled by default, but may be
enabled via `options.proclist` as usual.
enabled via custom scopes ([Scopes](@ref)).

### Future: Network Devices and Topology

Expand Down
2 changes: 1 addition & 1 deletion docs/src/propagation.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Option Propagation

Most options passed to Dagger are passed via `delayed` or `Dagger.@spawn`
Most options passed to Dagger are passed via `@spawn/spawn` or `delayed`
directly. This works well when an option only needs to be set for a single
thunk, but is cumbersome when the same option needs to be set on multiple
thunks, or set recursively on thunks spawned within other thunks. Thankfully,
Expand Down
Loading