Module rsyscall.unistd
#include <unistd.h>
Expand source code Browse git
"`#include <unistd.h>`"
from __future__ import annotations
from rsyscall._raw import ffi, lib # type: ignore
import enum
from rsyscall.struct import Serializer, FixedSerializer, Serializable
import struct
import typing as t
import rsyscall.near.types as near
import os
from rsyscall.handle.pointer import Pointer, WrittenPointer, ReadablePointer
if t.TYPE_CHECKING:
from rsyscall.handle import Task, FileDescriptor
__all__ = [
"SEEK",
"OK",
"ArgList",
"Pipe",
"FSFileDescriptor",
]
# re-exported
from rsyscall.unistd.io import SEEK
from rsyscall.unistd.pipe import Pipe
class OK(enum.IntFlag):
"The mode argument to access, faccessat."
R = lib.R_OK
W = lib.W_OK
X = lib.X_OK
F = lib.F_OK
class RENAME(enum.IntFlag):
"The flags argument to renameat2"
NONE = 0
EXCHANGE = lib.RENAME_EXCHANGE
NOREPLACE = lib.RENAME_NOREPLACE
WHITEOUT = lib.RENAME_WHITEOUT
T_arglist = t.TypeVar('T_arglist', bound='ArgList')
class ArgList(t.List[WrittenPointer[t.Union[str, os.PathLike]]], FixedSerializer):
"A null-terminated list of null-terminated strings, as passed to execve."
@classmethod
def get_serializer(cls, task: Task) -> Serializer[T_arglist]:
return ArgListSerializer()
import struct
class ArgListSerializer(Serializer[T_arglist]):
def to_bytes(self, arglist: T_arglist) -> bytes:
ret = b""
for ptr in arglist:
ret += struct.Struct("Q").pack(int(ptr.near))
ret += struct.Struct("Q").pack(0)
return ret
def from_bytes(self, data: bytes) -> T_arglist:
raise Exception("can't get pointer handles from raw bytes")
def _get_near(fd: t.Optional[BaseFileDescriptor]) -> t.Optional[near.FileDescriptor]:
if fd is None:
return None
else:
fd._validate()
return fd.near
#### Classes ####
from rsyscall.handle.fd import BaseFileDescriptor, FileDescriptorTask
from rsyscall.fcntl import AT, O
T_fd = t.TypeVar('T_fd', bound='FSFileDescriptor')
class FSFileDescriptor(BaseFileDescriptor):
async def readlinkat(self, path: WrittenPointer[t.Union[str, os.PathLike]],
buf: Pointer) -> t.Tuple[ReadablePointer, Pointer]:
"""read value of a symbolic link
manpage: readlinkat(2)
"""
self._validate()
with path.borrow(self.task):
with buf.borrow(self.task):
ret = await _readlinkat(self.task.sysif, self.near, path.near, buf.near, buf.size())
return buf.readable_split(ret)
async def faccessat(self, ptr: WrittenPointer[t.Union[str, os.PathLike]], mode: OK, flags: AT=AT.NONE) -> None:
"""check user's permissions for a file
manpage: faccessat(2)
"""
self._validate()
with ptr.borrow(self.task):
await _faccessat(self.task.sysif, self.near, ptr.near, mode, flags)
async def openat(self: T_fd, path: WrittenPointer[t.Union[str, os.PathLike]], flags: O, mode=0o644) -> T_fd:
"""open and possibly create a file
manpage: openat(2)
"""
self._validate()
with path.borrow(self.task) as path_n:
try:
fd = await _openat(self.task.sysif, self.near, path_n, flags|O.CLOEXEC, mode)
except OSError as exn:
exn.filename = path.value
raise
return self.task.make_fd_handle(fd)
async def mkdirat(self, path: WrittenPointer[t.Union[str, os.PathLike]], mode=0o755) -> None:
"""create a directory
manpage: mkdirat(2)
"""
self._validate()
with path.borrow(self.task) as path_n:
try:
await _mkdirat(self.task.sysif, self.near, path_n, mode)
except OSError as exn:
exn.filename = path.value
raise
async def unlinkat(self, path: WrittenPointer[t.Union[str, os.PathLike]], flags: AT=AT.NONE) -> None:
"""delete a name and possibly the file it refers to
manpage: unlinkat(2)
"""
with path.borrow(self.task) as path_n:
try:
await _unlinkat(self.task.sysif, self.near, path_n, flags)
except OSError as exn:
exn.filename = path.value
raise
async def rmdirat(self, path: WrittenPointer[t.Union[str, os.PathLike]]) -> None:
"""delete a directory
manpage: unlinkat(2)
"""
await self.unlinkat(path, AT.REMOVEDIR)
async def linkat(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]],
newdirfd: t.Optional[FSFileDescriptor],
newpath: WrittenPointer[t.Union[str, os.PathLike]],
flags: AT=AT.NONE) -> None:
"""make a new name for a file
manpage: linkat(2)
"""
self._validate()
with oldpath.borrow(self.task) as oldpath_n:
with newpath.borrow(self.task) as newpath_n:
try:
await _linkat(self.task.sysif, self.near, oldpath_n, _get_near(newdirfd), newpath_n, flags)
except OSError as exn:
exn.filename = oldpath.value
exn.filename2 = (newdirfd, newpath.value) if newdirfd else newpath.value
raise
async def renameat(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]],
newdirfd: t.Optional[FSFileDescriptor],
newpath: WrittenPointer[t.Union[str, os.PathLike]],
flags: RENAME=RENAME.NONE) -> None:
"""change the name or location of a file
manpage: renameat2(2)
"""
self._validate()
with oldpath.borrow(self.task) as oldpath_n:
with newpath.borrow(self.task) as newpath_n:
try:
await _renameat2(self.task.sysif, self.near, oldpath_n, _get_near(newdirfd), newpath_n, flags)
except OSError as exn:
exn.filename = oldpath.value
exn.filename2 = (newdirfd, newpath.value) if newdirfd else newpath.value
raise
async def symlinkat(self, target: WrittenPointer[t.Union[str, os.PathLike]],
linkpath: WrittenPointer[t.Union[str, os.PathLike]]) -> None:
"""make a new name for a file
Note that `self` controls where the link is created, not the target of the link; `self` is
the `newdirfd` argument to symlinkat.
manpage: symlinkat(2)
"""
self._validate()
with target.borrow(self.task) as target_n:
with linkpath.borrow(self.task) as linkpath_n:
try:
await _symlinkat(self.task.sysif, target_n, self.near, linkpath_n)
except OSError as exn:
exn.filename = target.value
exn.filename2 = (self, linkpath.value)
raise
async def fchmod(self, mode: int) -> None:
"""change permissions of a file
manpage: fchmod(2)
"""
self._validate()
await _fchmod(self.task.sysif, self.near, mode)
async def ftruncate(self, length: int) -> None:
"""truncate a file to a specified length
manpage: ftruncate(2)
"""
self._validate()
await _ftruncate(self.task.sysif, self.near, length)
# oldfd has to be a valid file descriptor. newfd is not, technically, required to be
# open, but that's the best practice for avoiding races, so we require it anyway here.
async def dup3(self, newfd: T_fd, flags: int) -> T_fd:
self._validate()
if not newfd.is_only_handle():
raise Exception("can't dup over newfd", newfd, "there are more handles to it than just ours")
if self.near == newfd.near:
# dup3 fails if newfd == oldfd. I guess I'll just work around that.
return newfd
await _dup3(self.task.sysif, self.near, newfd.near, flags)
# newfd is left as a valid pointer to the new file descriptor
return newfd
async def dup2(self, newfd: T_fd) -> T_fd:
"""duplicate a file descriptor
manpage: dup(2)
"""
return await self.dup3(newfd, 0)
class FSTask(FileDescriptorTask[T_fd]):
async def readlink(self, path: WrittenPointer[t.Union[str, os.PathLike]],
buf: Pointer) -> t.Tuple[ReadablePointer, Pointer]:
with path.borrow(self) as path_n:
with buf.borrow(self) as buf_n:
ret = await _readlinkat(self.sysif, None, path_n, buf_n, buf.size())
return buf.readable_split(ret)
async def access(self, path: WrittenPointer[t.Union[str, os.PathLike]], mode: int, flags: int=0) -> None:
with path.borrow(self) as path_n:
try:
await _faccessat(self.sysif, None, path_n, mode, flags)
except FileNotFoundError as exn:
exn.filename = path.value
raise
async def open(self, path: WrittenPointer[t.Union[str, os.PathLike]], flags: O, mode=0o644) -> T_fd:
with path.borrow(self) as path_n:
try:
fd = await _openat(self.sysif, None, path_n, flags|O.CLOEXEC, mode)
except FileNotFoundError as exn:
exn.filename = path.value
raise
return self.make_fd_handle(fd)
async def mkdir(self, path: WrittenPointer[t.Union[str, os.PathLike]], mode=0o755) -> None:
with path.borrow(self) as path_n:
await _mkdirat(self.sysif, None, path_n, mode)
async def unlink(self, path: WrittenPointer[t.Union[str, os.PathLike]]) -> None:
with path.borrow(self) as path_n:
await _unlinkat(self.sysif, None, path_n, 0)
async def rmdir(self, path: WrittenPointer[t.Union[str, os.PathLike]]) -> None:
with path.borrow(self) as path_n:
await _unlinkat(self.sysif, None, path_n, AT.REMOVEDIR)
async def link(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]],
newpath: WrittenPointer[t.Union[str, os.PathLike]]) -> None:
with oldpath.borrow(self) as oldpath_n:
with newpath.borrow(self) as newpath_n:
await _linkat(self.sysif, None, oldpath_n, None, newpath_n, 0)
async def linkat(self,
olddirfd: t.Optional[FSFileDescriptor],
oldpath: WrittenPointer[t.Union[str, os.PathLike]],
newdirfd: t.Optional[FSFileDescriptor],
newpath: WrittenPointer[t.Union[str, os.PathLike]],
flags: AT=AT.NONE) -> None:
"""make a new name for a file
See also `FSFileDescriptor.linkat`.
manpage: linkat(2)
"""
with oldpath.borrow(self) as oldpath_n:
with newpath.borrow(self) as newpath_n:
try:
await _linkat(self.sysif, _get_near(olddirfd), oldpath_n, _get_near(newdirfd), newpath_n, flags)
except OSError as exn:
exn.filename = (olddirfd, oldpath.value) if olddirfd else oldpath.value
exn.filename2 = (newdirfd, newpath.value) if newdirfd else newpath.value
raise
async def rename(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]],
newpath: WrittenPointer[t.Union[str, os.PathLike]]) -> None:
with oldpath.borrow(self) as oldpath_n:
with newpath.borrow(self) as newpath_n:
await _renameat2(self.sysif, None, oldpath_n, None, newpath_n, 0)
async def symlink(self, target: WrittenPointer[t.Union[str, os.PathLike]],
linkpath: WrittenPointer[t.Union[str, os.PathLike]]) -> None:
with target.borrow(self) as target_n:
with linkpath.borrow(self) as linkpath_n:
await _symlinkat(self.sysif, target_n, None, linkpath_n)
#### Raw syscalls ####
import rsyscall.near.types as near
from rsyscall.near.sysif import SyscallInterface
from rsyscall.sys.syscall import SYS
async def _readlinkat(sysif: SyscallInterface, dirfd: t.Optional[near.FileDescriptor],
path: near.Address, buf: near.Address, bufsiz: int) -> int:
if dirfd is None:
dirfd = AT.FDCWD # type: ignore
return (await sysif.syscall(SYS.readlinkat, dirfd, path, buf, bufsiz))
async def _faccessat(sysif: SyscallInterface, dirfd: t.Optional[near.FileDescriptor],
path: near.Address, flags: int, mode: int) -> None:
if dirfd is None:
dirfd = AT.FDCWD # type: ignore
await sysif.syscall(SYS.faccessat, dirfd, path, flags, mode)
async def _openat(sysif: SyscallInterface, dirfd: t.Optional[near.FileDescriptor],
path: near.Address, flags: int, mode: int) -> near.FileDescriptor:
if dirfd is None:
dirfd = AT.FDCWD # type: ignore
return near.FileDescriptor(await sysif.syscall(SYS.openat, dirfd, path, flags, mode))
async def _fchmod(sysif: SyscallInterface, fd: near.FileDescriptor, mode: int) -> None:
await sysif.syscall(SYS.fchmod, fd, mode)
async def _ftruncate(sysif: SyscallInterface, fd: near.FileDescriptor, length: int) -> None:
await sysif.syscall(SYS.ftruncate, fd, length)
async def _mkdirat(sysif: SyscallInterface,
dirfd: t.Optional[near.FileDescriptor], path: near.Address, mode: int) -> None:
if dirfd is None:
dirfd = AT.FDCWD # type: ignore
await sysif.syscall(SYS.mkdirat, dirfd, path, mode)
async def _unlinkat(sysif: SyscallInterface,
dirfd: t.Optional[near.FileDescriptor], path: near.Address, flags: int) -> None:
if dirfd is None:
dirfd = AT.FDCWD # type: ignore
await sysif.syscall(SYS.unlinkat, dirfd, path, flags)
async def _linkat(sysif: SyscallInterface,
olddirfd: t.Optional[near.FileDescriptor], oldpath: near.Address,
newdirfd: t.Optional[near.FileDescriptor], newpath: near.Address,
flags: int) -> None:
if olddirfd is None:
olddirfd = AT.FDCWD # type: ignore
if newdirfd is None:
newdirfd = AT.FDCWD # type: ignore
await sysif.syscall(SYS.linkat, olddirfd, oldpath, newdirfd, newpath, flags)
async def _renameat2(sysif: SyscallInterface,
olddirfd: t.Optional[near.FileDescriptor], oldpath: near.Address,
newdirfd: t.Optional[near.FileDescriptor], newpath: near.Address,
flags: int) -> None:
if olddirfd is None:
olddirfd = AT.FDCWD # type: ignore
if newdirfd is None:
newdirfd = AT.FDCWD # type: ignore
await sysif.syscall(SYS.renameat2, olddirfd, oldpath, newdirfd, newpath, flags)
async def _symlinkat(sysif: SyscallInterface,
target: near.Address, newdirfd: t.Optional[near.FileDescriptor], linkpath: near.Address) -> None:
if newdirfd is None:
newdirfd = AT.FDCWD # type: ignore
await sysif.syscall(SYS.symlinkat, target, newdirfd, linkpath)
async def _dup3(sysif: SyscallInterface,
oldfd: near.FileDescriptor, newfd: near.FileDescriptor, flags: int) -> near.FileDescriptor:
return near.FileDescriptor(await sysif.syscall(SYS.dup3, oldfd, newfd, flags))
Sub-modules
rsyscall.unistd.credentials
-
Named for the excellent manpage about these process attributes, credentials(7)
rsyscall.unistd.cwd
rsyscall.unistd.exec
rsyscall.unistd.io
-
The subset of functionality in unistd.h which relates to IO
rsyscall.unistd.pipe
Classes
class SEEK (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
The whence argument to lseek.
Expand source code Browse git
class SEEK(enum.IntEnum): "The whence argument to lseek." SET = lib.SEEK_SET CUR = lib.SEEK_CUR END = lib.SEEK_END DATA = lib.SEEK_DATA HOLE = lib.SEEK_HOLE
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var SET
var CUR
var END
var DATA
var HOLE
class OK (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
The mode argument to access, faccessat.
Expand source code Browse git
class OK(enum.IntFlag): "The mode argument to access, faccessat." R = lib.R_OK W = lib.W_OK X = lib.X_OK F = lib.F_OK
Ancestors
- enum.IntFlag
- builtins.int
- enum.Flag
- enum.Enum
Class variables
var R
var W
var X
var F
class ArgList (*args, **kwargs)
-
A null-terminated list of null-terminated strings, as passed to execve.
Expand source code Browse git
class ArgList(t.List[WrittenPointer[t.Union[str, os.PathLike]]], FixedSerializer): "A null-terminated list of null-terminated strings, as passed to execve." @classmethod def get_serializer(cls, task: Task) -> Serializer[T_arglist]: return ArgListSerializer()
Ancestors
- builtins.list
- typing.Generic
- FixedSerializer
- HasSerializer
Inherited members
class Pipe (read: FileDescriptor, write: FileDescriptor)
-
A pair of file descriptors, as written by pipe.
Expand source code Browse git
@dataclass class Pipe(FixedSize): "A pair of file descriptors, as written by pipe." read: FileDescriptor write: FileDescriptor def __getitem__(self, idx: int) -> FileDescriptor: if idx == 0: return self.read elif idx == 1: return self.write else: raise IndexError("only index 0 or 1 are valid for Pipe:", idx) def __iter__(self) -> t.Iterable[FileDescriptor]: return iter([self.read, self.write]) @classmethod def sizeof(cls) -> int: return ffi.sizeof('struct fdpair') @classmethod def get_serializer(cls: t.Type[T_pipe], task: FileDescriptorTask[FileDescriptor]) -> Serializer[T_pipe]: return PipeSerializer(cls, task)
Ancestors
Class variables
var read : FileDescriptor
var write : FileDescriptor
Inherited members
class FSFileDescriptor (task: FileDescriptorTask, near: FileDescriptor, valid: bool)
-
A file descriptor accessed through some
Task
This is an rsyscall-internal base class, which other
FileDescriptor
objects inherit from for core lifecycle methods. SeeFileDescriptor
for more information.Expand source code Browse git
class FSFileDescriptor(BaseFileDescriptor): async def readlinkat(self, path: WrittenPointer[t.Union[str, os.PathLike]], buf: Pointer) -> t.Tuple[ReadablePointer, Pointer]: """read value of a symbolic link manpage: readlinkat(2) """ self._validate() with path.borrow(self.task): with buf.borrow(self.task): ret = await _readlinkat(self.task.sysif, self.near, path.near, buf.near, buf.size()) return buf.readable_split(ret) async def faccessat(self, ptr: WrittenPointer[t.Union[str, os.PathLike]], mode: OK, flags: AT=AT.NONE) -> None: """check user's permissions for a file manpage: faccessat(2) """ self._validate() with ptr.borrow(self.task): await _faccessat(self.task.sysif, self.near, ptr.near, mode, flags) async def openat(self: T_fd, path: WrittenPointer[t.Union[str, os.PathLike]], flags: O, mode=0o644) -> T_fd: """open and possibly create a file manpage: openat(2) """ self._validate() with path.borrow(self.task) as path_n: try: fd = await _openat(self.task.sysif, self.near, path_n, flags|O.CLOEXEC, mode) except OSError as exn: exn.filename = path.value raise return self.task.make_fd_handle(fd) async def mkdirat(self, path: WrittenPointer[t.Union[str, os.PathLike]], mode=0o755) -> None: """create a directory manpage: mkdirat(2) """ self._validate() with path.borrow(self.task) as path_n: try: await _mkdirat(self.task.sysif, self.near, path_n, mode) except OSError as exn: exn.filename = path.value raise async def unlinkat(self, path: WrittenPointer[t.Union[str, os.PathLike]], flags: AT=AT.NONE) -> None: """delete a name and possibly the file it refers to manpage: unlinkat(2) """ with path.borrow(self.task) as path_n: try: await _unlinkat(self.task.sysif, self.near, path_n, flags) except OSError as exn: exn.filename = path.value raise async def rmdirat(self, path: WrittenPointer[t.Union[str, os.PathLike]]) -> None: """delete a directory manpage: unlinkat(2) """ await self.unlinkat(path, AT.REMOVEDIR) async def linkat(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]], newdirfd: t.Optional[FSFileDescriptor], newpath: WrittenPointer[t.Union[str, os.PathLike]], flags: AT=AT.NONE) -> None: """make a new name for a file manpage: linkat(2) """ self._validate() with oldpath.borrow(self.task) as oldpath_n: with newpath.borrow(self.task) as newpath_n: try: await _linkat(self.task.sysif, self.near, oldpath_n, _get_near(newdirfd), newpath_n, flags) except OSError as exn: exn.filename = oldpath.value exn.filename2 = (newdirfd, newpath.value) if newdirfd else newpath.value raise async def renameat(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]], newdirfd: t.Optional[FSFileDescriptor], newpath: WrittenPointer[t.Union[str, os.PathLike]], flags: RENAME=RENAME.NONE) -> None: """change the name or location of a file manpage: renameat2(2) """ self._validate() with oldpath.borrow(self.task) as oldpath_n: with newpath.borrow(self.task) as newpath_n: try: await _renameat2(self.task.sysif, self.near, oldpath_n, _get_near(newdirfd), newpath_n, flags) except OSError as exn: exn.filename = oldpath.value exn.filename2 = (newdirfd, newpath.value) if newdirfd else newpath.value raise async def symlinkat(self, target: WrittenPointer[t.Union[str, os.PathLike]], linkpath: WrittenPointer[t.Union[str, os.PathLike]]) -> None: """make a new name for a file Note that `self` controls where the link is created, not the target of the link; `self` is the `newdirfd` argument to symlinkat. manpage: symlinkat(2) """ self._validate() with target.borrow(self.task) as target_n: with linkpath.borrow(self.task) as linkpath_n: try: await _symlinkat(self.task.sysif, target_n, self.near, linkpath_n) except OSError as exn: exn.filename = target.value exn.filename2 = (self, linkpath.value) raise async def fchmod(self, mode: int) -> None: """change permissions of a file manpage: fchmod(2) """ self._validate() await _fchmod(self.task.sysif, self.near, mode) async def ftruncate(self, length: int) -> None: """truncate a file to a specified length manpage: ftruncate(2) """ self._validate() await _ftruncate(self.task.sysif, self.near, length) # oldfd has to be a valid file descriptor. newfd is not, technically, required to be # open, but that's the best practice for avoiding races, so we require it anyway here. async def dup3(self, newfd: T_fd, flags: int) -> T_fd: self._validate() if not newfd.is_only_handle(): raise Exception("can't dup over newfd", newfd, "there are more handles to it than just ours") if self.near == newfd.near: # dup3 fails if newfd == oldfd. I guess I'll just work around that. return newfd await _dup3(self.task.sysif, self.near, newfd.near, flags) # newfd is left as a valid pointer to the new file descriptor return newfd async def dup2(self, newfd: T_fd) -> T_fd: """duplicate a file descriptor manpage: dup(2) """ return await self.dup3(newfd, 0)
Ancestors
Subclasses
Methods
async def readlinkat(self, path: WrittenPointer[t.Union[str, os.PathLike]], buf: Pointer) ‑> Tuple[ReadablePointer, Pointer]
-
read value of a symbolic link
manpage: readlinkat(2)
Expand source code Browse git
async def readlinkat(self, path: WrittenPointer[t.Union[str, os.PathLike]], buf: Pointer) -> t.Tuple[ReadablePointer, Pointer]: """read value of a symbolic link manpage: readlinkat(2) """ self._validate() with path.borrow(self.task): with buf.borrow(self.task): ret = await _readlinkat(self.task.sysif, self.near, path.near, buf.near, buf.size()) return buf.readable_split(ret)
async def faccessat(self, ptr: WrittenPointer[t.Union[str, os.PathLike]], mode: OK, flags: AT = AT.NONE) ‑> NoneType
-
check user's permissions for a file
manpage: faccessat(2)
Expand source code Browse git
async def faccessat(self, ptr: WrittenPointer[t.Union[str, os.PathLike]], mode: OK, flags: AT=AT.NONE) -> None: """check user's permissions for a file manpage: faccessat(2) """ self._validate() with ptr.borrow(self.task): await _faccessat(self.task.sysif, self.near, ptr.near, mode, flags)
async def openat(self: T_fd, path: WrittenPointer[t.Union[str, os.PathLike]], flags: O, mode=420) ‑> ~T_fd
-
open and possibly create a file
manpage: openat(2)
Expand source code Browse git
async def openat(self: T_fd, path: WrittenPointer[t.Union[str, os.PathLike]], flags: O, mode=0o644) -> T_fd: """open and possibly create a file manpage: openat(2) """ self._validate() with path.borrow(self.task) as path_n: try: fd = await _openat(self.task.sysif, self.near, path_n, flags|O.CLOEXEC, mode) except OSError as exn: exn.filename = path.value raise return self.task.make_fd_handle(fd)
async def mkdirat(self, path: WrittenPointer[t.Union[str, os.PathLike]], mode=493) ‑> NoneType
-
create a directory
manpage: mkdirat(2)
Expand source code Browse git
async def mkdirat(self, path: WrittenPointer[t.Union[str, os.PathLike]], mode=0o755) -> None: """create a directory manpage: mkdirat(2) """ self._validate() with path.borrow(self.task) as path_n: try: await _mkdirat(self.task.sysif, self.near, path_n, mode) except OSError as exn: exn.filename = path.value raise
async def unlinkat(self, path: WrittenPointer[t.Union[str, os.PathLike]], flags: AT = AT.NONE) ‑> NoneType
-
delete a name and possibly the file it refers to
manpage: unlinkat(2)
Expand source code Browse git
async def unlinkat(self, path: WrittenPointer[t.Union[str, os.PathLike]], flags: AT=AT.NONE) -> None: """delete a name and possibly the file it refers to manpage: unlinkat(2) """ with path.borrow(self.task) as path_n: try: await _unlinkat(self.task.sysif, self.near, path_n, flags) except OSError as exn: exn.filename = path.value raise
async def rmdirat(self, path: WrittenPointer[t.Union[str, os.PathLike]]) ‑> NoneType
-
delete a directory
manpage: unlinkat(2)
Expand source code Browse git
async def rmdirat(self, path: WrittenPointer[t.Union[str, os.PathLike]]) -> None: """delete a directory manpage: unlinkat(2) """ await self.unlinkat(path, AT.REMOVEDIR)
async def linkat(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]], newdirfd: t.Optional[FSFileDescriptor], newpath: WrittenPointer[t.Union[str, os.PathLike]], flags: AT = AT.NONE) ‑> NoneType
-
make a new name for a file
manpage: linkat(2)
Expand source code Browse git
async def linkat(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]], newdirfd: t.Optional[FSFileDescriptor], newpath: WrittenPointer[t.Union[str, os.PathLike]], flags: AT=AT.NONE) -> None: """make a new name for a file manpage: linkat(2) """ self._validate() with oldpath.borrow(self.task) as oldpath_n: with newpath.borrow(self.task) as newpath_n: try: await _linkat(self.task.sysif, self.near, oldpath_n, _get_near(newdirfd), newpath_n, flags) except OSError as exn: exn.filename = oldpath.value exn.filename2 = (newdirfd, newpath.value) if newdirfd else newpath.value raise
async def renameat(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]], newdirfd: t.Optional[FSFileDescriptor], newpath: WrittenPointer[t.Union[str, os.PathLike]], flags: RENAME = RENAME.NONE) ‑> NoneType
-
change the name or location of a file
manpage: renameat2(2)
Expand source code Browse git
async def renameat(self, oldpath: WrittenPointer[t.Union[str, os.PathLike]], newdirfd: t.Optional[FSFileDescriptor], newpath: WrittenPointer[t.Union[str, os.PathLike]], flags: RENAME=RENAME.NONE) -> None: """change the name or location of a file manpage: renameat2(2) """ self._validate() with oldpath.borrow(self.task) as oldpath_n: with newpath.borrow(self.task) as newpath_n: try: await _renameat2(self.task.sysif, self.near, oldpath_n, _get_near(newdirfd), newpath_n, flags) except OSError as exn: exn.filename = oldpath.value exn.filename2 = (newdirfd, newpath.value) if newdirfd else newpath.value raise
async def symlinkat(self, target: WrittenPointer[t.Union[str, os.PathLike]], linkpath: WrittenPointer[t.Union[str, os.PathLike]]) ‑> NoneType
-
make a new name for a file
Note that
self
controls where the link is created, not the target of the link;self
is thenewdirfd
argument to symlinkat.manpage: symlinkat(2)
Expand source code Browse git
async def symlinkat(self, target: WrittenPointer[t.Union[str, os.PathLike]], linkpath: WrittenPointer[t.Union[str, os.PathLike]]) -> None: """make a new name for a file Note that `self` controls where the link is created, not the target of the link; `self` is the `newdirfd` argument to symlinkat. manpage: symlinkat(2) """ self._validate() with target.borrow(self.task) as target_n: with linkpath.borrow(self.task) as linkpath_n: try: await _symlinkat(self.task.sysif, target_n, self.near, linkpath_n) except OSError as exn: exn.filename = target.value exn.filename2 = (self, linkpath.value) raise
async def fchmod(self, mode: int) ‑> NoneType
-
change permissions of a file
manpage: fchmod(2)
Expand source code Browse git
async def fchmod(self, mode: int) -> None: """change permissions of a file manpage: fchmod(2) """ self._validate() await _fchmod(self.task.sysif, self.near, mode)
async def ftruncate(self, length: int) ‑> NoneType
-
truncate a file to a specified length
manpage: ftruncate(2)
Expand source code Browse git
async def ftruncate(self, length: int) -> None: """truncate a file to a specified length manpage: ftruncate(2) """ self._validate() await _ftruncate(self.task.sysif, self.near, length)
async def dup3(self, newfd: T_fd, flags: int) ‑> ~T_fd
-
Expand source code Browse git
async def dup3(self, newfd: T_fd, flags: int) -> T_fd: self._validate() if not newfd.is_only_handle(): raise Exception("can't dup over newfd", newfd, "there are more handles to it than just ours") if self.near == newfd.near: # dup3 fails if newfd == oldfd. I guess I'll just work around that. return newfd await _dup3(self.task.sysif, self.near, newfd.near, flags) # newfd is left as a valid pointer to the new file descriptor return newfd
async def dup2(self, newfd: T_fd) ‑> ~T_fd
-
duplicate a file descriptor
manpage: dup(2)
Expand source code Browse git
async def dup2(self, newfd: T_fd) -> T_fd: """duplicate a file descriptor manpage: dup(2) """ return await self.dup3(newfd, 0)
Inherited members