from __future__ import annotations
import os
import re
import requests as _requests
from .exceptions import (
AuthenticationError,
ConnectionError,
ScriptRuntimeError,
ScriptSyntaxError,
TimeoutError,
)
from ._result import ExecutionResult
_TOKEN_FILE = os.path.expanduser("~/.daz3d/dazscriptserver_token.txt")
_DEFAULT_HOST = "127.0.0.1"
_DEFAULT_PORT = 18811
def _load_token() -> str:
if os.path.exists(_TOKEN_FILE):
with open(_TOKEN_FILE) as f:
return f.read().strip()
return ""
def _map_response(resp: _requests.Response, script: str = "") -> ExecutionResult:
status = resp.status_code
if status == 401 or status == 403:
raise AuthenticationError(f"HTTP {status}: {resp.text[:200]}")
data = resp.json()
request_id = data.get("request_id", "")
if not data.get("success", True):
error_msg = data.get("error", "Script failed")
# SyntaxError comes from the parser; runtime errors (TypeError, ReferenceError,
# Error, etc.) also include "Line N:" but never say "SyntaxError" explicitly.
if "SyntaxError" in error_msg:
raise ScriptSyntaxError(error_msg, script=script, request_id=request_id)
raise ScriptRuntimeError(error_msg, script=script, request_id=request_id)
return ExecutionResult(
value=data.get("result"),
output=data.get("output", []),
request_id=request_id,
success=True,
error="",
duration_ms=data.get("duration_ms", 0.0),
)
[docs]
class DazClient:
"""HTTP client for the DAZ Studio Script Server.
Handles authentication, request serialisation, and response mapping for
all server endpoints. The token is loaded automatically from
``~/.daz3d/dazscriptserver_token.txt`` when *token* is ``None``.
Args:
host: Hostname or IP address of the Script Server.
port: Listening port of the Script Server.
token: API token. Pass an empty string to disable authentication or
``None`` to auto-load from the default token file.
timeout: Per-request HTTP timeout in seconds.
Example::
client = DazClient() # default 127.0.0.1:18811
client = DazClient(token="my-secret-token") # explicit token
"""
def __init__(
self,
host: str = _DEFAULT_HOST,
port: int = _DEFAULT_PORT,
token: str | None = None,
timeout: float = 30.0,
):
self._base = f"http://{host}:{port}"
self._token = token if token is not None else _load_token()
self._timeout = timeout
@property
def _headers(self) -> dict:
h = {}
if self._token:
h["X-API-Token"] = self._token
return h
def _post(self, path: str, payload: dict) -> _requests.Response:
try:
return _requests.post(
f"{self._base}{path}",
json=payload,
headers=self._headers,
timeout=self._timeout,
)
except _requests.exceptions.ConnectionError as e:
raise ConnectionError(f"Cannot reach DAZ Studio at {self._base}: {e}") from e
except _requests.exceptions.Timeout as e:
raise TimeoutError(f"Request timed out after {self._timeout}s") from e
def _get(self, path: str, params: dict | None = None) -> _requests.Response:
try:
return _requests.get(
f"{self._base}{path}",
headers=self._headers,
params=params,
timeout=self._timeout,
)
except _requests.exceptions.ConnectionError as e:
raise ConnectionError(f"Cannot reach DAZ Studio at {self._base}: {e}") from e
except _requests.exceptions.Timeout as e:
raise TimeoutError(f"Request timed out after {self._timeout}s") from e
[docs]
def execute(self, script: str, args: object = None) -> ExecutionResult:
"""Execute a DazScript string synchronously.
Args:
script: DazScript source code to execute.
args: Optional value passed into the script as ``getArguments()[0]``.
Must be JSON-serialisable.
Returns:
The execution result containing the script return value and any
console output.
Raises:
ConnectionError: If the server cannot be reached.
AuthenticationError: If the token is invalid or the IP is blocked.
ScriptSyntaxError: If the script contains a parse error.
ScriptRuntimeError: If the script raises a runtime exception.
TimeoutError: If the request exceeds *timeout* seconds.
"""
payload: dict = {"script": script}
if args is not None:
payload["args"] = args
resp = self._post("/execute", payload)
return _map_response(resp, script=script)
[docs]
def execute_file(self, script_file: str, args: object = None) -> ExecutionResult:
"""Execute a ``.dsa`` script file that resides on the DAZ Studio host.
Args:
script_file: Absolute path to the ``.dsa`` file on the server host.
args: Optional argument passed to the script.
Returns:
The execution result.
Raises:
ConnectionError: If the server cannot be reached.
AuthenticationError: On auth failure.
ScriptSyntaxError: On parse error.
ScriptRuntimeError: On runtime error.
TimeoutError: On HTTP timeout.
"""
payload: dict = {"scriptFile": script_file}
if args is not None:
payload["args"] = args
resp = self._post("/execute", payload)
return _map_response(resp)
[docs]
def execute_async_submit(self, script: str, args: object = None) -> str:
"""Submit a script for asynchronous execution and return immediately.
Args:
script: DazScript source code.
args: Optional argument for the script.
Returns:
The server-assigned ``request_id`` string. Use it with
:meth:`get_request_status` or :meth:`get_request_result` to poll
for the outcome.
Raises:
ConnectionError: If the server cannot be reached.
AuthenticationError: On auth failure.
"""
payload: dict = {"script": script}
if args is not None:
payload["args"] = args
resp = self._post("/execute/async", payload)
if resp.status_code in (401, 403):
raise AuthenticationError(f"HTTP {resp.status_code}")
return resp.json().get("request_id", "")
[docs]
def get_request_status(self, request_id: str) -> dict:
"""Return the current status of an async request.
Args:
request_id: The ID returned by :meth:`execute_async_submit`.
Returns:
A dict with at least a ``"status"`` key. Possible values:
``"queued"``, ``"running"``, ``"completed"``, ``"failed"``,
``"cancelled"``, or ``"not_found"``.
"""
resp = self._get(f"/requests/{request_id}/status")
if resp.status_code == 404:
return {"status": "not_found"}
return resp.json()
[docs]
def get_request_result(self, request_id: str, wait: bool = False, wait_timeout: int = 30) -> dict:
"""Fetch the result of a completed async request.
Args:
request_id: The ID returned by :meth:`execute_async_submit`.
wait: If ``True``, the server will long-poll until the request
completes or *wait_timeout* is reached.
wait_timeout: Maximum number of seconds the server should wait
before returning (only relevant when *wait* is ``True``).
Returns:
A dict containing ``success``, ``result``, ``output``, ``error``,
``duration_ms``, and ``status`` keys.
"""
params = {}
if wait:
params["wait"] = "true"
params["timeout"] = str(wait_timeout)
resp = self._get(f"/requests/{request_id}/result", params=params or None)
if resp.status_code == 404:
return {"status": "not_found"}
return resp.json()
[docs]
def cancel_request(self, request_id: str) -> bool:
"""Cancel a queued or running async request.
Args:
request_id: The ID returned by :meth:`execute_async_submit`.
Returns:
``True`` if the server confirmed cancellation, ``False`` otherwise.
"""
try:
resp = _requests.delete(
f"{self._base}/requests/{request_id}",
headers=self._headers,
timeout=self._timeout,
)
return resp.status_code == 200
except _requests.exceptions.RequestException:
return False
[docs]
def status(self) -> dict:
"""Return the server status dict from ``GET /status``."""
return self._get("/status").json()
[docs]
def health(self) -> dict:
"""Return the health check dict from ``GET /health``."""
return self._get("/health").json()
[docs]
def metrics(self) -> dict:
"""Return the metrics dict from ``GET /metrics``."""
return self._get("/metrics").json()