Module rsyscall.tests.test_epoller

Expand source code Browse git
from __future__ import annotations

from rsyscall.tests.trio_test_case import TrioTestCase
from rsyscall import local_thread, FileDescriptor, Pointer
from rsyscall.epoller import *
import trio
import outcome

from rsyscall.tests.utils import do_async_things
from rsyscall.near.sysif import SyscallInterface, Syscall
from rsyscall.sys.syscall import SYS
from dneio import RequestQueue, reset, Continuation
import typing as t

class DelayResultSysif(SyscallInterface):
    def __init__(self, sysif: SyscallInterface,
                 delay_queue: RequestQueue[t.Tuple[Syscall, outcome.Outcome[int]], None]) -> None:
        self.sysif = sysif
        self.delay_queue = delay_queue

    async def syscall(self, number: SYS, arg1=0, arg2=0, arg3=0, arg4=0, arg5=0, arg6=0) -> int:
        syscall = Syscall(number, arg1, arg2, arg3, arg4, arg5, arg6)
        ret = await outcome.acapture(self.sysif.syscall, number, arg1, arg2, arg3, arg4, arg5, arg6)
        await self.delay_queue.request((syscall, ret))
        return ret.unwrap()

    async def close_interface(self) -> None:
        return await self.sysif.close_interface()

    def get_activity_fd(self) -> t.Optional[FileDescriptor]:
        return self.sysif.get_activity_fd()

class TestEpoller(TrioTestCase):
    async def asyncSetUp(self) -> None:
        self.thr = local_thread

    async def test_local(self) -> None:
        await do_async_things(self, self.thr.epoller, self.thr)

    async def test_multi(self) -> None:
        await do_async_things(self, self.thr.epoller, self.thr, 0)
        async with trio.open_nursery() as nursery:
            for i in range(1, 6):
                nursery.start_soon(do_async_things, self, self.thr.epoller, self.thr, i)

    async def test_thread_multi(self) -> None:
        thread = await self.thr.clone()
        await do_async_things(self, thread.epoller, thread, 0)
        async with trio.open_nursery() as nursery:
            for i in range(1, 6):
                nursery.start_soon(do_async_things, self, thread.epoller, thread, i)

    async def test_thread_root_epoller(self) -> None:
        thread = await self.thr.clone()
        epoller = await Epoller.make_root(thread.ram, thread.task)
        await do_async_things(self, epoller, thread)

    async def test_afd_with_handle(self):
        pipe = await self.thr.pipe()
        afd = await self.thr.make_afd(pipe.write, set_nonblock=True)
        new_afd = afd.with_handle(pipe.write)
        await new_afd.write_all_bytes(b'foo')

    async def test_delayed_eagain(self):
        pipe = await self.thr.pipe()
        thread = await self.thr.clone()
        async_pipe_rfd = await thread.make_afd(thread.inherit_fd(pipe.read), set_nonblock=True)
        # write in parent, read in child
        input_data = b'hello'
        buf_to_write: Pointer[bytes] = await self.thr.ptr(input_data)
        buf_to_write, _ = await pipe.write.write(buf_to_write)
        self.assertEqual(await async_pipe_rfd.read_some_bytes(), input_data)
        buf = await thread.malloc(bytes, 4096)
        # set up the EAGAIN to be delayed
        queue: RequestQueue[t.Tuple[Syscall, outcome.Outcome[int]], None] = RequestQueue()
        old_sysif = thread.task.sysif
        thread.task.sysif = DelayResultSysif(old_sysif, queue)
        @self.nursery.start_soon
        async def race_eagain():
            # wait for EAGAIN
            (syscall, result), cb = await queue.get_one()
            self.assertIsInstance(result.error, BlockingIOError)
            thread.task.sysif = old_sysif
            queue.close(Exception("remaining syscalls?"))
            # write data after the EAGAIN
            await pipe.write.write(buf_to_write)
            # give epoll event a chance to be read - it will be available immediately
            await trio.sleep(0)
            # resume the suspended EAGAIN coroutine, which should keep running and get data
            cb.send(None)
        valid, remaining = await async_pipe_rfd.read(buf)

    async def test_wrong_op_on_pipe(self):
        "Reading or writing to the wrong side of a pipe fails immediately with an error"
        pipe = await self.thr.pipe()
        async_pipe_wfd = await self.thr.make_afd(pipe.write, set_nonblock=True)
        async_pipe_rfd = await self.thr.make_afd(pipe.read, set_nonblock=True)
        # we actually are defined to get EBADF in this case, which is
        # a bit of a worrying error, but whatever
        with self.assertRaises(OSError) as cm:
            await async_pipe_wfd.read_some_bytes()
        self.assertEqual(cm.exception.errno, 9)
        with self.assertRaises(OSError) as cm:
            await async_pipe_rfd.write_all_bytes(b'hi')
        self.assertEqual(cm.exception.errno, 9)

Classes

class DelayResultSysif (sysif: SyscallInterface, delay_queue: RequestQueue[t.Tuple[Syscall, outcome.Outcome[int]], None])

The lowest-level interface for an object which lets us send syscalls to some process.

We send syscalls to a process, but nothing in this interface tells us anything about the process to which we're sending syscalls; that information is maintained in the Task, which contains an object matching this interface.

This is like the segment register override prefix, with no awareness of the contents of the register.

Expand source code Browse git
class DelayResultSysif(SyscallInterface):
    def __init__(self, sysif: SyscallInterface,
                 delay_queue: RequestQueue[t.Tuple[Syscall, outcome.Outcome[int]], None]) -> None:
        self.sysif = sysif
        self.delay_queue = delay_queue

    async def syscall(self, number: SYS, arg1=0, arg2=0, arg3=0, arg4=0, arg5=0, arg6=0) -> int:
        syscall = Syscall(number, arg1, arg2, arg3, arg4, arg5, arg6)
        ret = await outcome.acapture(self.sysif.syscall, number, arg1, arg2, arg3, arg4, arg5, arg6)
        await self.delay_queue.request((syscall, ret))
        return ret.unwrap()

    async def close_interface(self) -> None:
        return await self.sysif.close_interface()

    def get_activity_fd(self) -> t.Optional[FileDescriptor]:
        return self.sysif.get_activity_fd()

Ancestors

Inherited members

class TestEpoller (methodName='runTest')

A trio-enabled variant of unittest.TestCase

Create an instance of the class that will use the named test method when executed. Raises a ValueError if the instance does not have a method with the specified name.

Expand source code Browse git
class TestEpoller(TrioTestCase):
    async def asyncSetUp(self) -> None:
        self.thr = local_thread

    async def test_local(self) -> None:
        await do_async_things(self, self.thr.epoller, self.thr)

    async def test_multi(self) -> None:
        await do_async_things(self, self.thr.epoller, self.thr, 0)
        async with trio.open_nursery() as nursery:
            for i in range(1, 6):
                nursery.start_soon(do_async_things, self, self.thr.epoller, self.thr, i)

    async def test_thread_multi(self) -> None:
        thread = await self.thr.clone()
        await do_async_things(self, thread.epoller, thread, 0)
        async with trio.open_nursery() as nursery:
            for i in range(1, 6):
                nursery.start_soon(do_async_things, self, thread.epoller, thread, i)

    async def test_thread_root_epoller(self) -> None:
        thread = await self.thr.clone()
        epoller = await Epoller.make_root(thread.ram, thread.task)
        await do_async_things(self, epoller, thread)

    async def test_afd_with_handle(self):
        pipe = await self.thr.pipe()
        afd = await self.thr.make_afd(pipe.write, set_nonblock=True)
        new_afd = afd.with_handle(pipe.write)
        await new_afd.write_all_bytes(b'foo')

    async def test_delayed_eagain(self):
        pipe = await self.thr.pipe()
        thread = await self.thr.clone()
        async_pipe_rfd = await thread.make_afd(thread.inherit_fd(pipe.read), set_nonblock=True)
        # write in parent, read in child
        input_data = b'hello'
        buf_to_write: Pointer[bytes] = await self.thr.ptr(input_data)
        buf_to_write, _ = await pipe.write.write(buf_to_write)
        self.assertEqual(await async_pipe_rfd.read_some_bytes(), input_data)
        buf = await thread.malloc(bytes, 4096)
        # set up the EAGAIN to be delayed
        queue: RequestQueue[t.Tuple[Syscall, outcome.Outcome[int]], None] = RequestQueue()
        old_sysif = thread.task.sysif
        thread.task.sysif = DelayResultSysif(old_sysif, queue)
        @self.nursery.start_soon
        async def race_eagain():
            # wait for EAGAIN
            (syscall, result), cb = await queue.get_one()
            self.assertIsInstance(result.error, BlockingIOError)
            thread.task.sysif = old_sysif
            queue.close(Exception("remaining syscalls?"))
            # write data after the EAGAIN
            await pipe.write.write(buf_to_write)
            # give epoll event a chance to be read - it will be available immediately
            await trio.sleep(0)
            # resume the suspended EAGAIN coroutine, which should keep running and get data
            cb.send(None)
        valid, remaining = await async_pipe_rfd.read(buf)

    async def test_wrong_op_on_pipe(self):
        "Reading or writing to the wrong side of a pipe fails immediately with an error"
        pipe = await self.thr.pipe()
        async_pipe_wfd = await self.thr.make_afd(pipe.write, set_nonblock=True)
        async_pipe_rfd = await self.thr.make_afd(pipe.read, set_nonblock=True)
        # we actually are defined to get EBADF in this case, which is
        # a bit of a worrying error, but whatever
        with self.assertRaises(OSError) as cm:
            await async_pipe_wfd.read_some_bytes()
        self.assertEqual(cm.exception.errno, 9)
        with self.assertRaises(OSError) as cm:
            await async_pipe_rfd.write_all_bytes(b'hi')
        self.assertEqual(cm.exception.errno, 9)

Ancestors

Class variables

var nursery : trio.Nursery

Methods

async def test_local(self) ‑> NoneType
Expand source code Browse git
async def test_local(self) -> None:
    await do_async_things(self, self.thr.epoller, self.thr)
async def test_multi(self) ‑> NoneType
Expand source code Browse git
async def test_multi(self) -> None:
    await do_async_things(self, self.thr.epoller, self.thr, 0)
    async with trio.open_nursery() as nursery:
        for i in range(1, 6):
            nursery.start_soon(do_async_things, self, self.thr.epoller, self.thr, i)
async def test_thread_multi(self) ‑> NoneType
Expand source code Browse git
async def test_thread_multi(self) -> None:
    thread = await self.thr.clone()
    await do_async_things(self, thread.epoller, thread, 0)
    async with trio.open_nursery() as nursery:
        for i in range(1, 6):
            nursery.start_soon(do_async_things, self, thread.epoller, thread, i)
async def test_thread_root_epoller(self) ‑> NoneType
Expand source code Browse git
async def test_thread_root_epoller(self) -> None:
    thread = await self.thr.clone()
    epoller = await Epoller.make_root(thread.ram, thread.task)
    await do_async_things(self, epoller, thread)
async def test_afd_with_handle(self)
Expand source code Browse git
async def test_afd_with_handle(self):
    pipe = await self.thr.pipe()
    afd = await self.thr.make_afd(pipe.write, set_nonblock=True)
    new_afd = afd.with_handle(pipe.write)
    await new_afd.write_all_bytes(b'foo')
async def test_delayed_eagain(self)
Expand source code Browse git
async def test_delayed_eagain(self):
    pipe = await self.thr.pipe()
    thread = await self.thr.clone()
    async_pipe_rfd = await thread.make_afd(thread.inherit_fd(pipe.read), set_nonblock=True)
    # write in parent, read in child
    input_data = b'hello'
    buf_to_write: Pointer[bytes] = await self.thr.ptr(input_data)
    buf_to_write, _ = await pipe.write.write(buf_to_write)
    self.assertEqual(await async_pipe_rfd.read_some_bytes(), input_data)
    buf = await thread.malloc(bytes, 4096)
    # set up the EAGAIN to be delayed
    queue: RequestQueue[t.Tuple[Syscall, outcome.Outcome[int]], None] = RequestQueue()
    old_sysif = thread.task.sysif
    thread.task.sysif = DelayResultSysif(old_sysif, queue)
    @self.nursery.start_soon
    async def race_eagain():
        # wait for EAGAIN
        (syscall, result), cb = await queue.get_one()
        self.assertIsInstance(result.error, BlockingIOError)
        thread.task.sysif = old_sysif
        queue.close(Exception("remaining syscalls?"))
        # write data after the EAGAIN
        await pipe.write.write(buf_to_write)
        # give epoll event a chance to be read - it will be available immediately
        await trio.sleep(0)
        # resume the suspended EAGAIN coroutine, which should keep running and get data
        cb.send(None)
    valid, remaining = await async_pipe_rfd.read(buf)
async def test_wrong_op_on_pipe(self)

Reading or writing to the wrong side of a pipe fails immediately with an error

Expand source code Browse git
async def test_wrong_op_on_pipe(self):
    "Reading or writing to the wrong side of a pipe fails immediately with an error"
    pipe = await self.thr.pipe()
    async_pipe_wfd = await self.thr.make_afd(pipe.write, set_nonblock=True)
    async_pipe_rfd = await self.thr.make_afd(pipe.read, set_nonblock=True)
    # we actually are defined to get EBADF in this case, which is
    # a bit of a worrying error, but whatever
    with self.assertRaises(OSError) as cm:
        await async_pipe_wfd.read_some_bytes()
    self.assertEqual(cm.exception.errno, 9)
    with self.assertRaises(OSError) as cm:
        await async_pipe_rfd.write_all_bytes(b'hi')
    self.assertEqual(cm.exception.errno, 9)

Inherited members