Module rsyscall.memory.ram

The high-level interfaces to memory

Expand source code Browse git
"The high-level interfaces to memory"
from __future__ import annotations
from dneio import run_all
from rsyscall.handle import Task, Pointer, WrittenPointer
from rsyscall.memory.transport import MemoryTransport, MemoryGateway
from rsyscall.memory.allocation_interface import AllocationInterface
from rsyscall.memory.allocator import AllocatorInterface
from rsyscall.struct import FixedSize, T_fixed_size, HasSerializer, T_has_serializer, FixedSerializer, T_fixed_serializer, Serializer, PathLikeSerializer, T_pathlike, StrSerializer
from rsyscall.sys.mman import MemoryMapping
import functools
import os
import rsyscall.near.types as near
import rsyscall.far as far

import typing as t

__all__ = [
    "RAM",
    "perform_batch",
]

class BytesSerializer(Serializer[bytes]):
    def to_bytes(self, val: bytes) -> bytes:
        return val

    def from_bytes(self, data: bytes) -> bytes:
        return data

T = t.TypeVar('T')
class RAM:
    """Central user-friendly class for accessing memory.

    Future work: An option to allocate "const" pointers, which we
    could cache and reuse each time they're requested. This would be
    useful for small pieces of memory which are very frequently used.

    """
    def __init__(self, 
                 task: Task,
                 transport: MemoryTransport,
                 allocator: AllocatorInterface,
    ) -> None:
        self.task = task
        self.transport = transport
        self.allocator = allocator

    @t.overload
    async def malloc(self, cls: t.Type[T_fixed_size]) -> Pointer[T_fixed_size]: ...
    @t.overload
    async def malloc(self, cls: t.Type[T_fixed_serializer], size: int) -> Pointer[T_fixed_serializer]: ...
    @t.overload
    async def malloc(self, cls: t.Type[T_pathlike], size: int) -> Pointer[T_pathlike]: ...
    @t.overload
    async def malloc(self, cls: t.Type[str], size: int) -> Pointer[str]: ...
    @t.overload
    async def malloc(self, cls: t.Type[bytes], size: int) -> Pointer[bytes]: ...

    # have to type: ignore because of https://github.com/python/mypy/issues/9420
    async def malloc(self, cls: t.Union[  # type: ignore
            t.Type[T_fixed_size],
            t.Type[T_fixed_serializer],
            t.Type[T_pathlike],
            t.Type[str],
            t.Type[bytes],
    ], size: t.Optional[int]=None,
    ) -> t.Union[
        Pointer[T_fixed_size],
        Pointer[T_fixed_serializer],
        Pointer[T_pathlike],
        Pointer[str],
        Pointer[bytes],
    ]:
        "Allocate a typed space in memory, sized according to the size of the type or an explicit size argument"
        if size is None:
            if not issubclass(cls, FixedSize):
                raise Exception("non-FixedSize cls passed to malloc without specifying size to allocate", cls)
            ptr: Pointer = await self.malloc_serializer(cls.get_serializer(self.task), cls.sizeof(), cls)
            return ptr
        else:
            if issubclass(cls, FixedSize):
                raise Exception("Can't pass a FixedSize cls to malloc and also specify the size argument", cls, size)
            if issubclass(cls, FixedSerializer):
                ptr = await self.malloc_serializer(cls.get_serializer(self.task), size, cls)
                return ptr
            # special-case Path/str/bytes so that they don't have to get wrapped just for rsyscall
            elif issubclass(cls, os.PathLike):
                pathlike_cls = t.cast(t.Type[T_pathlike], cls)
                return await self.malloc_serializer(PathLikeSerializer(pathlike_cls), size, pathlike_cls)
            elif issubclass(cls, str):
                return await self.malloc_serializer(StrSerializer(), size, str)
            elif issubclass(cls, bytes):
                return await self.malloc_serializer(BytesSerializer(), size, bytes)
            else:
                raise Exception("don't know how to find serializer for", cls)


    @t.overload
    async def ptr(self, data: T_has_serializer) -> WrittenPointer[T_has_serializer]: ...
    @t.overload
    async def ptr(self, data: T_pathlike) -> WrittenPointer[T_pathlike]: ...
    @t.overload
    async def ptr(self, data: str) -> WrittenPointer[str]: ...
    @t.overload
    async def ptr(self, data: t.Union[bytes]) -> WrittenPointer[bytes]: ...
    async def ptr(self, data: t.Union[T_has_serializer, T_pathlike, str, bytes],
    ) -> t.Union[
        WrittenPointer[T_has_serializer],
        WrittenPointer[T_pathlike],
        WrittenPointer[str], WrittenPointer[bytes],
    ]:
        "Take some serializable data and return a pointer in memory containing it."
        if isinstance(data, HasSerializer):
            serializer = data.get_self_serializer(self.task)
            data_bytes = serializer.to_bytes(data)
            ptr: Pointer = await self.malloc_serializer(
                serializer, len(data_bytes), type(data))
            return await self._write_to_pointer(ptr, data, data_bytes)
        elif isinstance(data, os.PathLike):
            path_serializer = PathLikeSerializer(type(data))
            data_bytes = path_serializer.to_bytes(data)
            ptr = await self.malloc_serializer(path_serializer, len(data_bytes), type(data))
            return await self._write_to_pointer(ptr, data, data_bytes)
        elif isinstance(data, str):
            str_serializer = StrSerializer()
            data_bytes = str_serializer.to_bytes(data)
            ptr = await self.malloc_serializer(str_serializer, len(data_bytes), type(data))
            return await self._write_to_pointer(ptr, data, data_bytes)
        elif isinstance(data, bytes):
            ptr = await self.malloc(bytes, len(data))
            return await self._write_to_pointer(ptr, data, data)
        else:
            raise Exception("don't know how to serialize data passed to ptr", data)

    async def perform_batch(self, op: t.Callable[[RAM], t.Awaitable[T]],
                                  allocator: AllocatorInterface=None,
    ) -> T:
        """Batches together memory operations performed by a callable.
        
        See the free function of the same name for more documentation.

        """
        if allocator is None:
            allocator = self.allocator
        return await perform_batch(self.task, self.transport, allocator, op)

    async def malloc_serializer(
            self, serializer: Serializer[T], size: int, typ: t.Type[T],
    ) -> Pointer[T]:
        """Allocate a typed space in memory using an explicitly-specified Serializer.

        This is useful only in relatively niche situations.

        """
        mapping, allocation = await self.allocator.malloc(size, alignment=1)
        try:
            return Pointer(mapping, self.transport, serializer, allocation, typ)
        except:
            allocation.free()
            raise

    async def _write_to_pointer(self, ptr: Pointer[T], data: T, data_bytes: bytes) -> WrittenPointer[T]:
        try:
            return await ptr.write(data)
        except:
            ptr.free()
            raise

class NullAllocation(AllocationInterface):
    "An fake allocation for a null pointer."
    def __init__(self, n: int) -> None:
        self.n = n

    def offset(self) -> int:
        return 0

    def size(self) -> int:
        return self.n

    def split(self, size: int) -> t.Tuple[AllocationInterface, AllocationInterface]:
        return NullAllocation(self.size() - size), NullAllocation(size)

    def merge(self, other: AllocationInterface) -> AllocationInterface:
        raise Exception("can't merge")

    def free(self) -> None:
        pass

class LaterAllocator(AllocatorInterface):
    "An allocator which stores allocation requests and returns null pointers."
    def __init__(self) -> None:
        self.allocations: t.List[t.Tuple[int, int]] = []

    async def malloc(self, size: int, alignment: int) -> t.Tuple[MemoryMapping, AllocationInterface]:
        self.allocations.append((size, alignment))
        return (
            MemoryMapping(t.cast(Task, None), near.MemoryMapping(0, size, 4096), far.File()),
            NullAllocation(size),
        )

class NoopTransport(MemoryTransport):
    "A memory transport which doesn't do anything."
    async def read(self, src: Pointer) -> bytes:
        raise Exception("shouldn't try to read")
    async def write(self, dest: Pointer, data: bytes) -> None:
        pass
    def inherit(self, task: Task) -> NoopTransport:
        raise Exception("shouldn't try to inherit")

class PrefilledAllocator(AllocatorInterface):
    "An allocator which has been prefilled with allocations for an exact sequence of calls to malloc."
    def __init__(self, allocations: t.Sequence[t.Tuple[MemoryMapping, AllocationInterface]]) -> None:
        self.allocations = list(allocations)

    async def malloc(self, size: int, alignment: int) -> t.Tuple[MemoryMapping, AllocationInterface]:
        mapping, allocation = self.allocations.pop(0)
        if allocation.size() != size:
            raise Exception("batch operation seems to be non-deterministic, ",
                            "allocating different sizes/in different order on second run")
        return mapping, allocation

class BatchWriteSemantics(RAM):
    "A variant of RAM which stores writes to pointers so they can be performed later."
    def __init__(self, 
                 task: Task,
                 transport: MemoryTransport,
                 allocator: AllocatorInterface,
    ) -> None:
        self.task = task
        self.transport = transport
        self.allocator = allocator
        self.writes: t.List[t.Tuple[Pointer, bytes]] = []

    async def _write_to_pointer(self, ptr: Pointer[T], data: T, data_bytes: bytes) -> WrittenPointer[T]:
        wptr = ptr._wrote(data)
        self.writes.append((wptr, data_bytes))
        return wptr

async def perform_batch(
        task: Task,
        transport: MemoryTransport,
        allocator: AllocatorInterface,
        batch: t.Callable[[RAM], t.Awaitable[T]],
) -> T:
    """Batches together memory operations performed by a callable.

    To use, first define a callable which takes a RAM, and produces
    some result using only the methods of the passed-in RAM. There are
    several requirements on this callable, described below. Pass that
    callable to this function, and we'll magically batch its
    operations together and return what it returns.

    Requirements on the callable:
    - Do not have any side-effects other than calling methods on the
      passed-in RAM.
    - Do not branch based on the numeric values of pointers; indeed,
      better that you just don't branch at all.

    Concretely, we'll call the callable twice. Once with a RAM which
    no-ops all the allocations and writes, so that we can know the
    size of all the requested allocations. Then we'll batch-allocate
    all that memory, and call the callable again with a RAM which just
    returns those batch allocations, and no-ops any writes. After that
    second call completes, we batch-perform all the writes, and return
    the result of that second call from this function.
    
    This is useful, among other cases, when performing a lot of memory
    allocation and writes in anticipation of one or more syscalls
    which will read values from that memory. For example, we can batch
    together writing the argv and envp arguments to execve, along with
    all the pointers referenced by them. This is much more efficient.

    This technique is inspired by tagless final style. Unfortunately
    we can't really implement that in Python, so an interface is all
    we've got.

    We had a more explicit style before, where you explicitly listed
    the sizes of the allocations you wanted, but it was far less
    ergonomic and less robust. This style is nominally less efficient
    in CPU time, but it improves robustness by making it not possible
    to mess up in calculating the size you want to allocate.

    """
    later_allocator = LaterAllocator()
    await batch(RAM(task, NoopTransport(), later_allocator))
    allocations = await allocator.bulk_malloc(later_allocator.allocations)
    sem = BatchWriteSemantics(task, transport, PrefilledAllocator(allocations))
    ret = await batch(sem)

    await run_all([functools.partial(transport.write, dest, data)
                   for dest, data in sem.writes])
    return ret

Functions

async def perform_batch(task: Task, transport: MemoryTransport, allocator: AllocatorInterface, batch: t.Callable[[RAM], t.Awaitable[T]]) ‑> ~T

Batches together memory operations performed by a callable.

To use, first define a callable which takes a RAM, and produces some result using only the methods of the passed-in RAM. There are several requirements on this callable, described below. Pass that callable to this function, and we'll magically batch its operations together and return what it returns.

Requirements on the callable: - Do not have any side-effects other than calling methods on the passed-in RAM. - Do not branch based on the numeric values of pointers; indeed, better that you just don't branch at all.

Concretely, we'll call the callable twice. Once with a RAM which no-ops all the allocations and writes, so that we can know the size of all the requested allocations. Then we'll batch-allocate all that memory, and call the callable again with a RAM which just returns those batch allocations, and no-ops any writes. After that second call completes, we batch-perform all the writes, and return the result of that second call from this function.

This is useful, among other cases, when performing a lot of memory allocation and writes in anticipation of one or more syscalls which will read values from that memory. For example, we can batch together writing the argv and envp arguments to execve, along with all the pointers referenced by them. This is much more efficient.

This technique is inspired by tagless final style. Unfortunately we can't really implement that in Python, so an interface is all we've got.

We had a more explicit style before, where you explicitly listed the sizes of the allocations you wanted, but it was far less ergonomic and less robust. This style is nominally less efficient in CPU time, but it improves robustness by making it not possible to mess up in calculating the size you want to allocate.

Expand source code Browse git
async def perform_batch(
        task: Task,
        transport: MemoryTransport,
        allocator: AllocatorInterface,
        batch: t.Callable[[RAM], t.Awaitable[T]],
) -> T:
    """Batches together memory operations performed by a callable.

    To use, first define a callable which takes a RAM, and produces
    some result using only the methods of the passed-in RAM. There are
    several requirements on this callable, described below. Pass that
    callable to this function, and we'll magically batch its
    operations together and return what it returns.

    Requirements on the callable:
    - Do not have any side-effects other than calling methods on the
      passed-in RAM.
    - Do not branch based on the numeric values of pointers; indeed,
      better that you just don't branch at all.

    Concretely, we'll call the callable twice. Once with a RAM which
    no-ops all the allocations and writes, so that we can know the
    size of all the requested allocations. Then we'll batch-allocate
    all that memory, and call the callable again with a RAM which just
    returns those batch allocations, and no-ops any writes. After that
    second call completes, we batch-perform all the writes, and return
    the result of that second call from this function.
    
    This is useful, among other cases, when performing a lot of memory
    allocation and writes in anticipation of one or more syscalls
    which will read values from that memory. For example, we can batch
    together writing the argv and envp arguments to execve, along with
    all the pointers referenced by them. This is much more efficient.

    This technique is inspired by tagless final style. Unfortunately
    we can't really implement that in Python, so an interface is all
    we've got.

    We had a more explicit style before, where you explicitly listed
    the sizes of the allocations you wanted, but it was far less
    ergonomic and less robust. This style is nominally less efficient
    in CPU time, but it improves robustness by making it not possible
    to mess up in calculating the size you want to allocate.

    """
    later_allocator = LaterAllocator()
    await batch(RAM(task, NoopTransport(), later_allocator))
    allocations = await allocator.bulk_malloc(later_allocator.allocations)
    sem = BatchWriteSemantics(task, transport, PrefilledAllocator(allocations))
    ret = await batch(sem)

    await run_all([functools.partial(transport.write, dest, data)
                   for dest, data in sem.writes])
    return ret

Classes

class RAM (task: Task, transport: MemoryTransport, allocator: AllocatorInterface)

Central user-friendly class for accessing memory.

Future work: An option to allocate "const" pointers, which we could cache and reuse each time they're requested. This would be useful for small pieces of memory which are very frequently used.

Expand source code Browse git
class RAM:
    """Central user-friendly class for accessing memory.

    Future work: An option to allocate "const" pointers, which we
    could cache and reuse each time they're requested. This would be
    useful for small pieces of memory which are very frequently used.

    """
    def __init__(self, 
                 task: Task,
                 transport: MemoryTransport,
                 allocator: AllocatorInterface,
    ) -> None:
        self.task = task
        self.transport = transport
        self.allocator = allocator

    @t.overload
    async def malloc(self, cls: t.Type[T_fixed_size]) -> Pointer[T_fixed_size]: ...
    @t.overload
    async def malloc(self, cls: t.Type[T_fixed_serializer], size: int) -> Pointer[T_fixed_serializer]: ...
    @t.overload
    async def malloc(self, cls: t.Type[T_pathlike], size: int) -> Pointer[T_pathlike]: ...
    @t.overload
    async def malloc(self, cls: t.Type[str], size: int) -> Pointer[str]: ...
    @t.overload
    async def malloc(self, cls: t.Type[bytes], size: int) -> Pointer[bytes]: ...

    # have to type: ignore because of https://github.com/python/mypy/issues/9420
    async def malloc(self, cls: t.Union[  # type: ignore
            t.Type[T_fixed_size],
            t.Type[T_fixed_serializer],
            t.Type[T_pathlike],
            t.Type[str],
            t.Type[bytes],
    ], size: t.Optional[int]=None,
    ) -> t.Union[
        Pointer[T_fixed_size],
        Pointer[T_fixed_serializer],
        Pointer[T_pathlike],
        Pointer[str],
        Pointer[bytes],
    ]:
        "Allocate a typed space in memory, sized according to the size of the type or an explicit size argument"
        if size is None:
            if not issubclass(cls, FixedSize):
                raise Exception("non-FixedSize cls passed to malloc without specifying size to allocate", cls)
            ptr: Pointer = await self.malloc_serializer(cls.get_serializer(self.task), cls.sizeof(), cls)
            return ptr
        else:
            if issubclass(cls, FixedSize):
                raise Exception("Can't pass a FixedSize cls to malloc and also specify the size argument", cls, size)
            if issubclass(cls, FixedSerializer):
                ptr = await self.malloc_serializer(cls.get_serializer(self.task), size, cls)
                return ptr
            # special-case Path/str/bytes so that they don't have to get wrapped just for rsyscall
            elif issubclass(cls, os.PathLike):
                pathlike_cls = t.cast(t.Type[T_pathlike], cls)
                return await self.malloc_serializer(PathLikeSerializer(pathlike_cls), size, pathlike_cls)
            elif issubclass(cls, str):
                return await self.malloc_serializer(StrSerializer(), size, str)
            elif issubclass(cls, bytes):
                return await self.malloc_serializer(BytesSerializer(), size, bytes)
            else:
                raise Exception("don't know how to find serializer for", cls)


    @t.overload
    async def ptr(self, data: T_has_serializer) -> WrittenPointer[T_has_serializer]: ...
    @t.overload
    async def ptr(self, data: T_pathlike) -> WrittenPointer[T_pathlike]: ...
    @t.overload
    async def ptr(self, data: str) -> WrittenPointer[str]: ...
    @t.overload
    async def ptr(self, data: t.Union[bytes]) -> WrittenPointer[bytes]: ...
    async def ptr(self, data: t.Union[T_has_serializer, T_pathlike, str, bytes],
    ) -> t.Union[
        WrittenPointer[T_has_serializer],
        WrittenPointer[T_pathlike],
        WrittenPointer[str], WrittenPointer[bytes],
    ]:
        "Take some serializable data and return a pointer in memory containing it."
        if isinstance(data, HasSerializer):
            serializer = data.get_self_serializer(self.task)
            data_bytes = serializer.to_bytes(data)
            ptr: Pointer = await self.malloc_serializer(
                serializer, len(data_bytes), type(data))
            return await self._write_to_pointer(ptr, data, data_bytes)
        elif isinstance(data, os.PathLike):
            path_serializer = PathLikeSerializer(type(data))
            data_bytes = path_serializer.to_bytes(data)
            ptr = await self.malloc_serializer(path_serializer, len(data_bytes), type(data))
            return await self._write_to_pointer(ptr, data, data_bytes)
        elif isinstance(data, str):
            str_serializer = StrSerializer()
            data_bytes = str_serializer.to_bytes(data)
            ptr = await self.malloc_serializer(str_serializer, len(data_bytes), type(data))
            return await self._write_to_pointer(ptr, data, data_bytes)
        elif isinstance(data, bytes):
            ptr = await self.malloc(bytes, len(data))
            return await self._write_to_pointer(ptr, data, data)
        else:
            raise Exception("don't know how to serialize data passed to ptr", data)

    async def perform_batch(self, op: t.Callable[[RAM], t.Awaitable[T]],
                                  allocator: AllocatorInterface=None,
    ) -> T:
        """Batches together memory operations performed by a callable.
        
        See the free function of the same name for more documentation.

        """
        if allocator is None:
            allocator = self.allocator
        return await perform_batch(self.task, self.transport, allocator, op)

    async def malloc_serializer(
            self, serializer: Serializer[T], size: int, typ: t.Type[T],
    ) -> Pointer[T]:
        """Allocate a typed space in memory using an explicitly-specified Serializer.

        This is useful only in relatively niche situations.

        """
        mapping, allocation = await self.allocator.malloc(size, alignment=1)
        try:
            return Pointer(mapping, self.transport, serializer, allocation, typ)
        except:
            allocation.free()
            raise

    async def _write_to_pointer(self, ptr: Pointer[T], data: T, data_bytes: bytes) -> WrittenPointer[T]:
        try:
            return await ptr.write(data)
        except:
            ptr.free()
            raise

Subclasses

  • rsyscall.memory.ram.BatchWriteSemantics

Methods

async def malloc(self, cls: t.Union[t.Type[T_fixed_size], t.Type[T_fixed_serializer], t.Type[T_pathlike], t.Type[str], t.Type[bytes]], size: t.Optional[int] = None) ‑> Union[Pointer[~T_fixed_size], Pointer[~T_fixed_serializer], Pointer[~T_pathlike], Pointer[str], Pointer[bytes]]

Allocate a typed space in memory, sized according to the size of the type or an explicit size argument

Expand source code Browse git
async def malloc(self, cls: t.Union[  # type: ignore
        t.Type[T_fixed_size],
        t.Type[T_fixed_serializer],
        t.Type[T_pathlike],
        t.Type[str],
        t.Type[bytes],
], size: t.Optional[int]=None,
) -> t.Union[
    Pointer[T_fixed_size],
    Pointer[T_fixed_serializer],
    Pointer[T_pathlike],
    Pointer[str],
    Pointer[bytes],
]:
    "Allocate a typed space in memory, sized according to the size of the type or an explicit size argument"
    if size is None:
        if not issubclass(cls, FixedSize):
            raise Exception("non-FixedSize cls passed to malloc without specifying size to allocate", cls)
        ptr: Pointer = await self.malloc_serializer(cls.get_serializer(self.task), cls.sizeof(), cls)
        return ptr
    else:
        if issubclass(cls, FixedSize):
            raise Exception("Can't pass a FixedSize cls to malloc and also specify the size argument", cls, size)
        if issubclass(cls, FixedSerializer):
            ptr = await self.malloc_serializer(cls.get_serializer(self.task), size, cls)
            return ptr
        # special-case Path/str/bytes so that they don't have to get wrapped just for rsyscall
        elif issubclass(cls, os.PathLike):
            pathlike_cls = t.cast(t.Type[T_pathlike], cls)
            return await self.malloc_serializer(PathLikeSerializer(pathlike_cls), size, pathlike_cls)
        elif issubclass(cls, str):
            return await self.malloc_serializer(StrSerializer(), size, str)
        elif issubclass(cls, bytes):
            return await self.malloc_serializer(BytesSerializer(), size, bytes)
        else:
            raise Exception("don't know how to find serializer for", cls)
async def ptr(self, data: t.Union[T_has_serializer, T_pathlike, str, bytes]) ‑> Union[WrittenPointer[~T_has_serializer], WrittenPointer[~T_pathlike], WrittenPointer[str], WrittenPointer[bytes]]

Take some serializable data and return a pointer in memory containing it.

Expand source code Browse git
async def ptr(self, data: t.Union[T_has_serializer, T_pathlike, str, bytes],
) -> t.Union[
    WrittenPointer[T_has_serializer],
    WrittenPointer[T_pathlike],
    WrittenPointer[str], WrittenPointer[bytes],
]:
    "Take some serializable data and return a pointer in memory containing it."
    if isinstance(data, HasSerializer):
        serializer = data.get_self_serializer(self.task)
        data_bytes = serializer.to_bytes(data)
        ptr: Pointer = await self.malloc_serializer(
            serializer, len(data_bytes), type(data))
        return await self._write_to_pointer(ptr, data, data_bytes)
    elif isinstance(data, os.PathLike):
        path_serializer = PathLikeSerializer(type(data))
        data_bytes = path_serializer.to_bytes(data)
        ptr = await self.malloc_serializer(path_serializer, len(data_bytes), type(data))
        return await self._write_to_pointer(ptr, data, data_bytes)
    elif isinstance(data, str):
        str_serializer = StrSerializer()
        data_bytes = str_serializer.to_bytes(data)
        ptr = await self.malloc_serializer(str_serializer, len(data_bytes), type(data))
        return await self._write_to_pointer(ptr, data, data_bytes)
    elif isinstance(data, bytes):
        ptr = await self.malloc(bytes, len(data))
        return await self._write_to_pointer(ptr, data, data)
    else:
        raise Exception("don't know how to serialize data passed to ptr", data)
async def perform_batch(self, op: t.Callable[[RAM], t.Awaitable[T]], allocator: AllocatorInterface = None) ‑> ~T

Batches together memory operations performed by a callable.

See the free function of the same name for more documentation.

Expand source code Browse git
async def perform_batch(self, op: t.Callable[[RAM], t.Awaitable[T]],
                              allocator: AllocatorInterface=None,
) -> T:
    """Batches together memory operations performed by a callable.
    
    See the free function of the same name for more documentation.

    """
    if allocator is None:
        allocator = self.allocator
    return await perform_batch(self.task, self.transport, allocator, op)
async def malloc_serializer(self, serializer: Serializer[T], size: int, typ: t.Type[T]) ‑> Pointer[~T]

Allocate a typed space in memory using an explicitly-specified Serializer.

This is useful only in relatively niche situations.

Expand source code Browse git
async def malloc_serializer(
        self, serializer: Serializer[T], size: int, typ: t.Type[T],
) -> Pointer[T]:
    """Allocate a typed space in memory using an explicitly-specified Serializer.

    This is useful only in relatively niche situations.

    """
    mapping, allocation = await self.allocator.malloc(size, alignment=1)
    try:
        return Pointer(mapping, self.transport, serializer, allocation, typ)
    except:
        allocation.free()
        raise