from __future__ import annotations
import subprocess, hashlib, tempfile, ctypes, ctypes.util, functools
from pathlib import Path
from typing import Tuple, Optional
import gpuctypes.cuda as cuda
from tinygrad.helpers import (
DEBUG,
getenv,
diskcache,
from_mv,
init_c_var,
pretty_ptx,
cpu_time_execution,
compile_cuda_style,
encode_args_cuda_style,
time_execution_cuda_style,
)
from tinygrad.device import Compiled, LRUAllocator, MallocAllocator
from tinygrad.codegen.kernel import LinearizerOptions
from tinygrad.renderer.cstyle import CUDARenderer
"""
Module for handling CUDA/CPU operations.
Attributes:
CUDACPU (bool): Flag to check if using CUDA or CPU processing.
gpuocelot_lib (ctypes.CDLL): The GPU Ocelot library, only loaded if CUDACPU is True.
"""
CUDACPU = getenv("CUDACPU") == 1
if CUDACPU:
gpuocelot_lib = ctypes.CDLL(ctypes.util.find_library("gpuocelot"))
gpuocelot_lib.ptx_run.argtypes = [
ctypes.c_char_p,
ctypes.c_int,
ctypes.POINTER(ctypes.c_void_p),
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
]
"""
Override the cuLaunchKernel function from CUDA library with a custom lambda function.
Attributes:
cuda.cuLaunchKernel (function): The overridden CUDA kernel launch function.
"""
cuda.cuLaunchKernel = lambda src, gx, gy, gz, lx, ly, lz, shared, stream, unused_extra, args: gpuocelot_lib.ptx_run(
src,
len(args),
(ctypes.c_void_p * len(args))(*[ctypes.cast(x, ctypes.c_void_p) for x in args]),
lx,
ly,
lz,
gx,
gy,
gz,
shared,
)
[docs]
def check(status):
"""
A function to check the status of an operation and raise a RuntimeError if it is not 0.
This function checks the 'status' argument and raises a RuntimeError with a descriptive error message if the status is non-zero. The error message includes the CUDA Error code and its corresponding string representation, which are retrieved by calling an external ctypes function.
Attributes:
status (int): The status code to check. If it is not 0, a RuntimeError will be raised.
"""
if status != 0:
raise RuntimeError(
f"CUDA Error {status}, {ctypes.string_at(init_c_var(ctypes.POINTER(ctypes.c_char)(), lambda x: cuda.cuGetErrorString(status, ctypes.byref(x)))).decode()}"
)
[docs]
def cu_time_execution(cb, enable=False) -> Optional[float]:
"""
This function measures the execution time of a provided callback function.
Attributes:
cb (function): The callback function to measure the execution time for.
enable (bool): An optional boolean flag that enables or disables timing. Default is False.
Returns:
Optional[float]: If CUDACPU is not set, returns a float representing the elapsed time of the callback function's execution.
Otherwise, it returns None.
"""
return (
time_execution_cuda_style(
cb,
cuda.CUevent,
cuda.cuEventCreate,
cuda.cuEventRecord,
cuda.cuEventSynchronize,
cuda.cuEventDestroy_v2,
cuda.cuEventElapsedTime,
enable=enable,
)
if not CUDACPU
else cpu_time_execution(cb, enable=enable)
)
@diskcache
def compile_cuda(prg) -> bytes:
"""
Compile a CUDA program with specified parameters.
This function compiles a given CUDA program using the default architecture name, include paths, and various NVRTC functions for program compilation.
:param prg: The CUDA program to be compiled.
:type prg: bytes
:return: Compiled PTX (Parallel Thread Execution) code as a byte object.
:rtype: bytes
:Attributes:
* **CUDADevice.default_arch_name** (*str*) -- Default GPU architecture name.
* **/usr/local/cuda/include** (*str*) -- Default path to CUDA include files.
* **/usr/include** (*str*) -- Default system include path.
:Attributes:
* **cuda.nvrtcProgram** (*function*) -- NVRTC function for creating a program object.
* **cuda.nvrtcCreateProgram** (*function*) -- NVRTC function for creating a program object.
* **cuda.nvrtcCompileProgram** (*function*) -- NVRTC function for compiling the program.
* **cuda.nvrtcGetPTX** (*function*) -- NVRTC function for obtaining the compiled PTX code.
* **cuda.nvrtcGetPTXSize** (*function*) -- NVRTC function for getting the size of the compiled PTX code.
* **cuda.nvrtcGetProgramLog** (*function*) -- NVRTC function for obtaining the program log.
* **cuda.nvrtcGetProgramLogSize** (*function*) -- NVRTC function for getting the size of the program log.
* **check** (*function*) -- Function to check for any errors during the compilation process.
"""
return compile_cuda_style(
prg,
[
f"--gpu-architecture={CUDADevice.default_arch_name}",
"-I/usr/local/cuda/include",
"-I/usr/include",
],
cuda.nvrtcProgram,
cuda.nvrtcCreateProgram,
cuda.nvrtcCompileProgram,
cuda.nvrtcGetPTX,
cuda.nvrtcGetPTXSize,
cuda.nvrtcGetProgramLog,
cuda.nvrtcGetProgramLogSize,
check,
)
[docs]
class CUDAProgram:
"""
This class represents a CUDA program. It handles the loading, execution and deletion of CUDA programs.
Attributes:
device (CUDADevice): The CUDA device to which this program is associated.
name (str): The name of the CUDA program.
lib (bytes): The compiled program data in bytes.
prg (cuda.CUfunction or bytes): The CUDA function object or the raw bytecode, depending on `CUDACPU`.
"""
def __init__(self, device: CUDADevice, name: str, lib: bytes):
"""
Initializes a new instance of the CUDAProgram class.
Args:
device (CUDADevice): The CUDA device to which this program is associated.
name (str): The name of the CUDA program.
lib (bytes): The compiled program data in bytes.
Notes:
If `DEBUG` >= 5, pretty-prints the PTX code.
If `DEBUG` >= 6, attempts to generate and print SASS code.
"""
self.device, self.name, self.lib = device, name, lib
if DEBUG >= 5:
print(pretty_ptx(lib.decode("utf-8")))
if DEBUG >= 6:
try:
fn = (
Path(tempfile.gettempdir())
/ f"tinycuda_{hashlib.md5(lib).hexdigest()}"
).as_posix()
with open(fn + ".ptx", "wb") as f:
f.write(lib)
subprocess.run(
[
"ptxas",
f"-arch={CUDADevice.default_arch_name}",
"-o",
fn,
fn + ".ptx",
],
check=True,
)
print(subprocess.check_output(["nvdisasm", fn]).decode("utf-8"))
except Exception as e:
print("failed to generate SASS", str(e))
if not CUDACPU:
check(cuda.cuCtxSetCurrent(self.device.context))
self.module = init_c_var(
cuda.CUmodule(),
lambda x: check(cuda.cuModuleLoadData(ctypes.byref(x), lib)),
)
check(
cuda.cuModuleGetFunction(
ctypes.byref(prg := cuda.CUfunction()),
self.module,
name.encode("utf-8"),
)
)
self.prg = prg if not CUDACPU else lib
def __del__(self):
"""
Deletes this CUDA program. Unloads the module from the GPU memory.
"""
if not CUDACPU:
check(cuda.cuModuleUnload(self.module))
def __call__(
self,
*bufs,
global_size: Tuple[int, int, int],
local_size: Tuple[int, int, int],
vals: Tuple[int, ...] = (),
wait=False,
):
"""
Executes this CUDA program.
Args:
*bufs: Buffers to be used as arguments for the kernel function.
global_size (Tuple[int, int, int]): The global size of the execution grid.
local_size (Tuple[int, int, int]): The local size of the execution grid.
vals (Tuple[int, ...]): Additional values to be passed as arguments to the kernel function.
wait (bool): If `True`, waits for the execution to complete before returning.
Returns:
float: The time taken to execute the kernel function, in milliseconds.
"""
if not CUDACPU:
check(cuda.cuCtxSetCurrent(self.device.context))
c_kernel_input_config = (
encode_args_cuda_style(bufs, vals, cuda.CUdeviceptr_v2, (1, 2, 0))[0]
if not CUDACPU
else (bufs + vals)
)
return cu_time_execution(
lambda: check(
cuda.cuLaunchKernel(
self.prg,
*global_size,
*local_size,
0,
None,
None,
c_kernel_input_config,
)
),
enable=wait,
)
[docs]
class CUDAAllocator(LRUAllocator):
"""
CUDA Allocator Class.
This class is a subclass of LRUAllocator and provides functionality for allocating and
deallocating memory on the CUDA device. It also handles copying data from host to device and vice versa.
Attributes:
device (CUDADevice): The CUDA device object representing the GPU on which memory is allocated and manipulated.
"""
def __init__(self, device: CUDADevice):
"""
Initializes a new instance of the CUDAAllocator class.
Args:
device (CUDADevice): The CUDA device object representing the GPU on which memory is allocated and manipulated.
"""
self.device = device
super().__init__()
def _alloc(self, size):
"""
Allocates memory on the CUDA device.
Args:
size (int): The number of bytes to allocate.
Returns:
ctypes.c_void_p: A pointer to the allocated memory.
"""
check(cuda.cuCtxSetCurrent(self.device.context))
return init_c_var(
cuda.CUdeviceptr(),
lambda x: check(cuda.cuMemAlloc_v2(ctypes.byref(x), size)),
)
def _free(self, opaque):
"""
Frees memory on the CUDA device.
Args:
opaque (ctypes.c_void_p): A pointer to the memory to be freed.
"""
check(cuda.cuMemFree_v2(opaque))
[docs]
def copyin(self, dest, src: memoryview):
"""
Copies data from host (CPU) memory to device (GPU) memory.
Args:
dest (ctypes.c_void_p): A pointer to the destination memory on the GPU.
src (memoryview): A memoryview object representing the source memory on the CPU.
"""
check(cuda.cuCtxSetCurrent(self.device.context))
check(cuda.cuMemcpyHtoD_v2(dest, from_mv(src), len(src), None))
[docs]
def copyout(self, dest: memoryview, src):
"""
Copies data from device (GPU) memory to host (CPU) memory.
Args:
dest (memoryview): A memoryview object representing the destination memory on the CPU.
src (ctypes.c_void_p): A pointer to the source memory on the GPU.
"""
check(cuda.cuCtxSetCurrent(self.device.context))
check(cuda.cuMemcpyDtoH_v2(from_mv(dest), src, len(dest)))
[docs]
class CUDADevice(Compiled):
"""
This class represents a CUDA device for computation. It initializes the device, context, and allocator.
Attributes:
default_arch_name (str): The default architecture name for the device. Defaults to "sm_35".
Methods:
__init__(self, device: str): Initializes the CUDADevice object with a specific device.
synchronize(self): Synchronizes the computation on the device and waits for it to finish.
"""
default_arch_name = "sm_35"
def __init__(self, device: str):
"""
Initializes a CUDADevice object with a specific device.
Parameters:
device (str): The device to use for computation.
Returns:
None
"""
device_id = int(device.split(":")[1]) if ":" in device else 0
if not CUDACPU:
check(cuda.cuInit(0))
check(cuda.cuDeviceGet(ctypes.byref(device := cuda.CUdevice()), device_id))
check(
cuda.cuCtxCreate_v2(
ctypes.byref(context := cuda.CUcontext()), 0, device
)
)
self.context = context
check(
cuda.cuDeviceComputeCapability(
ctypes.byref(major := ctypes.c_int()),
ctypes.byref(minor := ctypes.c_int()),
device_id,
)
)
if device_id == 0:
CUDADevice.default_arch_name = f"sm_{major.value}{minor.value}"
from tinygrad.features.graph.cuda import CUDAGraph
super().__init__(
CUDAAllocator(self) if not CUDACPU else MallocAllocator,
LinearizerOptions(
supports_float4_alu=False,
global_max=[65535, 65535, 2147483647],
local_max=[64, 1024, 1024],
),
CUDARenderer,
compile_cuda,
functools.partial(CUDAProgram, self),
graph=CUDAGraph if not CUDACPU else None,
)
[docs]
def synchronize(self):
"""
Synchronizes the computation on the device and waits for it to finish. This ensures that all queued operations have completed before continuing.
Parameters:
None
Returns:
None
"""
return check(cuda.cuCtxSynchronize()) if not CUDACPU else None