Skip to content

[DONT MERGE] Make stream creation faster #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

from __future__ import annotations

from cython import NULL
from libcpp cimport bool
from libc.stdint cimport intptr_t


import os
import warnings
import weakref
Expand All @@ -13,6 +18,7 @@
if TYPE_CHECKING:
import cuda.bindings
from cuda.core.experimental._device import Device

from cuda.core.experimental._context import Context
from cuda.core.experimental._event import Event, EventOptions
from cuda.core.experimental._utils.clear_error_support import assert_type
Expand All @@ -24,6 +30,10 @@
runtime,
)

from cuda.bindings cimport cydriver as cdriver
from cuda.bindings cimport cyruntime as cruntime
from cuda.core.experimental._utils cimport _error_utils


@dataclass
class StreamOptions:
Expand All @@ -43,7 +53,7 @@ class StreamOptions:
priority: Optional[int] = None


class Stream:
cdef class Stream:
"""Represent a queue of GPU operations that are executed in a specific order.

Applications use streams to control the order of execution for
Expand All @@ -61,55 +71,42 @@ class Stream:

"""

class _MembersNeededForFinalize:
__slots__ = ("handle", "owner", "builtin")

def __init__(self, stream_obj, handle, owner, builtin):
self.handle = handle
self.owner = owner
self.builtin = builtin
weakref.finalize(stream_obj, self.close)

def close(self):
if self.owner is None:
if self.handle and not self.builtin:
handle_return(driver.cuStreamDestroy(self.handle))
else:
self.owner = None
self.handle = None

def __new__(self, *args, **kwargs):
def __init__(self, *args, **kwargs):
raise RuntimeError(
"Stream objects cannot be instantiated directly. "
"Please use Device APIs (create_stream) or other Stream APIs (from_handle)."
)

__slots__ = ("__weakref__", "_mnff", "_nonblocking", "_priority", "_device_id", "_ctx_handle")
__slots__ = ("_handle", "_nonblocking", "_priority", "_device_id", "_ctx_handle")

cdef cdriver.CUstream _handle
cdef bool _nonblocking
cdef int _priority
cdef int _device_id
cdef object _ctx_handle # Python object


@classmethod
def _legacy_default(cls):
self = super().__new__(cls)
self._mnff = Stream._MembersNeededForFinalize(self, driver.CUstream(driver.CU_STREAM_LEGACY), None, True)
self._nonblocking = None # delayed
self._priority = None # delayed
self._device_id = None # delayed
cdef Stream self = cls.__new__(cls) # safe allocator
self._nonblocking = True # delayed
self._priority = 0 # delayed
self._device_id = -1 # delayed
self._ctx_handle = None # delayed
return self

@classmethod
def _per_thread_default(cls):
self = super().__new__(cls)
self._mnff = Stream._MembersNeededForFinalize(self, driver.CUstream(driver.CU_STREAM_PER_THREAD), None, True)
self._nonblocking = None # delayed
self._priority = None # delayed
self._device_id = None # delayed
cdef Stream self = cls.__new__(cls) # safe allocator
self._nonblocking = True # delayed
self._priority = 0 # delayed
self._device_id = -1 # delayed
self._ctx_handle = None # delayed
return self

@classmethod
def _init(cls, obj=None, *, options: Optional[StreamOptions] = None):
self = super().__new__(cls)
self._mnff = Stream._MembersNeededForFinalize(self, None, None, False)
cdef Stream self = cls.__new__(cls)

if obj is not None and options is not None:
raise ValueError("obj and options cannot be both specified")
Expand Down Expand Up @@ -142,46 +139,66 @@ def _init(cls, obj=None, *, options: Optional[StreamOptions] = None):
f"The first element of the sequence returned by obj.__cuda_stream__ must be 0, got {repr(info[0])}"
)

self._mnff.handle = driver.CUstream(info[1])
# TODO: check if obj is created under the current context/device
self._mnff.owner = obj
self._nonblocking = None # delayed
self._priority = None # delayed
self._device_id = None # delayed
self._nonblocking = True # delayed
self._priority = 0 # delayed
self._device_id = -1 # delayed
self._ctx_handle = None # delayed
return self

options = check_or_create_options(StreamOptions, options, "Stream options")
nonblocking = options.nonblocking
priority = options.priority
# options = check_or_create_options(StreamOptions, options, "Stream options")
cdef int high, low = 0
cdef cruntime.cudaError_t r_err = cruntime.cudaDeviceGetStreamPriorityRange(&high, &low)
_error_utils._check_runtime_error(r_err)
#high, low = result[1:]
#high, low = handle_return(runtime.cudaDeviceGetStreamPriorityRange())

cdef bool nonblocking = False
cdef int priority = high
if options is not None:
nonblocking = options.nonblocking
priority = options.priority if options.priority is not None else priority

cdef flags = cdriver.CUstream_flags.CU_STREAM_NON_BLOCKING if nonblocking else cdriver.CUstream_flags.CU_STREAM_DEFAULT

flags = driver.CUstream_flags.CU_STREAM_NON_BLOCKING if nonblocking else driver.CUstream_flags.CU_STREAM_DEFAULT

high, low = handle_return(runtime.cudaDeviceGetStreamPriorityRange())
if priority is not None:
if not (low <= priority <= high):
raise ValueError(f"{priority=} is out of range {[low, high]}")
else:
priority = high

self._mnff.handle = handle_return(driver.cuStreamCreateWithPriority(flags, priority))
self._mnff.owner = None
#cdef cdriver.CUstream handler = cdriver.CUstream()
cdef cdriver.CUresult c_err = cdriver.cuStreamCreateWithPriority(&self._handle, flags, priority)
_error_utils._check_driver_error(c_err)

#self._handle = handler

self._nonblocking = nonblocking
self._priority = priority
# don't defer this because we will have to pay a cost for context
# switch later
self._device_id = int(handle_return(driver.cuCtxGetDevice()))
cdef int device_id = -1
err = cdriver.cuCtxGetDevice(&device_id)
_error_utils._check_driver_error(err)
self._device_id = device_id
#self._device_id = int(handle_return(driver.cuCtxGetDevice()))
self._ctx_handle = None # delayed
return self

def __dealloc__(self):
if self._handle:
c_err = cdriver.cuStreamDestroy(self._handle)
_error_utils._check_driver_error(c_err)

def close(self):
"""Destroy the stream.

Destroy the stream if we own it. Borrowed foreign stream
object will instead have their references released.

"""
self._mnff.close()
if self.handle:
handle_return(driver.cuStreamDestroy(self.handle))
self._handle = NULL

def __cuda_stream__(self) -> Tuple[int, int]:
"""Return an instance of a __cuda_stream__ protocol."""
Expand All @@ -193,16 +210,16 @@ def handle(self) -> cuda.bindings.driver.CUstream:

.. caution::

This handle is a Python object. To get the memory address of the underlying C
handle, call ``int(Stream.handle)``.
This handle is a Python object. Representing the memory
address of the underlying C object.
"""
return self._mnff.handle
return <intptr_t>self._handle

@property
def is_nonblocking(self) -> bool:
"""Return True if this is a nonblocking stream, otherwise False."""
if self._nonblocking is None:
flag = handle_return(driver.cuStreamGetFlags(self._mnff.handle))
flag = handle_return(driver.cuStreamGetFlags(self.handle))
if flag == driver.CUstream_flags.CU_STREAM_NON_BLOCKING:
self._nonblocking = True
else:
Expand All @@ -213,13 +230,13 @@ def is_nonblocking(self) -> bool:
def priority(self) -> int:
"""Return the stream priority."""
if self._priority is None:
prio = handle_return(driver.cuStreamGetPriority(self._mnff.handle))
prio = handle_return(driver.cuStreamGetPriority(self.handle))
self._priority = prio
return self._priority

def sync(self):
"""Synchronize the stream."""
handle_return(driver.cuStreamSynchronize(self._mnff.handle))
handle_return(driver.cuStreamSynchronize(self.handle))

def record(self, event: Event = None, options: EventOptions = None) -> Event:
"""Record an event onto the stream.
Expand All @@ -246,7 +263,7 @@ def record(self, event: Event = None, options: EventOptions = None) -> Event:
if event is None:
event = Event._init(self._device_id, self._ctx_handle, options)
assert_type(event, Event)
handle_return(driver.cuEventRecord(event.handle, self._mnff.handle))
handle_return(driver.cuEventRecord(event.handle, self.handle))
return event

def wait(self, event_or_stream: Union[Event, Stream]):
Expand Down Expand Up @@ -278,7 +295,7 @@ def wait(self, event_or_stream: Union[Event, Stream]):
discard_event = True

# TODO: support flags other than 0?
handle_return(driver.cuStreamWaitEvent(self._mnff.handle, event, 0))
handle_return(driver.cuStreamWaitEvent(self.handle, event, 0))
if discard_event:
handle_return(driver.cuEventDestroy(event))

Expand All @@ -298,15 +315,15 @@ def device(self) -> Device:
if self._device_id is None:
# Get the stream context first
if self._ctx_handle is None:
self._ctx_handle = handle_return(driver.cuStreamGetCtx(self._mnff.handle))
self._ctx_handle = handle_return(driver.cuStreamGetCtx(self.handle))
self._device_id = get_device_from_ctx(self._ctx_handle)
return Device(self._device_id)

@property
def context(self) -> Context:
"""Return the :obj:`~_context.Context` associated with this stream."""
if self._ctx_handle is None:
self._ctx_handle = handle_return(driver.cuStreamGetCtx(self._mnff.handle))
self._ctx_handle = handle_return(driver.cuStreamGetCtx(self.handle))
if self._device_id is None:
self._device_id = get_device_from_ctx(self._ctx_handle)
return Context._from_ctx(self._ctx_handle, self._device_id)
Expand Down
5 changes: 5 additions & 0 deletions cuda_core/cuda/core/experimental/_utils/_error_utils.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from cuda.bindings cimport cydriver as driver
from cuda.bindings cimport cyruntime as runtime

cpdef _check_driver_error(driver.CUresult result)
cpdef _check_runtime_error(runtime.cudaError_t error)
56 changes: 56 additions & 0 deletions cuda_core/cuda/core/experimental/_utils/_error_utils.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from cython cimport NULL
from libc.string cimport strlen

import cuda.bindings
from cuda.bindings import driver, runtime

from cuda.bindings cimport cydriver as cdriver
from cuda.bindings cimport cyruntime as cruntime

from cuda.core.experimental._utils.driver_cu_result_explanations import DRIVER_CU_RESULT_EXPLANATIONS
from cuda.core.experimental._utils.runtime_cuda_error_explanations import RUNTIME_CUDA_ERROR_EXPLANATIONS


class CUDAError(Exception):
pass



cpdef _check_driver_error(cdriver.CUresult error):
if error == cdriver.CUresult.CUDA_SUCCESS:
return
cdef const char *c_name = NULL
name_err = driver.cuGetErrorName(error, &c_name)
if name_err != driver.CUresult.CUDA_SUCCESS:
raise CUDAError(f"UNEXPECTED ERROR CODE: {error}")
name: str = (<char *>c_name)[:strlen(c_name)].decode()
#name = name.decode()
expl = DRIVER_CU_RESULT_EXPLANATIONS.get(int(error))
if expl is not None:
raise CUDAError(f"{name}: {expl}")
cdef const char *c_desc = NULL
desc_err = driver.cuGetErrorString(error, &c_desc)
if desc_err != driver.CUresult.CUDA_SUCCESS:
raise CUDAError(f"{name}")
desc: str = (<char *>c_desc)[:strlen(c_desc)].decode()
#desc = desc.decode()
raise CUDAError(f"{name}: {desc}")


cpdef _check_runtime_error(cruntime.cudaError_t error):
if error == cruntime.cudaError_t.cudaSuccess:
return
name_err, name = runtime.cudaGetErrorName(error)
if name_err != runtime.cudaError_t.cudaSuccess:
raise CUDAError(f"UNEXPECTED ERROR CODE: {error}")
name = name.decode()
expl = RUNTIME_CUDA_ERROR_EXPLANATIONS.get(int(error))
if expl is not None:
raise CUDAError(f"{name}: {expl}")
desc_err, desc = runtime.cudaGetErrorString(error)
if desc_err != runtime.cudaError_t.cudaSuccess:
raise CUDAError(f"{name}")
desc = desc.decode()
raise CUDAError(f"{name}: {desc}")


39 changes: 1 addition & 38 deletions cuda_core/cuda/core/experimental/_utils/cuda_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
from cuda import cudart as runtime
from cuda import nvrtc

from cuda.core.experimental._utils._error_utils import CUDAError, _check_driver_error, _check_runtime_error
from cuda.core.experimental._utils.driver_cu_result_explanations import DRIVER_CU_RESULT_EXPLANATIONS
from cuda.core.experimental._utils.runtime_cuda_error_explanations import RUNTIME_CUDA_ERROR_EXPLANATIONS


class CUDAError(Exception):
pass


class NVRTCError(CUDAError):
pass

Expand All @@ -48,40 +45,6 @@ def cast_to_3_tuple(label, cfg):
return cfg + (1,) * (3 - len(cfg))


def _check_driver_error(error):
if error == driver.CUresult.CUDA_SUCCESS:
return
name_err, name = driver.cuGetErrorName(error)
if name_err != driver.CUresult.CUDA_SUCCESS:
raise CUDAError(f"UNEXPECTED ERROR CODE: {error}")
name = name.decode()
expl = DRIVER_CU_RESULT_EXPLANATIONS.get(int(error))
if expl is not None:
raise CUDAError(f"{name}: {expl}")
desc_err, desc = driver.cuGetErrorString(error)
if desc_err != driver.CUresult.CUDA_SUCCESS:
raise CUDAError(f"{name}")
desc = desc.decode()
raise CUDAError(f"{name}: {desc}")


def _check_runtime_error(error):
if error == runtime.cudaError_t.cudaSuccess:
return
name_err, name = runtime.cudaGetErrorName(error)
if name_err != runtime.cudaError_t.cudaSuccess:
raise CUDAError(f"UNEXPECTED ERROR CODE: {error}")
name = name.decode()
expl = RUNTIME_CUDA_ERROR_EXPLANATIONS.get(int(error))
if expl is not None:
raise CUDAError(f"{name}: {expl}")
desc_err, desc = runtime.cudaGetErrorString(error)
if desc_err != runtime.cudaError_t.cudaSuccess:
raise CUDAError(f"{name}")
desc = desc.decode()
raise CUDAError(f"{name}: {desc}")


def _check_error(error, handle=None):
if isinstance(error, driver.CUresult):
_check_driver_error(error)
Expand Down
Loading