Module rsyscall.tests.utils

Expand source code Browse git
import typing as t
import trio
from rsyscall.epoller import Epoller, AsyncFileDescriptor
from rsyscall.unistd import Pipe
from rsyscall.fcntl import O
from rsyscall.thread import Thread

import logging
logger = logging.getLogger(__name__)
# logging.basicConfig(level=logging.DEBUG)

import unittest
async def do_async_things(self: unittest.TestCase, epoller: Epoller, thr: Thread, i: int=0,
                          *, task_status=trio.TASK_STATUS_IGNORED) -> None:
    logger.info("Setting up for do_async_things(%d)", i)
    pipe = await (await thr.task.pipe(await thr.ram.malloc(Pipe), O.NONBLOCK)).read()
    async_pipe_rfd = await AsyncFileDescriptor.make(epoller, thr.ram, pipe.read)
    async_pipe_wfd = await AsyncFileDescriptor.make(epoller, thr.ram, pipe.write)
    task_status.started(None)
    data = b"hello world"
    logger.info("Starting do_async_things(%d)", i)
    async def stuff():
        logger.info("do_async_things(%d): read(%s): starting", i, async_pipe_rfd.handle.near)
        result = await async_pipe_rfd.read_some_bytes()
        logger.info("do_async_things(%d): read(%s): returned", i, async_pipe_rfd.handle.near)
        self.assertEqual(result, data)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(stuff)
        await trio.sleep(0.0001)
        # hmmm MMM MMMmmmm MMM mmm MMm mm MM mmm MM mm MM
        # does this make sense?
        logger.info("do_async_things(%d): write(%s): starting", i, async_pipe_wfd.handle.near)
        await async_pipe_wfd.write_all_bytes(data)
        logger.info("do_async_things(%d): write(%s): returned", i, async_pipe_wfd.handle.near)
    await async_pipe_rfd.close()
    await async_pipe_wfd.close()
    logger.info("Done with do_async_things(%d)", i)

async def assert_thread_works(self: unittest.TestCase, thr: Thread) -> None:
    await do_async_things(self, thr.epoller, thr)

Functions

async def do_async_things(self: unittest.case.TestCase, epoller: Epoller, thr: Thread, i: int = 0, *, task_status=TASK_STATUS_IGNORED) ‑> NoneType
Expand source code Browse git
async def do_async_things(self: unittest.TestCase, epoller: Epoller, thr: Thread, i: int=0,
                          *, task_status=trio.TASK_STATUS_IGNORED) -> None:
    logger.info("Setting up for do_async_things(%d)", i)
    pipe = await (await thr.task.pipe(await thr.ram.malloc(Pipe), O.NONBLOCK)).read()
    async_pipe_rfd = await AsyncFileDescriptor.make(epoller, thr.ram, pipe.read)
    async_pipe_wfd = await AsyncFileDescriptor.make(epoller, thr.ram, pipe.write)
    task_status.started(None)
    data = b"hello world"
    logger.info("Starting do_async_things(%d)", i)
    async def stuff():
        logger.info("do_async_things(%d): read(%s): starting", i, async_pipe_rfd.handle.near)
        result = await async_pipe_rfd.read_some_bytes()
        logger.info("do_async_things(%d): read(%s): returned", i, async_pipe_rfd.handle.near)
        self.assertEqual(result, data)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(stuff)
        await trio.sleep(0.0001)
        # hmmm MMM MMMmmmm MMM mmm MMm mm MM mmm MM mm MM
        # does this make sense?
        logger.info("do_async_things(%d): write(%s): starting", i, async_pipe_wfd.handle.near)
        await async_pipe_wfd.write_all_bytes(data)
        logger.info("do_async_things(%d): write(%s): returned", i, async_pipe_wfd.handle.near)
    await async_pipe_rfd.close()
    await async_pipe_wfd.close()
    logger.info("Done with do_async_things(%d)", i)
async def assert_thread_works(self: unittest.case.TestCase, thr: Thread) ‑> NoneType
Expand source code Browse git
async def assert_thread_works(self: unittest.TestCase, thr: Thread) -> None:
    await do_async_things(self, thr.epoller, thr)