Module rsyscall.sys.epoll

#include <sys/epoll.h>

Expand source code Browse git
"`#include <sys/epoll.h>`"
from __future__ import annotations
from rsyscall._raw import lib, ffi # type: ignore
from rsyscall.struct import Struct, Serializable
import enum
import typing as t
from dataclasses import dataclass

class EpollFlag(enum.IntFlag):
    NONE = 0
    CLOEXEC = lib.EPOLL_CLOEXEC

class EPOLL_CTL(enum.IntEnum):
    ADD = lib.EPOLL_CTL_ADD
    MOD = lib.EPOLL_CTL_MOD
    DEL = lib.EPOLL_CTL_DEL

class EPOLL(enum.IntFlag):
    NONE = 0
    IN = lib.EPOLLIN
    OUT = lib.EPOLLOUT
    RDHUP = lib.EPOLLRDHUP # type: ignore
    PRI = lib.EPOLLPRI
    ERR = lib.EPOLLERR
    HUP = lib.EPOLLHUP
    # options
    ET = lib.EPOLLET

    def __iter__(self) -> t.Iterator[EPOLL]:
        for flag in EPOLL:
            if self & flag:
                yield flag

@dataclass
class EpollEvent(Struct):
    data: int
    events: EPOLL

    def to_bytes(self) -> bytes:
        return bytes(ffi.buffer(ffi.new('struct epoll_event const*', (self.events, (self.data,)))))

    T = t.TypeVar('T', bound='EpollEvent')
    @classmethod
    def from_bytes(cls: t.Type[T], data: bytes) -> T:
        struct = ffi.cast('struct epoll_event*', ffi.from_buffer(data))
        return cls(struct.data.u64, EPOLL(struct.events))

    @classmethod
    def sizeof(cls) -> int:
        return ffi.sizeof('struct epoll_event')

class EpollEventList(t.List[EpollEvent], Serializable):
    def to_bytes(self) -> bytes:
        ret = b""
        for ent in self:
            ret += ent.to_bytes()
        return ret

    T = t.TypeVar('T', bound='EpollEventList')
    @classmethod
    def from_bytes(cls: t.Type[T], data: bytes) -> T:
        entries = []
        while len(data) > 0:
            ent = EpollEvent.from_bytes(data)
            entries.append(ent)
            data = data[EpollEvent.sizeof():]
        return cls(entries)

#### Classes ####
from rsyscall.handle.fd import BaseFileDescriptor, FileDescriptorTask
from rsyscall.handle.pointer import Pointer, WrittenPointer, ReadablePointer

T_fd = t.TypeVar('T_fd', bound='EpollFileDescriptor')
class EpollFileDescriptor(BaseFileDescriptor):
    async def epoll_wait(self, events: Pointer[EpollEventList],
                         timeout: int) -> t.Tuple[ReadablePointer[EpollEventList], Pointer]:
        self._validate()
        with events.borrow(self.task):
            maxevents = events.size()//EpollEvent.sizeof()
            num = await _epoll_wait(self.task.sysif, self.near, events.near, maxevents, timeout)
            valid_size = num * EpollEvent.sizeof()
            return events.readable_split(valid_size)

    async def epoll_ctl(self, op: EPOLL_CTL, fd: BaseFileDescriptor,
                        event: t.Optional[WrittenPointer[EpollEvent]]=None) -> None:
        self._validate()
        with fd.borrow(self.task):
            if event is not None:
                event.check_address_space(self.task)
            return (await _epoll_ctl(self.task.sysif, self.near, op, fd.near, event.near if event else None))

class EpollTask(FileDescriptorTask[T_fd]):
    async def epoll_create(self, flags: EpollFlag=EpollFlag.NONE) -> T_fd:
        return self.make_fd_handle(await _epoll_create(self.sysif, flags|EpollFlag.CLOEXEC))

#### Raw syscalls ####
import rsyscall.near.types as near
from rsyscall.near.sysif import SyscallInterface
from rsyscall.sys.syscall import SYS

async def _epoll_create(sysif: SyscallInterface, flags: EpollFlag) -> near.FileDescriptor:
    return near.FileDescriptor(await sysif.syscall(SYS.epoll_create1, flags))

async def _epoll_ctl(sysif: SyscallInterface, epfd: near.FileDescriptor, op: EPOLL_CTL,
                     fd: near.FileDescriptor, event: t.Optional[near.Address]=None) -> None:
    if event is None:
        event = 0 # type: ignore
    await sysif.syscall(SYS.epoll_ctl, epfd, op, fd, event)

async def _epoll_wait(sysif: SyscallInterface, epfd: near.FileDescriptor,
                      events: near.Address, maxevents: int, timeout: int) -> int:
    return (await sysif.syscall(SYS.epoll_wait, epfd, events, maxevents, timeout))


#### Tests ####
from unittest import TestCase
class TestEpoll(TestCase):
    def test_epoll_event_list(self) -> None:
        initial = EpollEventList([EpollEvent(42, EPOLL.IN|EPOLL.PRI)])
        output = EpollEventList.from_bytes(initial.to_bytes())
        self.assertEqual(initial, output)

Classes

class EpollFlag (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class EpollFlag(enum.IntFlag):
    NONE = 0
    CLOEXEC = lib.EPOLL_CLOEXEC

Ancestors

  • enum.IntFlag
  • builtins.int
  • enum.Flag
  • enum.Enum

Class variables

var NONE
var CLOEXEC
class EPOLL_CTL (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class EPOLL_CTL(enum.IntEnum):
    ADD = lib.EPOLL_CTL_ADD
    MOD = lib.EPOLL_CTL_MOD
    DEL = lib.EPOLL_CTL_DEL

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var ADD
var MOD
var DEL
class EPOLL (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class EPOLL(enum.IntFlag):
    NONE = 0
    IN = lib.EPOLLIN
    OUT = lib.EPOLLOUT
    RDHUP = lib.EPOLLRDHUP # type: ignore
    PRI = lib.EPOLLPRI
    ERR = lib.EPOLLERR
    HUP = lib.EPOLLHUP
    # options
    ET = lib.EPOLLET

    def __iter__(self) -> t.Iterator[EPOLL]:
        for flag in EPOLL:
            if self & flag:
                yield flag

Ancestors

  • enum.IntFlag
  • builtins.int
  • enum.Flag
  • enum.Enum

Class variables

var NONE
var IN
var OUT
var RDHUP
var PRI
var ERR
var HUP
var ET
class EpollEvent (data: int, events: EPOLL)

EpollEvent(data: 'int', events: 'EPOLL')

Expand source code Browse git
@dataclass
class EpollEvent(Struct):
    data: int
    events: EPOLL

    def to_bytes(self) -> bytes:
        return bytes(ffi.buffer(ffi.new('struct epoll_event const*', (self.events, (self.data,)))))

    T = t.TypeVar('T', bound='EpollEvent')
    @classmethod
    def from_bytes(cls: t.Type[T], data: bytes) -> T:
        struct = ffi.cast('struct epoll_event*', ffi.from_buffer(data))
        return cls(struct.data.u64, EPOLL(struct.events))

    @classmethod
    def sizeof(cls) -> int:
        return ffi.sizeof('struct epoll_event')

Ancestors

Class variables

var data : int
var eventsEPOLL
var T

Inherited members

class EpollEventList (*args, **kwargs)

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Expand source code Browse git
class EpollEventList(t.List[EpollEvent], Serializable):
    def to_bytes(self) -> bytes:
        ret = b""
        for ent in self:
            ret += ent.to_bytes()
        return ret

    T = t.TypeVar('T', bound='EpollEventList')
    @classmethod
    def from_bytes(cls: t.Type[T], data: bytes) -> T:
        entries = []
        while len(data) > 0:
            ent = EpollEvent.from_bytes(data)
            entries.append(ent)
            data = data[EpollEvent.sizeof():]
        return cls(entries)

Ancestors

Class variables

var T

Inherited members

class EpollFileDescriptor (task: FileDescriptorTask, near: FileDescriptor, valid: bool)

A file descriptor accessed through some Task

This is an rsyscall-internal base class, which other FileDescriptor objects inherit from for core lifecycle methods. See FileDescriptor for more information.

Expand source code Browse git
class EpollFileDescriptor(BaseFileDescriptor):
    async def epoll_wait(self, events: Pointer[EpollEventList],
                         timeout: int) -> t.Tuple[ReadablePointer[EpollEventList], Pointer]:
        self._validate()
        with events.borrow(self.task):
            maxevents = events.size()//EpollEvent.sizeof()
            num = await _epoll_wait(self.task.sysif, self.near, events.near, maxevents, timeout)
            valid_size = num * EpollEvent.sizeof()
            return events.readable_split(valid_size)

    async def epoll_ctl(self, op: EPOLL_CTL, fd: BaseFileDescriptor,
                        event: t.Optional[WrittenPointer[EpollEvent]]=None) -> None:
        self._validate()
        with fd.borrow(self.task):
            if event is not None:
                event.check_address_space(self.task)
            return (await _epoll_ctl(self.task.sysif, self.near, op, fd.near, event.near if event else None))

Ancestors

Subclasses

Methods

async def epoll_wait(self, events: Pointer[EpollEventList], timeout: int) ‑> Tuple[ReadablePointer[EpollEventList], Pointer]
Expand source code Browse git
async def epoll_wait(self, events: Pointer[EpollEventList],
                     timeout: int) -> t.Tuple[ReadablePointer[EpollEventList], Pointer]:
    self._validate()
    with events.borrow(self.task):
        maxevents = events.size()//EpollEvent.sizeof()
        num = await _epoll_wait(self.task.sysif, self.near, events.near, maxevents, timeout)
        valid_size = num * EpollEvent.sizeof()
        return events.readable_split(valid_size)
async def epoll_ctl(self, op: EPOLL_CTL, fd: BaseFileDescriptor, event: t.Optional[WrittenPointer[EpollEvent]] = None) ‑> NoneType
Expand source code Browse git
async def epoll_ctl(self, op: EPOLL_CTL, fd: BaseFileDescriptor,
                    event: t.Optional[WrittenPointer[EpollEvent]]=None) -> None:
    self._validate()
    with fd.borrow(self.task):
        if event is not None:
            event.check_address_space(self.task)
        return (await _epoll_ctl(self.task.sysif, self.near, op, fd.near, event.near if event else None))

Inherited members

class EpollTask (sysif: SyscallInterface, near_process: Process, fd_table: FDTable, address_space: AddressSpace, pidns: PidNamespace)

A wrapper around SyscallInterface which tracks the namespaces of the underlying process

Note that this is a base class for the more fully featured Task.

We store namespace objects to represent the namespaces that we believe that underlying processes is in. Since we have complete control over the process, we can make sure this belief is accurate, by updating our stored namespaces when the process changes namespace. That isn't done here; it's done in handle.Task.

Currently, we store only one PidNamespace. But each process actually has two pid namespaces:

  • the process's own pid namespace, which determines the pids returned from getpid, clone, and other syscalls.
  • the pid namespace that new children will be in.

The two pid namespaces only differ if we call unshare(CLONE.NEWPID). Currently we don't do that because unshare(CLONE.NEWPID) makes monitoring children more complex, since they can be deleted without leaving a zombie at any time if the pid namespace shuts down. But if we did call unshare(CLONE.NEWPID), we'd need to handle this right.

In the analogy to near and far pointers, this is like a segment register, if a segment register was write-only. Then we'd need to maintain the knowledge of what the segment register was set to, outside the segment register itself. That's what we do here.

There actually were systems where segment registers were, if not quite write-only, at least expensive to set and expensive to read. For example, x86_64 - the FS and GS segment registers can only be set via syscall. If you wanted to use segmentation on such systems, you'd probably have a structure much like this one.

Expand source code Browse git
class EpollTask(FileDescriptorTask[T_fd]):
    async def epoll_create(self, flags: EpollFlag=EpollFlag.NONE) -> T_fd:
        return self.make_fd_handle(await _epoll_create(self.sysif, flags|EpollFlag.CLOEXEC))

Ancestors

Subclasses

Class variables

var sysifSyscallInterface
var near_processProcess
var fd_tableFDTable
var address_spaceAddressSpace
var pidnsPidNamespace

Methods

async def epoll_create(self, flags: EpollFlag = EpollFlag.NONE) ‑> ~T_fd
Expand source code Browse git
async def epoll_create(self, flags: EpollFlag=EpollFlag.NONE) -> T_fd:
    return self.make_fd_handle(await _epoll_create(self.sysif, flags|EpollFlag.CLOEXEC))

Inherited members

class TestEpoll (methodName='runTest')

A class whose instances are single test cases.

By default, the test code itself should be placed in a method named 'runTest'.

If the fixture may be used for many test cases, create as many test methods as are needed. When instantiating such a TestCase subclass, specify in the constructor arguments the name of the test method that the instance is to execute.

Test authors should subclass TestCase for their own tests. Construction and deconstruction of the test's environment ('fixture') can be implemented by overriding the 'setUp' and 'tearDown' methods respectively.

If it is necessary to override the init method, the base class init method must always be called. It is important that subclasses should not change the signature of their init method, since instances of the classes are instantiated automatically by parts of the framework in order to be run.

When subclassing TestCase, you can set these attributes: * failureException: determines which exception will be raised when the instance's assertion methods fail; test methods raising this exception will be deemed to have 'failed' rather than 'errored'. * longMessage: determines whether long messages (including repr of objects used in assert methods) will be printed on failure in addition to any explicit message passed. * maxDiff: sets the maximum length of a diff in failure messages by assert methods using difflib. It is looked up as an instance attribute so can be configured by individual tests if required.

Create an instance of the class that will use the named test method when executed. Raises a ValueError if the instance does not have a method with the specified name.

Expand source code Browse git
class TestEpoll(TestCase):
    def test_epoll_event_list(self) -> None:
        initial = EpollEventList([EpollEvent(42, EPOLL.IN|EPOLL.PRI)])
        output = EpollEventList.from_bytes(initial.to_bytes())
        self.assertEqual(initial, output)

Ancestors

  • unittest.case.TestCase

Methods

def test_epoll_event_list(self) ‑> NoneType
Expand source code Browse git
def test_epoll_event_list(self) -> None:
    initial = EpollEventList([EpollEvent(42, EPOLL.IN|EPOLL.PRI)])
    output = EpollEventList.from_bytes(initial.to_bytes())
    self.assertEqual(initial, output)