Module rsyscall.epoller

Non-blocking IO operations implemented using epoll.

We use epoll in edge-triggered mode. Our usage is relatively straightforward, but there are a few quirks.

To avoid blocking a thread in a call to epoll_wait when there is more work to be done, there are two options:

  1. We can register the epollfd on some higher event loop, and instead of blocking in epoll_wait, just wait until that event loop tells us there are events to be read. This is the typical way to use most file descriptors in a non-blocking way, just applied to an epollfd.

  2. Or, we can ensure that all possible pending work is registered on our epollfd; anything that might want to do some work needs to register a file descriptor that is readable/writable/etc on our epollfd. This is the typical requirement that most event loops place on the rest of the program.

In an ideal world, we would do both with every epoll instance. Whenever a new epoll E is created, we would register E on all the other event loops; and we would register all the other event loops on E. Then we could freely block in either E or any other event loop, and not miss any events. We could have multiple event loops in our process without pain.

Unfortunately, this would necessitate a cycle in epoll registrations (i.e. an epoll A registered on an epoll B which is itself registered on epoll A). This would necessitate breaking the cycle each time epoll_wait is called, which could be non-trivially expensive, and Linux doesn't want to do that.

So instead, to prevent cycles, we need to use only one of these two approaches at any one time; at any one time, we'll have a single "root" event loop which uses approach 2 and can make blocking calls to epoll_wait; and the other event loops will be "subsidiary", using approach 1, and delegating their blocking to the root event loop. Then we can still have multiple event loops in our process.

These two approaches to making an Epoller are implemented in the "make_subsidiary" (1) and "make_root" (2) classmethods of Epoller.

We would rather go with 2 - the only one allowed to block in our threads should be us. Unfortunately, most event loop libraries don't expose a way to function as a subsidiary event loop. So in the case of threads which need to interoperate with another event loop, we are forced into 1.

In other cases, there is no other event loop, so we must go with 2. We must take care to satisfy the requirements of 2: Anything that wants to perform work should cause a wakeup on our epollfd.

The only case where we use 1 is when running on the local Python thread. There, we use the trio async library; and trio does not expose its epollfd such that we could use approach 2. Instead, we call trio's wait_readable method on the epollfd instead of blocking in epoll_wait.

On every thread other than the local Python thread, we use approach 2. At the moment, the only other source of work that is present in every thread is the file descriptor from which the rsyscall server reads incoming syscalls. So that other syscalls will not be blocked by our calls to epoll_wait, we register that file descriptor (exposed in SyscallInterface as the "activity fd") on our epollfd. In this way, if a new syscall is received in the thread while already in a call to epoll_wait, the epoll_wait will wake up so that the rsyscall server can read and perform the unrelated syscalls.

We could straightforwardly support a mode for an Epoller to be registered on another of our Epollers; but we haven't done this because it's not really useful.

Expand source code Browse git
"""Non-blocking IO operations implemented using epoll.

We use epoll in edge-triggered mode. Our usage is relatively
straightforward, but there are a few quirks.


--------------------------------------------------------------------------------

To avoid blocking a thread in a call to epoll_wait when there is more
work to be done, there are two options:

1. We can register the epollfd on some higher event loop, and instead
of blocking in epoll_wait, just wait until that event loop tells us
there are events to be read. This is the typical way to use most file
descriptors in a non-blocking way, just applied to an epollfd.

2. Or, we can ensure that all possible pending work is registered on
our epollfd; anything that might want to do some work needs to
register a file descriptor that is readable/writable/etc on our
epollfd.  This is the typical requirement that most event loops place
on the rest of the program.

In an ideal world, we would do both with every epoll instance.
Whenever a new epoll E is created, we would register E on all the
other event loops; and we would register all the other event loops on
E. Then we could freely block in either E or any other event loop, and
not miss any events. We could have multiple event loops in our process
without pain.

Unfortunately, this would necessitate a cycle in epoll registrations
(i.e. an epoll A registered on an epoll B which is itself registered
on epoll A). This would necessitate breaking the cycle each time
epoll_wait is called, which could be non-trivially expensive, and
Linux doesn't want to do that.

So instead, to prevent cycles, we need to use only one of these two
approaches at any one time; at any one time, we'll have a single
"root" event loop which uses approach 2 and can make blocking calls to
epoll_wait; and the other event loops will be "subsidiary", using
approach 1, and delegating their blocking to the root event loop.
Then we can still have multiple event loops in our process.

These two approaches to making an Epoller are implemented in the
"make_subsidiary" (1) and "make_root" (2) classmethods of Epoller.

We would rather go with 2 - the only one allowed to block in our
threads should be us. Unfortunately, most event loop libraries don't
expose a way to function as a subsidiary event loop.  So in the case
of threads which need to interoperate with another event loop, we are
forced into 1.

In other cases, there is no other event loop, so we must go with 2.
We must take care to satisfy the requirements of 2: Anything that
wants to perform work should cause a wakeup on our epollfd.

The only case where we use 1 is when running on the local Python
thread.  There, we use the trio async library; and trio does not
expose its epollfd such that we could use approach 2. Instead, we call
trio's wait_readable method on the epollfd instead of blocking in
epoll_wait.

On every thread other than the local Python thread, we use approach
2. At the moment, the only other source of work that is present in
every thread is the file descriptor from which the rsyscall server
reads incoming syscalls. So that other syscalls will not be blocked by
our calls to epoll_wait, we register that file descriptor (exposed in
SyscallInterface as the "activity fd") on our epollfd. In this way, if
a new syscall is received in the thread while already in a call to
epoll_wait, the epoll_wait will wake up so that the rsyscall server
can read and perform the unrelated syscalls.

We could straightforwardly support a mode for an Epoller to be
registered on another of our Epollers; but we haven't done this
because it's not really useful.


--------------------------------------------------------------------------------

"""
from __future__ import annotations
from dneio import RequestQueue, Continuation, reset
from rsyscall._raw import ffi # type: ignore
import collections
import errno
import os
import math
import typing as t
from rsyscall.near.sysif import SyscallError
from rsyscall.memory.ram import RAM
from rsyscall.handle import FileDescriptor, Pointer, WrittenPointer, Task
from dataclasses import dataclass
from rsyscall.struct import Int32, T_fixed_size
from rsyscall.sys.syscall import SYS
from rsyscall.sys.socket import SOCK, SOL, SO, Sockaddr, SockaddrStorage, T_sockaddr, Sockbuf
from rsyscall.sys.epoll import EpollEvent, EpollEventList, EPOLL, EPOLL_CTL, EpollFlag
from rsyscall.fcntl import O, F

import logging
logger = logging.getLogger(__name__)

__all__ = [
    "Epoller",
    "EpolledFileDescriptor",
    "AsyncFileDescriptor",
    "AsyncReadBuffer",
]

class RemovedFromEpollError(Exception):
    pass

class EpollWaiter:
    """The core class which reads events from the epollfd and dispatches them.

    Many threads may have a copy of our epollfd, and register fds on it. To actually wait
    on this epollfd and pull events from it is the responsibility of this class. By
    inheriting the epollfd to many threads for registration purposes, but centralizing the
    actual epoll_wait calls in this shared class, our usage of epoll becomes more
    efficient.

    """
    def __init__(self, ram: RAM, epfd: FileDescriptor,
                 wait_readable: t.Optional[t.Callable[[], t.Awaitable[None]]],
                 timeout: int,
    ) -> None:
        "To make this, use one of the constructor methods of Epoller: make_subsidiary or make_root"
        self.ram = ram
        self.epfd = epfd
        self.wait_readable = wait_readable
        self.timeout = timeout

        self.used_numbers: t.Set[int] = set()
        self.pending_remove: t.Set[int] = set()
        self.queue = RequestQueue[int, EPOLL]()
        reset(self._run())

    def allocate_number(self, request: int) -> int:
        """Add a callback which will be called on EpollEvents with data == returned number.

        We can then add the returned number to the epollfd with some file descriptor and
        mask.

        This is not the user-interface for epoll. That's the register method on Epoller.

        """
        number = request
        # this is amusingly horrible, but it provides a useful feature:
        # the number in epoll usually matches the fd number.
        while number in self.used_numbers:
            number += 1000
        self.used_numbers.add(number)
        return number

    def remove_number_after_next_epoll_wait(self, number: int) -> None:
        """Remove the callback for this number after the next time we call epoll_wait.

        We don't want to remove it immediately, since even if this
        number was just removed, we might still get an event for it if
        an epoll_wait call was in progress.
        """
        self.pending_remove.add(number)

    async def _run(self) -> None:
        input_buf: Pointer = await self.ram.malloc(EpollEventList, 32 * EpollEvent.sizeof())
        number_to_cb: t.Dict[int, Continuation[EPOLL]] = {}
        registered_activity_fd: t.Optional[FileDescriptor] = None
        while True:
            if self.wait_readable:
                await self.wait_readable()
            activity_fd = self.epfd.task.sysif.get_activity_fd()
            if activity_fd and (registered_activity_fd is not activity_fd):
                # the activity fd changed, we need to register the new one
                if registered_activity_fd:
                    # delete the old registered activity fd
                    await self.epfd.epoll_ctl(EPOLL_CTL.DEL, registered_activity_fd)
                activity_fd_number = self.allocate_number(int(activity_fd))
                # start up a coroutine to consume events from the activity_fd
                async def devnull(activity_fd_number=activity_fd_number):
                    while True:
                        await self.queue.request(activity_fd_number)
                reset(devnull())
                await self.epfd.epoll_ctl(EPOLL_CTL.ADD, activity_fd, await self.ram.ptr(
                    EpollEvent(activity_fd_number,
                               # not edge triggered; we don't want to block if there's
                               # anything that can be read.
                               EPOLL.IN|EPOLL.RDHUP|EPOLL.PRI|EPOLL.ERR|EPOLL.HUP)))
                registered_activity_fd = activity_fd
            try:
                valid_events_buf, rest = await self.epfd.epoll_wait(input_buf, self.timeout)
                received_events = await valid_events_buf.read()
            except Exception as wait_error:
                final_exn = wait_error
                break
            input_buf = valid_events_buf + rest
            for num, cb in self.queue.fetch_any():
                number_to_cb[num] = cb
            for event in received_events:
                number_to_cb[event.data].send(event.events)
                del number_to_cb[event.data]
            for num, cb in self.queue.fetch_any():
                number_to_cb[num] = cb
            for number in list(self.pending_remove):
                number_to_cb[number].throw(RemovedFromEpollError())
                del number_to_cb[number]
                self.pending_remove.remove(number)
        self.queue.close(final_exn)

class Epoller:
    "Terribly named class that allows registering fds on epoll, and waiting on them."
    @staticmethod
    def make_subsidiary(ram: RAM, epfd: FileDescriptor, wait_readable: t.Callable[[], t.Awaitable[None]]) -> Epoller:
        """Make a subsidiary epoller, as described in the module docstring.

        We delegate responsibility for blocking to wait for new events to some other
        component. We call the passed-in wait_readable function to block for new events.

        """
        center = Epoller(EpollWaiter(ram, epfd, wait_readable, 0), ram, epfd)
        return center

    @staticmethod
    async def make_root(ram: RAM, task: Task) -> Epoller:
        """Make a root epoller, as described in the module docstring.

        We take responsibility for blocking to wait for new events for every other
        component in this thread. We pull the activity_fd from the SyscallInterface and
        register it on our epollfd. The activity_fd is readable whenever some other
        component in the thread wants to work on the thread.

        """
        epfd = await task.epoll_create()
        center = Epoller(EpollWaiter(ram, epfd, None, -1), ram, epfd)
        return center

    def __init__(self, epoll_waiter: EpollWaiter, ram: RAM, epfd: FileDescriptor) -> None:
        "Don't construct directly; use one of the constructor methods, make_subsidiary or make_root."
        self.epoll_waiter = epoll_waiter
        self.ram = ram
        self.epfd = epfd

    def inherit(self, ram: RAM) -> Epoller:
        """Make a new Epoller which shares the same EpollWaiter class.

        We inherit the epollfd to a new task for the purpose of registering new fds on it;
        but we share the class and task which actually calls epoll_wait.

        """
        return Epoller(self.epoll_waiter, ram, self.epfd.inherit(ram.task))

    async def register(self, fd: FileDescriptor, events: EPOLL) -> EpolledFileDescriptor:
        """Register a file descriptor on this epollfd, for the given events, calling the passed callback.

        The return value can be used to wait for callback calls, modify the events
        registered for this file descriptor, and delete the file descriptor from this
        epollfd.

        """
        number = self.epoll_waiter.allocate_number(int(fd.near))
        efd = EpolledFileDescriptor(self, fd, number)
        await self.epfd.epoll_ctl(EPOLL_CTL.ADD, fd, await self.ram.ptr(EpollEvent(number, events)))
        return efd

class EpolledFileDescriptor:
    """Representation of a file descriptor registered on an epollfd.

    We have to keep around a reference to the file descriptor to perform
    EPOLL_CTL.DEL. This is a bit annoying, but whatever.

    """
    def __init__(self,
                 epoller: Epoller,
                 fd: FileDescriptor,
                 number: int,
    ) -> None:
        self.epoller = epoller
        self.fd = fd
        # TODO, we should copy this so it can't be closed out from under us
        # self.fd = fd.copy()
        self.number = number
        # We optimistically assume that the FD is ready for reading/writing immediately,
        # by setting our initial status to EPOLL.OUT|EPOLL.IN.
        # This has two benefits:
        # 1. Performance improves on our test suite and in many real-world cases, and
        # 2. More critically, if a user erroneously tries to read or write an FD which
        #    will never receive an EPOLL.OUT or EPOLL.IN because it doesn't support
        #    reading/writing, we'll fail immediately instead of blocking forever.
        self.status = FDStatus(EPOLL.OUT|EPOLL.IN)
        self.in_epollfd = True
        self.queue = RequestQueue[EPOLL, None]()
        self.total_events: t.Counter[EPOLL] = collections.Counter()
        self.consumed_events: t.Dict[EPOLL, int] = {flag: 0 for flag in EPOLL}
        reset(self._run())

    def __str__(self) -> str:
        return f"EpolledFileDescriptor({self.fd.near}, {self.number}, {self.status})"

    async def modify(self, events: EPOLL) -> None:
        "Change the EPOLL flags that this fd is registered with."
        await self.epoller.epfd.epoll_ctl(
            EPOLL_CTL.MOD, self.fd, await self.epoller.ram.ptr(EpollEvent(self.number, events)))

    async def delete(self) -> None:
        "Delete this fd from the epollfd."
        if not self.in_epollfd:
            raise Exception("already deleted from epollfd")
        await self.epoller.epfd.epoll_ctl(EPOLL_CTL.DEL, self.fd)
        self.epoller.epoll_waiter.remove_number_after_next_epoll_wait(self.number)
        self.in_epollfd = False

    async def _run(self) -> None:
        waiters: t.Dict[EPOLL, t.List[t.Tuple[EPOLL, Continuation[None]]]] = {flag: [] for flag in EPOLL}
        while True:
            try:
                ev = await self.epoller.epoll_waiter.queue.request(self.number)
            except Exception as e:
                final_exn = e
                break
            self.total_events.update(ev)
            self.status.posedge(ev)
            for val, coro in self.queue.fetch_any():
                for flag in val:
                    waiters[flag].append((val, coro))
            for flag in ev:
                to_resume = waiters[flag]
                for val, cb in list(to_resume):
                    # remove the cb from other lists it's on; cool scope there, Python
                    for waiting_flag in val:
                        waiters[waiting_flag].remove((val, cb))
                    cb.send(None)
        self.queue.close(final_exn)

    async def wait_for(self, flags: EPOLL) -> None:
        "Call epoll_wait until at least one of the passed flags is set in our status."
        if (not (self.status.mask & flags)
            and not any(self.total_events[flag] - self.consumed_events[flag] > 0
                        for flag in flags)):
            return await self.queue.request(flags)

    def consume(self, events: t.Dict[EPOLL, int]) -> None:
        """The information from these events is outdated; discard them

        Each syscall gives us information about some kinds of events, and information from
        events from epoll_wait calls before that syscall are obsoleted by this more
        up-to-date information.

        So, we call `get_current_events` before each syscall, for the set of flags that
        that syscall will inform us about. Then, after the syscall, we call `consume`, to
        indicate that the information from those events is now obsolete.

        We can't consume the events until after the syscall is done, because we want to
        allow others to optimistically perform syscalls based on these epoll events; not
        consuming the events immediately also makes us tolerate being canceled without
        performing the syscall.

        Only the events received before the syscall are known to be obsolete; events
        received during or after this syscall need to be preserved until more syscalls are
        made, to prevent deadlocks.

        The reason we need this somewhat complicated edifice is because our calls to
        epoll_wait are not synchronized with our system calls on individual file
        descriptors; calls to epoll_wait and fd system calls can happen in different
        threads and can be arbitrarily reordered relative to each other.

        Note, however, that system calls made from an individual `AsyncFileDescriptor`
        instance *are* synchronized relative to each other; those system calls are made in
        a single thread, so the system calls return in order thanks to `dneio`. This
        allows us to safely immediately call `FDStatus.negedge`/`FDStatus.posedge` after a
        system call.

        TODO if we have multiple concurrently-used `AsyncFileDescriptor` instances,
        though, they aren't synchronized, so we need to be a little more careful...

        """
        self.consumed_events.update(((flag, max(self.consumed_events[flag], count))
                                     for flag, count in events.items()))

    def get_current_events(self, flags: EPOLL) -> t.Dict[EPOLL, int]:
        "Get the current set of events from epoll_wait for these flags, for use with consume."
        return {flag: self.total_events[flag] for flag in flags}

@dataclass
class FDStatus:
    """Tracks the IO status of a file as an EPOLL mask.

    With edge-triggered epoll, we can track whether a file is ready for a given IO in
    userspace.

    Edge-triggered epoll sends us a stream of posedges for each tracked file: Each time we get
    an EPOLLET event, it means that the file is ready for the IO operations corresponding
    to the bits set in the event. We track this by just or-ing in the event's mask into
    FDStatus's mask.

    We receive negedges by getting EAGAINs (or similar equivalents) when performing
    operations on file descriptors. Each one tells us that the file is not ready for some
    specific IO operation. We track this by translating it into the corresponding EPOLL
    flag and unsetting it in FDStatus's mask.

    By following this discipline and getting every event, we know for sure at any time
    whether a file *might* be ready for IO, or if instead it's safe to block in a call to
    epoll_wait to wait for the file to be ready.

    Dropping a negedge decreases our efficiency, but is harmless, since we'll just get an
    EAGAIN the next time. Dropping a posedge is very bad and means we will deadlock.

    epoll seems to not send us a posedge if we could already know that the file descriptor
    is readable. So, for example, if we directly read an fd and we haven't already gotten
    a posedge for it, and we get a full buffer of data, then we won't get a posedge for it
    from epoll. This would cause various problems, including deadlocks if multiple threads
    were trying to read from the fd and one was blocked in epoll_wait, waiting for the
    posedge.

    Therefore, if you optimistically read an fd before getting a posedge from epoll, and
    you successfully read some data, make sure to treat that as a posedge for EPOLL.IN.
    Likewise with write and EPOLL.OUT, and any other event and the appropriate EPOLL bit.

    """
    mask: EPOLL

    def posedge(self, event: EPOLL) -> None:
        self.mask |= event

    def negedge(self, event: EPOLL) -> None:
        self.mask &= ~event

class AsyncFileDescriptor:
    """A file descriptor on which IO can be performed without blocking the thread.

    Also comes with helpful methods `AsyncFileDescriptor.write_all_bytes` and
    `AsyncFileDescriptor.read_some_bytes` to abstract over memory allocation.

    We always wait for a posedge to come back from epoll before trying to read. This is
    not necessarily too pessimistic, because as soon as we have a single posedge, we will
    keep reading in a loop, as long as data keeps coming through.

    """
    @staticmethod
    async def make(epoller: Epoller, ram: RAM, fd: FileDescriptor) -> AsyncFileDescriptor:
        """Make an AsyncFileDescriptor; make sure to call this with only O.NONBLOCK file descriptors.

        It won't actually break anything if this is called with file descriptors not in
        NONBLOCK mode; it just means that they'll block when we go to read, which is
        probably not what the user wants.

        """
        epolled = await epoller.register(
            fd, EPOLL.IN|EPOLL.OUT|EPOLL.RDHUP|EPOLL.PRI|EPOLL.ERR|EPOLL.HUP|EPOLL.ET,
        )
        return AsyncFileDescriptor(ram, fd, epolled)

    def __init__(self, ram: RAM, handle: FileDescriptor,
                 epolled: EpolledFileDescriptor,
    ) -> None:
        "Don't construct directly; use the AsyncFileDescriptor.make constructor instead."
        self.ram = ram
        self.handle = handle
        "The underlying FileDescriptor for this AFD, used for all system calls"
        self.epolled = epolled

    def __str__(self) -> str:
        return f"AsyncFileDescriptor({self.epolled})"

    async def make_new_afd(self, fd: FileDescriptor) -> AsyncFileDescriptor:
        """Use the Epoller and RAM in this AsyncFD to make a new `AsyncFileDescriptor` for `fd`

        Make sure that `fd` is already in non-blocking mode;
        such as by accepting it with the `SOCK.NONBLOCK` flag.

        This doesn't steal any resources from the original AFD; it's just a convenience method,
        most useful when calling accept() and wanting to create new AFDs out of the resulting FDs.

        """
        return await AsyncFileDescriptor.make(self.epolled.epoller, self.ram, fd)

    async def wait_for_rdhup(self) -> None:
        "Call epoll_wait until this file descriptor has a hangup."
        await self.epolled.wait_for(EPOLL.RDHUP|EPOLL.HUP)

    async def read(self, ptr: Pointer) -> t.Tuple[Pointer, Pointer]:
        "Call `FileDescriptor.read` without blocking the thread."
        while True:
            await self.epolled.wait_for(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR)
            current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR)
            try:
                return (await self.handle.read(ptr))
            except OSError as e:
                self.epolled.consume(current_events)
                if e.errno == errno.EAGAIN:
                    self.epolled.status.negedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR)
                else:
                    self.epolled.status.posedge(EPOLL.ERR)
                    raise
            else:
                self.epolled.consume(current_events)
                self.epolled.status.posedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP)

    async def read_some_bytes(self, count: int=4096) -> bytes:
        """Read at most count bytes; possibly less, if we have a partial read.

        This allocates on each call. For some applications, you may want to avoid the cost of
        allocation, by instead allocating a buffer with `Thread.malloc` up front and reusing it
        across multiple calls to `AsyncFileDescriptor.read`.

        """
        ptr = await self.ram.malloc(bytes, count)
        valid, _ = await self.read(ptr)
        return await valid.read()

    async def write(self, buf: Pointer) -> t.Tuple[Pointer, Pointer]:
        """Call `FileDescriptor.write` without blocking the thread.

        Note that this doesn't retry partial writes, which are always a possibility, so you should
        make sure to do that yourself, or use `AsyncFileDescriptor.write_all`.

        """
        while True:
            await self.epolled.wait_for(EPOLL.OUT|EPOLL.ERR)
            current_events = self.epolled.get_current_events(EPOLL.OUT|EPOLL.ERR)
            try:
                return await self.handle.write(buf)
            except OSError as e:
                self.epolled.consume(current_events)
                if e.errno == errno.EAGAIN:
                    self.epolled.status.negedge(EPOLL.OUT|EPOLL.ERR)
                else:
                    self.epolled.status.posedge(EPOLL.ERR)
                    raise
            else:
                self.epolled.consume(current_events)
                self.epolled.status.posedge(EPOLL.OUT)

    async def write_all(self, to_write: Pointer) -> None:
        """Write all of this pointer to the fd, retrying on partial writes until complete.

        You might want to not use this, if you want to react to a partial write in some special way.
        For example, `rsyscall.memory.socket_transport.SocketMemoryTransport` starts a `recv`
        immediately after a partial write, before retrying the write, for increased parallelism.

        """
        while to_write.size() > 0:
            written, to_write = await self.write(to_write)

    async def write_all_bytes(self, buf: bytes) -> None:
        """Write all these bytes to the fd, retrying on partial writes until complete.

        This allocates and performs a store to memory on each call. This is inefficient if you
        already have an initialized pointer for the value, or if you already have an allocated
        buffer that you can use to store the value, which you can also reuse for other values.
        In those cases, you might want to use `AsyncFileDescriptor.write_all`.

        """
        ptr = await self.ram.ptr(buf)
        await self.write_all(ptr)

    @t.overload
    async def accept(self, flags: SOCK=SOCK.NONE) -> FileDescriptor: ...
    @t.overload
    async def accept(self, flags: SOCK, addr: WrittenPointer[Sockbuf[T_sockaddr]]
    ) -> t.Tuple[FileDescriptor, WrittenPointer[Sockbuf[T_sockaddr]]]: ...

    async def accept(self, flags: SOCK=SOCK.NONE, addr: t.Optional[WrittenPointer[Sockbuf[T_sockaddr]]]=None
    ) -> t.Union[FileDescriptor, t.Tuple[FileDescriptor, WrittenPointer[Sockbuf[T_sockaddr]]]]:
        "Call accept without blocking the thread."
        while True:
            await self.epolled.wait_for(EPOLL.IN|EPOLL.HUP|EPOLL.ERR)
            current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.HUP|EPOLL.ERR)
            try:
                if addr is None:
                    return (await self.handle.accept(flags))
                else:
                    return (await self.handle.accept(flags, addr))
            except OSError as e:
                self.epolled.consume(current_events)
                if e.errno == errno.EAGAIN:
                    self.epolled.status.negedge(EPOLL.IN|EPOLL.HUP|EPOLL.ERR)
                else:
                    self.epolled.status.posedge(EPOLL.HUP|EPOLL.ERR)
                    raise
            else:
                self.epolled.consume(current_events)
                self.epolled.status.posedge(EPOLL.IN)

    async def accept_addr(self, flags: SOCK=SOCK.NONE) -> t.Tuple[FileDescriptor, Sockaddr]:
        "Call accept with a buffer for the address, and return the resulting fd and address."
        written_sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(SockaddrStorage)))
        fd, sockbuf = await self.accept(flags, written_sockbuf)
        addr = (await (await sockbuf.read()).buf.read()).parse()
        return fd, addr

    async def bind(self, addr: WrittenPointer[Sockaddr]) -> None:
        "Call bind; bind already doesn't block the thread."
        await self.handle.bind(addr)

    async def connect(self, addr: WrittenPointer[Sockaddr]) -> None:
        "Call connect without blocking the thread."
        try:
            # Note that an unconnected socket, at least with AF.INET SOCK.STREAM,
            # will have EPOLL.OUT|EPOLL.HUP set when added to epoll, before calling connect.
            current_events = self.epolled.get_current_events(EPOLL.OUT)
            await self.handle.connect(addr)
        except OSError as e:
            self.epolled.consume(current_events)
            self.epolled.status.negedge(EPOLL.OUT)
            if e.errno == errno.EINPROGRESS:
                await self.epolled.wait_for(EPOLL.OUT)
                current_events = self.epolled.get_current_events(EPOLL.OUT)
                sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(Int32)))
                retbuf = await self.handle.getsockopt(SOL.SOCKET, SO.ERROR, sockbuf)
                err = await (await retbuf.read()).buf.read()
                self.epolled.consume(current_events)
                if err == 0:
                    self.epolled.status.posedge(EPOLL.OUT)
                else:
                    self.epolled.status.posedge(EPOLL.ERR)
                    try:
                        raise OSError(err, os.strerror(err))
                    except OSError as exn:
                        exn.filename = self.handle
                        if hasattr(addr, 'value'):
                            exn.filename2 = addr.value
                        raise
            else:
                raise
        else:
            self.epolled.consume(current_events)
            self.epolled.status.posedge(EPOLL.OUT)

    def with_handle(self, fd: FileDescriptor) -> AsyncFileDescriptor:
        """Return a new AFD using this new FD handle for making syscalls.

        This is useful when we want to change what task we're making syscalls in when
        using this AFD.

        """
        return AsyncFileDescriptor(self.ram, fd, self.epolled)

    async def close(self) -> None:
        "Remove this FD from Epoll and invalidate the FD handle."
        await self.epolled.delete()
        await self.handle.invalidate()

    async def __aenter__(self) -> None:
        pass

    async def __aexit__(self, *args, **kwargs) -> None:
        await self.close()


################################################################################
# Miscellaneous helpers

class AsyncReadBuffer:
    """A buffer for parsing variable-length streaming data.

    When reading data from a stream such as a pipe or TCP connection, data is not
    delivered to us from the kernel in nicely-separated records. We need to rebuffer the
    data so that it can be parsed. That's what this class does; and it provides a few
    helper methods to make it easier to read and parse such streams.

    """
    def __init__(self, fd: AsyncFileDescriptor) -> None:
        self.fd = fd
        self.buf = b""
        self.unread_ptr: t.Optional[Pointer] = None

    async def _read(self) -> bytes:
        "Read some bytes; return None on EOF."
        if self.unread_ptr is None:
            ptr = await self.fd.ram.malloc(bytes, 4096)
            self.unread_ptr, _ = await self.fd.read(ptr)
        if self.unread_ptr.size():
            data = await self.unread_ptr.read()
        else:
            data = b''
        self.unread_ptr = None
        if len(data) == 0:
            raise EOFError
        else:
            return data

    async def read_length(self, length: int) -> bytes:
        "Read exactly this many bytes; raises on EOF."
        while len(self.buf) < length:
            data = await self._read()
            self.buf += data
        section = self.buf[:length]
        self.buf = self.buf[length:]
        return section

    async def read_cffi(self, name: str) -> t.Any:
        "Read, parse, and return this fixed-size cffi type."
        size = ffi.sizeof(name)
        try:
            data = await self.read_length(size)
        except EOFError as e:
            e.args = ("got EOF while expecting to read a", name, "of size", size)
            raise
        nameptr = name + '*'
        dest = ffi.new(nameptr)
        # ffi.cast drops the reference to the backing buffer, so we have to copy it
        src = ffi.cast(nameptr, ffi.from_buffer(data))
        ffi.memmove(dest, src, size)
        return dest[0]

    async def read_struct(self, cls: t.Type[T_fixed_size]) -> T_fixed_size:
        "Read one fixed-size struct from the buffer, or return None if that's not possible"
        size = cls.sizeof()
        try:
            data = await self.read_length(size)
        except EOFError as e:
            e.args = ("got EOF while expecting to read a", cls, "of size", size)
            raise
        return cls.get_serializer(self.fd.handle.task).from_bytes(data)

    async def read_length_prefixed_string(self) -> bytes:
        "Read a bytestring which is prefixed with a 64-bit native-byte-order size."
        elem_size = await self.read_cffi('size_t')
        try:
            elem = await self.read_length(elem_size)
        except EOFError as e:
            e.args = ("got EOF while expecting to read environment element of length", elem_size)
        return elem

    async def read_length_prefixed_array(self, length: int) -> t.List[bytes]:
        "Read an array, prefixed with its size, of bytestrings, each prefixed with their size."
        ret: t.List[bytes] = []
        for _ in range(length):
            ret.append(await self.read_length_prefixed_string())
        return ret

    async def read_envp(self, length: int) -> t.Dict[str, str]:
        """Read a size-prefixed array of size-prefixed bytestrings, with each bytestring containing '=', into a dict.

        This is the format we expect for envp, which is written to us on startup by
        non-child threads.

        We assume that the environment is correctly formed; that is, each element contains
        '='.  If that's not true, we'll throw an exception.  But it would be quite unusual
        to have an incorrectly formed environment variable, so it's not too concerning.

        """
        raw = await self.read_length_prefixed_array(length)
        environ: t.Dict[str, str] = {}
        for elem in raw:
            key, val = elem.split(b"=", 1)
            environ[os.fsdecode(key)] = os.fsdecode(val)
        return environ

    async def read_until_delimiter(self, delim: bytes) -> t.Optional[bytes]:
        "Read and return all bytes until the specified delimiter, stripping the delimiter; on EOF, return None."
        while True:
            try:
                i = self.buf.index(delim)
            except ValueError:
                pass
            else:
                section = self.buf[:i]
                # skip the delimiter
                self.buf = self.buf[i+1:]
                return section
            # buf contains no copies of "delim", gotta read some more data
            data = await self._read()
            if data is None:
                return None
            self.buf += data

    async def read_line(self) -> bytes:
        "Read and return a line, stripping the newline character."
        ret = await self.read_until_delimiter(b"\n")
        if ret is None:
            raise EOFError("hangup before reading full line")
        return ret

    async def read_netstring(self) -> bytes:
        "Read a netstring, as defined by DJB."
        length_bytes = await self.read_until_delimiter(b':')
        if length_bytes is None:
            raise EOFError("hangup before reaching colon at end of netstring size")
        length = int(length_bytes)
        try:
            data = await self.read_length(length)
        except EOFError as e:
            e.args = ("hangup before we read netstring data of size", length)
            raise
        try:
            comma = await self.read_length(1)
        except EOFError as e:
            e.args = ("hangup before comma at end of netstring",)
            raise
        if comma != b",":
            raise Exception("bad netstring delimiter", comma)
        return data

Classes

class Epoller (epoll_waiter: EpollWaiter, ram: RAM, epfd: FileDescriptor)

Terribly named class that allows registering fds on epoll, and waiting on them.

Don't construct directly; use one of the constructor methods, make_subsidiary or make_root.

Expand source code Browse git
class Epoller:
    "Terribly named class that allows registering fds on epoll, and waiting on them."
    @staticmethod
    def make_subsidiary(ram: RAM, epfd: FileDescriptor, wait_readable: t.Callable[[], t.Awaitable[None]]) -> Epoller:
        """Make a subsidiary epoller, as described in the module docstring.

        We delegate responsibility for blocking to wait for new events to some other
        component. We call the passed-in wait_readable function to block for new events.

        """
        center = Epoller(EpollWaiter(ram, epfd, wait_readable, 0), ram, epfd)
        return center

    @staticmethod
    async def make_root(ram: RAM, task: Task) -> Epoller:
        """Make a root epoller, as described in the module docstring.

        We take responsibility for blocking to wait for new events for every other
        component in this thread. We pull the activity_fd from the SyscallInterface and
        register it on our epollfd. The activity_fd is readable whenever some other
        component in the thread wants to work on the thread.

        """
        epfd = await task.epoll_create()
        center = Epoller(EpollWaiter(ram, epfd, None, -1), ram, epfd)
        return center

    def __init__(self, epoll_waiter: EpollWaiter, ram: RAM, epfd: FileDescriptor) -> None:
        "Don't construct directly; use one of the constructor methods, make_subsidiary or make_root."
        self.epoll_waiter = epoll_waiter
        self.ram = ram
        self.epfd = epfd

    def inherit(self, ram: RAM) -> Epoller:
        """Make a new Epoller which shares the same EpollWaiter class.

        We inherit the epollfd to a new task for the purpose of registering new fds on it;
        but we share the class and task which actually calls epoll_wait.

        """
        return Epoller(self.epoll_waiter, ram, self.epfd.inherit(ram.task))

    async def register(self, fd: FileDescriptor, events: EPOLL) -> EpolledFileDescriptor:
        """Register a file descriptor on this epollfd, for the given events, calling the passed callback.

        The return value can be used to wait for callback calls, modify the events
        registered for this file descriptor, and delete the file descriptor from this
        epollfd.

        """
        number = self.epoll_waiter.allocate_number(int(fd.near))
        efd = EpolledFileDescriptor(self, fd, number)
        await self.epfd.epoll_ctl(EPOLL_CTL.ADD, fd, await self.ram.ptr(EpollEvent(number, events)))
        return efd

Static methods

def make_subsidiary(ram: RAM, epfd: FileDescriptor, wait_readable: t.Callable[[], t.Awaitable[None]]) ‑> Epoller

Make a subsidiary epoller, as described in the module docstring.

We delegate responsibility for blocking to wait for new events to some other component. We call the passed-in wait_readable function to block for new events.

Expand source code Browse git
@staticmethod
def make_subsidiary(ram: RAM, epfd: FileDescriptor, wait_readable: t.Callable[[], t.Awaitable[None]]) -> Epoller:
    """Make a subsidiary epoller, as described in the module docstring.

    We delegate responsibility for blocking to wait for new events to some other
    component. We call the passed-in wait_readable function to block for new events.

    """
    center = Epoller(EpollWaiter(ram, epfd, wait_readable, 0), ram, epfd)
    return center
async def make_root(ram: RAM, task: Task) ‑> Epoller

Make a root epoller, as described in the module docstring.

We take responsibility for blocking to wait for new events for every other component in this thread. We pull the activity_fd from the SyscallInterface and register it on our epollfd. The activity_fd is readable whenever some other component in the thread wants to work on the thread.

Expand source code Browse git
@staticmethod
async def make_root(ram: RAM, task: Task) -> Epoller:
    """Make a root epoller, as described in the module docstring.

    We take responsibility for blocking to wait for new events for every other
    component in this thread. We pull the activity_fd from the SyscallInterface and
    register it on our epollfd. The activity_fd is readable whenever some other
    component in the thread wants to work on the thread.

    """
    epfd = await task.epoll_create()
    center = Epoller(EpollWaiter(ram, epfd, None, -1), ram, epfd)
    return center

Methods

def inherit(self, ram: RAM) ‑> Epoller

Make a new Epoller which shares the same EpollWaiter class.

We inherit the epollfd to a new task for the purpose of registering new fds on it; but we share the class and task which actually calls epoll_wait.

Expand source code Browse git
def inherit(self, ram: RAM) -> Epoller:
    """Make a new Epoller which shares the same EpollWaiter class.

    We inherit the epollfd to a new task for the purpose of registering new fds on it;
    but we share the class and task which actually calls epoll_wait.

    """
    return Epoller(self.epoll_waiter, ram, self.epfd.inherit(ram.task))
async def register(self, fd: FileDescriptor, events: EPOLL) ‑> EpolledFileDescriptor

Register a file descriptor on this epollfd, for the given events, calling the passed callback.

The return value can be used to wait for callback calls, modify the events registered for this file descriptor, and delete the file descriptor from this epollfd.

Expand source code Browse git
async def register(self, fd: FileDescriptor, events: EPOLL) -> EpolledFileDescriptor:
    """Register a file descriptor on this epollfd, for the given events, calling the passed callback.

    The return value can be used to wait for callback calls, modify the events
    registered for this file descriptor, and delete the file descriptor from this
    epollfd.

    """
    number = self.epoll_waiter.allocate_number(int(fd.near))
    efd = EpolledFileDescriptor(self, fd, number)
    await self.epfd.epoll_ctl(EPOLL_CTL.ADD, fd, await self.ram.ptr(EpollEvent(number, events)))
    return efd
class EpolledFileDescriptor (epoller: Epoller, fd: FileDescriptor, number: int)

Representation of a file descriptor registered on an epollfd.

We have to keep around a reference to the file descriptor to perform EPOLL_CTL.DEL. This is a bit annoying, but whatever.

Expand source code Browse git
class EpolledFileDescriptor:
    """Representation of a file descriptor registered on an epollfd.

    We have to keep around a reference to the file descriptor to perform
    EPOLL_CTL.DEL. This is a bit annoying, but whatever.

    """
    def __init__(self,
                 epoller: Epoller,
                 fd: FileDescriptor,
                 number: int,
    ) -> None:
        self.epoller = epoller
        self.fd = fd
        # TODO, we should copy this so it can't be closed out from under us
        # self.fd = fd.copy()
        self.number = number
        # We optimistically assume that the FD is ready for reading/writing immediately,
        # by setting our initial status to EPOLL.OUT|EPOLL.IN.
        # This has two benefits:
        # 1. Performance improves on our test suite and in many real-world cases, and
        # 2. More critically, if a user erroneously tries to read or write an FD which
        #    will never receive an EPOLL.OUT or EPOLL.IN because it doesn't support
        #    reading/writing, we'll fail immediately instead of blocking forever.
        self.status = FDStatus(EPOLL.OUT|EPOLL.IN)
        self.in_epollfd = True
        self.queue = RequestQueue[EPOLL, None]()
        self.total_events: t.Counter[EPOLL] = collections.Counter()
        self.consumed_events: t.Dict[EPOLL, int] = {flag: 0 for flag in EPOLL}
        reset(self._run())

    def __str__(self) -> str:
        return f"EpolledFileDescriptor({self.fd.near}, {self.number}, {self.status})"

    async def modify(self, events: EPOLL) -> None:
        "Change the EPOLL flags that this fd is registered with."
        await self.epoller.epfd.epoll_ctl(
            EPOLL_CTL.MOD, self.fd, await self.epoller.ram.ptr(EpollEvent(self.number, events)))

    async def delete(self) -> None:
        "Delete this fd from the epollfd."
        if not self.in_epollfd:
            raise Exception("already deleted from epollfd")
        await self.epoller.epfd.epoll_ctl(EPOLL_CTL.DEL, self.fd)
        self.epoller.epoll_waiter.remove_number_after_next_epoll_wait(self.number)
        self.in_epollfd = False

    async def _run(self) -> None:
        waiters: t.Dict[EPOLL, t.List[t.Tuple[EPOLL, Continuation[None]]]] = {flag: [] for flag in EPOLL}
        while True:
            try:
                ev = await self.epoller.epoll_waiter.queue.request(self.number)
            except Exception as e:
                final_exn = e
                break
            self.total_events.update(ev)
            self.status.posedge(ev)
            for val, coro in self.queue.fetch_any():
                for flag in val:
                    waiters[flag].append((val, coro))
            for flag in ev:
                to_resume = waiters[flag]
                for val, cb in list(to_resume):
                    # remove the cb from other lists it's on; cool scope there, Python
                    for waiting_flag in val:
                        waiters[waiting_flag].remove((val, cb))
                    cb.send(None)
        self.queue.close(final_exn)

    async def wait_for(self, flags: EPOLL) -> None:
        "Call epoll_wait until at least one of the passed flags is set in our status."
        if (not (self.status.mask & flags)
            and not any(self.total_events[flag] - self.consumed_events[flag] > 0
                        for flag in flags)):
            return await self.queue.request(flags)

    def consume(self, events: t.Dict[EPOLL, int]) -> None:
        """The information from these events is outdated; discard them

        Each syscall gives us information about some kinds of events, and information from
        events from epoll_wait calls before that syscall are obsoleted by this more
        up-to-date information.

        So, we call `get_current_events` before each syscall, for the set of flags that
        that syscall will inform us about. Then, after the syscall, we call `consume`, to
        indicate that the information from those events is now obsolete.

        We can't consume the events until after the syscall is done, because we want to
        allow others to optimistically perform syscalls based on these epoll events; not
        consuming the events immediately also makes us tolerate being canceled without
        performing the syscall.

        Only the events received before the syscall are known to be obsolete; events
        received during or after this syscall need to be preserved until more syscalls are
        made, to prevent deadlocks.

        The reason we need this somewhat complicated edifice is because our calls to
        epoll_wait are not synchronized with our system calls on individual file
        descriptors; calls to epoll_wait and fd system calls can happen in different
        threads and can be arbitrarily reordered relative to each other.

        Note, however, that system calls made from an individual `AsyncFileDescriptor`
        instance *are* synchronized relative to each other; those system calls are made in
        a single thread, so the system calls return in order thanks to `dneio`. This
        allows us to safely immediately call `FDStatus.negedge`/`FDStatus.posedge` after a
        system call.

        TODO if we have multiple concurrently-used `AsyncFileDescriptor` instances,
        though, they aren't synchronized, so we need to be a little more careful...

        """
        self.consumed_events.update(((flag, max(self.consumed_events[flag], count))
                                     for flag, count in events.items()))

    def get_current_events(self, flags: EPOLL) -> t.Dict[EPOLL, int]:
        "Get the current set of events from epoll_wait for these flags, for use with consume."
        return {flag: self.total_events[flag] for flag in flags}

Methods

async def modify(self, events: EPOLL) ‑> NoneType

Change the EPOLL flags that this fd is registered with.

Expand source code Browse git
async def modify(self, events: EPOLL) -> None:
    "Change the EPOLL flags that this fd is registered with."
    await self.epoller.epfd.epoll_ctl(
        EPOLL_CTL.MOD, self.fd, await self.epoller.ram.ptr(EpollEvent(self.number, events)))
async def delete(self) ‑> NoneType

Delete this fd from the epollfd.

Expand source code Browse git
async def delete(self) -> None:
    "Delete this fd from the epollfd."
    if not self.in_epollfd:
        raise Exception("already deleted from epollfd")
    await self.epoller.epfd.epoll_ctl(EPOLL_CTL.DEL, self.fd)
    self.epoller.epoll_waiter.remove_number_after_next_epoll_wait(self.number)
    self.in_epollfd = False
async def wait_for(self, flags: EPOLL) ‑> NoneType

Call epoll_wait until at least one of the passed flags is set in our status.

Expand source code Browse git
async def wait_for(self, flags: EPOLL) -> None:
    "Call epoll_wait until at least one of the passed flags is set in our status."
    if (not (self.status.mask & flags)
        and not any(self.total_events[flag] - self.consumed_events[flag] > 0
                    for flag in flags)):
        return await self.queue.request(flags)
def consume(self, events: t.Dict[EPOLL, int]) ‑> NoneType

The information from these events is outdated; discard them

Each syscall gives us information about some kinds of events, and information from events from epoll_wait calls before that syscall are obsoleted by this more up-to-date information.

So, we call get_current_events before each syscall, for the set of flags that that syscall will inform us about. Then, after the syscall, we call consume, to indicate that the information from those events is now obsolete.

We can't consume the events until after the syscall is done, because we want to allow others to optimistically perform syscalls based on these epoll events; not consuming the events immediately also makes us tolerate being canceled without performing the syscall.

Only the events received before the syscall are known to be obsolete; events received during or after this syscall need to be preserved until more syscalls are made, to prevent deadlocks.

The reason we need this somewhat complicated edifice is because our calls to epoll_wait are not synchronized with our system calls on individual file descriptors; calls to epoll_wait and fd system calls can happen in different threads and can be arbitrarily reordered relative to each other.

Note, however, that system calls made from an individual AsyncFileDescriptor instance are synchronized relative to each other; those system calls are made in a single thread, so the system calls return in order thanks to dneio. This allows us to safely immediately call FDStatus.negedge/FDStatus.posedge after a system call.

TODO if we have multiple concurrently-used AsyncFileDescriptor instances, though, they aren't synchronized, so we need to be a little more careful…

Expand source code Browse git
def consume(self, events: t.Dict[EPOLL, int]) -> None:
    """The information from these events is outdated; discard them

    Each syscall gives us information about some kinds of events, and information from
    events from epoll_wait calls before that syscall are obsoleted by this more
    up-to-date information.

    So, we call `get_current_events` before each syscall, for the set of flags that
    that syscall will inform us about. Then, after the syscall, we call `consume`, to
    indicate that the information from those events is now obsolete.

    We can't consume the events until after the syscall is done, because we want to
    allow others to optimistically perform syscalls based on these epoll events; not
    consuming the events immediately also makes us tolerate being canceled without
    performing the syscall.

    Only the events received before the syscall are known to be obsolete; events
    received during or after this syscall need to be preserved until more syscalls are
    made, to prevent deadlocks.

    The reason we need this somewhat complicated edifice is because our calls to
    epoll_wait are not synchronized with our system calls on individual file
    descriptors; calls to epoll_wait and fd system calls can happen in different
    threads and can be arbitrarily reordered relative to each other.

    Note, however, that system calls made from an individual `AsyncFileDescriptor`
    instance *are* synchronized relative to each other; those system calls are made in
    a single thread, so the system calls return in order thanks to `dneio`. This
    allows us to safely immediately call `FDStatus.negedge`/`FDStatus.posedge` after a
    system call.

    TODO if we have multiple concurrently-used `AsyncFileDescriptor` instances,
    though, they aren't synchronized, so we need to be a little more careful...

    """
    self.consumed_events.update(((flag, max(self.consumed_events[flag], count))
                                 for flag, count in events.items()))
def get_current_events(self, flags: EPOLL) ‑> Dict[EPOLL, int]

Get the current set of events from epoll_wait for these flags, for use with consume.

Expand source code Browse git
def get_current_events(self, flags: EPOLL) -> t.Dict[EPOLL, int]:
    "Get the current set of events from epoll_wait for these flags, for use with consume."
    return {flag: self.total_events[flag] for flag in flags}
class AsyncFileDescriptor (ram: RAM, handle: FileDescriptor, epolled: EpolledFileDescriptor)

A file descriptor on which IO can be performed without blocking the thread.

Also comes with helpful methods AsyncFileDescriptor.write_all_bytes() and AsyncFileDescriptor.read_some_bytes() to abstract over memory allocation.

We always wait for a posedge to come back from epoll before trying to read. This is not necessarily too pessimistic, because as soon as we have a single posedge, we will keep reading in a loop, as long as data keeps coming through.

Don't construct directly; use the AsyncFileDescriptor.make constructor instead.

Expand source code Browse git
class AsyncFileDescriptor:
    """A file descriptor on which IO can be performed without blocking the thread.

    Also comes with helpful methods `AsyncFileDescriptor.write_all_bytes` and
    `AsyncFileDescriptor.read_some_bytes` to abstract over memory allocation.

    We always wait for a posedge to come back from epoll before trying to read. This is
    not necessarily too pessimistic, because as soon as we have a single posedge, we will
    keep reading in a loop, as long as data keeps coming through.

    """
    @staticmethod
    async def make(epoller: Epoller, ram: RAM, fd: FileDescriptor) -> AsyncFileDescriptor:
        """Make an AsyncFileDescriptor; make sure to call this with only O.NONBLOCK file descriptors.

        It won't actually break anything if this is called with file descriptors not in
        NONBLOCK mode; it just means that they'll block when we go to read, which is
        probably not what the user wants.

        """
        epolled = await epoller.register(
            fd, EPOLL.IN|EPOLL.OUT|EPOLL.RDHUP|EPOLL.PRI|EPOLL.ERR|EPOLL.HUP|EPOLL.ET,
        )
        return AsyncFileDescriptor(ram, fd, epolled)

    def __init__(self, ram: RAM, handle: FileDescriptor,
                 epolled: EpolledFileDescriptor,
    ) -> None:
        "Don't construct directly; use the AsyncFileDescriptor.make constructor instead."
        self.ram = ram
        self.handle = handle
        "The underlying FileDescriptor for this AFD, used for all system calls"
        self.epolled = epolled

    def __str__(self) -> str:
        return f"AsyncFileDescriptor({self.epolled})"

    async def make_new_afd(self, fd: FileDescriptor) -> AsyncFileDescriptor:
        """Use the Epoller and RAM in this AsyncFD to make a new `AsyncFileDescriptor` for `fd`

        Make sure that `fd` is already in non-blocking mode;
        such as by accepting it with the `SOCK.NONBLOCK` flag.

        This doesn't steal any resources from the original AFD; it's just a convenience method,
        most useful when calling accept() and wanting to create new AFDs out of the resulting FDs.

        """
        return await AsyncFileDescriptor.make(self.epolled.epoller, self.ram, fd)

    async def wait_for_rdhup(self) -> None:
        "Call epoll_wait until this file descriptor has a hangup."
        await self.epolled.wait_for(EPOLL.RDHUP|EPOLL.HUP)

    async def read(self, ptr: Pointer) -> t.Tuple[Pointer, Pointer]:
        "Call `FileDescriptor.read` without blocking the thread."
        while True:
            await self.epolled.wait_for(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR)
            current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR)
            try:
                return (await self.handle.read(ptr))
            except OSError as e:
                self.epolled.consume(current_events)
                if e.errno == errno.EAGAIN:
                    self.epolled.status.negedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR)
                else:
                    self.epolled.status.posedge(EPOLL.ERR)
                    raise
            else:
                self.epolled.consume(current_events)
                self.epolled.status.posedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP)

    async def read_some_bytes(self, count: int=4096) -> bytes:
        """Read at most count bytes; possibly less, if we have a partial read.

        This allocates on each call. For some applications, you may want to avoid the cost of
        allocation, by instead allocating a buffer with `Thread.malloc` up front and reusing it
        across multiple calls to `AsyncFileDescriptor.read`.

        """
        ptr = await self.ram.malloc(bytes, count)
        valid, _ = await self.read(ptr)
        return await valid.read()

    async def write(self, buf: Pointer) -> t.Tuple[Pointer, Pointer]:
        """Call `FileDescriptor.write` without blocking the thread.

        Note that this doesn't retry partial writes, which are always a possibility, so you should
        make sure to do that yourself, or use `AsyncFileDescriptor.write_all`.

        """
        while True:
            await self.epolled.wait_for(EPOLL.OUT|EPOLL.ERR)
            current_events = self.epolled.get_current_events(EPOLL.OUT|EPOLL.ERR)
            try:
                return await self.handle.write(buf)
            except OSError as e:
                self.epolled.consume(current_events)
                if e.errno == errno.EAGAIN:
                    self.epolled.status.negedge(EPOLL.OUT|EPOLL.ERR)
                else:
                    self.epolled.status.posedge(EPOLL.ERR)
                    raise
            else:
                self.epolled.consume(current_events)
                self.epolled.status.posedge(EPOLL.OUT)

    async def write_all(self, to_write: Pointer) -> None:
        """Write all of this pointer to the fd, retrying on partial writes until complete.

        You might want to not use this, if you want to react to a partial write in some special way.
        For example, `rsyscall.memory.socket_transport.SocketMemoryTransport` starts a `recv`
        immediately after a partial write, before retrying the write, for increased parallelism.

        """
        while to_write.size() > 0:
            written, to_write = await self.write(to_write)

    async def write_all_bytes(self, buf: bytes) -> None:
        """Write all these bytes to the fd, retrying on partial writes until complete.

        This allocates and performs a store to memory on each call. This is inefficient if you
        already have an initialized pointer for the value, or if you already have an allocated
        buffer that you can use to store the value, which you can also reuse for other values.
        In those cases, you might want to use `AsyncFileDescriptor.write_all`.

        """
        ptr = await self.ram.ptr(buf)
        await self.write_all(ptr)

    @t.overload
    async def accept(self, flags: SOCK=SOCK.NONE) -> FileDescriptor: ...
    @t.overload
    async def accept(self, flags: SOCK, addr: WrittenPointer[Sockbuf[T_sockaddr]]
    ) -> t.Tuple[FileDescriptor, WrittenPointer[Sockbuf[T_sockaddr]]]: ...

    async def accept(self, flags: SOCK=SOCK.NONE, addr: t.Optional[WrittenPointer[Sockbuf[T_sockaddr]]]=None
    ) -> t.Union[FileDescriptor, t.Tuple[FileDescriptor, WrittenPointer[Sockbuf[T_sockaddr]]]]:
        "Call accept without blocking the thread."
        while True:
            await self.epolled.wait_for(EPOLL.IN|EPOLL.HUP|EPOLL.ERR)
            current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.HUP|EPOLL.ERR)
            try:
                if addr is None:
                    return (await self.handle.accept(flags))
                else:
                    return (await self.handle.accept(flags, addr))
            except OSError as e:
                self.epolled.consume(current_events)
                if e.errno == errno.EAGAIN:
                    self.epolled.status.negedge(EPOLL.IN|EPOLL.HUP|EPOLL.ERR)
                else:
                    self.epolled.status.posedge(EPOLL.HUP|EPOLL.ERR)
                    raise
            else:
                self.epolled.consume(current_events)
                self.epolled.status.posedge(EPOLL.IN)

    async def accept_addr(self, flags: SOCK=SOCK.NONE) -> t.Tuple[FileDescriptor, Sockaddr]:
        "Call accept with a buffer for the address, and return the resulting fd and address."
        written_sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(SockaddrStorage)))
        fd, sockbuf = await self.accept(flags, written_sockbuf)
        addr = (await (await sockbuf.read()).buf.read()).parse()
        return fd, addr

    async def bind(self, addr: WrittenPointer[Sockaddr]) -> None:
        "Call bind; bind already doesn't block the thread."
        await self.handle.bind(addr)

    async def connect(self, addr: WrittenPointer[Sockaddr]) -> None:
        "Call connect without blocking the thread."
        try:
            # Note that an unconnected socket, at least with AF.INET SOCK.STREAM,
            # will have EPOLL.OUT|EPOLL.HUP set when added to epoll, before calling connect.
            current_events = self.epolled.get_current_events(EPOLL.OUT)
            await self.handle.connect(addr)
        except OSError as e:
            self.epolled.consume(current_events)
            self.epolled.status.negedge(EPOLL.OUT)
            if e.errno == errno.EINPROGRESS:
                await self.epolled.wait_for(EPOLL.OUT)
                current_events = self.epolled.get_current_events(EPOLL.OUT)
                sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(Int32)))
                retbuf = await self.handle.getsockopt(SOL.SOCKET, SO.ERROR, sockbuf)
                err = await (await retbuf.read()).buf.read()
                self.epolled.consume(current_events)
                if err == 0:
                    self.epolled.status.posedge(EPOLL.OUT)
                else:
                    self.epolled.status.posedge(EPOLL.ERR)
                    try:
                        raise OSError(err, os.strerror(err))
                    except OSError as exn:
                        exn.filename = self.handle
                        if hasattr(addr, 'value'):
                            exn.filename2 = addr.value
                        raise
            else:
                raise
        else:
            self.epolled.consume(current_events)
            self.epolled.status.posedge(EPOLL.OUT)

    def with_handle(self, fd: FileDescriptor) -> AsyncFileDescriptor:
        """Return a new AFD using this new FD handle for making syscalls.

        This is useful when we want to change what task we're making syscalls in when
        using this AFD.

        """
        return AsyncFileDescriptor(self.ram, fd, self.epolled)

    async def close(self) -> None:
        "Remove this FD from Epoll and invalidate the FD handle."
        await self.epolled.delete()
        await self.handle.invalidate()

    async def __aenter__(self) -> None:
        pass

    async def __aexit__(self, *args, **kwargs) -> None:
        await self.close()

Static methods

async def make(epoller: Epoller, ram: RAM, fd: FileDescriptor) ‑> AsyncFileDescriptor

Make an AsyncFileDescriptor; make sure to call this with only O.NONBLOCK file descriptors.

It won't actually break anything if this is called with file descriptors not in NONBLOCK mode; it just means that they'll block when we go to read, which is probably not what the user wants.

Expand source code Browse git
@staticmethod
async def make(epoller: Epoller, ram: RAM, fd: FileDescriptor) -> AsyncFileDescriptor:
    """Make an AsyncFileDescriptor; make sure to call this with only O.NONBLOCK file descriptors.

    It won't actually break anything if this is called with file descriptors not in
    NONBLOCK mode; it just means that they'll block when we go to read, which is
    probably not what the user wants.

    """
    epolled = await epoller.register(
        fd, EPOLL.IN|EPOLL.OUT|EPOLL.RDHUP|EPOLL.PRI|EPOLL.ERR|EPOLL.HUP|EPOLL.ET,
    )
    return AsyncFileDescriptor(ram, fd, epolled)

Instance variables

var handle

The underlying FileDescriptor for this AFD, used for all system calls

Methods

async def make_new_afd(self, fd: FileDescriptor) ‑> AsyncFileDescriptor

Use the Epoller and RAM in this AsyncFD to make a new AsyncFileDescriptor for fd

Make sure that fd is already in non-blocking mode; such as by accepting it with the SOCK.NONBLOCK flag.

This doesn't steal any resources from the original AFD; it's just a convenience method, most useful when calling accept() and wanting to create new AFDs out of the resulting FDs.

Expand source code Browse git
async def make_new_afd(self, fd: FileDescriptor) -> AsyncFileDescriptor:
    """Use the Epoller and RAM in this AsyncFD to make a new `AsyncFileDescriptor` for `fd`

    Make sure that `fd` is already in non-blocking mode;
    such as by accepting it with the `SOCK.NONBLOCK` flag.

    This doesn't steal any resources from the original AFD; it's just a convenience method,
    most useful when calling accept() and wanting to create new AFDs out of the resulting FDs.

    """
    return await AsyncFileDescriptor.make(self.epolled.epoller, self.ram, fd)
async def wait_for_rdhup(self) ‑> NoneType

Call epoll_wait until this file descriptor has a hangup.

Expand source code Browse git
async def wait_for_rdhup(self) -> None:
    "Call epoll_wait until this file descriptor has a hangup."
    await self.epolled.wait_for(EPOLL.RDHUP|EPOLL.HUP)
async def read(self, ptr: Pointer) ‑> Tuple[PointerPointer]

Call FileDescriptor.read without blocking the thread.

Expand source code Browse git
async def read(self, ptr: Pointer) -> t.Tuple[Pointer, Pointer]:
    "Call `FileDescriptor.read` without blocking the thread."
    while True:
        await self.epolled.wait_for(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR)
        current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR)
        try:
            return (await self.handle.read(ptr))
        except OSError as e:
            self.epolled.consume(current_events)
            if e.errno == errno.EAGAIN:
                self.epolled.status.negedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR)
            else:
                self.epolled.status.posedge(EPOLL.ERR)
                raise
        else:
            self.epolled.consume(current_events)
            self.epolled.status.posedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP)
async def read_some_bytes(self, count: int = 4096) ‑> bytes

Read at most count bytes; possibly less, if we have a partial read.

This allocates on each call. For some applications, you may want to avoid the cost of allocation, by instead allocating a buffer with Thread.malloc up front and reusing it across multiple calls to AsyncFileDescriptor.read().

Expand source code Browse git
async def read_some_bytes(self, count: int=4096) -> bytes:
    """Read at most count bytes; possibly less, if we have a partial read.

    This allocates on each call. For some applications, you may want to avoid the cost of
    allocation, by instead allocating a buffer with `Thread.malloc` up front and reusing it
    across multiple calls to `AsyncFileDescriptor.read`.

    """
    ptr = await self.ram.malloc(bytes, count)
    valid, _ = await self.read(ptr)
    return await valid.read()
async def write(self, buf: Pointer) ‑> Tuple[PointerPointer]

Call FileDescriptor.write without blocking the thread.

Note that this doesn't retry partial writes, which are always a possibility, so you should make sure to do that yourself, or use AsyncFileDescriptor.write_all().

Expand source code Browse git
async def write(self, buf: Pointer) -> t.Tuple[Pointer, Pointer]:
    """Call `FileDescriptor.write` without blocking the thread.

    Note that this doesn't retry partial writes, which are always a possibility, so you should
    make sure to do that yourself, or use `AsyncFileDescriptor.write_all`.

    """
    while True:
        await self.epolled.wait_for(EPOLL.OUT|EPOLL.ERR)
        current_events = self.epolled.get_current_events(EPOLL.OUT|EPOLL.ERR)
        try:
            return await self.handle.write(buf)
        except OSError as e:
            self.epolled.consume(current_events)
            if e.errno == errno.EAGAIN:
                self.epolled.status.negedge(EPOLL.OUT|EPOLL.ERR)
            else:
                self.epolled.status.posedge(EPOLL.ERR)
                raise
        else:
            self.epolled.consume(current_events)
            self.epolled.status.posedge(EPOLL.OUT)
async def write_all(self, to_write: Pointer) ‑> NoneType

Write all of this pointer to the fd, retrying on partial writes until complete.

You might want to not use this, if you want to react to a partial write in some special way. For example, SocketMemoryTransport starts a recv immediately after a partial write, before retrying the write, for increased parallelism.

Expand source code Browse git
async def write_all(self, to_write: Pointer) -> None:
    """Write all of this pointer to the fd, retrying on partial writes until complete.

    You might want to not use this, if you want to react to a partial write in some special way.
    For example, `rsyscall.memory.socket_transport.SocketMemoryTransport` starts a `recv`
    immediately after a partial write, before retrying the write, for increased parallelism.

    """
    while to_write.size() > 0:
        written, to_write = await self.write(to_write)
async def write_all_bytes(self, buf: bytes) ‑> NoneType

Write all these bytes to the fd, retrying on partial writes until complete.

This allocates and performs a store to memory on each call. This is inefficient if you already have an initialized pointer for the value, or if you already have an allocated buffer that you can use to store the value, which you can also reuse for other values. In those cases, you might want to use AsyncFileDescriptor.write_all().

Expand source code Browse git
async def write_all_bytes(self, buf: bytes) -> None:
    """Write all these bytes to the fd, retrying on partial writes until complete.

    This allocates and performs a store to memory on each call. This is inefficient if you
    already have an initialized pointer for the value, or if you already have an allocated
    buffer that you can use to store the value, which you can also reuse for other values.
    In those cases, you might want to use `AsyncFileDescriptor.write_all`.

    """
    ptr = await self.ram.ptr(buf)
    await self.write_all(ptr)
async def accept(self, flags: SOCK = SOCK.NONE, addr: t.Optional[WrittenPointer[Sockbuf[T_sockaddr]]] = None) ‑> Union[FileDescriptor, Tuple[FileDescriptorWrittenPointer[Sockbuf[~T_sockaddr]]]]

Call accept without blocking the thread.

Expand source code Browse git
async def accept(self, flags: SOCK=SOCK.NONE, addr: t.Optional[WrittenPointer[Sockbuf[T_sockaddr]]]=None
) -> t.Union[FileDescriptor, t.Tuple[FileDescriptor, WrittenPointer[Sockbuf[T_sockaddr]]]]:
    "Call accept without blocking the thread."
    while True:
        await self.epolled.wait_for(EPOLL.IN|EPOLL.HUP|EPOLL.ERR)
        current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.HUP|EPOLL.ERR)
        try:
            if addr is None:
                return (await self.handle.accept(flags))
            else:
                return (await self.handle.accept(flags, addr))
        except OSError as e:
            self.epolled.consume(current_events)
            if e.errno == errno.EAGAIN:
                self.epolled.status.negedge(EPOLL.IN|EPOLL.HUP|EPOLL.ERR)
            else:
                self.epolled.status.posedge(EPOLL.HUP|EPOLL.ERR)
                raise
        else:
            self.epolled.consume(current_events)
            self.epolled.status.posedge(EPOLL.IN)
async def accept_addr(self, flags: SOCK = SOCK.NONE) ‑> Tuple[FileDescriptorSockaddr]

Call accept with a buffer for the address, and return the resulting fd and address.

Expand source code Browse git
async def accept_addr(self, flags: SOCK=SOCK.NONE) -> t.Tuple[FileDescriptor, Sockaddr]:
    "Call accept with a buffer for the address, and return the resulting fd and address."
    written_sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(SockaddrStorage)))
    fd, sockbuf = await self.accept(flags, written_sockbuf)
    addr = (await (await sockbuf.read()).buf.read()).parse()
    return fd, addr
async def bind(self, addr: WrittenPointer[Sockaddr]) ‑> NoneType

Call bind; bind already doesn't block the thread.

Expand source code Browse git
async def bind(self, addr: WrittenPointer[Sockaddr]) -> None:
    "Call bind; bind already doesn't block the thread."
    await self.handle.bind(addr)
async def connect(self, addr: WrittenPointer[Sockaddr]) ‑> NoneType

Call connect without blocking the thread.

Expand source code Browse git
async def connect(self, addr: WrittenPointer[Sockaddr]) -> None:
    "Call connect without blocking the thread."
    try:
        # Note that an unconnected socket, at least with AF.INET SOCK.STREAM,
        # will have EPOLL.OUT|EPOLL.HUP set when added to epoll, before calling connect.
        current_events = self.epolled.get_current_events(EPOLL.OUT)
        await self.handle.connect(addr)
    except OSError as e:
        self.epolled.consume(current_events)
        self.epolled.status.negedge(EPOLL.OUT)
        if e.errno == errno.EINPROGRESS:
            await self.epolled.wait_for(EPOLL.OUT)
            current_events = self.epolled.get_current_events(EPOLL.OUT)
            sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(Int32)))
            retbuf = await self.handle.getsockopt(SOL.SOCKET, SO.ERROR, sockbuf)
            err = await (await retbuf.read()).buf.read()
            self.epolled.consume(current_events)
            if err == 0:
                self.epolled.status.posedge(EPOLL.OUT)
            else:
                self.epolled.status.posedge(EPOLL.ERR)
                try:
                    raise OSError(err, os.strerror(err))
                except OSError as exn:
                    exn.filename = self.handle
                    if hasattr(addr, 'value'):
                        exn.filename2 = addr.value
                    raise
        else:
            raise
    else:
        self.epolled.consume(current_events)
        self.epolled.status.posedge(EPOLL.OUT)
def with_handle(self, fd: FileDescriptor) ‑> AsyncFileDescriptor

Return a new AFD using this new FD handle for making syscalls.

This is useful when we want to change what task we're making syscalls in when using this AFD.

Expand source code Browse git
def with_handle(self, fd: FileDescriptor) -> AsyncFileDescriptor:
    """Return a new AFD using this new FD handle for making syscalls.

    This is useful when we want to change what task we're making syscalls in when
    using this AFD.

    """
    return AsyncFileDescriptor(self.ram, fd, self.epolled)
async def close(self) ‑> NoneType

Remove this FD from Epoll and invalidate the FD handle.

Expand source code Browse git
async def close(self) -> None:
    "Remove this FD from Epoll and invalidate the FD handle."
    await self.epolled.delete()
    await self.handle.invalidate()
class AsyncReadBuffer (fd: AsyncFileDescriptor)

A buffer for parsing variable-length streaming data.

When reading data from a stream such as a pipe or TCP connection, data is not delivered to us from the kernel in nicely-separated records. We need to rebuffer the data so that it can be parsed. That's what this class does; and it provides a few helper methods to make it easier to read and parse such streams.

Expand source code Browse git
class AsyncReadBuffer:
    """A buffer for parsing variable-length streaming data.

    When reading data from a stream such as a pipe or TCP connection, data is not
    delivered to us from the kernel in nicely-separated records. We need to rebuffer the
    data so that it can be parsed. That's what this class does; and it provides a few
    helper methods to make it easier to read and parse such streams.

    """
    def __init__(self, fd: AsyncFileDescriptor) -> None:
        self.fd = fd
        self.buf = b""
        self.unread_ptr: t.Optional[Pointer] = None

    async def _read(self) -> bytes:
        "Read some bytes; return None on EOF."
        if self.unread_ptr is None:
            ptr = await self.fd.ram.malloc(bytes, 4096)
            self.unread_ptr, _ = await self.fd.read(ptr)
        if self.unread_ptr.size():
            data = await self.unread_ptr.read()
        else:
            data = b''
        self.unread_ptr = None
        if len(data) == 0:
            raise EOFError
        else:
            return data

    async def read_length(self, length: int) -> bytes:
        "Read exactly this many bytes; raises on EOF."
        while len(self.buf) < length:
            data = await self._read()
            self.buf += data
        section = self.buf[:length]
        self.buf = self.buf[length:]
        return section

    async def read_cffi(self, name: str) -> t.Any:
        "Read, parse, and return this fixed-size cffi type."
        size = ffi.sizeof(name)
        try:
            data = await self.read_length(size)
        except EOFError as e:
            e.args = ("got EOF while expecting to read a", name, "of size", size)
            raise
        nameptr = name + '*'
        dest = ffi.new(nameptr)
        # ffi.cast drops the reference to the backing buffer, so we have to copy it
        src = ffi.cast(nameptr, ffi.from_buffer(data))
        ffi.memmove(dest, src, size)
        return dest[0]

    async def read_struct(self, cls: t.Type[T_fixed_size]) -> T_fixed_size:
        "Read one fixed-size struct from the buffer, or return None if that's not possible"
        size = cls.sizeof()
        try:
            data = await self.read_length(size)
        except EOFError as e:
            e.args = ("got EOF while expecting to read a", cls, "of size", size)
            raise
        return cls.get_serializer(self.fd.handle.task).from_bytes(data)

    async def read_length_prefixed_string(self) -> bytes:
        "Read a bytestring which is prefixed with a 64-bit native-byte-order size."
        elem_size = await self.read_cffi('size_t')
        try:
            elem = await self.read_length(elem_size)
        except EOFError as e:
            e.args = ("got EOF while expecting to read environment element of length", elem_size)
        return elem

    async def read_length_prefixed_array(self, length: int) -> t.List[bytes]:
        "Read an array, prefixed with its size, of bytestrings, each prefixed with their size."
        ret: t.List[bytes] = []
        for _ in range(length):
            ret.append(await self.read_length_prefixed_string())
        return ret

    async def read_envp(self, length: int) -> t.Dict[str, str]:
        """Read a size-prefixed array of size-prefixed bytestrings, with each bytestring containing '=', into a dict.

        This is the format we expect for envp, which is written to us on startup by
        non-child threads.

        We assume that the environment is correctly formed; that is, each element contains
        '='.  If that's not true, we'll throw an exception.  But it would be quite unusual
        to have an incorrectly formed environment variable, so it's not too concerning.

        """
        raw = await self.read_length_prefixed_array(length)
        environ: t.Dict[str, str] = {}
        for elem in raw:
            key, val = elem.split(b"=", 1)
            environ[os.fsdecode(key)] = os.fsdecode(val)
        return environ

    async def read_until_delimiter(self, delim: bytes) -> t.Optional[bytes]:
        "Read and return all bytes until the specified delimiter, stripping the delimiter; on EOF, return None."
        while True:
            try:
                i = self.buf.index(delim)
            except ValueError:
                pass
            else:
                section = self.buf[:i]
                # skip the delimiter
                self.buf = self.buf[i+1:]
                return section
            # buf contains no copies of "delim", gotta read some more data
            data = await self._read()
            if data is None:
                return None
            self.buf += data

    async def read_line(self) -> bytes:
        "Read and return a line, stripping the newline character."
        ret = await self.read_until_delimiter(b"\n")
        if ret is None:
            raise EOFError("hangup before reading full line")
        return ret

    async def read_netstring(self) -> bytes:
        "Read a netstring, as defined by DJB."
        length_bytes = await self.read_until_delimiter(b':')
        if length_bytes is None:
            raise EOFError("hangup before reaching colon at end of netstring size")
        length = int(length_bytes)
        try:
            data = await self.read_length(length)
        except EOFError as e:
            e.args = ("hangup before we read netstring data of size", length)
            raise
        try:
            comma = await self.read_length(1)
        except EOFError as e:
            e.args = ("hangup before comma at end of netstring",)
            raise
        if comma != b",":
            raise Exception("bad netstring delimiter", comma)
        return data

Methods

async def read_length(self, length: int) ‑> bytes

Read exactly this many bytes; raises on EOF.

Expand source code Browse git
async def read_length(self, length: int) -> bytes:
    "Read exactly this many bytes; raises on EOF."
    while len(self.buf) < length:
        data = await self._read()
        self.buf += data
    section = self.buf[:length]
    self.buf = self.buf[length:]
    return section
async def read_cffi(self, name: str) ‑> Any

Read, parse, and return this fixed-size cffi type.

Expand source code Browse git
async def read_cffi(self, name: str) -> t.Any:
    "Read, parse, and return this fixed-size cffi type."
    size = ffi.sizeof(name)
    try:
        data = await self.read_length(size)
    except EOFError as e:
        e.args = ("got EOF while expecting to read a", name, "of size", size)
        raise
    nameptr = name + '*'
    dest = ffi.new(nameptr)
    # ffi.cast drops the reference to the backing buffer, so we have to copy it
    src = ffi.cast(nameptr, ffi.from_buffer(data))
    ffi.memmove(dest, src, size)
    return dest[0]
async def read_struct(self, cls: t.Type[T_fixed_size]) ‑> ~T_fixed_size

Read one fixed-size struct from the buffer, or return None if that's not possible

Expand source code Browse git
async def read_struct(self, cls: t.Type[T_fixed_size]) -> T_fixed_size:
    "Read one fixed-size struct from the buffer, or return None if that's not possible"
    size = cls.sizeof()
    try:
        data = await self.read_length(size)
    except EOFError as e:
        e.args = ("got EOF while expecting to read a", cls, "of size", size)
        raise
    return cls.get_serializer(self.fd.handle.task).from_bytes(data)
async def read_length_prefixed_string(self) ‑> bytes

Read a bytestring which is prefixed with a 64-bit native-byte-order size.

Expand source code Browse git
async def read_length_prefixed_string(self) -> bytes:
    "Read a bytestring which is prefixed with a 64-bit native-byte-order size."
    elem_size = await self.read_cffi('size_t')
    try:
        elem = await self.read_length(elem_size)
    except EOFError as e:
        e.args = ("got EOF while expecting to read environment element of length", elem_size)
    return elem
async def read_length_prefixed_array(self, length: int) ‑> List[bytes]

Read an array, prefixed with its size, of bytestrings, each prefixed with their size.

Expand source code Browse git
async def read_length_prefixed_array(self, length: int) -> t.List[bytes]:
    "Read an array, prefixed with its size, of bytestrings, each prefixed with their size."
    ret: t.List[bytes] = []
    for _ in range(length):
        ret.append(await self.read_length_prefixed_string())
    return ret
async def read_envp(self, length: int) ‑> Dict[str, str]

Read a size-prefixed array of size-prefixed bytestrings, with each bytestring containing '=', into a dict.

This is the format we expect for envp, which is written to us on startup by non-child threads.

We assume that the environment is correctly formed; that is, each element contains '='. If that's not true, we'll throw an exception. But it would be quite unusual to have an incorrectly formed environment variable, so it's not too concerning.

Expand source code Browse git
async def read_envp(self, length: int) -> t.Dict[str, str]:
    """Read a size-prefixed array of size-prefixed bytestrings, with each bytestring containing '=', into a dict.

    This is the format we expect for envp, which is written to us on startup by
    non-child threads.

    We assume that the environment is correctly formed; that is, each element contains
    '='.  If that's not true, we'll throw an exception.  But it would be quite unusual
    to have an incorrectly formed environment variable, so it's not too concerning.

    """
    raw = await self.read_length_prefixed_array(length)
    environ: t.Dict[str, str] = {}
    for elem in raw:
        key, val = elem.split(b"=", 1)
        environ[os.fsdecode(key)] = os.fsdecode(val)
    return environ
async def read_until_delimiter(self, delim: bytes) ‑> Optional[bytes]

Read and return all bytes until the specified delimiter, stripping the delimiter; on EOF, return None.

Expand source code Browse git
async def read_until_delimiter(self, delim: bytes) -> t.Optional[bytes]:
    "Read and return all bytes until the specified delimiter, stripping the delimiter; on EOF, return None."
    while True:
        try:
            i = self.buf.index(delim)
        except ValueError:
            pass
        else:
            section = self.buf[:i]
            # skip the delimiter
            self.buf = self.buf[i+1:]
            return section
        # buf contains no copies of "delim", gotta read some more data
        data = await self._read()
        if data is None:
            return None
        self.buf += data
async def read_line(self) ‑> bytes

Read and return a line, stripping the newline character.

Expand source code Browse git
async def read_line(self) -> bytes:
    "Read and return a line, stripping the newline character."
    ret = await self.read_until_delimiter(b"\n")
    if ret is None:
        raise EOFError("hangup before reading full line")
    return ret
async def read_netstring(self) ‑> bytes

Read a netstring, as defined by DJB.

Expand source code Browse git
async def read_netstring(self) -> bytes:
    "Read a netstring, as defined by DJB."
    length_bytes = await self.read_until_delimiter(b':')
    if length_bytes is None:
        raise EOFError("hangup before reaching colon at end of netstring size")
    length = int(length_bytes)
    try:
        data = await self.read_length(length)
    except EOFError as e:
        e.args = ("hangup before we read netstring data of size", length)
        raise
    try:
        comma = await self.read_length(1)
    except EOFError as e:
        e.args = ("hangup before comma at end of netstring",)
        raise
    if comma != b",":
        raise Exception("bad netstring delimiter", comma)
    return data