wip: feat: implement things as a textual app

This is a bit more complex, but it seems like it should work
This commit is contained in:
Asger Gitz-Johansen 2024-10-12 11:31:13 +02:00
parent 5bcbfa3b6d
commit 7f66c8e97a
7 changed files with 300 additions and 23 deletions

View File

@ -0,0 +1,21 @@
# SCI Interface
The de-facto TUI interface for [sci](https://git.gtz.dk/agj/sci).
Note that this application doesn't work unless an instance on `sci` is already running on the same machine you're using.
## Screenshots
WIP
## Install
`scii` is available from PyPi:
```
python3 -m pip install scii
```
## Development
This project is built using the usual python virtual-environment setup:
```sh
python3 -m venv venv
source venv/bin/activate
python3 -m build
```

View File

@ -10,6 +10,7 @@ readme = "README.md"
requires-python = ">=3.12" requires-python = ">=3.12"
dependencies = [ dependencies = [
"loguru", "loguru",
"textual"
] ]
[project.scripts] [project.scripts]

View File

@ -2,24 +2,14 @@ import sys
from loguru import logger from loguru import logger
from scii.mq import MessageQueue from scii.tui.tuiapp import TuiApp
logger.catch logger.catch
def main():
setup_logging("TRACE", True)
logger.info("welcome to the SCI Interface")
with MessageQueue(rx="/sci_tx", tx="/sci_rx") as queue:
while True:
r = queue.receive()
if r is None:
break
logger.info("msg: {}", r)
queue.send("list")
r = queue.receive() def main():
if r is None: logger.info("welcome to the SCI Interface")
break TuiApp().setup_logging("TRACE", True).run()
logger.info("list: {}", r)
def setup_logging(level: str, use_colors: bool) -> None: def setup_logging(level: str, use_colors: bool) -> None:

View File

@ -1,9 +1,11 @@
from ctypes import cdll from ctypes import cdll
from ctypes import create_string_buffer from ctypes import create_string_buffer
from typing import Any
from loguru import logger from loguru import logger
libc = cdll.LoadLibrary("libc.so.6") libc = cdll.LoadLibrary("libc.so.6")
MAX_QUEUE_LEN = 8192
class MessageQueue: class MessageQueue:
@ -26,7 +28,7 @@ class MessageQueue:
logger.error("bad tx open") logger.error("bad tx open")
return self return self
def __exit__(self, *_) -> None: def __exit__(self, *_: list[Any]) -> None:
if self._is_rx_valid(): if self._is_rx_valid():
libc.mq_close(self._rx_queue) libc.mq_close(self._rx_queue)
if self._is_tx_valid(): if self._is_tx_valid():
@ -43,13 +45,13 @@ class MessageQueue:
def receive(self) -> str | None: def receive(self) -> str | None:
if not self._is_rx_valid(): if not self._is_rx_valid():
raise RuntimeError(f"rx queue is invalid {self._rx_queue}") raise RuntimeError(f"rx queue is invalid {self._rx_queue}")
result = create_string_buffer(b'\0' * 64) result = create_string_buffer(b"\0" * MAX_QUEUE_LEN)
res = libc.mq_receive(self._rx_queue, result, 8192, None) res = libc.mq_receive(self._rx_queue, result, MAX_QUEUE_LEN, None)
if res == -1: if res == -1:
libc.perror(b"mq_receive") libc.perror(b"mq_receive")
logger.error("bad receive") logger.error("bad receive")
return None return None
return str(result.value) return str(result.value, encoding="utf-8")
def _is_rx_valid(self) -> bool: def _is_rx_valid(self) -> bool:
return self._rx_queue is not None and self._rx_queue != -1 return self._rx_queue is not None and self._rx_queue != -1

119
src/scii/sci_pipelines.py Normal file
View File

@ -0,0 +1,119 @@
import asyncio
from collections.abc import Callable
from typing import NamedTuple
from loguru import logger
from scii.mq import MessageQueue
class PipelineDTO(NamedTuple):
id: str
start_time: str
class _PipelinesDiff(NamedTuple):
additions: dict[str, PipelineDTO]
deletions: dict[str, PipelineDTO]
class SciPipelines:
"""SCI pipelines synchronizer class. Use this to automatically stay up to date."""
def __init__(
self,
pipeline_new_callback: Callable[[PipelineDTO], None],
pipeline_end_callback: Callable[[str, int | None], None],
) -> None:
self._on_new_pipeline = pipeline_new_callback
self._on_end_pipeline = pipeline_end_callback
self._pipelines: dict[str, PipelineDTO] = {}
self._queue = MessageQueue(rx="/sci_tx", tx="/sci_rx")
self._is_running: bool = False
async def async_run(self) -> None:
await asyncio.to_thread(self.run)
async def async_request_list(self) -> None:
await asyncio.to_thread(self.request_list)
def request_list(self) -> None:
if self._is_running:
self._queue.send("list")
def run(self) -> None:
with self._queue:
try:
self._is_running = True
while True:
msg = self._queue.receive()
if msg is None:
logger.warning("did not receive anything")
continue
self._handle_message(msg)
finally:
self._is_running = False
def _handle_message(self, msg: str) -> None:
msg_type = msg.split(" ")[0]
content = msg[len(msg_type) :]
logger.trace("handling sci message {}", msg_type)
match msg_type:
case "list":
pipelines_raw = content.split("\\n")
pipelines = self._from_raw(pipelines_raw)
diff = self._diff(self._pipelines, pipelines)
for key in diff.additions:
self._on_new_pipeline(diff.additions[key])
for key in diff.deletions:
self._on_end_pipeline(key, None)
self._pipelines = pipelines
case "pipeline_new":
pipeline = self._from_raw2(content)
if pipeline is not None:
self._on_new_pipeline(pipeline)
case "pipeline_end":
raw = content.split(" ")
id = raw[0]
return_code = None
try:
return_code = int(raw[2])
except ValueError:
logger.error("bad cast")
self._on_end_pipeline(id, return_code)
case "sci_started":
for k in self._pipelines:
self._on_end_pipeline(k, None)
self._pipelines.clear()
case _:
logger.warning("unknown sci message {}", msg)
def _from_raw(self, raw: list[str]) -> dict[str, PipelineDTO]:
result: dict[str, PipelineDTO] = {}
for x in raw:
if x == "":
continue
dto = self._from_raw2(x)
if dto is not None:
result[dto.id] = dto
return result
def _from_raw2(self, raw: str) -> PipelineDTO | None:
if raw == "":
return None
s = raw.split(" ")
return PipelineDTO(id=s[0], start_time=s[1])
def _diff(self, a: dict[str, PipelineDTO], b: dict[str, PipelineDTO]) -> _PipelinesDiff:
result = _PipelinesDiff(additions={}, deletions={})
for akey in a:
if akey not in b:
result.deletions[akey] = a[akey]
for bkey in b:
if bkey not in a:
result.additions[bkey] = b[bkey]
return result

114
src/scii/tui/tuiapp.py Normal file
View File

@ -0,0 +1,114 @@
from __future__ import annotations
import asyncio
from typing import override
import loguru
from loguru import logger
from textual.app import App
from textual.app import ComposeResult
from textual.containers import ScrollableContainer
from textual.driver import Driver
from textual.types import CSSPathType
from textual.widgets import Footer
from textual.widgets import Header
from textual.widgets import Log
from scii.sci_pipelines import PipelineDTO
from scii.sci_pipelines import SciPipelines
from scii.tui.widgets.pipeline import PipelineWidget
class _WritableLog(loguru.Writable):
def __init__(self, log: Log) -> None:
super().__init__()
self._log = log
@override
def write(self, message: loguru.Message) -> None:
_ = self._log.write(message)
class TuiApp(App[None]):
"""A Textual app to manage sci pipelines."""
BINDINGS = [
("d", "toggle_dark", "Toggle dark mode"),
("l", "toggle_log", "Toggle log"),
("q", "quit", "Quit"),
]
def __init__(
self,
driver_class: type[Driver] | None = None,
css_path: CSSPathType | None = None,
watch_css: bool = False,
ansi_color: bool = False,
):
super().__init__(driver_class, css_path, watch_css, ansi_color)
self._pipelines = SciPipelines(self._on_new_pipeline, self._on_end_pipeline)
self._my_log: Log = Log(id="log")
self._my_log_wrapper = _WritableLog(self._my_log)
def setup_logging(self, level: str, use_colors: bool) -> "TuiApp":
"""Setup logging.
Args:
level: The logging level.
use_colors: Whether to use colors in the log output.
Returns:
self
"""
logger.remove()
_ = logger.add(
self._my_log_wrapper,
colorize=use_colors,
format="{time:HH:mm:ss} <level>{level}</level> <fg #888>{file}:{line}:</fg #888> <bold>{message}</bold>",
level=level
)
return self
def on_mount(self) -> None:
_ = asyncio.create_task(self._pipelines.async_run()) # run the pipelines task in the background
_ = self.set_interval(1, self._pipelines.async_request_list)
@override
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
yield Header()
yield Footer()
yield ScrollableContainer(id="pipelines")
yield self._my_log
@override
def action_toggle_dark(self) -> None:
"""An action to toggle dark mode."""
self.dark = not self.dark
def action_toggle_log(self) -> None:
self._my_log.display = not self._my_log.display
async def action_add_pipeline(self, id: str, start_time: str | None = None) -> None:
"""An action to add a timer."""
if start_time is None:
start_time = "N/A"
new_pipeline = PipelineWidget(id, start_time)
_ = self.query_one("#pipelines").mount(new_pipeline)
async def action_remove_pipeline(self, id: str) -> None:
"""Called to remove a timer."""
containers = self.query_one("#pipelines")
for child in containers.children:
if not isinstance(child, PipelineWidget):
continue
if child.pipeline_id == id:
logger.trace("removing pipeline widget with id {}", id)
await child.remove()
def _on_new_pipeline(self, pipeline: PipelineDTO) -> None:
logger.debug("TODO: add pipeline {} to tui", pipeline)
def _on_end_pipeline(self, id: str, return_code: int | None) -> None:
logger.debug("TODO: remove pipeline {}({}) from tui", id, return_code)

View File

@ -0,0 +1,30 @@
from typing import override
from textual.app import ComposeResult
from textual.widgets import Button
from textual.widgets import Static
class PipelineWidget(Static):
def __init__(self, id: str, start_time: str):
"""Construct a new PipelineWidget.
Args:
id: The pipeline unique identifier.
start_time: The time that the pipeline started.
"""
super().__init__()
self._pipeline_id = id
self._start_time = start_time
@property
def pipeline_id(self) -> str:
"""The id property."""
return self._pipeline_id
@override
def compose(self) -> ComposeResult:
"""Create child widgets of a pipeline."""
yield Button("Stop", id="stop", variant="error")
# yield TimeDisplay("00:00:00.00")