Module rsyscall.sys.epoll
#include <sys/epoll.h>
Expand source code Browse git
"`#include <sys/epoll.h>`"
from __future__ import annotations
from rsyscall._raw import lib, ffi # type: ignore
from rsyscall.struct import Struct, Serializable
import enum
import typing as t
from dataclasses import dataclass
class EpollFlag(enum.IntFlag):
NONE = 0
CLOEXEC = lib.EPOLL_CLOEXEC
class EPOLL_CTL(enum.IntEnum):
ADD = lib.EPOLL_CTL_ADD
MOD = lib.EPOLL_CTL_MOD
DEL = lib.EPOLL_CTL_DEL
class EPOLL(enum.IntFlag):
NONE = 0
IN = lib.EPOLLIN
OUT = lib.EPOLLOUT
RDHUP = lib.EPOLLRDHUP # type: ignore
PRI = lib.EPOLLPRI
ERR = lib.EPOLLERR
HUP = lib.EPOLLHUP
# options
ET = lib.EPOLLET
def __iter__(self) -> t.Iterator[EPOLL]:
for flag in EPOLL:
if self & flag:
yield flag
@dataclass
class EpollEvent(Struct):
data: int
events: EPOLL
def to_bytes(self) -> bytes:
return bytes(ffi.buffer(ffi.new('struct epoll_event const*', (self.events, (self.data,)))))
T = t.TypeVar('T', bound='EpollEvent')
@classmethod
def from_bytes(cls: t.Type[T], data: bytes) -> T:
struct = ffi.cast('struct epoll_event*', ffi.from_buffer(data))
return cls(struct.data.u64, EPOLL(struct.events))
@classmethod
def sizeof(cls) -> int:
return ffi.sizeof('struct epoll_event')
class EpollEventList(t.List[EpollEvent], Serializable):
def to_bytes(self) -> bytes:
ret = b""
for ent in self:
ret += ent.to_bytes()
return ret
T = t.TypeVar('T', bound='EpollEventList')
@classmethod
def from_bytes(cls: t.Type[T], data: bytes) -> T:
entries = []
while len(data) > 0:
ent = EpollEvent.from_bytes(data)
entries.append(ent)
data = data[EpollEvent.sizeof():]
return cls(entries)
#### Classes ####
from rsyscall.handle.fd import BaseFileDescriptor, FileDescriptorTask
from rsyscall.handle.pointer import Pointer, WrittenPointer, ReadablePointer
T_fd = t.TypeVar('T_fd', bound='EpollFileDescriptor')
class EpollFileDescriptor(BaseFileDescriptor):
async def epoll_wait(self, events: Pointer[EpollEventList],
timeout: int) -> t.Tuple[ReadablePointer[EpollEventList], Pointer]:
self._validate()
with events.borrow(self.task):
maxevents = events.size()//EpollEvent.sizeof()
num = await _epoll_wait(self.task.sysif, self.near, events.near, maxevents, timeout)
valid_size = num * EpollEvent.sizeof()
return events.readable_split(valid_size)
async def epoll_ctl(self, op: EPOLL_CTL, fd: BaseFileDescriptor,
event: t.Optional[WrittenPointer[EpollEvent]]=None) -> None:
self._validate()
with fd.borrow(self.task):
if event is not None:
event.check_address_space(self.task)
return (await _epoll_ctl(self.task.sysif, self.near, op, fd.near, event.near if event else None))
class EpollTask(FileDescriptorTask[T_fd]):
async def epoll_create(self, flags: EpollFlag=EpollFlag.NONE) -> T_fd:
return self.make_fd_handle(await _epoll_create(self.sysif, flags|EpollFlag.CLOEXEC))
#### Raw syscalls ####
import rsyscall.near.types as near
from rsyscall.near.sysif import SyscallInterface
from rsyscall.sys.syscall import SYS
async def _epoll_create(sysif: SyscallInterface, flags: EpollFlag) -> near.FileDescriptor:
return near.FileDescriptor(await sysif.syscall(SYS.epoll_create1, flags))
async def _epoll_ctl(sysif: SyscallInterface, epfd: near.FileDescriptor, op: EPOLL_CTL,
fd: near.FileDescriptor, event: t.Optional[near.Address]=None) -> None:
if event is None:
event = 0 # type: ignore
await sysif.syscall(SYS.epoll_ctl, epfd, op, fd, event)
async def _epoll_wait(sysif: SyscallInterface, epfd: near.FileDescriptor,
events: near.Address, maxevents: int, timeout: int) -> int:
return (await sysif.syscall(SYS.epoll_wait, epfd, events, maxevents, timeout))
#### Tests ####
from unittest import TestCase
class TestEpoll(TestCase):
def test_epoll_event_list(self) -> None:
initial = EpollEventList([EpollEvent(42, EPOLL.IN|EPOLL.PRI)])
output = EpollEventList.from_bytes(initial.to_bytes())
self.assertEqual(initial, output)
Classes
class EpollFlag (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class EpollFlag(enum.IntFlag): NONE = 0 CLOEXEC = lib.EPOLL_CLOEXEC
Ancestors
- enum.IntFlag
- builtins.int
- enum.Flag
- enum.Enum
Class variables
var NONE
var CLOEXEC
class EPOLL_CTL (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class EPOLL_CTL(enum.IntEnum): ADD = lib.EPOLL_CTL_ADD MOD = lib.EPOLL_CTL_MOD DEL = lib.EPOLL_CTL_DEL
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var ADD
var MOD
var DEL
class EPOLL (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class EPOLL(enum.IntFlag): NONE = 0 IN = lib.EPOLLIN OUT = lib.EPOLLOUT RDHUP = lib.EPOLLRDHUP # type: ignore PRI = lib.EPOLLPRI ERR = lib.EPOLLERR HUP = lib.EPOLLHUP # options ET = lib.EPOLLET def __iter__(self) -> t.Iterator[EPOLL]: for flag in EPOLL: if self & flag: yield flag
Ancestors
- enum.IntFlag
- builtins.int
- enum.Flag
- enum.Enum
Class variables
var NONE
var IN
var OUT
var RDHUP
var PRI
var ERR
var HUP
var ET
class EpollEvent (data: int, events: EPOLL)
-
EpollEvent(data: 'int', events: 'EPOLL')
Expand source code Browse git
@dataclass class EpollEvent(Struct): data: int events: EPOLL def to_bytes(self) -> bytes: return bytes(ffi.buffer(ffi.new('struct epoll_event const*', (self.events, (self.data,))))) T = t.TypeVar('T', bound='EpollEvent') @classmethod def from_bytes(cls: t.Type[T], data: bytes) -> T: struct = ffi.cast('struct epoll_event*', ffi.from_buffer(data)) return cls(struct.data.u64, EPOLL(struct.events)) @classmethod def sizeof(cls) -> int: return ffi.sizeof('struct epoll_event')
Ancestors
Class variables
var data : int
var events : EPOLL
var T
Inherited members
class EpollEventList (*args, **kwargs)
-
Built-in mutable sequence.
If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.
Expand source code Browse git
class EpollEventList(t.List[EpollEvent], Serializable): def to_bytes(self) -> bytes: ret = b"" for ent in self: ret += ent.to_bytes() return ret T = t.TypeVar('T', bound='EpollEventList') @classmethod def from_bytes(cls: t.Type[T], data: bytes) -> T: entries = [] while len(data) > 0: ent = EpollEvent.from_bytes(data) entries.append(ent) data = data[EpollEvent.sizeof():] return cls(entries)
Ancestors
- builtins.list
- typing.Generic
- Serializable
- FixedSerializer
- HasSerializer
Class variables
var T
Inherited members
class EpollFileDescriptor (task: FileDescriptorTask, near: FileDescriptor, valid: bool)
-
A file descriptor accessed through some
Task
This is an rsyscall-internal base class, which other
FileDescriptor
objects inherit from for core lifecycle methods. SeeFileDescriptor
for more information.Expand source code Browse git
class EpollFileDescriptor(BaseFileDescriptor): async def epoll_wait(self, events: Pointer[EpollEventList], timeout: int) -> t.Tuple[ReadablePointer[EpollEventList], Pointer]: self._validate() with events.borrow(self.task): maxevents = events.size()//EpollEvent.sizeof() num = await _epoll_wait(self.task.sysif, self.near, events.near, maxevents, timeout) valid_size = num * EpollEvent.sizeof() return events.readable_split(valid_size) async def epoll_ctl(self, op: EPOLL_CTL, fd: BaseFileDescriptor, event: t.Optional[WrittenPointer[EpollEvent]]=None) -> None: self._validate() with fd.borrow(self.task): if event is not None: event.check_address_space(self.task) return (await _epoll_ctl(self.task.sysif, self.near, op, fd.near, event.near if event else None))
Ancestors
Subclasses
Methods
async def epoll_wait(self, events: Pointer[EpollEventList], timeout: int) ‑> Tuple[ReadablePointer[EpollEventList], Pointer]
-
Expand source code Browse git
async def epoll_wait(self, events: Pointer[EpollEventList], timeout: int) -> t.Tuple[ReadablePointer[EpollEventList], Pointer]: self._validate() with events.borrow(self.task): maxevents = events.size()//EpollEvent.sizeof() num = await _epoll_wait(self.task.sysif, self.near, events.near, maxevents, timeout) valid_size = num * EpollEvent.sizeof() return events.readable_split(valid_size)
async def epoll_ctl(self, op: EPOLL_CTL, fd: BaseFileDescriptor, event: t.Optional[WrittenPointer[EpollEvent]] = None) ‑> NoneType
-
Expand source code Browse git
async def epoll_ctl(self, op: EPOLL_CTL, fd: BaseFileDescriptor, event: t.Optional[WrittenPointer[EpollEvent]]=None) -> None: self._validate() with fd.borrow(self.task): if event is not None: event.check_address_space(self.task) return (await _epoll_ctl(self.task.sysif, self.near, op, fd.near, event.near if event else None))
Inherited members
class EpollTask (sysif: SyscallInterface, near_process: Process, fd_table: FDTable, address_space: AddressSpace, pidns: PidNamespace)
-
A wrapper around
SyscallInterface
which tracks the namespaces of the underlying processNote that this is a base class for the more fully featured
Task
.We store namespace objects to represent the namespaces that we believe that underlying processes is in. Since we have complete control over the process, we can make sure this belief is accurate, by updating our stored namespaces when the process changes namespace. That isn't done here; it's done in handle.Task.
Currently, we store only one
PidNamespace
. But each process actually has two pid namespaces:- the process's own pid namespace, which determines the pids returned from getpid, clone, and other syscalls.
- the pid namespace that new children will be in.
The two pid namespaces only differ if we call unshare(CLONE.NEWPID). Currently we don't do that because unshare(CLONE.NEWPID) makes monitoring children more complex, since they can be deleted without leaving a zombie at any time if the pid namespace shuts down. But if we did call unshare(CLONE.NEWPID), we'd need to handle this right.
In the analogy to near and far pointers, this is like a segment register, if a segment register was write-only. Then we'd need to maintain the knowledge of what the segment register was set to, outside the segment register itself. That's what we do here.
There actually were systems where segment registers were, if not quite write-only, at least expensive to set and expensive to read. For example, x86_64 - the FS and GS segment registers can only be set via syscall. If you wanted to use segmentation on such systems, you'd probably have a structure much like this one.
Expand source code Browse git
class EpollTask(FileDescriptorTask[T_fd]): async def epoll_create(self, flags: EpollFlag=EpollFlag.NONE) -> T_fd: return self.make_fd_handle(await _epoll_create(self.sysif, flags|EpollFlag.CLOEXEC))
Ancestors
- FileDescriptorTask
- Task
- typing.Generic
Subclasses
Class variables
var sysif : SyscallInterface
var near_process : Process
var fd_table : FDTable
var address_space : AddressSpace
var pidns : PidNamespace
Methods
async def epoll_create(self, flags: EpollFlag = EpollFlag.NONE) ‑> ~T_fd
-
Expand source code Browse git
async def epoll_create(self, flags: EpollFlag=EpollFlag.NONE) -> T_fd: return self.make_fd_handle(await _epoll_create(self.sysif, flags|EpollFlag.CLOEXEC))
Inherited members
class TestEpoll (methodName='runTest')
-
A class whose instances are single test cases.
By default, the test code itself should be placed in a method named 'runTest'.
If the fixture may be used for many test cases, create as many test methods as are needed. When instantiating such a TestCase subclass, specify in the constructor arguments the name of the test method that the instance is to execute.
Test authors should subclass TestCase for their own tests. Construction and deconstruction of the test's environment ('fixture') can be implemented by overriding the 'setUp' and 'tearDown' methods respectively.
If it is necessary to override the init method, the base class init method must always be called. It is important that subclasses should not change the signature of their init method, since instances of the classes are instantiated automatically by parts of the framework in order to be run.
When subclassing TestCase, you can set these attributes: * failureException: determines which exception will be raised when the instance's assertion methods fail; test methods raising this exception will be deemed to have 'failed' rather than 'errored'. * longMessage: determines whether long messages (including repr of objects used in assert methods) will be printed on failure in addition to any explicit message passed. * maxDiff: sets the maximum length of a diff in failure messages by assert methods using difflib. It is looked up as an instance attribute so can be configured by individual tests if required.
Create an instance of the class that will use the named test method when executed. Raises a ValueError if the instance does not have a method with the specified name.
Expand source code Browse git
class TestEpoll(TestCase): def test_epoll_event_list(self) -> None: initial = EpollEventList([EpollEvent(42, EPOLL.IN|EPOLL.PRI)]) output = EpollEventList.from_bytes(initial.to_bytes()) self.assertEqual(initial, output)
Ancestors
- unittest.case.TestCase
Methods
def test_epoll_event_list(self) ‑> NoneType
-
Expand source code Browse git
def test_epoll_event_list(self) -> None: initial = EpollEventList([EpollEvent(42, EPOLL.IN|EPOLL.PRI)]) output = EpollEventList.from_bytes(initial.to_bytes()) self.assertEqual(initial, output)