Source code for tinygrad.runtime.ops_disk

import os, mmap

try:
    import _posixshmem
except Exception:
    pass
from typing import Callable, Dict, Tuple
from tinygrad.helpers import prod, DType, OSX, dtypes
from tinygrad.device import Interpreted, Allocator
from tinygrad.ops import Op, MovementOps, UnaryOps
from tinygrad.shape.view import strides_for_shape


[docs] class UnderlyingDiskBuffer: """ This class represents an underlying disk buffer. It is initialized with a file descriptor (fd) and memory (mem). Attributes: fd (file descriptor): The file descriptor of the disk buffer. mem (memory): The in-memory data associated with the disk buffer. """ def __init__(self, fd, mem): """ Constructs all the necessary attributes for the underlying disk buffer. Parameters: fd (file descriptor): The file descriptor of the disk buffer. mem (memory): The in-memory data associated with the disk buffer. """ self.fd, self.mem = fd, mem def __del__(self): """ Closes the file descriptor if it exists when the object is destroyed. """ if self.fd: self.fd.close()
[docs] class DiskBuffer: """ Class for handling disk buffer operations. Attributes: ud (UnderlyingDiskBuffer): Underlying disk buffer. size (int): Size of the buffer. dtype (DType): Data type of the buffer elements. Default is dtypes.uint8. offset (int): Offset in the buffer. Default is 0. """ def __init__( self, ud: UnderlyingDiskBuffer, size: int, dtype: DType = dtypes.uint8, offset=0 ): self.ud, self.size, self.dtype, self.offset = ud, size, dtype, offset def __repr__(self): """ Return a string representation of the DiskBuffer object. Returns: str: String representation of the DiskBuffer object containing its size, data type and offset. """ return f"<DiskBuffer size={self.size} dtype={self.dtype} offset={self.offset}>"
[docs] def cast(self, arg: Tuple[DType, bool]): """ Cast the DiskBuffer to a new data type. Args: arg (Tuple[DType, bool]): A tuple containing the new data type and a boolean value. Returns: DiskBuffer: A new DiskBuffer with the casted data type. """ return DiskBuffer(self.ud, self.size, arg[0], offset=self.offset)
[docs] def as_strided(self, arg): """ Create a view of the original buffer with new dimensions and strides. Args: arg: A tuple containing the new shape, calculated strides and an offset. Returns: DiskBuffer: A new DiskBuffer with the reshaped data. """ assert strides_for_shape(arg[0]) == arg[1], "disk tensors don't support strides" return DiskBuffer( self.ud, prod(arg[0]), self.dtype, offset=self.offset + arg[2] * self.dtype.itemsize, )
def _buf(self) -> memoryview: """ Return a memory view of the buffer. Returns: memoryview: A memory view of the underlying buffer. """ return memoryview(self.ud.mem)[ self.offset : self.offset + self.size * self.dtype.itemsize ]
disk_fxn_for_op: Dict[Op, Callable] = { UnaryOps.CAST: DiskBuffer.cast, MovementOps.AS_STRIDED: DiskBuffer.as_strided, } MAP_LOCKED, MAP_POPULATE = 0x2000, 0x008000
[docs] class DiskAllocator(Allocator): """ DiskAllocator class for allocating disk space. Attributes: device (str): The device to be used for allocation. """ def __init__(self, device): """ Initializes the DiskAllocator object. Parameters: device (str): The device to use for disk allocation. """ self.device = device def _alloc(self, size): """ Allocates memory from the device. Parameters: size (int): Size of the memory to allocate in bytes. Returns: DiskBuffer: A DiskBuffer object representing the allocated memory. """ if str(self.device).startswith("shm:"): if OSX: with open(f"/tmp/shm_{self.device[4:]}", "w+b") as f: f.truncate(size) shm = mmap.mmap(f.fileno(), size, flags=mmap.MAP_SHARED) else: fd = _posixshmem.shm_open(self.device[4:], os.O_RDWR, 0o600) # TODO: these flags are somewhat platform specific, but python doesn't expose the ones we need shm = mmap.mmap( fd, size, flags=mmap.MAP_SHARED | MAP_LOCKED | MAP_POPULATE ) shm.madvise(mmap.MADV_HUGEPAGE) # type: ignore # not on OSX os.close(fd) buf = UnderlyingDiskBuffer(None, shm) else: f = open(self.device, "a+b") if os.path.getsize(self.device) < size: os.ftruncate(f.fileno(), size) buf = UnderlyingDiskBuffer(f, mmap.mmap(f.fileno(), size)) return DiskBuffer(buf, size)
[docs] def as_buffer(self, src: DiskBuffer): """ Returns the underlying buffer of a DiskBuffer object. Parameters: src (DiskBuffer): The DiskBuffer object to get the buffer from. Returns: UnderlyingDiskBuffer: The underlying buffer of the DiskBuffer object. """ return src._buf()
[docs] def copyin(self, dest: DiskBuffer, src: memoryview): """ Copies data from a memoryview object to a DiskBuffer object. Parameters: dest (DiskBuffer): The destination DiskBuffer object. src (memoryview): The source memoryview object. """ dest._buf()[:] = src
[docs] def copyout(self, dest: memoryview, src: DiskBuffer): """ Copies data from a DiskBuffer object to a memoryview object. Parameters: dest (memoryview): The destination memoryview object. src (DiskBuffer): The source DiskBuffer object. """ if src.ud.fd is not None: src.ud.fd.seek(src.offset) src.ud.fd.readinto(dest) else: dest[:] = src._buf()
[docs] class DiskDevice(Interpreted): """ Initialize a new DiskDevice object. Parameters: device (str): The device identifier for the disk to be allocated. Attributes: super (Interpreted): Inherit from the Interpreted class. DiskAllocator (DiskAllocator): Initialize a new DiskAllocator object with the given device. disk_fxn_for_op (function): The function to be used for performing operations on the disk. """ def __init__(self, device): super().__init__(DiskAllocator(device[5:]), disk_fxn_for_op)