feat: add message queue class

This commit is contained in:
Asger Gitz-Johansen 2024-10-06 19:45:18 +02:00
parent 30974f8098
commit 9070db4a14
5 changed files with 75 additions and 4 deletions

View File

@ -10,11 +10,10 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"loguru",
"posix-ipc",
]
[project.scripts]
scih = "scih.main:main"
scii = "scii.main:main"
[project.optional-dependencies]
build = [

View File

@ -1,2 +0,0 @@
def hello() -> str:
return "Hello from scii!"

4
src/scii/__main__.py Normal file
View File

@ -0,0 +1,4 @@
from scii.main import main
if __name__ == "__main__":
main()

12
src/scii/main.py Normal file
View File

@ -0,0 +1,12 @@
from loguru import logger
from scii.mq import MessageQueue
logger.catch
def main():
logger.info("welcome to the SCI Interface")
with MessageQueue(rx="/sci_tx", tx="/sci_rx") as qu:
while True:
r = qu.receive()
if r is None:
break

58
src/scii/mq.py Normal file
View File

@ -0,0 +1,58 @@
from ctypes import cdll
from ctypes import create_string_buffer
from loguru import logger
libc = cdll.LoadLibrary("libc.so.6")
class MessageQueue:
"""A simple abstraction over mq_* libc functions."""
def __init__(self, rx: str, tx: str) -> None:
self._rx = rx
self._tx = tx
self._rx_queue: int | None = None
self._tx_queue: int | None = None
def __enter__(self) -> "MessageQueue":
self._rx_queue = libc.mq_open(bytes(self._rx, "utf-8"), 0, 666, None)
if not self._is_rx_valid():
libc.perror(b"mq_open")
logger.error("bad rx open")
self._tx_queue = libc.mq_open(bytes(self._tx, "utf-8"), 1, 666, None)
if not self._is_tx_valid():
libc.perror(b"mq_open")
logger.error("bad tx open")
return self
def __exit__(self, *_) -> None:
if self._is_rx_valid():
libc.mq_close(self._rx_queue)
if self._is_tx_valid():
libc.mq_close(self._tx_queue)
def send(self, msg: str) -> None:
if not self._is_tx_valid():
raise RuntimeError(f"tx queue is invalid {self._tx_queue}")
res = libc.mq_send(self._tx_queue, bytes(msg, "utf-8"), len(msg), 1)
if res == -1:
libc.perror(b"mq_send")
logger.error("bad send")
def receive(self) -> str | None:
if not self._is_rx_valid():
raise RuntimeError(f"rx queue is invalid {self._rx_queue}")
result = create_string_buffer(b'\0' * 32)
res = libc.mq_receive(self._rx_queue, result, 8192, None)
if res == -1:
libc.perror(b"mq_receive")
logger.error("bad receive")
return None
return str(result.value)
def _is_rx_valid(self) -> bool:
return self._rx_queue is not None and self._rx_queue != -1
def _is_tx_valid(self) -> bool:
return self._tx_queue is not None and self._tx_queue != -1