Module rsyscall.wish
Several REPL-based WishGranters for the wish library
This module provides several choices of WishGranters for the wish library. Each
WishGranter is an asynchronous REPL based on the arepl library.
In fact, this library automatically instantiates ConsoleGenie and sets
my_wish_granter to it at import time. Not very nice, but convenient.
Also, for convenience, we re-export wish() and Wish.
Expand source code Browse git
"""Several REPL-based `wish.WishGranter`s for the `wish` library
This module provides several choices of `wish.WishGranter`s for the wish library. Each
`WishGranter` is an asynchronous REPL based on the `arepl` library.
In fact, this library automatically instantiates `ConsoleGenie` and sets
`wish.my_wish_granter` to it at import time. Not very nice, but convenient.
Also, for convenience, we re-export `wish.wish` and `wish.Wish`.
"""
from __future__ import annotations
from rsyscall.command import Command
from rsyscall.epoller import AsyncFileDescriptor
from rsyscall.thread import Thread
from rsyscall.path import Path
from wish import wish, WishGranter, Wish, my_wish_granter
import logging
import os
import arepl
import sys
import traceback
import trio
import typing as t
from rsyscall.sched import CLONE
from rsyscall.sys.socket import SOCK, AF
from rsyscall.sys.un import SockaddrUn
from rsyscall.sys.wait import W
from rsyscall.unistd import Pipe
logger = logging.getLogger(__name__)
__all__ = [
'ConsoleGenie',
'ConsoleServerGenie',
'run_repl',
'serve_repls',
]
T = t.TypeVar('T')
class ConsoleGenie(WishGranter):
"A WishGranter which satisfies wishes by starting a REPL on stdin/stdout for human intervention"
@classmethod
async def make(self, thread: Thread):
"Create a ConsoleGenie that will serve using `thread`'s stdin/stdout"
cat = await thread.environ.which("cat")
return ConsoleGenie(thread, cat)
def __init__(self, thread: Thread, cat: Command) -> None:
self.thread = thread
self.cat = cat
self.lock = trio.Lock()
async def wish(self, wish: Wish[T]) -> T:
"""Serve a REPL on stdin/stdout and returns the value returned from it; throws on REPL hangup
We use cat as a hack to be async; we can't robustly turn stdin/stdout into
AsyncFDs, because setting them to NONBLOCK will be inherited by other processes
using the same terminal. Instead, we just start two cats to do blocking
reads/writes of stdin/stdout, then do async reads/writes to the cats.
"""
async with self.lock:
message = "".join(traceback.format_exception(None, wish, wish.__traceback__))
wisher_frame = [frame for (frame, lineno) in traceback.walk_tb(wish.__traceback__)][-1]
to_term_pipe = await (await self.thread.task.pipe(await self.thread.ram.malloc(Pipe))).read()
from_term_pipe = await (await self.thread.task.pipe(await self.thread.ram.malloc(Pipe))).read()
async_from_term = await self.thread.make_afd(from_term_pipe.read, set_nonblock=True)
async_to_term = await self.thread.make_afd(to_term_pipe.write, set_nonblock=True)
try:
cat_stdin_thread = await self.thread.clone()
await cat_stdin_thread.task.inherit_fd(to_term_pipe.read).dup2(cat_stdin_thread.stdin)
async with await cat_stdin_thread.exec(self.cat):
cat_stdout_thread = await self.thread.clone()
await cat_stdout_thread.task.inherit_fd(from_term_pipe.write).dup2(cat_stdout_thread.stdout)
async with await cat_stdout_thread.exec(self.cat):
ret = await run_repl(async_from_term, async_to_term, {
**wisher_frame.f_locals,
**wisher_frame.f_globals,
'__repl_stdin__': async_from_term,
'__repl_stdout__': async_to_term,
'wish': wish,
'wisher_frame': wisher_frame,
}, wish.return_type, message)
return ret
finally:
await async_from_term.close()
await async_to_term.close()
class ConsoleServerGenie(WishGranter):
"""On wish, listens for connections to a socket in `sockdir`, and serves a REPL to them
Also, immediately starts socat from `thread` connecting to the socket; that means a
REPL is served on stdin/stdout.
The socket names are generated from the function and line number at which wish was
called. The sockets are unlinked after a value is returned from some REPL.
Multiple connections to the socket are supported at a time. Once one returns a value,
all are cancelled.
"""
@classmethod
async def make(self, thread: Thread, sockdir: Path):
socat = await thread.environ.which("socat")
return ConsoleServerGenie(thread, sockdir, socat)
def __init__(self, thread: Thread, sockdir: Path, socat: Command) -> None:
self.thread = thread
self.sockdir = sockdir
self.socat = socat
self.name_counts: t.Dict[str, int] = {}
def _uniquify_name(self, name: str) -> str:
"We never reuse a name for a socket."
if name not in self.name_counts:
self.name_counts[name] = 1
return name
else:
self.name_counts[name] += 1
return name + f".{self.name_counts[name]}"
async def wish(self, wish: Wish[T]) -> T:
message = "".join(traceback.format_exception(None, wish, wish.__traceback__))
wisher_frame = [frame for (frame, lineno) in traceback.walk_tb(wish.__traceback__)][-1]
sock_name = self._uniquify_name(f'{wisher_frame.f_code.co_name}-{wisher_frame.f_lineno}')
sock_path = self.sockdir/sock_name
cmd = self.socat.args("-", "UNIX-CONNECT:" + os.fsdecode(sock_path))
sockfd = await self.thread.make_afd(await self.thread.socket(AF.UNIX, SOCK.STREAM|SOCK.NONBLOCK))
await sockfd.bind(await self.thread.ptr(await SockaddrUn.from_path(self.thread, sock_path)))
await sockfd.handle.listen(10)
async with trio.open_nursery() as nursery:
@nursery.start_soon
async def do_socat():
while True:
thread = await self.thread.clone()
try:
child = await thread.exec(cmd)
except:
await thread.exit(0)
raise
async with child:
await child.waitpid(W.EXITED)
ret = await serve_repls(sockfd, {
**wisher_frame.f_locals,
**wisher_frame.f_globals,
'wisher_frame': wisher_frame,
}, wish.return_type, message)
nursery.cancel_scope.cancel()
await self.thread.task.unlink(await self.thread.ram.ptr(sock_path))
return ret
async def run_repl(infd: AsyncFileDescriptor,
outfd: AsyncFileDescriptor,
global_vars: t.Dict[str, t.Any],
wanted_type: t.Type[T], message: str) -> T:
"""Serve a REPL on infd/outfd, guarding that it only returns `wanted_type`
We print `message` on startup. `global_vars` is mutably used as the global variables
dict for the REPL.
"""
async with trio.open_nursery() as repl_nursery:
@repl_nursery.start_soon
async def wait_for_rdhup() -> None:
await infd.wait_for_rdhup()
# when we get RDHUP on the connection, we want to cancel the REPL, even if
# some task is in progress.
raise Exception("REPL connection hangup")
await outfd.write_all_bytes((message+"\n").encode())
ret = await arepl.run_repl(infd.read_some_bytes, outfd.write_all_bytes, global_vars, wanted_type)
repl_nursery.cancel_scope.cancel()
return ret
async def serve_repls(listenfd: AsyncFileDescriptor,
initial_vars: t.Dict[str, t.Any],
wanted_type: t.Type[T], message: str) -> T:
"""Serve REPLs on a socket until someone gives us the type we want
Multiple connections to the socket can be active simultaneously, with REPLs being
served to all.
Each REPL has a reference to the REPL variables of all other REPLs, through the
__repls__ variable present in each REPL.
A good extension would be to support killing a REPL from another REPL. We didn't do
that for some reason, not sure why.
"""
repl_vars: t.Dict[str, t.Dict[str, t.Any]] = {}
retval = None
async with trio.open_nursery() as nursery:
async def do_repl(connfd: AsyncFileDescriptor,
global_vars: t.Dict[str, t.Any]) -> None:
try:
ret = await run_repl(connfd, connfd, global_vars, wanted_type, message)
except arepl.FromREPL as e:
raise e.exn from e
except Exception:
logger.exception("run_repl's internal logic raised an exception, disconnecting that REPL and continuing")
else:
nonlocal retval
retval = ret
nursery.cancel_scope.cancel()
finally:
await connfd.close()
num = 0
while True:
connfd = await listenfd.make_new_afd(await listenfd.accept(SOCK.NONBLOCK))
global_vars = {**initial_vars,
'__repls__': repl_vars,
'__repl_stdin__': connfd,
'__repl_stdout__': connfd}
repl_vars[str(num)] = global_vars
nursery.start_soon(do_repl, connfd, global_vars)
num += 1
return retval
def _initialize_module() -> None:
from rsyscall import local_thread
my_wish_granter.set(trio.run(ConsoleGenie.make, local_thread))
_initialize_module()
Functions
async def run_repl(infd: AsyncFileDescriptor, outfd: AsyncFileDescriptor, global_vars: t.Dict[str, t.Any], wanted_type: t.Type[T], message: str) ‑> ~T-
Serve a REPL on infd/outfd, guarding that it only returns
wanted_typeWe print
messageon startup.global_varsis mutably used as the global variables dict for the REPL.Expand source code Browse git
async def run_repl(infd: AsyncFileDescriptor, outfd: AsyncFileDescriptor, global_vars: t.Dict[str, t.Any], wanted_type: t.Type[T], message: str) -> T: """Serve a REPL on infd/outfd, guarding that it only returns `wanted_type` We print `message` on startup. `global_vars` is mutably used as the global variables dict for the REPL. """ async with trio.open_nursery() as repl_nursery: @repl_nursery.start_soon async def wait_for_rdhup() -> None: await infd.wait_for_rdhup() # when we get RDHUP on the connection, we want to cancel the REPL, even if # some task is in progress. raise Exception("REPL connection hangup") await outfd.write_all_bytes((message+"\n").encode()) ret = await arepl.run_repl(infd.read_some_bytes, outfd.write_all_bytes, global_vars, wanted_type) repl_nursery.cancel_scope.cancel() return ret async def serve_repls(listenfd: AsyncFileDescriptor, initial_vars: t.Dict[str, t.Any], wanted_type: t.Type[T], message: str) ‑> ~T-
Serve REPLs on a socket until someone gives us the type we want
Multiple connections to the socket can be active simultaneously, with REPLs being served to all.
Each REPL has a reference to the REPL variables of all other REPLs, through the repls variable present in each REPL.
A good extension would be to support killing a REPL from another REPL. We didn't do that for some reason, not sure why.
Expand source code Browse git
async def serve_repls(listenfd: AsyncFileDescriptor, initial_vars: t.Dict[str, t.Any], wanted_type: t.Type[T], message: str) -> T: """Serve REPLs on a socket until someone gives us the type we want Multiple connections to the socket can be active simultaneously, with REPLs being served to all. Each REPL has a reference to the REPL variables of all other REPLs, through the __repls__ variable present in each REPL. A good extension would be to support killing a REPL from another REPL. We didn't do that for some reason, not sure why. """ repl_vars: t.Dict[str, t.Dict[str, t.Any]] = {} retval = None async with trio.open_nursery() as nursery: async def do_repl(connfd: AsyncFileDescriptor, global_vars: t.Dict[str, t.Any]) -> None: try: ret = await run_repl(connfd, connfd, global_vars, wanted_type, message) except arepl.FromREPL as e: raise e.exn from e except Exception: logger.exception("run_repl's internal logic raised an exception, disconnecting that REPL and continuing") else: nonlocal retval retval = ret nursery.cancel_scope.cancel() finally: await connfd.close() num = 0 while True: connfd = await listenfd.make_new_afd(await listenfd.accept(SOCK.NONBLOCK)) global_vars = {**initial_vars, '__repls__': repl_vars, '__repl_stdin__': connfd, '__repl_stdout__': connfd} repl_vars[str(num)] = global_vars nursery.start_soon(do_repl, connfd, global_vars) num += 1 return retval
Classes
class ConsoleGenie (thread: Thread, cat: Command)-
A WishGranter which satisfies wishes by starting a REPL on stdin/stdout for human intervention
Expand source code Browse git
class ConsoleGenie(WishGranter): "A WishGranter which satisfies wishes by starting a REPL on stdin/stdout for human intervention" @classmethod async def make(self, thread: Thread): "Create a ConsoleGenie that will serve using `thread`'s stdin/stdout" cat = await thread.environ.which("cat") return ConsoleGenie(thread, cat) def __init__(self, thread: Thread, cat: Command) -> None: self.thread = thread self.cat = cat self.lock = trio.Lock() async def wish(self, wish: Wish[T]) -> T: """Serve a REPL on stdin/stdout and returns the value returned from it; throws on REPL hangup We use cat as a hack to be async; we can't robustly turn stdin/stdout into AsyncFDs, because setting them to NONBLOCK will be inherited by other processes using the same terminal. Instead, we just start two cats to do blocking reads/writes of stdin/stdout, then do async reads/writes to the cats. """ async with self.lock: message = "".join(traceback.format_exception(None, wish, wish.__traceback__)) wisher_frame = [frame for (frame, lineno) in traceback.walk_tb(wish.__traceback__)][-1] to_term_pipe = await (await self.thread.task.pipe(await self.thread.ram.malloc(Pipe))).read() from_term_pipe = await (await self.thread.task.pipe(await self.thread.ram.malloc(Pipe))).read() async_from_term = await self.thread.make_afd(from_term_pipe.read, set_nonblock=True) async_to_term = await self.thread.make_afd(to_term_pipe.write, set_nonblock=True) try: cat_stdin_thread = await self.thread.clone() await cat_stdin_thread.task.inherit_fd(to_term_pipe.read).dup2(cat_stdin_thread.stdin) async with await cat_stdin_thread.exec(self.cat): cat_stdout_thread = await self.thread.clone() await cat_stdout_thread.task.inherit_fd(from_term_pipe.write).dup2(cat_stdout_thread.stdout) async with await cat_stdout_thread.exec(self.cat): ret = await run_repl(async_from_term, async_to_term, { **wisher_frame.f_locals, **wisher_frame.f_globals, '__repl_stdin__': async_from_term, '__repl_stdout__': async_to_term, 'wish': wish, 'wisher_frame': wisher_frame, }, wish.return_type, message) return ret finally: await async_from_term.close() await async_to_term.close()Ancestors
Static methods
async def make(thread: Thread)-
Create a ConsoleGenie that will serve using
thread's stdin/stdoutExpand source code Browse git
@classmethod async def make(self, thread: Thread): "Create a ConsoleGenie that will serve using `thread`'s stdin/stdout" cat = await thread.environ.which("cat") return ConsoleGenie(thread, cat)
Methods
async def wish(self, wish: Wish[T]) ‑> ~T-
Serve a REPL on stdin/stdout and returns the value returned from it; throws on REPL hangup
We use cat as a hack to be async; we can't robustly turn stdin/stdout into AsyncFDs, because setting them to NONBLOCK will be inherited by other processes using the same terminal. Instead, we just start two cats to do blocking reads/writes of stdin/stdout, then do async reads/writes to the cats.
Expand source code Browse git
async def wish(self, wish: Wish[T]) -> T: """Serve a REPL on stdin/stdout and returns the value returned from it; throws on REPL hangup We use cat as a hack to be async; we can't robustly turn stdin/stdout into AsyncFDs, because setting them to NONBLOCK will be inherited by other processes using the same terminal. Instead, we just start two cats to do blocking reads/writes of stdin/stdout, then do async reads/writes to the cats. """ async with self.lock: message = "".join(traceback.format_exception(None, wish, wish.__traceback__)) wisher_frame = [frame for (frame, lineno) in traceback.walk_tb(wish.__traceback__)][-1] to_term_pipe = await (await self.thread.task.pipe(await self.thread.ram.malloc(Pipe))).read() from_term_pipe = await (await self.thread.task.pipe(await self.thread.ram.malloc(Pipe))).read() async_from_term = await self.thread.make_afd(from_term_pipe.read, set_nonblock=True) async_to_term = await self.thread.make_afd(to_term_pipe.write, set_nonblock=True) try: cat_stdin_thread = await self.thread.clone() await cat_stdin_thread.task.inherit_fd(to_term_pipe.read).dup2(cat_stdin_thread.stdin) async with await cat_stdin_thread.exec(self.cat): cat_stdout_thread = await self.thread.clone() await cat_stdout_thread.task.inherit_fd(from_term_pipe.write).dup2(cat_stdout_thread.stdout) async with await cat_stdout_thread.exec(self.cat): ret = await run_repl(async_from_term, async_to_term, { **wisher_frame.f_locals, **wisher_frame.f_globals, '__repl_stdin__': async_from_term, '__repl_stdout__': async_to_term, 'wish': wish, 'wisher_frame': wisher_frame, }, wish.return_type, message) return ret finally: await async_from_term.close() await async_to_term.close()
class ConsoleServerGenie (thread: Thread, sockdir: Path, socat: Command)-
On wish, listens for connections to a socket in
sockdir, and serves a REPL to themAlso, immediately starts socat from
threadconnecting to the socket; that means a REPL is served on stdin/stdout.The socket names are generated from the function and line number at which wish was called. The sockets are unlinked after a value is returned from some REPL.
Multiple connections to the socket are supported at a time. Once one returns a value, all are cancelled.
Expand source code Browse git
class ConsoleServerGenie(WishGranter): """On wish, listens for connections to a socket in `sockdir`, and serves a REPL to them Also, immediately starts socat from `thread` connecting to the socket; that means a REPL is served on stdin/stdout. The socket names are generated from the function and line number at which wish was called. The sockets are unlinked after a value is returned from some REPL. Multiple connections to the socket are supported at a time. Once one returns a value, all are cancelled. """ @classmethod async def make(self, thread: Thread, sockdir: Path): socat = await thread.environ.which("socat") return ConsoleServerGenie(thread, sockdir, socat) def __init__(self, thread: Thread, sockdir: Path, socat: Command) -> None: self.thread = thread self.sockdir = sockdir self.socat = socat self.name_counts: t.Dict[str, int] = {} def _uniquify_name(self, name: str) -> str: "We never reuse a name for a socket." if name not in self.name_counts: self.name_counts[name] = 1 return name else: self.name_counts[name] += 1 return name + f".{self.name_counts[name]}" async def wish(self, wish: Wish[T]) -> T: message = "".join(traceback.format_exception(None, wish, wish.__traceback__)) wisher_frame = [frame for (frame, lineno) in traceback.walk_tb(wish.__traceback__)][-1] sock_name = self._uniquify_name(f'{wisher_frame.f_code.co_name}-{wisher_frame.f_lineno}') sock_path = self.sockdir/sock_name cmd = self.socat.args("-", "UNIX-CONNECT:" + os.fsdecode(sock_path)) sockfd = await self.thread.make_afd(await self.thread.socket(AF.UNIX, SOCK.STREAM|SOCK.NONBLOCK)) await sockfd.bind(await self.thread.ptr(await SockaddrUn.from_path(self.thread, sock_path))) await sockfd.handle.listen(10) async with trio.open_nursery() as nursery: @nursery.start_soon async def do_socat(): while True: thread = await self.thread.clone() try: child = await thread.exec(cmd) except: await thread.exit(0) raise async with child: await child.waitpid(W.EXITED) ret = await serve_repls(sockfd, { **wisher_frame.f_locals, **wisher_frame.f_globals, 'wisher_frame': wisher_frame, }, wish.return_type, message) nursery.cancel_scope.cancel() await self.thread.task.unlink(await self.thread.ram.ptr(sock_path)) return retAncestors
Static methods
async def make(thread: Thread, sockdir: Path)-
Expand source code Browse git
@classmethod async def make(self, thread: Thread, sockdir: Path): socat = await thread.environ.which("socat") return ConsoleServerGenie(thread, sockdir, socat)
Inherited members