Package rsyscall
Process-independent interface to Linux system calls
rsyscall provides an interface to an ever-growing subset of Linux system calls. This interface is:
- process-independent: all system calls are called as methods on process objects, which can refer to the “local” process or to other processes under our control.
- type-safe: many Linux API constraints, which are usually left to user code to enforce, are made explicit in the type system.
- low-level: any action which is possible with the underlying Linux APIs, is possible with rsyscall; nothing is forbidden or discouraged.
Thread
The main entry point in rsyscall is the Thread
.
One Thread
exists for each process in which we can make system calls.
Like most other things called "threads",
rsyscall Thread
s:
- are under the complete control of a single program
- exit when that program does
- can run arbitrary system calls in parallel across multiple CPUs
Unlike other "threads":
- user code, such as arbitrary Python or C, cannot run on rsyscall threads, only system calls.
The initial Thread
is local_thread
, which operates in the Python interpreter process.
New processes are usually created by Thread.clone()
, which returns a ChildThread
.
A Thread
has a number of other conventional, helpful resources,
which are almost always present.
For example, Thread.stdin
, Thread.stdout
, and Thread.stderr
.
System calls on Task
, FileDescriptor
, etc.
Thread.task
points to the lower-level Task
object.
All system calls exist either as methods on Task
,
or as methods on objects (such as FileDescriptor
) returned from Task
,
which hold a reference to Task
.
Read rsyscall.doc.syscall_api
for an overview of the layout of the system call API.
Task
makes syscalls using an internal instance of SyscallInterface
.
The main implementations are LocalSyscall
and SyscallConnection
.
The distinction between Thread
and Task
is that Task
provides only the bare minimum functionality guaranteed by Linux;
for example, it is not guaranteed that stdin/stdout/stderr actually exist, and Task
does not assume they do.
Memory allocation and access
Because a process may be in a separate address space,
memory allocation, and access by reading and writing, are explicit.
This is primarily done through the methods Thread.ptr()
and Thread.malloc()
,
which return Pointer
s.
Pointer
s are garbage collected, so memory freeing is automatic.
Thread
performs memory allocation and access using internal instances of
AllocatorInterface
and MemoryTransport
.
The main allocator is UnlimitedAllocator
,
and the main memory transports are LocalMemoryTransport
and SocketMemoryTransport
.
Non-blocking operations
We can make system calls in multiple processes in parallel. But in any specific process, only one system call can happen at a time.
If a system call blocks, the process running it can't run other system calls.
This is undesirable, so Thread
comes with an epoll event loop,
Epoller
,
so that we can wait for an epoll event and perform non-blocking system calls.
Use Thread.make_afd()
to register a FileDescriptor
with epoll
and get a corresponding AsyncFileDescriptor
.
AsyncFileDescriptor
supports AsyncFileDescriptor.read()
, AsyncFileDescriptor.write()
,
AsyncFileDescriptor.accept()
, AsyncFileDescriptor.connect()
, and other such system calls.
The interfaces are identical to FileDescriptor
;
the async implementations simply wait for the appropriate epoll event before calling the underlying system call.
Many system calls can't meaningfully be done without blocking the thread,
so use AsyncFileDescriptor.handle
to access the underlying FileDescriptor
to make those system calls.
For convenience,
AsyncFileDescriptor
also has AsyncFileDescriptor.read_some_bytes()
and AsyncFileDescriptor.write_all_bytes()
,
which abstract over memory for simple use cases where efficient buffer management is not necessary.
Likewise, the AsyncChildProcess
object wraps ChildProcess
to perform non-blocking AsyncChildProcess.waitpid()
operations on child process.
One will usually never deal with ChildProcess
,
because ChildThread.exec()
returns an AsyncChildProcess
,
and that is the primary way to obtain child processes.
Child processes
As mentioned above,
new processes are typically created by Thread.clone()
which returns a ChildThread
.
As a convenience, the Command
object bundles up an executable path, arguments, and environment variable updates.
The ChildThread.exec()
takes a Command
and execs into it.
The Environment.which()
method looks up an executable name in PATH
and returns a Command
if found;
you can use this with Thread.environ
.
We'll often want to inherit file descriptors into child processes;
we can use FileDescriptorTask.inherit_fd()
to get a handle for an inherited file descriptor,
and then FileDescriptor.disable_cloexec()
to allow it to be further inherited over ChildThread.exec()
.
rsyscall.tasks
describes several other ways to get new Thread
s,
including privilege-escalated Thread
s, Thread
s on remote hosts, and persistent Thread
s.
Expand source code Browse git
"""Process-independent interface to Linux system calls
rsyscall provides an interface to an ever-growing subset of Linux system calls. This interface is:
- *process-independent*: all system calls are called as methods on process objects,
which can refer to the “local” process or to other processes under our control.
- *type-safe*: many Linux API constraints, which are usually left to user code to enforce,
are made explicit in the type system.
- *low-level*: any action which is possible with the underlying Linux APIs, is possible with rsyscall;
nothing is forbidden or discouraged.
## `Thread`
The main entry point in rsyscall is the `Thread`.
One `Thread` exists for each process in which we can make system calls.
Like most other things called "threads",
rsyscall `Thread`s:
- are under the complete control of a single program
- exit when that program does
- can run arbitrary system calls in parallel across multiple CPUs
Unlike other "threads":
- user code, such as arbitrary Python or C, cannot run on rsyscall threads, only system calls.
The initial `Thread` is `rsyscall.tasks.local.local_thread`, which operates in the Python interpreter process.
New processes are usually created by `Thread.clone`, which returns a `ChildThread`.
A `Thread` has a number of other conventional, helpful resources,
which are almost always present.
For example, `Thread.stdin`, `Thread.stdout`, and `Thread.stderr`.
## System calls on `Task`, `FileDescriptor`, etc.
`Thread.task` points to the lower-level `Task` object.
All system calls exist either as methods on `Task`,
or as methods on objects (such as `FileDescriptor`) returned from `Task`,
which hold a reference to `Task`.
Read `rsyscall.doc.syscall_api` for an overview of the layout of the system call API.
`Task` makes syscalls using an internal instance of `rsyscall.near.sysif.SyscallInterface`.
The main implementations are `rsyscall.tasks.local.LocalSyscall` and `rsyscall.tasks.connection.SyscallConnection`.
The distinction between `Thread` and `Task` is that `Task` provides only the bare minimum functionality guaranteed by Linux;
for example, it is not guaranteed that stdin/stdout/stderr actually exist, and `Task` does not assume they do.
## Memory allocation and access
Because a process may be in a separate address space,
memory allocation, and access by reading and writing, are explicit.
This is primarily done through the methods `Thread.ptr` and `Thread.malloc`,
which return `Pointer`s.
`Pointer`s are garbage collected, so memory freeing is automatic.
`Thread` performs memory allocation and access using internal instances of
`rsyscall.memory.allocator.AllocatorInterface` and `rsyscall.memory.transport.MemoryTransport`.
The main allocator is `rsyscall.memory.allocator.UnlimitedAllocator`,
and the main memory transports are `rsyscall.tasks.local.LocalMemoryTransport`
and `rsyscall.memory.socket_transport.SocketMemoryTransport`.
## Non-blocking operations
We can make system calls in multiple processes in parallel.
But in any specific process,
only one system call can happen at a time.
If a system call blocks, the process running it can't run other system calls.
This is undesirable, so `Thread` comes with an epoll event loop,
`rsyscall.epoller.Epoller`,
so that we can wait for an epoll event and perform non-blocking system calls.
Use `Thread.make_afd` to register a `FileDescriptor` with epoll
and get a corresponding `AsyncFileDescriptor`.
`AsyncFileDescriptor` supports `AsyncFileDescriptor.read`, `AsyncFileDescriptor.write`,
`AsyncFileDescriptor.accept`, `AsyncFileDescriptor.connect`, and other such system calls.
The interfaces are identical to `FileDescriptor`;
the async implementations simply wait for the appropriate epoll event before calling the underlying system call.
Many system calls can't meaningfully be done without blocking the thread,
so use `AsyncFileDescriptor.handle` to access the underlying `FileDescriptor` to make those system calls.
For convenience,
`AsyncFileDescriptor` also has `AsyncFileDescriptor.read_some_bytes` and `AsyncFileDescriptor.write_all_bytes`,
which abstract over memory for simple use cases where efficient buffer management is not necessary.
Likewise, the `AsyncChildProcess` object wraps `rsyscall.handle.ChildProcess`
to perform non-blocking `AsyncChildProcess.waitpid` operations on child process.
One will usually never deal with `ChildProcess`,
because `ChildThread.exec` returns an `AsyncChildProcess`,
and that is the primary way to obtain child processes.
## Child processes
As mentioned above,
new processes are typically created by `Thread.clone` which returns a `ChildThread`.
As a convenience, the `Command` object bundles up an executable path, arguments, and environment variable updates.
The `ChildThread.exec` takes a `Command` and execs into it.
The `rsyscall.environ.Environment.which` method looks up an executable name in `PATH` and returns a `Command` if found;
you can use this with `Thread.environ`.
We'll often want to inherit file descriptors into child processes;
we can use `Task.inherit_fd` to get a handle for an inherited file descriptor,
and then `FileDescriptor.disable_cloexec` to allow it to be further inherited over `ChildThread.exec`.
`rsyscall.tasks` describes several other ways to get new `Thread`s,
including privilege-escalated `Thread`s, `Thread`s on remote hosts, and persistent `Thread`s.
"""
from rsyscall.thread import Thread, ChildThread
from rsyscall.command import Command
from rsyscall.path import Path
from rsyscall.handle import (
FileDescriptor, Task,
WrittenPointer, Pointer,
ReadablePointer, LinearPointer,
)
from rsyscall.epoller import AsyncFileDescriptor
from rsyscall.monitor import AsyncChildProcess
from rsyscall.struct import Int32, Int64
from rsyscall.tasks.local import local_thread
from rsyscall.sys.mman import MemoryMapping
__all__ = [
'Thread', 'ChildThread',
'Command',
'Task',
'FileDescriptor',
'AsyncFileDescriptor',
'AsyncChildProcess',
'local_thread',
'Pointer', 'WrittenPointer', 'ReadablePointer', 'LinearPointer',
]
Sub-modules
rsyscall.command
-
Provides the Command class, which is a convenient representation of the arguments to execve.
rsyscall.doc
-
Various prose documentation strings
rsyscall.environ
-
Functions and classes relating to Unix environment variables …
rsyscall.epoller
-
Non-blocking IO operations implemented using epoll …
rsyscall.far
-
Definitions of namespaces and identifiers tagged with a namespace …
rsyscall.fcntl
-
#include <fnctl.h>
rsyscall.handle
-
Classes which own resources and provide the main syscall interfaces …
rsyscall.inotify_watch
-
Filesystem-watching implemented using inotify …
rsyscall.limits
-
#include <limits.h>
rsyscall.linux
-
#include <linux/...>
… rsyscall.loader
-
Access to native code functions …
rsyscall.memory
-
Classes and functions which provide the capability to interact with memory
rsyscall.monitor
-
Monitoring child processes in a non-blocking manner …
rsyscall.near
-
Definitions of namespace-local identifiers, syscalls, and SyscallInterface …
rsyscall.net
-
#include <net/...>
… rsyscall.netinet
-
#include <netinet/...>
… rsyscall.network
-
Deals with communication between threads, even if that happens on a single host
rsyscall.nix
-
Functions and classes for working with filesystem paths deployed with Nix …
rsyscall.path
-
A slightly improved version of
pathlib.PurePosixPath
rsyscall.sched
-
#include <sched.h>
rsyscall.scripts
-
Various miscellaneous command-line scripts built with rsyscall
rsyscall.signal
-
#include <signal.h>
rsyscall.stdlib
-
#include <stdlib.h>
… rsyscall.struct
-
Interfaces and basic functionality for our serialization framework.
rsyscall.sys
-
#include <sys/...>
… rsyscall.tasks
-
Various thread implementations with various different special abilities …
rsyscall.tests
-
Tests for rsyscall.
rsyscall.thread
-
Central thread class, with helper methods to ease a number of common tasks
rsyscall.time
-
#include <time.h>
rsyscall.unistd
-
#include <unistd.h>
rsyscall.wish
-
Several REPL-based
WishGranter
s for thersyscall.wish
library …
Classes
class Thread (task: Task, ram: RAM, connection: Connection, loader: NativeLoader, epoller: Epoller, child_monitor: ChildProcessMonitor, environ: Environment, stdin: FileDescriptor, stdout: FileDescriptor, stderr: FileDescriptor)
-
A central class holding everything necessary to work with some thread, along with various helpers
Expand source code Browse git
class Thread: "A central class holding everything necessary to work with some thread, along with various helpers" def __init__(self, task: Task, ram: RAM, connection: Connection, loader: NativeLoader, epoller: Epoller, child_monitor: ChildProcessMonitor, environ: Environment, stdin: FileDescriptor, stdout: FileDescriptor, stderr: FileDescriptor, ) -> None: self.task = task "The `Task` associated with this process" self.ram = ram self.epoller = epoller self.connection = connection "This thread's `rsyscall.network.connection.Connection`" self.loader = loader self.monitor = child_monitor self.environ = environ "This thread's `rsyscall.environ.Environment`" self.stdin = stdin "The standard input `FileDescriptor` (FD 0)" self.stdout = stdout "The standard output `FileDescriptor` (FD 1)" self.stderr = stderr "The standard error `FileDescriptor` (FD 2)" def _init_from(self, thr: Thread) -> None: self.task = thr.task self.ram = thr.ram self.epoller = thr.epoller self.connection = thr.connection self.loader = thr.loader self.monitor = thr.monitor self.environ = thr.environ self.stdin = thr.stdin self.stdout = thr.stdout self.stderr = thr.stderr @t.overload async def malloc(self, cls: t.Type[T_fixed_size]) -> Pointer[T_fixed_size]: "malloc a fixed size type" pass @t.overload async def malloc(self, cls: t.Type[T_fixed_serializer], size: int) -> Pointer[T_fixed_serializer]: "malloc specifying a specific size" pass @t.overload async def malloc(self, cls: t.Type[T_pathlike], size: int) -> Pointer[T_pathlike]: ... @t.overload async def malloc(self, cls: t.Type[str], size: int) -> Pointer[str]: ... @t.overload async def malloc(self, cls: t.Type[bytes], size: int) -> Pointer[bytes]: ... async def malloc(self, cls: t.Type[t.Union[FixedSize, FixedSerializer, os.PathLike, str, bytes]], size: int=None) -> Pointer: """Allocate a buffer for this type, with the specified size if required If `malloc` is given a `rsyscall.struct.FixedSize` type, the size must not be passed; if `malloc` is given any other type, the size must be passed. Any type which inherits from `rsyscall.struct.FixedSerializer` is supported. As a special case for convenience, `bytes`, `str`, and `os.PathLike` are also supported; `bytes` will be written out as they are, and `str` and `os.PathLike` will be null-terminated. """ return await self.ram.malloc(cls, size) # type: ignore @t.overload async def ptr(self, data: T_has_serializer) -> WrittenPointer[T_has_serializer]: ... @t.overload async def ptr(self, data: T_pathlike) -> WrittenPointer[T_pathlike]: ... @t.overload async def ptr(self, data: str) -> WrittenPointer[str]: ... @t.overload async def ptr(self, data: bytes) -> WrittenPointer[bytes]: ... async def ptr(self, data: t.Union[HasSerializer, os.PathLike, str, bytes]) -> WrittenPointer: """Allocate a buffer for this data, and write the data to that buffer Any value which inherits from `HasSerializer` is supported. As a special case for convenience, `bytes`, `str`, and `os.PathLike` are also supported; `bytes` will be written out as they are, and `str` and `os.PathLike` will be null-terminated. """ return await self.ram.ptr(data) async def make_afd(self, fd: FileDescriptor, set_nonblock: bool=False) -> AsyncFileDescriptor: """Make an AsyncFileDescriptor; make it nonblocking if `set_nonblock` is True. Make sure that `fd` is already in non-blocking mode; such as by accepting it with the `SOCK.NONBLOCK` flag; if it's not, you can pass set_nonblock=True to make it nonblocking. """ if set_nonblock: await fd.fcntl(F.SETFL, O.NONBLOCK) return await AsyncFileDescriptor.make(self.epoller, self.ram, fd) async def open_async_channels(self, count: int) -> t.List[t.Tuple[AsyncFileDescriptor, FileDescriptor]]: "Calls self.connection.open_async_channels; see `Connection.open_async_channels`" return (await self.connection.open_async_channels(count)) async def open_channels(self, count: int) -> t.List[t.Tuple[FileDescriptor, FileDescriptor]]: "Calls self.connection.open_channels; see `Connection.open_channels`" return (await self.connection.open_channels(count)) @t.overload async def spit(self, path: FileDescriptor, text: t.Union[str, bytes]) -> None: pass @t.overload async def spit(self, path: Path, text: t.Union[str, bytes], mode=0o644) -> Path: pass async def spit(self, path: t.Union[Path, FileDescriptor], text: t.Union[str, bytes], mode=0o644) -> t.Optional[Path]: """Open a file, creating and truncating it, and write the passed text to it Probably shouldn't use this on FIFOs or anything. Returns the passed-in Path so this serves as a nice pseudo-constructor. """ if isinstance(path, Path): out: t.Optional[Path] = path fd = await self.task.open(await self.ram.ptr(path), O.WRONLY|O.TRUNC|O.CREAT, mode=mode) else: out = None fd = path to_write: Pointer = await self.ram.ptr(os.fsencode(text)) while to_write.size() > 0: _, to_write = await fd.write(to_write) await fd.close() return out async def bind_getsockname(self, sock: FileDescriptor, addr: T_sockaddr) -> T_sockaddr: """Call bind and then getsockname on `sock`. bind followed by getsockname is a common pattern when allocating unused source ports with SockaddrIn(0, ...). Unfortunately, memory allocation for getsockname is quite verbose, so it would be nice to have a helper to make that pattern easier. Since we don't want to encourage usage of getsockname (it should be rarely used outside of that pattern), we add a helper for that specific pattern, rather than getsockname on its own. """ written_addr_ptr = await self.ram.ptr(addr) await sock.bind(written_addr_ptr) sockbuf_ptr = await sock.getsockname(await self.ram.ptr(Sockbuf(written_addr_ptr))) addr_ptr = (await sockbuf_ptr.read()).buf return await addr_ptr.read() async def mkdir(self, path: Path, mode=0o755) -> Path: "Make a directory at this path" await self.task.mkdir(await self.ram.ptr(path)) return path async def read_to_eof(self, fd: FileDescriptor) -> bytes: "Read this file descriptor until we get EOF, then return all the bytes read" data = b"" while True: read, rest = await fd.read(await self.ram.malloc(bytes, 4096)) if read.size() == 0: return data # TODO this would be more efficient if we batched our memory-reads at the end data += await read.read() async def mount(self, source: t.Union[str, os.PathLike], target: t.Union[str, os.PathLike], filesystemtype: str, mountflags: MS, data: str) -> None: "Call mount with these args" async def op(sem: RAM) -> t.Tuple[ WrittenPointer[t.Union[str, os.PathLike]], WrittenPointer[t.Union[str, os.PathLike]], WrittenPointer[str], WrittenPointer[str]]: return ( await sem.ptr(source), await sem.ptr(target), await sem.ptr(filesystemtype), await sem.ptr(data), ) source_ptr, target_ptr, filesystemtype_ptr, data_ptr = await self.ram.perform_batch(op) await self.task.mount(source_ptr, target_ptr, filesystemtype_ptr, mountflags, data_ptr) async def socket(self, domain: AF, type: SOCK, protocol: int=0) -> FileDescriptor: return await self.task.socket(domain, type, protocol) async def pipe(self) -> Pipe: return await (await self.task.pipe(await self.malloc(Pipe))).read() async def socketpair(self, domain: AF, type: SOCK, protocol: int=0) -> Socketpair: return await (await self.task.socketpair(domain, type, protocol, await self.malloc(Socketpair))).read() async def chroot(self, path: t.Union[str, os.PathLike]) -> None: await self.task.chroot(await self.ptr(path)) def inherit_fd(self, fd: FileDescriptor) -> FileDescriptor: return self.task.inherit_fd(fd) async def clone(self, flags: CLONE=CLONE.NONE, automatically_write_user_mappings: bool=True) -> ChildThread: """Create a new child thread manpage: clone(2) """ child_process, task = await clone_child_task( self.task, self.ram, self.connection, self.loader, self.monitor, flags, lambda sock: Trampoline(self.loader.server_func, [sock, sock])) ram = RAM(task, # We don't inherit the transport because it leads to a deadlock: # If when a child task calls transport.read, it performs a syscall in the child task, # then the parent task will need to call waitid to monitor the child task during the syscall, # which will in turn need to also call transport.read. # But the child is already using the transport and holding the lock, # so the parent will block forever on taking the lock, # and child's read syscall will never complete. self.ram.transport, self.ram.allocator.inherit(task), ) if flags & CLONE.NEWPID: # if the new process is pid 1, then CLONE_PARENT isn't allowed so we can't use inherit_to_child. # if we are a reaper, than we don't want our child CLONE_PARENTing to us, so we can't use inherit_to_child. # in both cases we just fall back to making a new ChildProcessMonitor for the child. epoller = await Epoller.make_root(ram, task) # this signal is already blocked, we inherited the block, um... I guess... # TODO handle this more formally signal_block = SignalBlock(task, await ram.ptr(Sigset({SIG.CHLD}))) monitor = await ChildProcessMonitor.make(ram, task, epoller, signal_block=signal_block) else: epoller = self.epoller.inherit(ram) monitor = self.monitor.inherit_to_child(task) thread = ChildThread(Thread( task, ram, self.connection.inherit(task, ram), self.loader, epoller, monitor, self.environ.inherit(task, ram), stdin=self.stdin.inherit(task), stdout=self.stdout.inherit(task), stderr=self.stderr.inherit(task), ), child_process) if flags & CLONE.NEWUSER and automatically_write_user_mappings: # hack, we should really track the [ug]id ahead of this so we don't have to get it # we have to get the [ug]id from the parent because it will fail in the child uid = await self.task.getuid() gid = await self.task.getgid() await write_user_mappings(thread, uid, gid) return thread async def run(self, command: Command, check=True, *, task_status=trio.TASK_STATUS_IGNORED) -> ChildState: """Run the passed command to completion and return its end state, throwing if unclean If check is False, we won't throw if the end state is unclean. """ thread = await self.clone() child = await thread.exec(command) task_status.started(child) if check: return await child.check() else: return await child.waitpid(W.EXITED) async def unshare(self, flags: CLONE) -> None: "Call the unshare syscall, appropriately updating values on this class" # Note: unsharing NEWPID causes us to not get zombies for our children if init dies. That # means we'll get ECHILDs, and various races can happen. It's not possible to robustly # unshare NEWPID. if flags & CLONE.FILES: await self.unshare_files() flags ^= CLONE.FILES if flags & CLONE.NEWUSER: await self.unshare_user() flags ^= CLONE.NEWUSER if flags & CLONE.FS: flags ^= CLONE.FS await self.task.unshare(flags) async def unshare_files(self, going_to_exec=True) -> None: """Unshare the file descriptor table. Set going_to_exec to False if you are going to keep this task around long-term, and we'll do a manual cloexec in userspace to clear out fds held by any other non-rsyscall libraries, which are automatically copied by Linux into the new fd space. We default going_to_exec to True because there's little reason to call unshare_files other than to then exec; and even if you do want to call unshare_files without execing, there probably aren't any significant other libraries in the FD space; and even if there are such libraries, it usually doesn't matter to keep stray references around to their FDs. TODO maybe this should return an object that lets us unset CLOEXEC on things? """ await self.task.unshare_files() if not going_to_exec: await do_cloexec_except(self, set([fd.near for fd in self.task.fd_handles])) async def unshare_user(self, in_namespace_uid: int=None, in_namespace_gid: int=None) -> None: """Unshare the user namespace. We automatically set up the user namespace in the unprivileged way using a single user mapping line. You can pass `in_namespace_uid` and `in_namespace_gid` to control what user id and group id we'll observe inside the namespace. If you want further control, call task.unshare(CLONE.NEWUSER) directly. We also automatically do unshare(CLONE.FS); that's required by CLONE.NEWUSER. """ uid = await self.task.getuid() gid = await self.task.getgid() await self.task.unshare(CLONE.FS|CLONE.NEWUSER) await write_user_mappings(self, uid, gid, in_namespace_uid=in_namespace_uid, in_namespace_gid=in_namespace_gid) async def exit(self, status: int=0) -> None: """Exit this thread Currently we just forward through to exit the task. I feel suspicious that this method will at some point require more heavy lifting with namespaces and monitored children, so I'm leaving it on Thread to prepare for that eventuality. manpage: exit(2) """ await self.task.exit(status) def __repr__(self) -> str: name = type(self).__name__ return f'{name}({self.task})'
Subclasses
Instance variables
var task
-
The
Task
associated with this process var connection
-
This thread's
Connection
var environ
-
This thread's
Environment
var stdin
-
The standard input
FileDescriptor
(FD 0) var stdout
-
The standard output
FileDescriptor
(FD 1) var stderr
-
The standard error
FileDescriptor
(FD 2)
Methods
async def malloc(self, cls: t.Type[t.Union[FixedSize, FixedSerializer, os.PathLike, str, bytes]], size: int = None) ‑> Pointer
-
Allocate a buffer for this type, with the specified size if required
If
malloc
is given aFixedSize
type, the size must not be passed; ifmalloc
is given any other type, the size must be passed.Any type which inherits from
FixedSerializer
is supported. As a special case for convenience,bytes
,str
, andos.PathLike
are also supported;bytes
will be written out as they are, andstr
andos.PathLike
will be null-terminated.Expand source code Browse git
async def malloc(self, cls: t.Type[t.Union[FixedSize, FixedSerializer, os.PathLike, str, bytes]], size: int=None) -> Pointer: """Allocate a buffer for this type, with the specified size if required If `malloc` is given a `rsyscall.struct.FixedSize` type, the size must not be passed; if `malloc` is given any other type, the size must be passed. Any type which inherits from `rsyscall.struct.FixedSerializer` is supported. As a special case for convenience, `bytes`, `str`, and `os.PathLike` are also supported; `bytes` will be written out as they are, and `str` and `os.PathLike` will be null-terminated. """ return await self.ram.malloc(cls, size) # type: ignore
async def ptr(self, data: t.Union[HasSerializer, os.PathLike, str, bytes]) ‑> WrittenPointer
-
Allocate a buffer for this data, and write the data to that buffer
Any value which inherits from
HasSerializer
is supported. As a special case for convenience,bytes
,str
, andos.PathLike
are also supported;bytes
will be written out as they are, andstr
andos.PathLike
will be null-terminated.Expand source code Browse git
async def ptr(self, data: t.Union[HasSerializer, os.PathLike, str, bytes]) -> WrittenPointer: """Allocate a buffer for this data, and write the data to that buffer Any value which inherits from `HasSerializer` is supported. As a special case for convenience, `bytes`, `str`, and `os.PathLike` are also supported; `bytes` will be written out as they are, and `str` and `os.PathLike` will be null-terminated. """ return await self.ram.ptr(data)
async def make_afd(self, fd: FileDescriptor, set_nonblock: bool = False) ‑> AsyncFileDescriptor
-
Make an AsyncFileDescriptor; make it nonblocking if
set_nonblock
is True.Make sure that
fd
is already in non-blocking mode; such as by accepting it with theSOCK.NONBLOCK
flag; if it's not, you can pass set_nonblock=True to make it nonblocking.Expand source code Browse git
async def make_afd(self, fd: FileDescriptor, set_nonblock: bool=False) -> AsyncFileDescriptor: """Make an AsyncFileDescriptor; make it nonblocking if `set_nonblock` is True. Make sure that `fd` is already in non-blocking mode; such as by accepting it with the `SOCK.NONBLOCK` flag; if it's not, you can pass set_nonblock=True to make it nonblocking. """ if set_nonblock: await fd.fcntl(F.SETFL, O.NONBLOCK) return await AsyncFileDescriptor.make(self.epoller, self.ram, fd)
async def open_async_channels(self, count: int) ‑> List[Tuple[AsyncFileDescriptor, FileDescriptor]]
-
Calls self.connection.open_async_channels; see
Connection.open_async_channels
Expand source code Browse git
async def open_async_channels(self, count: int) -> t.List[t.Tuple[AsyncFileDescriptor, FileDescriptor]]: "Calls self.connection.open_async_channels; see `Connection.open_async_channels`" return (await self.connection.open_async_channels(count))
async def open_channels(self, count: int) ‑> List[Tuple[FileDescriptor, FileDescriptor]]
-
Calls self.connection.open_channels; see
Connection.open_channels
Expand source code Browse git
async def open_channels(self, count: int) -> t.List[t.Tuple[FileDescriptor, FileDescriptor]]: "Calls self.connection.open_channels; see `Connection.open_channels`" return (await self.connection.open_channels(count))
async def spit(self, path: t.Union[Path, FileDescriptor], text: t.Union[str, bytes], mode=420) ‑> Optional[Path]
-
Open a file, creating and truncating it, and write the passed text to it
Probably shouldn't use this on FIFOs or anything.
Returns the passed-in Path so this serves as a nice pseudo-constructor.
Expand source code Browse git
async def spit(self, path: t.Union[Path, FileDescriptor], text: t.Union[str, bytes], mode=0o644) -> t.Optional[Path]: """Open a file, creating and truncating it, and write the passed text to it Probably shouldn't use this on FIFOs or anything. Returns the passed-in Path so this serves as a nice pseudo-constructor. """ if isinstance(path, Path): out: t.Optional[Path] = path fd = await self.task.open(await self.ram.ptr(path), O.WRONLY|O.TRUNC|O.CREAT, mode=mode) else: out = None fd = path to_write: Pointer = await self.ram.ptr(os.fsencode(text)) while to_write.size() > 0: _, to_write = await fd.write(to_write) await fd.close() return out
async def bind_getsockname(self, sock: FileDescriptor, addr: T_sockaddr) ‑> ~T_sockaddr
-
Call bind and then getsockname on
sock
.bind followed by getsockname is a common pattern when allocating unused source ports with SockaddrIn(0, …). Unfortunately, memory allocation for getsockname is quite verbose, so it would be nice to have a helper to make that pattern easier. Since we don't want to encourage usage of getsockname (it should be rarely used outside of that pattern), we add a helper for that specific pattern, rather than getsockname on its own.
Expand source code Browse git
async def bind_getsockname(self, sock: FileDescriptor, addr: T_sockaddr) -> T_sockaddr: """Call bind and then getsockname on `sock`. bind followed by getsockname is a common pattern when allocating unused source ports with SockaddrIn(0, ...). Unfortunately, memory allocation for getsockname is quite verbose, so it would be nice to have a helper to make that pattern easier. Since we don't want to encourage usage of getsockname (it should be rarely used outside of that pattern), we add a helper for that specific pattern, rather than getsockname on its own. """ written_addr_ptr = await self.ram.ptr(addr) await sock.bind(written_addr_ptr) sockbuf_ptr = await sock.getsockname(await self.ram.ptr(Sockbuf(written_addr_ptr))) addr_ptr = (await sockbuf_ptr.read()).buf return await addr_ptr.read()
async def mkdir(self, path: Path, mode=493) ‑> Path
-
Make a directory at this path
Expand source code Browse git
async def mkdir(self, path: Path, mode=0o755) -> Path: "Make a directory at this path" await self.task.mkdir(await self.ram.ptr(path)) return path
async def read_to_eof(self, fd: FileDescriptor) ‑> bytes
-
Read this file descriptor until we get EOF, then return all the bytes read
Expand source code Browse git
async def read_to_eof(self, fd: FileDescriptor) -> bytes: "Read this file descriptor until we get EOF, then return all the bytes read" data = b"" while True: read, rest = await fd.read(await self.ram.malloc(bytes, 4096)) if read.size() == 0: return data # TODO this would be more efficient if we batched our memory-reads at the end data += await read.read()
async def mount(self, source: t.Union[str, os.PathLike], target: t.Union[str, os.PathLike], filesystemtype: str, mountflags: MS, data: str) ‑> NoneType
-
Call mount with these args
Expand source code Browse git
async def mount(self, source: t.Union[str, os.PathLike], target: t.Union[str, os.PathLike], filesystemtype: str, mountflags: MS, data: str) -> None: "Call mount with these args" async def op(sem: RAM) -> t.Tuple[ WrittenPointer[t.Union[str, os.PathLike]], WrittenPointer[t.Union[str, os.PathLike]], WrittenPointer[str], WrittenPointer[str]]: return ( await sem.ptr(source), await sem.ptr(target), await sem.ptr(filesystemtype), await sem.ptr(data), ) source_ptr, target_ptr, filesystemtype_ptr, data_ptr = await self.ram.perform_batch(op) await self.task.mount(source_ptr, target_ptr, filesystemtype_ptr, mountflags, data_ptr)
async def socket(self, domain: AF, type: SOCK, protocol: int = 0) ‑> FileDescriptor
-
Expand source code Browse git
async def socket(self, domain: AF, type: SOCK, protocol: int=0) -> FileDescriptor: return await self.task.socket(domain, type, protocol)
async def pipe(self) ‑> Pipe
-
Expand source code Browse git
async def pipe(self) -> Pipe: return await (await self.task.pipe(await self.malloc(Pipe))).read()
async def socketpair(self, domain: AF, type: SOCK, protocol: int = 0) ‑> Socketpair
-
Expand source code Browse git
async def socketpair(self, domain: AF, type: SOCK, protocol: int=0) -> Socketpair: return await (await self.task.socketpair(domain, type, protocol, await self.malloc(Socketpair))).read()
async def chroot(self, path: t.Union[str, os.PathLike]) ‑> NoneType
-
Expand source code Browse git
async def chroot(self, path: t.Union[str, os.PathLike]) -> None: await self.task.chroot(await self.ptr(path))
def inherit_fd(self, fd: FileDescriptor) ‑> FileDescriptor
-
Expand source code Browse git
def inherit_fd(self, fd: FileDescriptor) -> FileDescriptor: return self.task.inherit_fd(fd)
async def clone(self, flags: CLONE = CLONE.NONE, automatically_write_user_mappings: bool = True) ‑> ChildThread
-
Create a new child thread
manpage: clone(2)
Expand source code Browse git
async def clone(self, flags: CLONE=CLONE.NONE, automatically_write_user_mappings: bool=True) -> ChildThread: """Create a new child thread manpage: clone(2) """ child_process, task = await clone_child_task( self.task, self.ram, self.connection, self.loader, self.monitor, flags, lambda sock: Trampoline(self.loader.server_func, [sock, sock])) ram = RAM(task, # We don't inherit the transport because it leads to a deadlock: # If when a child task calls transport.read, it performs a syscall in the child task, # then the parent task will need to call waitid to monitor the child task during the syscall, # which will in turn need to also call transport.read. # But the child is already using the transport and holding the lock, # so the parent will block forever on taking the lock, # and child's read syscall will never complete. self.ram.transport, self.ram.allocator.inherit(task), ) if flags & CLONE.NEWPID: # if the new process is pid 1, then CLONE_PARENT isn't allowed so we can't use inherit_to_child. # if we are a reaper, than we don't want our child CLONE_PARENTing to us, so we can't use inherit_to_child. # in both cases we just fall back to making a new ChildProcessMonitor for the child. epoller = await Epoller.make_root(ram, task) # this signal is already blocked, we inherited the block, um... I guess... # TODO handle this more formally signal_block = SignalBlock(task, await ram.ptr(Sigset({SIG.CHLD}))) monitor = await ChildProcessMonitor.make(ram, task, epoller, signal_block=signal_block) else: epoller = self.epoller.inherit(ram) monitor = self.monitor.inherit_to_child(task) thread = ChildThread(Thread( task, ram, self.connection.inherit(task, ram), self.loader, epoller, monitor, self.environ.inherit(task, ram), stdin=self.stdin.inherit(task), stdout=self.stdout.inherit(task), stderr=self.stderr.inherit(task), ), child_process) if flags & CLONE.NEWUSER and automatically_write_user_mappings: # hack, we should really track the [ug]id ahead of this so we don't have to get it # we have to get the [ug]id from the parent because it will fail in the child uid = await self.task.getuid() gid = await self.task.getgid() await write_user_mappings(thread, uid, gid) return thread
async def run(self, command: Command, check=True, *, task_status=TASK_STATUS_IGNORED) ‑> ChildState
-
Run the passed command to completion and return its end state, throwing if unclean
If check is False, we won't throw if the end state is unclean.
Expand source code Browse git
async def run(self, command: Command, check=True, *, task_status=trio.TASK_STATUS_IGNORED) -> ChildState: """Run the passed command to completion and return its end state, throwing if unclean If check is False, we won't throw if the end state is unclean. """ thread = await self.clone() child = await thread.exec(command) task_status.started(child) if check: return await child.check() else: return await child.waitpid(W.EXITED)
-
Call the unshare syscall, appropriately updating values on this class
Expand source code Browse git
async def unshare(self, flags: CLONE) -> None: "Call the unshare syscall, appropriately updating values on this class" # Note: unsharing NEWPID causes us to not get zombies for our children if init dies. That # means we'll get ECHILDs, and various races can happen. It's not possible to robustly # unshare NEWPID. if flags & CLONE.FILES: await self.unshare_files() flags ^= CLONE.FILES if flags & CLONE.NEWUSER: await self.unshare_user() flags ^= CLONE.NEWUSER if flags & CLONE.FS: flags ^= CLONE.FS await self.task.unshare(flags)
-
Unshare the file descriptor table.
Set going_to_exec to False if you are going to keep this task around long-term, and we'll do a manual cloexec in userspace to clear out fds held by any other non-rsyscall libraries, which are automatically copied by Linux into the new fd space.
We default going_to_exec to True because there's little reason to call unshare_files other than to then exec; and even if you do want to call unshare_files without execing, there probably aren't any significant other libraries in the FD space; and even if there are such libraries, it usually doesn't matter to keep stray references around to their FDs.
TODO maybe this should return an object that lets us unset CLOEXEC on things?
Expand source code Browse git
async def unshare_files(self, going_to_exec=True) -> None: """Unshare the file descriptor table. Set going_to_exec to False if you are going to keep this task around long-term, and we'll do a manual cloexec in userspace to clear out fds held by any other non-rsyscall libraries, which are automatically copied by Linux into the new fd space. We default going_to_exec to True because there's little reason to call unshare_files other than to then exec; and even if you do want to call unshare_files without execing, there probably aren't any significant other libraries in the FD space; and even if there are such libraries, it usually doesn't matter to keep stray references around to their FDs. TODO maybe this should return an object that lets us unset CLOEXEC on things? """ await self.task.unshare_files() if not going_to_exec: await do_cloexec_except(self, set([fd.near for fd in self.task.fd_handles]))
-
Unshare the user namespace.
We automatically set up the user namespace in the unprivileged way using a single user mapping line. You can pass
in_namespace_uid
andin_namespace_gid
to control what user id and group id we'll observe inside the namespace. If you want further control, call task.unshare(CLONE.NEWUSER) directly.We also automatically do unshare(CLONE.FS); that's required by CLONE.NEWUSER.
Expand source code Browse git
async def unshare_user(self, in_namespace_uid: int=None, in_namespace_gid: int=None) -> None: """Unshare the user namespace. We automatically set up the user namespace in the unprivileged way using a single user mapping line. You can pass `in_namespace_uid` and `in_namespace_gid` to control what user id and group id we'll observe inside the namespace. If you want further control, call task.unshare(CLONE.NEWUSER) directly. We also automatically do unshare(CLONE.FS); that's required by CLONE.NEWUSER. """ uid = await self.task.getuid() gid = await self.task.getgid() await self.task.unshare(CLONE.FS|CLONE.NEWUSER) await write_user_mappings(self, uid, gid, in_namespace_uid=in_namespace_uid, in_namespace_gid=in_namespace_gid)
async def exit(self, status: int = 0) ‑> NoneType
-
Exit this thread
Currently we just forward through to exit the task.
I feel suspicious that this method will at some point require more heavy lifting with namespaces and monitored children, so I'm leaving it on Thread to prepare for that eventuality.
manpage: exit(2)
Expand source code Browse git
async def exit(self, status: int=0) -> None: """Exit this thread Currently we just forward through to exit the task. I feel suspicious that this method will at some point require more heavy lifting with namespaces and monitored children, so I'm leaving it on Thread to prepare for that eventuality. manpage: exit(2) """ await self.task.exit(status)
class ChildThread (thr: Thread, process: AsyncChildProcess)
-
A thread that we know is also a direct child process of another thread
Expand source code Browse git
class ChildThread(Thread): "A thread that we know is also a direct child process of another thread" def __init__(self, thr: Thread, process: AsyncChildProcess) -> None: super()._init_from(thr) self.process = process async def _execve(self, path: t.Union[str, os.PathLike], argv: t.List[str], envp: t.List[str], command: Command=None, ) -> AsyncChildProcess: "Call execve, abstracting over memory; self.{exec,execve} are probably preferable" async def op(sem: RAM) -> t.Tuple[WrittenPointer[t.Union[str, os.PathLike]], WrittenPointer[ArgList], WrittenPointer[ArgList]]: argv_ptrs = ArgList([await sem.ptr(arg) for arg in argv]) envp_ptrs = ArgList([await sem.ptr(arg) for arg in envp]) return (await sem.ptr(path), await sem.ptr(argv_ptrs), await sem.ptr(envp_ptrs)) filename, argv_ptr, envp_ptr = await self.ram.perform_batch(op) await self.task.execve(filename, argv_ptr, envp_ptr, command=command) return self.process async def execv(self, path: t.Union[str, os.PathLike], argv: t.Sequence[t.Union[str, os.PathLike]], command: Command=None, ) -> AsyncChildProcess: """Replace the running executable in this thread with another; see execve. """ async def op(sem: RAM) -> t.Tuple[WrittenPointer[t.Union[str, os.PathLike]], WrittenPointer[ArgList]]: argv_ptrs = ArgList([await sem.ptr(arg) for arg in argv]) return (await sem.ptr(path), await sem.ptr(argv_ptrs)) filename_ptr, argv_ptr = await self.ram.perform_batch(op) envp_ptr = await self.environ.as_arglist(self.ram) await self.task.execve(filename_ptr, argv_ptr, envp_ptr, command=command) return self.process async def execve(self, path: t.Union[str, os.PathLike], argv: t.Sequence[t.Union[str, os.PathLike]], env_updates: t.Mapping[str, t.Union[str, os.PathLike]]={}, inherited_signal_blocks: t.List[SignalBlock]=[], command: Command=None, ) -> AsyncChildProcess: """Replace the running executable in this thread with another. self.exec is probably preferable; it takes a nice Command object which is easier to work with. We take inherited_signal_blocks as an argument so that we can default it to "inheriting" an empty signal mask. Most programs expect the signal mask to be cleared on startup. Since we're using signalfd as our signal handling method, we need to block signals with the signal mask; and if those blocked signals were inherited across exec, other programs would break (SIGCHLD is the most obvious example). We could depend on the user clearing the signal mask before calling exec, similar to how we require the user to remove CLOEXEC from inherited fds; but that is a fairly novel requirement to most, so for simplicity we just default to clearing the signal mask before exec, and allow the user to explicitly pass down additional signal blocks. """ sigmask: t.Set[SIG] = set() for block in inherited_signal_blocks: sigmask = sigmask.union(block.mask) await self.task.sigprocmask((HowSIG.SETMASK, await self.ram.ptr(Sigset(sigmask)))) if not env_updates: # use execv if we aren't updating the env, as an optimization. return await self.execv(path, argv, command=command) envp: t.Dict[str, str] = {**self.environ.data} for key, value in env_updates.items(): envp[key] = os.fsdecode(value) raw_envp: t.List[str] = ['='.join([key, value]) for key, value in envp.items()] logger.debug("execveat(%s, %s, %s)", path, argv, env_updates) return await self._execve(path, [os.fsdecode(arg) for arg in argv], raw_envp, command=command) async def exec(self, command: Command, inherited_signal_blocks: t.List[SignalBlock]=[], ) -> AsyncChildProcess: """Replace the running executable in this thread with what's specified in `command` See self.execve's docstring for an explanation of inherited_signal_blocks. manpage: execve(2) """ return (await self.execve(command.executable_path, command.arguments, command.env_updates, inherited_signal_blocks=inherited_signal_blocks, command=command))
Ancestors
Methods
async def execv(self, path: t.Union[str, os.PathLike], argv: t.Sequence[t.Union[str, os.PathLike]], command: Command = None) ‑> AsyncChildProcess
-
Replace the running executable in this thread with another; see execve.
Expand source code Browse git
async def execv(self, path: t.Union[str, os.PathLike], argv: t.Sequence[t.Union[str, os.PathLike]], command: Command=None, ) -> AsyncChildProcess: """Replace the running executable in this thread with another; see execve. """ async def op(sem: RAM) -> t.Tuple[WrittenPointer[t.Union[str, os.PathLike]], WrittenPointer[ArgList]]: argv_ptrs = ArgList([await sem.ptr(arg) for arg in argv]) return (await sem.ptr(path), await sem.ptr(argv_ptrs)) filename_ptr, argv_ptr = await self.ram.perform_batch(op) envp_ptr = await self.environ.as_arglist(self.ram) await self.task.execve(filename_ptr, argv_ptr, envp_ptr, command=command) return self.process
async def execve(self, path: t.Union[str, os.PathLike], argv: t.Sequence[t.Union[str, os.PathLike]], env_updates: t.Mapping[str, t.Union[str, os.PathLike]] = {}, inherited_signal_blocks: t.List[SignalBlock] = [], command: Command = None) ‑> AsyncChildProcess
-
Replace the running executable in this thread with another.
self.exec is probably preferable; it takes a nice Command object which is easier to work with.
We take inherited_signal_blocks as an argument so that we can default it to "inheriting" an empty signal mask. Most programs expect the signal mask to be cleared on startup. Since we're using signalfd as our signal handling method, we need to block signals with the signal mask; and if those blocked signals were inherited across exec, other programs would break (SIGCHLD is the most obvious example).
We could depend on the user clearing the signal mask before calling exec, similar to how we require the user to remove CLOEXEC from inherited fds; but that is a fairly novel requirement to most, so for simplicity we just default to clearing the signal mask before exec, and allow the user to explicitly pass down additional signal blocks.
Expand source code Browse git
async def execve(self, path: t.Union[str, os.PathLike], argv: t.Sequence[t.Union[str, os.PathLike]], env_updates: t.Mapping[str, t.Union[str, os.PathLike]]={}, inherited_signal_blocks: t.List[SignalBlock]=[], command: Command=None, ) -> AsyncChildProcess: """Replace the running executable in this thread with another. self.exec is probably preferable; it takes a nice Command object which is easier to work with. We take inherited_signal_blocks as an argument so that we can default it to "inheriting" an empty signal mask. Most programs expect the signal mask to be cleared on startup. Since we're using signalfd as our signal handling method, we need to block signals with the signal mask; and if those blocked signals were inherited across exec, other programs would break (SIGCHLD is the most obvious example). We could depend on the user clearing the signal mask before calling exec, similar to how we require the user to remove CLOEXEC from inherited fds; but that is a fairly novel requirement to most, so for simplicity we just default to clearing the signal mask before exec, and allow the user to explicitly pass down additional signal blocks. """ sigmask: t.Set[SIG] = set() for block in inherited_signal_blocks: sigmask = sigmask.union(block.mask) await self.task.sigprocmask((HowSIG.SETMASK, await self.ram.ptr(Sigset(sigmask)))) if not env_updates: # use execv if we aren't updating the env, as an optimization. return await self.execv(path, argv, command=command) envp: t.Dict[str, str] = {**self.environ.data} for key, value in env_updates.items(): envp[key] = os.fsdecode(value) raw_envp: t.List[str] = ['='.join([key, value]) for key, value in envp.items()] logger.debug("execveat(%s, %s, %s)", path, argv, env_updates) return await self._execve(path, [os.fsdecode(arg) for arg in argv], raw_envp, command=command)
async def exec(self, command: Command, inherited_signal_blocks: t.List[SignalBlock] = []) ‑> AsyncChildProcess
-
Replace the running executable in this thread with what's specified in
rsyscall.command
See self.execve's docstring for an explanation of inherited_signal_blocks.
manpage: execve(2)
Expand source code Browse git
async def exec(self, command: Command, inherited_signal_blocks: t.List[SignalBlock]=[], ) -> AsyncChildProcess: """Replace the running executable in this thread with what's specified in `command` See self.execve's docstring for an explanation of inherited_signal_blocks. manpage: execve(2) """ return (await self.execve(command.executable_path, command.arguments, command.env_updates, inherited_signal_blocks=inherited_signal_blocks, command=command))
Inherited members
class Command (executable_path: Path, arguments: List[Union[str, os.PathLike]], env_updates: Mapping[str, Union[str, os.PathLike]])
-
A convenient builder-pattern representation of the arguments to execve.
Expand source code Browse git
class Command: "A convenient builder-pattern representation of the arguments to execve." def __init__(self, executable_path: Path, arguments: t.List[t.Union[str, os.PathLike]], env_updates: t.Mapping[str, t.Union[str, os.PathLike]]) -> None: self.executable_path = executable_path self.arguments = arguments self.env_updates = env_updates def args(self: T_command, *args: t.Union[str, os.PathLike]) -> T_command: "Add more arguments to this Command." return type(self)(self.executable_path, [*self.arguments, *args], self.env_updates) def env(self: T_command, env_updates: t.Mapping[str, t.Union[str, os.PathLike]]={}, **updates: t.Union[str, os.PathLike]) -> T_command: """Add more environment variable updates to this Command. There are two ways to pass arguments to this method (which can be used simultaneously): - you can pass a dictionary of environment updates, - or you can provide your environment updates as keyword arguments. Both are necessary, since there are many valid environment variable names which are not valid Python keyword argument names. """ return type(self)(self.executable_path, self.arguments, {**self.env_updates, **env_updates, **updates}) def in_shell_form(self) -> str: "Render this Command as a string which could be passed to a shell." ret = "" for key, value in self.env_updates.items(): ret += os.fsdecode(key) + "=" + os.fsdecode(value) + " " ret += os.fsdecode(self.executable_path) # skip first argument for arg in self.arguments[1:]: ret += " " + os.fsdecode(arg) return ret def __str__(self) -> str: ret = "Command(" for key, value in self.env_updates.items(): ret += f"{key}={value} " ret += f"{os.fsdecode(self.executable_path)}," for arg in self.arguments: ret += " " + os.fsdecode(arg) ret += ")" return ret def __repr__(self) -> str: return str(self)
Subclasses
Methods
def args(self: ~T_command, *args: Union[str, os.PathLike]) ‑> ~T_command
-
Add more arguments to this Command.
Expand source code Browse git
def args(self: T_command, *args: t.Union[str, os.PathLike]) -> T_command: "Add more arguments to this Command." return type(self)(self.executable_path, [*self.arguments, *args], self.env_updates)
def env(self: ~T_command, env_updates: Mapping[str, Union[str, os.PathLike]] = {}, **updates: Union[str, os.PathLike]) ‑> ~T_command
-
Add more environment variable updates to this Command.
There are two ways to pass arguments to this method (which can be used simultaneously): - you can pass a dictionary of environment updates, - or you can provide your environment updates as keyword arguments. Both are necessary, since there are many valid environment variable names which are not valid Python keyword argument names.
Expand source code Browse git
def env(self: T_command, env_updates: t.Mapping[str, t.Union[str, os.PathLike]]={}, **updates: t.Union[str, os.PathLike]) -> T_command: """Add more environment variable updates to this Command. There are two ways to pass arguments to this method (which can be used simultaneously): - you can pass a dictionary of environment updates, - or you can provide your environment updates as keyword arguments. Both are necessary, since there are many valid environment variable names which are not valid Python keyword argument names. """ return type(self)(self.executable_path, self.arguments, {**self.env_updates, **env_updates, **updates})
def in_shell_form(self) ‑> str
-
Render this Command as a string which could be passed to a shell.
Expand source code Browse git
def in_shell_form(self) -> str: "Render this Command as a string which could be passed to a shell." ret = "" for key, value in self.env_updates.items(): ret += os.fsdecode(key) + "=" + os.fsdecode(value) + " " ret += os.fsdecode(self.executable_path) # skip first argument for arg in self.arguments[1:]: ret += " " + os.fsdecode(arg) return ret
class Task (process: t.Union[Process, Process], fd_table: FDTable, address_space: AddressSpace, pidns: PidNamespace)
-
A Linux process context under our control, ready for syscalls
Since there are many different syscalls we could make, this class is built by inheriting from many other purpose specific "Task" classes, which in turn all inherit from the base
Task
.This is named after the kernel struct, "struct task", associated with each process.
Expand source code Browse git
class Task( EventfdTask[FileDescriptor], TimerfdTask[FileDescriptor], EpollTask[FileDescriptor], InotifyTask[FileDescriptor], SignalfdTask[FileDescriptor], MemfdTask[FileDescriptor], FSTask[FileDescriptor], SocketTask[FileDescriptor], PipeTask, MemoryMappingTask, CWDTask, FileDescriptorTask[FileDescriptor], CapabilityTask, PrctlTask, MountTask, CredentialsTask, ProcessTask, SchedTask, ResourceTask, FutexTask, SignalTask, rsyscall.far.Task, ): """A Linux process context under our control, ready for syscalls Since there are many different syscalls we could make, this class is built by inheriting from many other purpose specific "Task" classes, which in turn all inherit from the base `rsyscall.far.Task`. This is named after the kernel struct, "struct task", associated with each process. """ def __init__(self, process: t.Union[rsyscall.near.Process, Process], fd_table: FDTable, address_space: rsyscall.far.AddressSpace, pidns: rsyscall.far.PidNamespace, ) -> None: super().__init__( UnusableSyscallInterface(), t.cast(rsyscall.near.Process, process), fd_table, address_space, pidns, ) def _file_descriptor_constructor(self, fd: rsyscall.near.FileDescriptor) -> FileDescriptor: # for extensibility return FileDescriptor(self, fd, True) def _make_fresh_address_space(self) -> None: self.address_space = rsyscall.far.AddressSpace(self.process.near.id) async def unshare(self, flags: CLONE) -> None: if flags & CLONE.FILES: await self.unshare_files() flags ^= CLONE.FILES if flags: await _unshare(self.sysif, flags) async def setns_user(self, fd: FileDescriptor) -> None: # can't setns to a user namespace while sharing CLONE_FS await self.unshare(CLONE.FS) await self.setns(fd, CLONE.NEWUSER) async def execveat(self, fd: t.Optional[FileDescriptor], pathname: WrittenPointer[t.Union[str, os.PathLike]], argv: WrittenPointer[ArgList], envp: WrittenPointer[ArgList], flags: AT=AT.NONE, command: Command=None, ) -> None: with contextlib.ExitStack() as stack: if fd: fd_n: t.Optional[rsyscall.near.FileDescriptor] = stack.enter_context(fd.borrow(self)) else: fd_n = None stack.enter_context(pathname.borrow(self)) argv.check_address_space(self) envp.check_address_space(self) for arg in [*argv.value, *envp.value]: stack.enter_context(arg.borrow(self)) self.manipulating_fd_table = True try: await _execveat(self.sysif, fd_n, pathname.near, argv.near, envp.near, flags) except OSError as exn: exn.filename = (fd, pathname.value) raise finally: self.manipulating_fd_table = False self._make_fresh_fd_table() self._make_fresh_address_space() if isinstance(self.process, ChildProcess): self.process.did_exec(command) await self.sysif.close_interface() async def execve(self, filename: WrittenPointer[t.Union[str, os.PathLike]], argv: WrittenPointer[ArgList], envp: WrittenPointer[ArgList], command: Command=None, ) -> None: filename.check_address_space(self) argv.check_address_space(self) envp.check_address_space(self) for arg in [*argv.value, *envp.value]: arg.check_address_space(self) self.manipulating_fd_table = True try: await _execve(self.sysif, filename.near, argv.near, envp.near) except OSError as exn: exn.filename = filename.value raise self.manipulating_fd_table = False self._make_fresh_fd_table() self._make_fresh_address_space() if isinstance(self.process, ChildProcess): self.process.did_exec(command) await self.sysif.close_interface() async def exit(self, status: int) -> None: self.manipulating_fd_table = True await _exit(self.sysif, status) self.manipulating_fd_table = False self._make_fresh_fd_table() # close the syscall interface; we don't have to do this since it'll be # GC'd, but maybe we want to be tidy in advance. await self.sysif.close_interface()
Ancestors
- EventfdTask
- rsyscall.sys.timerfd.TimerfdTask
- EpollTask
- rsyscall.sys.inotify.InotifyTask
- SignalfdTask
- MemfdTask
- rsyscall.unistd.FSTask
- SocketTask
- PipeTask
- MemoryMappingTask
- CWDTask
- FileDescriptorTask
- rsyscall.sys.capability.CapabilityTask
- PrctlTask
- MountTask
- CredentialsTask
- ProcessTask
- rsyscall.sched.SchedTask
- ResourceTask
- FutexTask
- SignalTask
- Task
- typing.Generic
Class variables
var sysif : SyscallInterface
var near_process : Process
var fd_table : FDTable
var address_space : AddressSpace
var pidns : PidNamespace
Methods
-
Expand source code Browse git
async def unshare(self, flags: CLONE) -> None: if flags & CLONE.FILES: await self.unshare_files() flags ^= CLONE.FILES if flags: await _unshare(self.sysif, flags)
async def setns_user(self, fd: FileDescriptor) ‑> NoneType
-
Expand source code Browse git
async def setns_user(self, fd: FileDescriptor) -> None: # can't setns to a user namespace while sharing CLONE_FS await self.unshare(CLONE.FS) await self.setns(fd, CLONE.NEWUSER)
async def execveat(self, fd: t.Optional[FileDescriptor], pathname: WrittenPointer[t.Union[str, os.PathLike]], argv: WrittenPointer[ArgList], envp: WrittenPointer[ArgList], flags: AT = AT.NONE, command: Command = None) ‑> NoneType
-
Expand source code Browse git
async def execveat(self, fd: t.Optional[FileDescriptor], pathname: WrittenPointer[t.Union[str, os.PathLike]], argv: WrittenPointer[ArgList], envp: WrittenPointer[ArgList], flags: AT=AT.NONE, command: Command=None, ) -> None: with contextlib.ExitStack() as stack: if fd: fd_n: t.Optional[rsyscall.near.FileDescriptor] = stack.enter_context(fd.borrow(self)) else: fd_n = None stack.enter_context(pathname.borrow(self)) argv.check_address_space(self) envp.check_address_space(self) for arg in [*argv.value, *envp.value]: stack.enter_context(arg.borrow(self)) self.manipulating_fd_table = True try: await _execveat(self.sysif, fd_n, pathname.near, argv.near, envp.near, flags) except OSError as exn: exn.filename = (fd, pathname.value) raise finally: self.manipulating_fd_table = False self._make_fresh_fd_table() self._make_fresh_address_space() if isinstance(self.process, ChildProcess): self.process.did_exec(command) await self.sysif.close_interface()
async def execve(self, filename: WrittenPointer[t.Union[str, os.PathLike]], argv: WrittenPointer[ArgList], envp: WrittenPointer[ArgList], command: Command = None) ‑> NoneType
-
Expand source code Browse git
async def execve(self, filename: WrittenPointer[t.Union[str, os.PathLike]], argv: WrittenPointer[ArgList], envp: WrittenPointer[ArgList], command: Command=None, ) -> None: filename.check_address_space(self) argv.check_address_space(self) envp.check_address_space(self) for arg in [*argv.value, *envp.value]: arg.check_address_space(self) self.manipulating_fd_table = True try: await _execve(self.sysif, filename.near, argv.near, envp.near) except OSError as exn: exn.filename = filename.value raise self.manipulating_fd_table = False self._make_fresh_fd_table() self._make_fresh_address_space() if isinstance(self.process, ChildProcess): self.process.did_exec(command) await self.sysif.close_interface()
async def exit(self, status: int) ‑> NoneType
-
Expand source code Browse git
async def exit(self, status: int) -> None: self.manipulating_fd_table = True await _exit(self.sysif, status) self.manipulating_fd_table = False self._make_fresh_fd_table() # close the syscall interface; we don't have to do this since it'll be # GC'd, but maybe we want to be tidy in advance. await self.sysif.close_interface()
Inherited members
class FileDescriptor (task: Task, near: FileDescriptor, valid: bool)
-
A file descriptor accessed through some
Task
, with FD-based syscalls as methodsA
FileDescriptor
represents the ability to use some open file through someTask
. When an open file is created by a syscall in someTask
, the syscall will return aFileDescriptor
which allows accessing that open file through thatTask
.A
FileDescriptor
has many methods to make syscalls; most syscalls which take a file descriptor as their first argument are present as a method onFileDescriptor
. These syscalls will be made through theTask
in the FileDescriptor'stask
field.Since there are so many syscalls, this class is built by inheriting from many other purpose specific
FooFileDescriptor
classes, which in turn all inherit fromBaseFileDescriptor
.After we have opened the file and performed some operations on it, we can call
close
to immediately close the FileDescriptor and free its resources. The FileDescriptor will also be automatically closed in the background after the FileDescriptor has been garbage collected. Garbage collection should be relied on and preferred over context managers or explicit closing, which are both too inflexible for large scale resource management. Garbage collection is currently run when we change file descriptor tables, as well as on-demand if the user callsFileDescriptorTask.run_fd_table_gc
.We can use
inherit
to copy a FileDescriptor into a task which inherited file descriptors from a parent, andfor_task
to copy a FileDescriptor into tasks sharing the same file descriptor table. We can also use more complicated methods, such asCmsgSCMRights
, to copy file descriptors without inheritance or a shared file descriptor table.Expand source code Browse git
@dataclass(eq=False) class FileDescriptor( EventFileDescriptor, TimerFileDescriptor, EpollFileDescriptor, InotifyFileDescriptor, SignalFileDescriptor, IoctlFileDescriptor, GetdentsFileDescriptor, UioFileDescriptor, SeekableFileDescriptor, IOFileDescriptor, FSFileDescriptor, SocketFileDescriptor, MappableFileDescriptor, StatFileDescriptor, FcntlFileDescriptor, BaseFileDescriptor, ): """A file descriptor accessed through some `Task`, with FD-based syscalls as methods A `FileDescriptor` represents the ability to use some open file through some `Task`. When an open file is created by a syscall in some `Task`, the syscall will return a `FileDescriptor` which allows accessing that open file through that `Task`. A `FileDescriptor` has many methods to make syscalls; most syscalls which take a file descriptor as their first argument are present as a method on `FileDescriptor`. These syscalls will be made through the `Task` in the FileDescriptor's `task` field. Since there are so many syscalls, this class is built by inheriting from many other purpose specific `FooFileDescriptor` classes, which in turn all inherit from `BaseFileDescriptor`. After we have opened the file and performed some operations on it, we can call `close` to immediately close the FileDescriptor and free its resources. The FileDescriptor will also be automatically closed in the background after the FileDescriptor has been garbage collected. Garbage collection should be relied on and preferred over context managers or explicit closing, which are both too inflexible for large scale resource management. Garbage collection is currently run when we change file descriptor tables, as well as on-demand if the user calls `FileDescriptorTask.run_fd_table_gc`. We can use `inherit` to copy a FileDescriptor into a task which inherited file descriptors from a parent, and `for_task` to copy a FileDescriptor into tasks sharing the same file descriptor table. We can also use more complicated methods, such as `rsyscall.sys.socket.CmsgSCMRights`, to copy file descriptors without inheritance or a shared file descriptor table. """ __slots__ = () task: Task def as_proc_path(self) -> Path: """Return the /proc/{pid}/fd/{num} path pointing to this FD. This should be used with care, but it's sometimes useful for programs which accept paths instead of file descriptors. """ pid = self.task.process.near.id num = self.near.number return Path(f"/proc/{pid}/fd/{num}") async def disable_cloexec(self) -> None: "Unset the `FD.CLOEXEC` flag so this file descriptor can be inherited" # TODO this doesn't make any sense. we shouldn't allow cloexec if there are multiple people in our fd table; # whether or not there are multiple handles to the fd is irrelevant. if not self.is_only_handle(): raise Exception("shouldn't disable cloexec when there are multiple handles to this fd") await self.fcntl(F.SETFD, 0) async def enable_cloexec(self) -> None: "Set the `FD.CLOEXEC` flag so this file descriptor can't be inherited" await self.fcntl(F.SETFD, FD.CLOEXEC) async def as_argument(self) -> int: "`disable_cloexec`, then return this `FileDescriptor` as an integer; useful when passing the FD as an argument" await self.disable_cloexec() return int(self) async def __aenter__(self) -> FileDescriptor: return self async def __aexit__(self, *args, **kwargs) -> None: await self.close() def __str__(self) -> str: return repr(self) def __repr__(self) -> str: if self.valid: return f"FD({self.task}, {self.near.number})" else: return f"FD({self.task}, {self.near.number}, valid=False)"
Ancestors
- EventFileDescriptor
- TimerFileDescriptor
- EpollFileDescriptor
- InotifyFileDescriptor
- SignalFileDescriptor
- IoctlFileDescriptor
- GetdentsFileDescriptor
- UioFileDescriptor
- SeekableFileDescriptor
- IOFileDescriptor
- FSFileDescriptor
- SocketFileDescriptor
- MappableFileDescriptor
- StatFileDescriptor
- FcntlFileDescriptor
- BaseFileDescriptor
Methods
def as_proc_path(self) ‑> Path
-
Return the /proc/{pid}/fd/{num} path pointing to this FD.
This should be used with care, but it's sometimes useful for programs which accept paths instead of file descriptors.
Expand source code Browse git
def as_proc_path(self) -> Path: """Return the /proc/{pid}/fd/{num} path pointing to this FD. This should be used with care, but it's sometimes useful for programs which accept paths instead of file descriptors. """ pid = self.task.process.near.id num = self.near.number return Path(f"/proc/{pid}/fd/{num}")
async def disable_cloexec(self) ‑> NoneType
-
Unset the
FD.CLOEXEC
flag so this file descriptor can be inheritedExpand source code Browse git
async def disable_cloexec(self) -> None: "Unset the `FD.CLOEXEC` flag so this file descriptor can be inherited" # TODO this doesn't make any sense. we shouldn't allow cloexec if there are multiple people in our fd table; # whether or not there are multiple handles to the fd is irrelevant. if not self.is_only_handle(): raise Exception("shouldn't disable cloexec when there are multiple handles to this fd") await self.fcntl(F.SETFD, 0)
async def enable_cloexec(self) ‑> NoneType
-
Set the
FD.CLOEXEC
flag so this file descriptor can't be inheritedExpand source code Browse git
async def enable_cloexec(self) -> None: "Set the `FD.CLOEXEC` flag so this file descriptor can't be inherited" await self.fcntl(F.SETFD, FD.CLOEXEC)
async def as_argument(self) ‑> int
-
disable_cloexec
, then return thisFileDescriptor
as an integer; useful when passing the FD as an argumentExpand source code Browse git
async def as_argument(self) -> int: "`disable_cloexec`, then return this `FileDescriptor` as an integer; useful when passing the FD as an argument" await self.disable_cloexec() return int(self)
Inherited members
class AsyncFileDescriptor (ram: RAM, handle: FileDescriptor, epolled: EpolledFileDescriptor)
-
A file descriptor on which IO can be performed without blocking the thread.
Also comes with helpful methods
AsyncFileDescriptor.write_all_bytes()
andAsyncFileDescriptor.read_some_bytes()
to abstract over memory allocation.We always wait for a posedge to come back from epoll before trying to read. This is not necessarily too pessimistic, because as soon as we have a single posedge, we will keep reading in a loop, as long as data keeps coming through.
Don't construct directly; use the AsyncFileDescriptor.make constructor instead.
Expand source code Browse git
class AsyncFileDescriptor: """A file descriptor on which IO can be performed without blocking the thread. Also comes with helpful methods `AsyncFileDescriptor.write_all_bytes` and `AsyncFileDescriptor.read_some_bytes` to abstract over memory allocation. We always wait for a posedge to come back from epoll before trying to read. This is not necessarily too pessimistic, because as soon as we have a single posedge, we will keep reading in a loop, as long as data keeps coming through. """ @staticmethod async def make(epoller: Epoller, ram: RAM, fd: FileDescriptor) -> AsyncFileDescriptor: """Make an AsyncFileDescriptor; make sure to call this with only O.NONBLOCK file descriptors. It won't actually break anything if this is called with file descriptors not in NONBLOCK mode; it just means that they'll block when we go to read, which is probably not what the user wants. """ epolled = await epoller.register( fd, EPOLL.IN|EPOLL.OUT|EPOLL.RDHUP|EPOLL.PRI|EPOLL.ERR|EPOLL.HUP|EPOLL.ET, ) return AsyncFileDescriptor(ram, fd, epolled) def __init__(self, ram: RAM, handle: FileDescriptor, epolled: EpolledFileDescriptor, ) -> None: "Don't construct directly; use the AsyncFileDescriptor.make constructor instead." self.ram = ram self.handle = handle "The underlying FileDescriptor for this AFD, used for all system calls" self.epolled = epolled def __str__(self) -> str: return f"AsyncFileDescriptor({self.epolled})" async def make_new_afd(self, fd: FileDescriptor) -> AsyncFileDescriptor: """Use the Epoller and RAM in this AsyncFD to make a new `AsyncFileDescriptor` for `fd` Make sure that `fd` is already in non-blocking mode; such as by accepting it with the `SOCK.NONBLOCK` flag. This doesn't steal any resources from the original AFD; it's just a convenience method, most useful when calling accept() and wanting to create new AFDs out of the resulting FDs. """ return await AsyncFileDescriptor.make(self.epolled.epoller, self.ram, fd) async def wait_for_rdhup(self) -> None: "Call epoll_wait until this file descriptor has a hangup." await self.epolled.wait_for(EPOLL.RDHUP|EPOLL.HUP) async def read(self, ptr: Pointer) -> t.Tuple[Pointer, Pointer]: "Call `FileDescriptor.read` without blocking the thread." while True: await self.epolled.wait_for(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR) current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR) try: return (await self.handle.read(ptr)) except OSError as e: self.epolled.consume(current_events) if e.errno == errno.EAGAIN: self.epolled.status.negedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR) else: self.epolled.status.posedge(EPOLL.ERR) raise else: self.epolled.consume(current_events) self.epolled.status.posedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP) async def read_some_bytes(self, count: int=4096) -> bytes: """Read at most count bytes; possibly less, if we have a partial read. This allocates on each call. For some applications, you may want to avoid the cost of allocation, by instead allocating a buffer with `Thread.malloc` up front and reusing it across multiple calls to `AsyncFileDescriptor.read`. """ ptr = await self.ram.malloc(bytes, count) valid, _ = await self.read(ptr) return await valid.read() async def write(self, buf: Pointer) -> t.Tuple[Pointer, Pointer]: """Call `FileDescriptor.write` without blocking the thread. Note that this doesn't retry partial writes, which are always a possibility, so you should make sure to do that yourself, or use `AsyncFileDescriptor.write_all`. """ while True: await self.epolled.wait_for(EPOLL.OUT|EPOLL.ERR) current_events = self.epolled.get_current_events(EPOLL.OUT|EPOLL.ERR) try: return await self.handle.write(buf) except OSError as e: self.epolled.consume(current_events) if e.errno == errno.EAGAIN: self.epolled.status.negedge(EPOLL.OUT|EPOLL.ERR) else: self.epolled.status.posedge(EPOLL.ERR) raise else: self.epolled.consume(current_events) self.epolled.status.posedge(EPOLL.OUT) async def write_all(self, to_write: Pointer) -> None: """Write all of this pointer to the fd, retrying on partial writes until complete. You might want to not use this, if you want to react to a partial write in some special way. For example, `rsyscall.memory.socket_transport.SocketMemoryTransport` starts a `recv` immediately after a partial write, before retrying the write, for increased parallelism. """ while to_write.size() > 0: written, to_write = await self.write(to_write) async def write_all_bytes(self, buf: bytes) -> None: """Write all these bytes to the fd, retrying on partial writes until complete. This allocates and performs a store to memory on each call. This is inefficient if you already have an initialized pointer for the value, or if you already have an allocated buffer that you can use to store the value, which you can also reuse for other values. In those cases, you might want to use `AsyncFileDescriptor.write_all`. """ ptr = await self.ram.ptr(buf) await self.write_all(ptr) @t.overload async def accept(self, flags: SOCK=SOCK.NONE) -> FileDescriptor: ... @t.overload async def accept(self, flags: SOCK, addr: WrittenPointer[Sockbuf[T_sockaddr]] ) -> t.Tuple[FileDescriptor, WrittenPointer[Sockbuf[T_sockaddr]]]: ... async def accept(self, flags: SOCK=SOCK.NONE, addr: t.Optional[WrittenPointer[Sockbuf[T_sockaddr]]]=None ) -> t.Union[FileDescriptor, t.Tuple[FileDescriptor, WrittenPointer[Sockbuf[T_sockaddr]]]]: "Call accept without blocking the thread." while True: await self.epolled.wait_for(EPOLL.IN|EPOLL.HUP|EPOLL.ERR) current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.HUP|EPOLL.ERR) try: if addr is None: return (await self.handle.accept(flags)) else: return (await self.handle.accept(flags, addr)) except OSError as e: self.epolled.consume(current_events) if e.errno == errno.EAGAIN: self.epolled.status.negedge(EPOLL.IN|EPOLL.HUP|EPOLL.ERR) else: self.epolled.status.posedge(EPOLL.HUP|EPOLL.ERR) raise else: self.epolled.consume(current_events) self.epolled.status.posedge(EPOLL.IN) async def accept_addr(self, flags: SOCK=SOCK.NONE) -> t.Tuple[FileDescriptor, Sockaddr]: "Call accept with a buffer for the address, and return the resulting fd and address." written_sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(SockaddrStorage))) fd, sockbuf = await self.accept(flags, written_sockbuf) addr = (await (await sockbuf.read()).buf.read()).parse() return fd, addr async def bind(self, addr: WrittenPointer[Sockaddr]) -> None: "Call bind; bind already doesn't block the thread." await self.handle.bind(addr) async def connect(self, addr: WrittenPointer[Sockaddr]) -> None: "Call connect without blocking the thread." try: # Note that an unconnected socket, at least with AF.INET SOCK.STREAM, # will have EPOLL.OUT|EPOLL.HUP set when added to epoll, before calling connect. current_events = self.epolled.get_current_events(EPOLL.OUT) await self.handle.connect(addr) except OSError as e: self.epolled.consume(current_events) self.epolled.status.negedge(EPOLL.OUT) if e.errno == errno.EINPROGRESS: await self.epolled.wait_for(EPOLL.OUT) current_events = self.epolled.get_current_events(EPOLL.OUT) sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(Int32))) retbuf = await self.handle.getsockopt(SOL.SOCKET, SO.ERROR, sockbuf) err = await (await retbuf.read()).buf.read() self.epolled.consume(current_events) if err == 0: self.epolled.status.posedge(EPOLL.OUT) else: self.epolled.status.posedge(EPOLL.ERR) try: raise OSError(err, os.strerror(err)) except OSError as exn: exn.filename = self.handle if hasattr(addr, 'value'): exn.filename2 = addr.value raise else: raise else: self.epolled.consume(current_events) self.epolled.status.posedge(EPOLL.OUT) def with_handle(self, fd: FileDescriptor) -> AsyncFileDescriptor: """Return a new AFD using this new FD handle for making syscalls. This is useful when we want to change what task we're making syscalls in when using this AFD. """ return AsyncFileDescriptor(self.ram, fd, self.epolled) async def close(self) -> None: "Remove this FD from Epoll and invalidate the FD handle." await self.epolled.delete() await self.handle.invalidate() async def __aenter__(self) -> None: pass async def __aexit__(self, *args, **kwargs) -> None: await self.close()
Static methods
async def make(epoller: Epoller, ram: RAM, fd: FileDescriptor) ‑> AsyncFileDescriptor
-
Make an AsyncFileDescriptor; make sure to call this with only O.NONBLOCK file descriptors.
It won't actually break anything if this is called with file descriptors not in NONBLOCK mode; it just means that they'll block when we go to read, which is probably not what the user wants.
Expand source code Browse git
@staticmethod async def make(epoller: Epoller, ram: RAM, fd: FileDescriptor) -> AsyncFileDescriptor: """Make an AsyncFileDescriptor; make sure to call this with only O.NONBLOCK file descriptors. It won't actually break anything if this is called with file descriptors not in NONBLOCK mode; it just means that they'll block when we go to read, which is probably not what the user wants. """ epolled = await epoller.register( fd, EPOLL.IN|EPOLL.OUT|EPOLL.RDHUP|EPOLL.PRI|EPOLL.ERR|EPOLL.HUP|EPOLL.ET, ) return AsyncFileDescriptor(ram, fd, epolled)
Instance variables
var handle
-
The underlying FileDescriptor for this AFD, used for all system calls
Methods
async def make_new_afd(self, fd: FileDescriptor) ‑> AsyncFileDescriptor
-
Use the Epoller and RAM in this AsyncFD to make a new
AsyncFileDescriptor
forfd
Make sure that
fd
is already in non-blocking mode; such as by accepting it with theSOCK.NONBLOCK
flag.This doesn't steal any resources from the original AFD; it's just a convenience method, most useful when calling accept() and wanting to create new AFDs out of the resulting FDs.
Expand source code Browse git
async def make_new_afd(self, fd: FileDescriptor) -> AsyncFileDescriptor: """Use the Epoller and RAM in this AsyncFD to make a new `AsyncFileDescriptor` for `fd` Make sure that `fd` is already in non-blocking mode; such as by accepting it with the `SOCK.NONBLOCK` flag. This doesn't steal any resources from the original AFD; it's just a convenience method, most useful when calling accept() and wanting to create new AFDs out of the resulting FDs. """ return await AsyncFileDescriptor.make(self.epolled.epoller, self.ram, fd)
async def wait_for_rdhup(self) ‑> NoneType
-
Call epoll_wait until this file descriptor has a hangup.
Expand source code Browse git
async def wait_for_rdhup(self) -> None: "Call epoll_wait until this file descriptor has a hangup." await self.epolled.wait_for(EPOLL.RDHUP|EPOLL.HUP)
async def read(self, ptr: Pointer) ‑> Tuple[Pointer, Pointer]
-
Call
IOFileDescriptor.read()
without blocking the thread.Expand source code Browse git
async def read(self, ptr: Pointer) -> t.Tuple[Pointer, Pointer]: "Call `FileDescriptor.read` without blocking the thread." while True: await self.epolled.wait_for(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR) current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR) try: return (await self.handle.read(ptr)) except OSError as e: self.epolled.consume(current_events) if e.errno == errno.EAGAIN: self.epolled.status.negedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP|EPOLL.ERR) else: self.epolled.status.posedge(EPOLL.ERR) raise else: self.epolled.consume(current_events) self.epolled.status.posedge(EPOLL.IN|EPOLL.RDHUP|EPOLL.HUP)
async def read_some_bytes(self, count: int = 4096) ‑> bytes
-
Read at most count bytes; possibly less, if we have a partial read.
This allocates on each call. For some applications, you may want to avoid the cost of allocation, by instead allocating a buffer with
Thread.malloc()
up front and reusing it across multiple calls toAsyncFileDescriptor.read()
.Expand source code Browse git
async def read_some_bytes(self, count: int=4096) -> bytes: """Read at most count bytes; possibly less, if we have a partial read. This allocates on each call. For some applications, you may want to avoid the cost of allocation, by instead allocating a buffer with `Thread.malloc` up front and reusing it across multiple calls to `AsyncFileDescriptor.read`. """ ptr = await self.ram.malloc(bytes, count) valid, _ = await self.read(ptr) return await valid.read()
async def write(self, buf: Pointer) ‑> Tuple[Pointer, Pointer]
-
Call
IOFileDescriptor.write()
without blocking the thread.Note that this doesn't retry partial writes, which are always a possibility, so you should make sure to do that yourself, or use
AsyncFileDescriptor.write_all()
.Expand source code Browse git
async def write(self, buf: Pointer) -> t.Tuple[Pointer, Pointer]: """Call `FileDescriptor.write` without blocking the thread. Note that this doesn't retry partial writes, which are always a possibility, so you should make sure to do that yourself, or use `AsyncFileDescriptor.write_all`. """ while True: await self.epolled.wait_for(EPOLL.OUT|EPOLL.ERR) current_events = self.epolled.get_current_events(EPOLL.OUT|EPOLL.ERR) try: return await self.handle.write(buf) except OSError as e: self.epolled.consume(current_events) if e.errno == errno.EAGAIN: self.epolled.status.negedge(EPOLL.OUT|EPOLL.ERR) else: self.epolled.status.posedge(EPOLL.ERR) raise else: self.epolled.consume(current_events) self.epolled.status.posedge(EPOLL.OUT)
async def write_all(self, to_write: Pointer) ‑> NoneType
-
Write all of this pointer to the fd, retrying on partial writes until complete.
You might want to not use this, if you want to react to a partial write in some special way. For example,
SocketMemoryTransport
starts arecv
immediately after a partial write, before retrying the write, for increased parallelism.Expand source code Browse git
async def write_all(self, to_write: Pointer) -> None: """Write all of this pointer to the fd, retrying on partial writes until complete. You might want to not use this, if you want to react to a partial write in some special way. For example, `rsyscall.memory.socket_transport.SocketMemoryTransport` starts a `recv` immediately after a partial write, before retrying the write, for increased parallelism. """ while to_write.size() > 0: written, to_write = await self.write(to_write)
async def write_all_bytes(self, buf: bytes) ‑> NoneType
-
Write all these bytes to the fd, retrying on partial writes until complete.
This allocates and performs a store to memory on each call. This is inefficient if you already have an initialized pointer for the value, or if you already have an allocated buffer that you can use to store the value, which you can also reuse for other values. In those cases, you might want to use
AsyncFileDescriptor.write_all()
.Expand source code Browse git
async def write_all_bytes(self, buf: bytes) -> None: """Write all these bytes to the fd, retrying on partial writes until complete. This allocates and performs a store to memory on each call. This is inefficient if you already have an initialized pointer for the value, or if you already have an allocated buffer that you can use to store the value, which you can also reuse for other values. In those cases, you might want to use `AsyncFileDescriptor.write_all`. """ ptr = await self.ram.ptr(buf) await self.write_all(ptr)
async def accept(self, flags: SOCK = SOCK.NONE, addr: t.Optional[WrittenPointer[Sockbuf[T_sockaddr]]] = None) ‑> Union[FileDescriptor, Tuple[FileDescriptor, WrittenPointer[Sockbuf[~T_sockaddr]]]]
-
Call accept without blocking the thread.
Expand source code Browse git
async def accept(self, flags: SOCK=SOCK.NONE, addr: t.Optional[WrittenPointer[Sockbuf[T_sockaddr]]]=None ) -> t.Union[FileDescriptor, t.Tuple[FileDescriptor, WrittenPointer[Sockbuf[T_sockaddr]]]]: "Call accept without blocking the thread." while True: await self.epolled.wait_for(EPOLL.IN|EPOLL.HUP|EPOLL.ERR) current_events = self.epolled.get_current_events(EPOLL.IN|EPOLL.HUP|EPOLL.ERR) try: if addr is None: return (await self.handle.accept(flags)) else: return (await self.handle.accept(flags, addr)) except OSError as e: self.epolled.consume(current_events) if e.errno == errno.EAGAIN: self.epolled.status.negedge(EPOLL.IN|EPOLL.HUP|EPOLL.ERR) else: self.epolled.status.posedge(EPOLL.HUP|EPOLL.ERR) raise else: self.epolled.consume(current_events) self.epolled.status.posedge(EPOLL.IN)
async def accept_addr(self, flags: SOCK = SOCK.NONE) ‑> Tuple[FileDescriptor, Sockaddr]
-
Call accept with a buffer for the address, and return the resulting fd and address.
Expand source code Browse git
async def accept_addr(self, flags: SOCK=SOCK.NONE) -> t.Tuple[FileDescriptor, Sockaddr]: "Call accept with a buffer for the address, and return the resulting fd and address." written_sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(SockaddrStorage))) fd, sockbuf = await self.accept(flags, written_sockbuf) addr = (await (await sockbuf.read()).buf.read()).parse() return fd, addr
async def bind(self, addr: WrittenPointer[Sockaddr]) ‑> NoneType
-
Call bind; bind already doesn't block the thread.
Expand source code Browse git
async def bind(self, addr: WrittenPointer[Sockaddr]) -> None: "Call bind; bind already doesn't block the thread." await self.handle.bind(addr)
async def connect(self, addr: WrittenPointer[Sockaddr]) ‑> NoneType
-
Call connect without blocking the thread.
Expand source code Browse git
async def connect(self, addr: WrittenPointer[Sockaddr]) -> None: "Call connect without blocking the thread." try: # Note that an unconnected socket, at least with AF.INET SOCK.STREAM, # will have EPOLL.OUT|EPOLL.HUP set when added to epoll, before calling connect. current_events = self.epolled.get_current_events(EPOLL.OUT) await self.handle.connect(addr) except OSError as e: self.epolled.consume(current_events) self.epolled.status.negedge(EPOLL.OUT) if e.errno == errno.EINPROGRESS: await self.epolled.wait_for(EPOLL.OUT) current_events = self.epolled.get_current_events(EPOLL.OUT) sockbuf = await self.ram.ptr(Sockbuf(await self.ram.malloc(Int32))) retbuf = await self.handle.getsockopt(SOL.SOCKET, SO.ERROR, sockbuf) err = await (await retbuf.read()).buf.read() self.epolled.consume(current_events) if err == 0: self.epolled.status.posedge(EPOLL.OUT) else: self.epolled.status.posedge(EPOLL.ERR) try: raise OSError(err, os.strerror(err)) except OSError as exn: exn.filename = self.handle if hasattr(addr, 'value'): exn.filename2 = addr.value raise else: raise else: self.epolled.consume(current_events) self.epolled.status.posedge(EPOLL.OUT)
def with_handle(self, fd: FileDescriptor) ‑> AsyncFileDescriptor
-
Return a new AFD using this new FD handle for making syscalls.
This is useful when we want to change what task we're making syscalls in when using this AFD.
Expand source code Browse git
def with_handle(self, fd: FileDescriptor) -> AsyncFileDescriptor: """Return a new AFD using this new FD handle for making syscalls. This is useful when we want to change what task we're making syscalls in when using this AFD. """ return AsyncFileDescriptor(self.ram, fd, self.epolled)
async def close(self) ‑> NoneType
-
Remove this FD from Epoll and invalidate the FD handle.
Expand source code Browse git
async def close(self) -> None: "Remove this FD from Epoll and invalidate the FD handle." await self.epolled.delete() await self.handle.invalidate()
class AsyncChildProcess (process: ChildProcess, ram: RAM, sigchld_sigfd: AsyncSignalfd)
-
A child process which can be monitored without blocking the thread
Expand source code Browse git
class AsyncChildProcess: "A child process which can be monitored without blocking the thread" def __init__(self, process: ChildProcess, ram: RAM, sigchld_sigfd: AsyncSignalfd) -> None: self.process = process self.ram = ram self.sigchld_sigfd = sigchld_sigfd self.next_sigchld: t.Optional[Event] = None def __repr__(self) -> str: name = type(self).__name__ return f'{name}({self.process})' async def _waitid_nohang(self) -> t.Optional[ChildState]: if self.process.unread_siginfo: # if we performed a waitid before, and it contains an event, we don't need to # waitid again. result = await self.process.read_siginfo() # but if there's no event in this previous waitid, we need to waitid now; if # we don't, we might erroneously block waiting for a SIGCHLD that happened # between the previous waitid and now, and was consumed at that time. if result: return result siginfo_buf = await self.ram.malloc(Siginfo) await self.process.waitid(W.EXITED|W.STOPPED|W.CONTINUED|W.NOHANG, siginfo_buf) return await self.process.read_siginfo() async def waitpid(self, options: W) -> ChildState: "Wait for a child state change in this child, like waitid(P.PID)" if options & W.EXITED and self.process.death_state: # TODO this is not really the actual behavior of waitpid... # if the child is already dead we'd get an ECHLD not the death state change again. return self.process.death_state while True: # If a previous call has given us a next_sigchld to wait on, then wait we shall. if self.next_sigchld: await self.next_sigchld.wait() # we shouldn't wait for SIGCHLD the next time we're called, we should eagerly call # waitid, since there may still be state changes to fetch. self.next_sigchld = None # We have to save this signal event before calling waitid, otherwise we may deadlock: If # a SIGCHLD is delivered while we're calling waitid, then saved_sigchld will be # different from self.sigchld_sigfd.next_signal after the waitid; and if we use the # value of self.sigchld_sigfd.next_signal after the waitid, we'll be waiting for a # SIGCHLD that will never come. saved_sigchld = self.sigchld_sigfd.next_signal state_change = await self._waitid_nohang() if state_change is not None: if state_change.state(options): return state_change else: # TODO we shouldn't discard the state change here if we're not waiting for it; # unfortunately doing it right will require a lot of refactoring of waitid pass else: # we know for sure that there will only be state changes fetchable by waitid after # waiting for this SIGCHLD event. note that this event may have already happened, if # we received a SIGCHLD while calling waitid. self.next_sigchld = saved_sigchld async def wait(self, options: W=W.EXITED|W.STOPPED|W.CONTINUED) -> ChildState: return await self.waitpid(options) async def check(self) -> ChildState: "Wait for this child to die, and once it does, throw `rsyscall.sys.wait.UncleanExit` if it didn't exit cleanly" try: death = await self.waitpid(W.EXITED) except trio.Cancelled: await self.kill(SIG.TERM) raise if not death.clean(): if self.process.command: raise UncleanExit(death, self.process.command) else: raise UncleanExit(death) pass return death async def kill(self, sig: SIG=SIG.KILL) -> None: "Send a signal to this child" if self.process.unread_siginfo: await self.process.read_siginfo() await self.process.kill(sig) async def killpg(self, sig: SIG=SIG.KILL) -> None: "Send a signal to the process group corresponding to this child" if self.process.unread_siginfo: await self.process.read_siginfo() await self.process.killpg(sig) async def __aenter__(self) -> None: pass async def __aexit__(self, *args, **kwargs) -> None: if self.process.death_state: pass else: await self.kill() await self.waitpid(W.EXITED)
Methods
async def waitpid(self, options: W) ‑> ChildState
-
Wait for a child state change in this child, like waitid(P.PID)
Expand source code Browse git
async def waitpid(self, options: W) -> ChildState: "Wait for a child state change in this child, like waitid(P.PID)" if options & W.EXITED and self.process.death_state: # TODO this is not really the actual behavior of waitpid... # if the child is already dead we'd get an ECHLD not the death state change again. return self.process.death_state while True: # If a previous call has given us a next_sigchld to wait on, then wait we shall. if self.next_sigchld: await self.next_sigchld.wait() # we shouldn't wait for SIGCHLD the next time we're called, we should eagerly call # waitid, since there may still be state changes to fetch. self.next_sigchld = None # We have to save this signal event before calling waitid, otherwise we may deadlock: If # a SIGCHLD is delivered while we're calling waitid, then saved_sigchld will be # different from self.sigchld_sigfd.next_signal after the waitid; and if we use the # value of self.sigchld_sigfd.next_signal after the waitid, we'll be waiting for a # SIGCHLD that will never come. saved_sigchld = self.sigchld_sigfd.next_signal state_change = await self._waitid_nohang() if state_change is not None: if state_change.state(options): return state_change else: # TODO we shouldn't discard the state change here if we're not waiting for it; # unfortunately doing it right will require a lot of refactoring of waitid pass else: # we know for sure that there will only be state changes fetchable by waitid after # waiting for this SIGCHLD event. note that this event may have already happened, if # we received a SIGCHLD while calling waitid. self.next_sigchld = saved_sigchld
async def wait(self, options: W = W.CONTINUED|EXITED|STOPPED) ‑> ChildState
-
Expand source code Browse git
async def wait(self, options: W=W.EXITED|W.STOPPED|W.CONTINUED) -> ChildState: return await self.waitpid(options)
async def check(self) ‑> ChildState
-
Wait for this child to die, and once it does, throw
UncleanExit
if it didn't exit cleanlyExpand source code Browse git
async def check(self) -> ChildState: "Wait for this child to die, and once it does, throw `rsyscall.sys.wait.UncleanExit` if it didn't exit cleanly" try: death = await self.waitpid(W.EXITED) except trio.Cancelled: await self.kill(SIG.TERM) raise if not death.clean(): if self.process.command: raise UncleanExit(death, self.process.command) else: raise UncleanExit(death) pass return death
async def kill(self, sig: SIG = SIG.KILL) ‑> NoneType
-
Send a signal to this child
Expand source code Browse git
async def kill(self, sig: SIG=SIG.KILL) -> None: "Send a signal to this child" if self.process.unread_siginfo: await self.process.read_siginfo() await self.process.kill(sig)
async def killpg(self, sig: SIG = SIG.KILL) ‑> NoneType
-
Send a signal to the process group corresponding to this child
Expand source code Browse git
async def killpg(self, sig: SIG=SIG.KILL) -> None: "Send a signal to the process group corresponding to this child" if self.process.unread_siginfo: await self.process.read_siginfo() await self.process.killpg(sig)
class Pointer (mapping: MemoryMapping, transport: MemoryGateway, serializer: Serializer[T], allocation: AllocationInterface, typ: t.Type[T])
-
An owning handle for some piece of memory.
More precisely, this is an owning handle for an allocation in some memory mapping. We're explicitly representing memory mappings, rather than glossing over them and pretending that the address space is flat and uniform. If we have two mappings for the same file, we can translate this Pointer between them.
As an implication of owning an allocation, we also know the length of that allocation, which is the length of the range of memory that it's valid to operate on through this pointer. We retrieve this through Pointer.size and use it in many places; anywhere we take a Pointer, if there's some question about what size to operate on, we operate on the full size of the pointer. Reducing the amount of memory to operate on can be done through Pointer.split.
We also know the type of the region of memory; that is, how to interpret this region of memory. This is useful at type-checking time to check that we aren't passing pointers to memory of the wrong type. At runtime, the type is reified as a serializer, which allows us to translate a value of the type to and from bytes.
We also hold a transport which will allow us to read and write the memory we own. Combined with the serializer, this allows us to write and read values of the appropriate type to and from memory using the Pointer.write and Pointer.read methods.
Finally, pointers have a "valid" bit which says whether the Pointer can be used. We say that a method "consumes" a pointer if it will invalidate that pointer.
Most of the methods manipulating the pointer are "linear". That is, they consume the pointer object they're called on and return a new pointer object to use. This forces the user to be more careful with tracking the state of the pointer; and also allows us to represent some state changes with by changing the type of the pointer, in particular Pointer.write.
See also the inheriting class WrittenPointer
Expand source code Browse git
@dataclass(eq=False) class Pointer(t.Generic[T]): """An owning handle for some piece of memory. More precisely, this is an owning handle for an allocation in some memory mapping. We're explicitly representing memory mappings, rather than glossing over them and pretending that the address space is flat and uniform. If we have two mappings for the same file, we can translate this Pointer between them. As an implication of owning an allocation, we also know the length of that allocation, which is the length of the range of memory that it's valid to operate on through this pointer. We retrieve this through Pointer.size and use it in many places; anywhere we take a Pointer, if there's some question about what size to operate on, we operate on the full size of the pointer. Reducing the amount of memory to operate on can be done through Pointer.split. We also know the type of the region of memory; that is, how to interpret this region of memory. This is useful at type-checking time to check that we aren't passing pointers to memory of the wrong type. At runtime, the type is reified as a serializer, which allows us to translate a value of the type to and from bytes. We also hold a transport which will allow us to read and write the memory we own. Combined with the serializer, this allows us to write and read values of the appropriate type to and from memory using the Pointer.write and Pointer.read methods. Finally, pointers have a "valid" bit which says whether the Pointer can be used. We say that a method "consumes" a pointer if it will invalidate that pointer. Most of the methods manipulating the pointer are "linear". That is, they consume the pointer object they're called on and return a new pointer object to use. This forces the user to be more careful with tracking the state of the pointer; and also allows us to represent some state changes with by changing the type of the pointer, in particular Pointer.write. See also the inheriting class WrittenPointer """ __slots__ = ('mapping', 'transport', 'serializer', 'allocation', 'valid', 'typ') mapping: MemoryMapping transport: MemoryGateway serializer: Serializer[T] allocation: AllocationInterface typ: t.Type[T] valid: bool def __init__(self, mapping: MemoryMapping, transport: MemoryGateway, serializer: Serializer[T], allocation: AllocationInterface, typ: t.Type[T], ) -> None: self.mapping = mapping self.transport = transport self.serializer = serializer self.allocation = allocation self.typ = typ self.valid = True async def write(self, value: T) -> WrittenPointer[T]: "Write this value to this pointer, consuming it and returning a new WrittenPointer" self._validate() value_bytes = self.serializer.to_bytes(value) if len(value_bytes) > self.size(): raise Exception("value_bytes is too long", len(value_bytes), "for this typed pointer of size", self.size()) await self.transport.write(self, value_bytes) return self._wrote(value) async def read(self) -> T: "Read the value pointed to by this pointer" self._validate() value = await self.transport.read(self) return self.serializer.from_bytes(value) def size(self) -> int: """Return the size of this pointer's allocation in bytes This is mostly used by syscalls and passed to the kernel, so that the kernel knows the size of the buffer that it's been passed. To reduce the size of a buffer passed to the kernel, use Pointer.split. """ return self.allocation.size() def split(self, size: int) -> t.Tuple[Pointer, Pointer]: """Invalidate this pointer and split it into two adjacent pointers This is primarily used by syscalls that write to one contiguous part of a buffer and leave the rest unused. They split the pointer into a "used" part and an "unused" part, and return both parts. """ self._validate() # TODO uhhhh if split throws an exception... don't we need to free... or something... self.valid = False # TODO we should only allow split if we are the only reference to this allocation alloc1, alloc2 = self.allocation.split(size) first = self._with_alloc(alloc1) # TODO should degrade this pointer to raw bytes or something, or maybe no type at all second = self._with_alloc(alloc2) return first, second def merge(self, ptr: Pointer) -> Pointer: """Merge two pointers produced by split back into a single pointer The two pointers passed in are invalidated. This is primarily used by the user to re-assemble a buffer that was split by a syscall. """ self._validate() ptr._validate() # TODO should assert that these two pointers both serialize the same thing # although they could be different types of serializers... self.valid = False ptr.valid = False # TODO we should only allow merge if we are the only reference to this allocation alloc = self.allocation.merge(ptr.allocation) return self._with_alloc(alloc) def __add__(self, right: Pointer[T]) -> Pointer[T]: "left + right desugars to left.merge(right)" return self.merge(right) def __radd__(self, left: t.Optional[Pointer[T]]) -> Pointer[T]: """"left += right" desugars to "left = (left + right) if left is not None else right" With this, you can initialize a variable to None, then merge pointers into it in a loop. This is especially useful when trying to write an entire buffer, or fill an entire buffer by reading. """ if left is None: return self else: return left + self @property def near(self) -> rsyscall.near.Address: """Return the raw memory address referred to by this Pointer This is mostly used by syscalls and passed to the kernel, so that the kernel knows the start of the buffer to read to or write from. """ # TODO hmm should maybe validate that this fits in the bounds of the mapping I guess self._validate() try: return self.mapping.near.as_address() + self.allocation.offset() except UseAfterFreeError as e: raise UseAfterFreeError( "Allocation inside this Pointer", self, "is freed, but the pointer is still valid; someone violated some invariants", ) from e def check_address_space(self, task: rsyscall.far.Task) -> None: if task.address_space != self.mapping.task.address_space: raise rsyscall.far.AddressSpaceMismatchError(task.address_space, self.mapping.task.address_space) @contextlib.contextmanager def borrow(self, task: rsyscall.far.Task) -> t.Iterator[rsyscall.near.Address]: """Pin the address of this pointer, and yield the pointer's raw memory address We validate this pointer, and pin it in memory so that it can't be moved or deleted while it's being used. This is mostly used by syscalls and passed to the kernel, so that the kernel knows the start of the buffer to read to or write from. """ # TODO actual tracking of pointer references is not yet implemented # we should have a flag or lock to indicate that this pointer shouldn't be moved or deleted, # while it's being borrowed. # TODO rename this to pinned # TODO make this the only way to get .near self._validate() self.check_address_space(task) yield self.near def _validate(self) -> None: if not self.valid: raise UseAfterFreeError("handle is no longer valid") def free(self) -> None: """Free this pointer, invalidating it and releasing the underlying allocation. It isn't necessary to explicitly call this, because the pointer will be freed on GC. But you can call it anyway if, for example, the pointer will be referenced for long after it is done being used. """ if self.valid: self.valid = False self.allocation.free() def __del__(self) -> None: # This isn't strictly necessary because the allocation will free itself on __del__. # But, that will only happen when *all* pointers referring to the allocation are collected; # not just the valid one. # So, this ensures GC is a bit more prompt. # Oh, wait. The real reason we need this is because the Arena stores references to the allocation. # TODO We should fix that. self.free() def split_from_end(self, size: int, alignment: int) -> t.Tuple[Pointer, Pointer]: """Split from the end of this pointer, such that the right pointer is aligned to `alignment` Used by write_to_end; mostly only useful for preparing stacks. """ extra_to_remove = (int(self.near) + size) % alignment return self.split(self.size() - size - extra_to_remove) async def write_to_end(self, value: T, alignment: int) -> t.Tuple[Pointer[T], WrittenPointer[T]]: """Write a value to the end of the range of this pointer Splits the pointer, and returns both parts. This function is only useful for preparing stacks. Would be nice to figure out either a more generic way to prep stacks, or to figure out more things that write_to_end could be used for. """ value_bytes = self.serializer.to_bytes(value) rest, write_buf = self.split_from_end(len(value_bytes), alignment) written = await write_buf.write(value) return rest, written def __repr__(self) -> str: name = type(self).__name__ typname = self.typ.__name__ try: return f"{name}[{typname}]({self.near}, {self.size()})" except UseAfterFreeError: return f"{name}[{typname}](valid={self.valid}, {self.mapping}, {self.allocation}, {self.serializer})" #### Various ways to create new Pointers by changing one thing about the old pointer. def _with_mapping(self: T_pointer, mapping: MemoryMapping) -> T_pointer: if type(self) is not Pointer: raise Exception("subclasses of Pointer must override _with_mapping") if mapping.file is not self.mapping.file: raise Exception("can only move pointer between two mappings of the same file") # we don't have a clean model for referring to the same object through multiple mappings. # this is a major TODO. # at least two ways to achieve it: # - have Pointers become multi-mapping super-pointers, which can be valid in multiple address spaces # - break our linearity constraint on pointers, allowing multiple pointers for the same allocation; # this is difficult because split() is only easy to implement due to linearity. # right here, we just linearly move the pointer to a new mapping self._validate() self.valid = False return type(self)(mapping, self.transport, self.serializer, self.allocation, self.typ) def _with_alloc(self, allocation: AllocationInterface) -> Pointer: return Pointer(self.mapping, self.transport, self.serializer, allocation, self.typ) def _reinterpret(self, serializer: Serializer[U], typ: t.Type[U]) -> Pointer[U]: # TODO how can we check to make sure we don't reinterpret in wacky ways? # maybe we should only be able to reinterpret in ways that are allowed by the serializer? # so maybe it's a method on the Serializer? cast_to(Type)? self._validate() self.valid = False return Pointer(self.mapping, self.transport, serializer, self.allocation, typ) def _readable(self) -> ReadablePointer[T]: self._validate() self.valid = False return ReadablePointer(self.mapping, self.transport, self.serializer, self.allocation, self.typ) def readable_split(self, size: int) -> t.Tuple[ReadablePointer[T], Pointer]: left, right = self.split(size) return left._readable(), right def _linearize(self) -> LinearPointer[T]: self._validate() self.valid = False return LinearPointer(self.mapping, self.transport, self.serializer, self.allocation, self.typ) def unsafe(self) -> ReadablePointer[T]: "Get a ReadablePointer from this pointer, even though it might not be initialized" return self._readable() def _wrote(self, value: T) -> WrittenPointer[T]: "Assert we wrote this value to this pointer, and return the appropriate new WrittenPointer" self._validate() self.valid = False return WrittenPointer(self.mapping, self.transport, value, self.serializer, self.allocation, self.typ)
Ancestors
- typing.Generic
Subclasses
Instance variables
var near : Address
-
Return the raw memory address referred to by this Pointer
This is mostly used by syscalls and passed to the kernel, so that the kernel knows the start of the buffer to read to or write from.
Expand source code Browse git
@property def near(self) -> rsyscall.near.Address: """Return the raw memory address referred to by this Pointer This is mostly used by syscalls and passed to the kernel, so that the kernel knows the start of the buffer to read to or write from. """ # TODO hmm should maybe validate that this fits in the bounds of the mapping I guess self._validate() try: return self.mapping.near.as_address() + self.allocation.offset() except UseAfterFreeError as e: raise UseAfterFreeError( "Allocation inside this Pointer", self, "is freed, but the pointer is still valid; someone violated some invariants", ) from e
var allocation : AllocationInterface
-
Return an attribute of instance, which is of type owner.
var mapping : MemoryMapping
-
Return an attribute of instance, which is of type owner.
var serializer : Serializer[~T]
-
Return an attribute of instance, which is of type owner.
var transport : MemoryGateway
-
Return an attribute of instance, which is of type owner.
var typ : Type[~T]
-
Return an attribute of instance, which is of type owner.
var valid : bool
-
Return an attribute of instance, which is of type owner.
Methods
async def write(self, value: T) ‑> WrittenPointer[~T]
-
Write this value to this pointer, consuming it and returning a new WrittenPointer
Expand source code Browse git
async def write(self, value: T) -> WrittenPointer[T]: "Write this value to this pointer, consuming it and returning a new WrittenPointer" self._validate() value_bytes = self.serializer.to_bytes(value) if len(value_bytes) > self.size(): raise Exception("value_bytes is too long", len(value_bytes), "for this typed pointer of size", self.size()) await self.transport.write(self, value_bytes) return self._wrote(value)
async def read(self) ‑> ~T
-
Read the value pointed to by this pointer
Expand source code Browse git
async def read(self) -> T: "Read the value pointed to by this pointer" self._validate() value = await self.transport.read(self) return self.serializer.from_bytes(value)
def size(self) ‑> int
-
Return the size of this pointer's allocation in bytes
This is mostly used by syscalls and passed to the kernel, so that the kernel knows the size of the buffer that it's been passed. To reduce the size of a buffer passed to the kernel, use Pointer.split.
Expand source code Browse git
def size(self) -> int: """Return the size of this pointer's allocation in bytes This is mostly used by syscalls and passed to the kernel, so that the kernel knows the size of the buffer that it's been passed. To reduce the size of a buffer passed to the kernel, use Pointer.split. """ return self.allocation.size()
def split(self, size: int) ‑> Tuple[Pointer, Pointer]
-
Invalidate this pointer and split it into two adjacent pointers
This is primarily used by syscalls that write to one contiguous part of a buffer and leave the rest unused. They split the pointer into a "used" part and an "unused" part, and return both parts.
Expand source code Browse git
def split(self, size: int) -> t.Tuple[Pointer, Pointer]: """Invalidate this pointer and split it into two adjacent pointers This is primarily used by syscalls that write to one contiguous part of a buffer and leave the rest unused. They split the pointer into a "used" part and an "unused" part, and return both parts. """ self._validate() # TODO uhhhh if split throws an exception... don't we need to free... or something... self.valid = False # TODO we should only allow split if we are the only reference to this allocation alloc1, alloc2 = self.allocation.split(size) first = self._with_alloc(alloc1) # TODO should degrade this pointer to raw bytes or something, or maybe no type at all second = self._with_alloc(alloc2) return first, second
def merge(self, ptr: Pointer) ‑> Pointer
-
Merge two pointers produced by split back into a single pointer
The two pointers passed in are invalidated.
This is primarily used by the user to re-assemble a buffer that was split by a syscall.
Expand source code Browse git
def merge(self, ptr: Pointer) -> Pointer: """Merge two pointers produced by split back into a single pointer The two pointers passed in are invalidated. This is primarily used by the user to re-assemble a buffer that was split by a syscall. """ self._validate() ptr._validate() # TODO should assert that these two pointers both serialize the same thing # although they could be different types of serializers... self.valid = False ptr.valid = False # TODO we should only allow merge if we are the only reference to this allocation alloc = self.allocation.merge(ptr.allocation) return self._with_alloc(alloc)
def check_address_space(self, task: Task) ‑> NoneType
-
Expand source code Browse git
def check_address_space(self, task: rsyscall.far.Task) -> None: if task.address_space != self.mapping.task.address_space: raise rsyscall.far.AddressSpaceMismatchError(task.address_space, self.mapping.task.address_space)
def borrow(self, task: Task) ‑> Iterator[Address]
-
Pin the address of this pointer, and yield the pointer's raw memory address
We validate this pointer, and pin it in memory so that it can't be moved or deleted while it's being used.
This is mostly used by syscalls and passed to the kernel, so that the kernel knows the start of the buffer to read to or write from.
Expand source code Browse git
@contextlib.contextmanager def borrow(self, task: rsyscall.far.Task) -> t.Iterator[rsyscall.near.Address]: """Pin the address of this pointer, and yield the pointer's raw memory address We validate this pointer, and pin it in memory so that it can't be moved or deleted while it's being used. This is mostly used by syscalls and passed to the kernel, so that the kernel knows the start of the buffer to read to or write from. """ # TODO actual tracking of pointer references is not yet implemented # we should have a flag or lock to indicate that this pointer shouldn't be moved or deleted, # while it's being borrowed. # TODO rename this to pinned # TODO make this the only way to get .near self._validate() self.check_address_space(task) yield self.near
def free(self) ‑> NoneType
-
Free this pointer, invalidating it and releasing the underlying allocation.
It isn't necessary to explicitly call this, because the pointer will be freed on GC. But you can call it anyway if, for example, the pointer will be referenced for long after it is done being used.
Expand source code Browse git
def free(self) -> None: """Free this pointer, invalidating it and releasing the underlying allocation. It isn't necessary to explicitly call this, because the pointer will be freed on GC. But you can call it anyway if, for example, the pointer will be referenced for long after it is done being used. """ if self.valid: self.valid = False self.allocation.free()
def split_from_end(self, size: int, alignment: int) ‑> Tuple[Pointer, Pointer]
-
Split from the end of this pointer, such that the right pointer is aligned to
alignment
Used by write_to_end; mostly only useful for preparing stacks.
Expand source code Browse git
def split_from_end(self, size: int, alignment: int) -> t.Tuple[Pointer, Pointer]: """Split from the end of this pointer, such that the right pointer is aligned to `alignment` Used by write_to_end; mostly only useful for preparing stacks. """ extra_to_remove = (int(self.near) + size) % alignment return self.split(self.size() - size - extra_to_remove)
async def write_to_end(self, value: T, alignment: int) ‑> Tuple[Pointer[~T], WrittenPointer[~T]]
-
Write a value to the end of the range of this pointer
Splits the pointer, and returns both parts. This function is only useful for preparing stacks. Would be nice to figure out either a more generic way to prep stacks, or to figure out more things that write_to_end could be used for.
Expand source code Browse git
async def write_to_end(self, value: T, alignment: int) -> t.Tuple[Pointer[T], WrittenPointer[T]]: """Write a value to the end of the range of this pointer Splits the pointer, and returns both parts. This function is only useful for preparing stacks. Would be nice to figure out either a more generic way to prep stacks, or to figure out more things that write_to_end could be used for. """ value_bytes = self.serializer.to_bytes(value) rest, write_buf = self.split_from_end(len(value_bytes), alignment) written = await write_buf.write(value) return rest, written
def readable_split(self, size: int) ‑> Tuple[ReadablePointer[~T], Pointer]
-
Expand source code Browse git
def readable_split(self, size: int) -> t.Tuple[ReadablePointer[T], Pointer]: left, right = self.split(size) return left._readable(), right
def unsafe(self) ‑> ReadablePointer[~T]
-
Get a ReadablePointer from this pointer, even though it might not be initialized
Expand source code Browse git
def unsafe(self) -> ReadablePointer[T]: "Get a ReadablePointer from this pointer, even though it might not be initialized" return self._readable()
class WrittenPointer (mapping: MemoryMapping, transport: MemoryGateway, value: T_co, serializer: Serializer[T_co], allocation: AllocationInterface, typ: t.Type[T_co])
-
A Pointer with some known value written to it
We have all the normal functionality of a Pointer (see that class for more information), but we also know that we've had some value written to us, and we know what that value is, and it's immediately accessible in Python.
We can also view this with an emphasis on the value: This is some known value, that has been written to some memory location. The value and the pointer are equally important in this class, and both are used by most uses of this class.
We use inheritance so that a WrittenPointer gracefully degrades back to a Pointer, and is invalidated whenever a pointer is invalidated. Specifically, we want anything that writes to a pointer to invalidate this pointer. The invalidation lets us know that this value is no longer necessarily written to this pointer.
For example, syscalls that write to pointers will typically call split. A call to WrittenPointer.split will invalidate the WrittenPointer and return regular Pointers; that's desirable because the syscall likely overwrote whatever value was previously written here.
TODO: We should fix syscalls that write to memory but don't call split so that they invalidate the WrittenPointer. That's mostly syscalls using Sockbufs…
Expand source code Browse git
class WrittenPointer(Pointer[T_co]): """A Pointer with some known value written to it We have all the normal functionality of a Pointer (see that class for more information), but we also know that we've had some value written to us, and we know what that value is, and it's immediately accessible in Python. We can also view this with an emphasis on the value: This is some known value, that has been written to some memory location. The value and the pointer are equally important in this class, and both are used by most uses of this class. We use inheritance so that a WrittenPointer gracefully degrades back to a Pointer, and is invalidated whenever a pointer is invalidated. Specifically, we want anything that writes to a pointer to invalidate this pointer. The invalidation lets us know that this value is no longer necessarily written to this pointer. For example, syscalls that write to pointers will typically call split. A call to WrittenPointer.split will invalidate the WrittenPointer and return regular Pointers; that's desirable because the syscall likely overwrote whatever value was previously written here. TODO: We should fix syscalls that write to memory but don't call split so that they invalidate the WrittenPointer. That's mostly syscalls using Sockbufs... """ __slots__ = ('value') def __init__(self, mapping: MemoryMapping, transport: MemoryGateway, value: T_co, serializer: Serializer[T_co], allocation: AllocationInterface, typ: t.Type[T_co], ) -> None: super().__init__(mapping, transport, serializer, allocation, typ) self.value = value def __repr__(self) -> str: name = type(self).__name__ typname = self.typ.__name__ try: return f"{name}[{typname}]({self.near}, {self.value})" except UseAfterFreeError: return f"{name}[{typname}](valid={self.valid}, {self.mapping}, {self.allocation}, {self.value})" def _with_mapping(self, mapping: MemoryMapping) -> WrittenPointer: if type(self) is not WrittenPointer: raise Exception("subclasses of WrittenPointer must override _with_mapping") if mapping.file is not self.mapping.file: raise Exception("can only move pointer between two mappings of the same file") # see notes in Pointer._with_mapping self._validate() self.valid = False return type(self)(mapping, self.transport, self.value, self.serializer, self.allocation, self.typ)
Ancestors
- Pointer
- typing.Generic
Instance variables
var value
-
Return an attribute of instance, which is of type owner.
Inherited members
class ReadablePointer (mapping: MemoryMapping, transport: MemoryGateway, serializer: Serializer[T], allocation: AllocationInterface, typ: t.Type[T])
-
A Pointer that is safely readable
This is returned by functions and syscalls which write some (possibly unknown) pure data to an address in memory, which then can be read and deserialized to get a sensical pure data value rather than nonsense.
Immediately after allocation, a Pointer is returned, rather than a ReadablePointer, to indicate that the pointer is uninitialized, and therefore not safely readable.
This is also returned by Pointer.unsafe(), to support system calls where it's not statically known that a passed Pointer is written to and initialized; ioctls, for example. Tt would be better to have a complete description of the Linux interface, so we could get rid of this unsafety.
This is currently only a marker type, but eventually we'll move the read() method here to ReadablePointer from Pointer, so that reading Pointers is actually not allowed. For now, this is just a hint.
Expand source code Browse git
class ReadablePointer(Pointer[T]): """A Pointer that is safely readable This is returned by functions and syscalls which write some (possibly unknown) pure data to an address in memory, which then can be read and deserialized to get a sensical pure data value rather than nonsense. Immediately after allocation, a Pointer is returned, rather than a ReadablePointer, to indicate that the pointer is uninitialized, and therefore not safely readable. This is also returned by Pointer.unsafe(), to support system calls where it's not statically known that a passed Pointer is written to and initialized; ioctls, for example. Tt would be better to have a complete description of the Linux interface, so we could get rid of this unsafety. This is currently only a marker type, but eventually we'll move the read() method here to ReadablePointer from Pointer, so that reading Pointers is actually not allowed. For now, this is just a hint. """ __slots__ = () def _with_mapping(self, mapping: MemoryMapping) -> ReadablePointer: # see notes in Pointer._with_mapping if type(self) is not ReadablePointer: raise Exception("subclasses of ReadablePointer must override _with_mapping") if mapping.file is not self.mapping.file: raise Exception("can only move pointer between two mappings of the same file") self._validate() self.valid = False return type(self)(mapping, self.transport, self.serializer, self.allocation, self.typ)
Ancestors
- Pointer
- typing.Generic
Subclasses
Inherited members
class LinearPointer (mapping: MemoryMapping, transport: MemoryGateway, serializer: Serializer[T], allocation: AllocationInterface, typ: t.Type[T])
-
A Pointer that must be read, once
This is returned by functions and syscalls which write a unknown value to an address in memory, which then must be read and deserialized once to manage the resources described by that value, such as file descriptors.
The value is:
- "affine"; it must be read at least once, so that the resources inside can be returned as managed objects.
- "relevant"; it must be read at most once, so that dangling handles to the resources can't be created again after they're closed.
Since it's both affine and relevant, this is a true linear type.
Unfortunately it's going to be quite difficult to guarantee relevance. There are three issues here:
- The pointer can simply be dropped and garbage collected.
- System calls can write to the pointer and discard its previous results
- We can write to the pointer (through
Pointer.write()
) and discard its previous results
We can mitigate 1 a little by warning in
__del__
.We could statically prevent 3 by removing the
Pointer.read()
andPointer.write()
methods from this class, and only allowingLinearPointer.linear_read()
, or dynamically by throwing inwrite
ifbeen_read
is false.Any approach to 2 is going to require some tweaks to the pointer API, and probably some mass changes to syscall implementations. Although maybe we could do it off of .near accesses.
Expand source code Browse git
class LinearPointer(ReadablePointer[T]): """A Pointer that must be read, once This is returned by functions and syscalls which write a unknown value to an address in memory, which then must be read and deserialized *once* to manage the resources described by that value, such as file descriptors. The value is: - "affine"; it must be read at least once, so that the resources inside can be returned as managed objects. - "relevant"; it must be read at most once, so that dangling handles to the resources can't be created again after they're closed. Since it's both affine and relevant, this is a true linear type. Unfortunately it's going to be quite difficult to guarantee relevance. There are three issues here: 1. The pointer can simply be dropped and garbage collected. 2. System calls can write to the pointer and discard its previous results 3. We can write to the pointer (through `Pointer.write`) and discard its previous results We can mitigate 1 a little by warning in `__del__`. We could statically prevent 3 by removing the `Pointer.read` and `Pointer.write` methods from this class, and only allowing `LinearPointer.linear_read`, or dynamically by throwing in `write` if `been_read` is false. Any approach to 2 is going to require some tweaks to the pointer API, and probably some mass changes to syscall implementations. Although maybe we could do it off of .near accesses. """ __slots__ = ('been_read') def __init__(self, mapping: MemoryMapping, transport: MemoryGateway, serializer: Serializer[T], allocation: AllocationInterface, typ: t.Type[T], ) -> None: super().__init__(mapping, transport, serializer, allocation, typ) self.been_read = False async def read(self) -> T: if self.been_read: raise Exception("This LinearPointer has already been read, it can't be read again for safety reasons.") ret = await super().read() self.been_read = True return ret async def linear_read(self) -> t.Tuple[T, Pointer[T]]: "Read the value, and return the now-inert buffer left over as a Pointer." ret = await self.read() self.valid = False new_ptr = Pointer(self.mapping, self.transport, self.serializer, self.allocation, self.typ) return ret, new_ptr def __del__(self) -> None: super().__del__() if not self.been_read: logger.error("Didn't read this LinearPointer before dropping it: %s", self) def _with_mapping(self, mapping: MemoryMapping) -> LinearPointer: # see notes in Pointer._with_mapping if type(self) is not LinearPointer: raise Exception("subclasses of LinearPointer must override _with_mapping") if mapping.file is not self.mapping.file: raise Exception("can only move pointer between two mappings of the same file") self._validate() self.valid = False return type(self)(mapping, self.transport, self.serializer, self.allocation, self.typ)
Ancestors
- ReadablePointer
- Pointer
- typing.Generic
Instance variables
var been_read
-
Return an attribute of instance, which is of type owner.
Methods
async def linear_read(self) ‑> Tuple[~T, Pointer[~T]]
-
Read the value, and return the now-inert buffer left over as a Pointer.
Expand source code Browse git
async def linear_read(self) -> t.Tuple[T, Pointer[T]]: "Read the value, and return the now-inert buffer left over as a Pointer." ret = await self.read() self.valid = False new_ptr = Pointer(self.mapping, self.transport, self.serializer, self.allocation, self.typ) return ret, new_ptr
Inherited members