from __future__ import annotations

import asyncio
import base64
import os
import uuid
from dataclasses import dataclass, field
from enum import Enum
import inspect
import logging
from typing import Any, Awaitable, Callable, Dict, Iterable, List, Optional, Union

from planner.dag_parser import DAGParser, PlanGraph, ParseResult
from executor.triggers import PeriodicTrigger, TriggerCondition
from services.tts_client import TTSClient, TTSConfig
from services.fr_client import FRClient, FRConfig
from services.vlm_client import VLMClient, VLMConfig


# ============================================================================
# Navigation State Types
# ============================================================================


class NavState(str, Enum):
    """Navigation state for Edge Proxy integration.

    Tracks the state of an active navigation command.
    """

    ACCEPTED = "accepted"
    NAVIGATING = "navigating"
    ARRIVED = "arrived"
    FAILED = "failed"
    CANCELLED = "cancelled"


@dataclass
class NavigationContext:
    """Context for an active navigation operation.

    Stores state for the current navigation command being executed.
    """

    request_id: str
    destination: str
    speed: str
    status: NavState
    progress: float = 0.0
    error: Optional[str] = None


class ActionExecutionError(RuntimeError):
    pass


ActionHandler = Callable[
    [Dict[str, Any], Optional[Any], Dict[str, "StepOutcome"]],
    Union[Dict[str, Any], Awaitable[Dict[str, Any]]],
]
ReplanCallback = Callable[[Dict[str, Any]], None]


@dataclass
class StepOutcome:
    step_id: str
    action: str
    status: str
    output: Dict[str, Any] = field(default_factory=dict)
    error: Optional[str] = None


@dataclass
class ExecutionReport:
    plan_id: str
    success: bool
    results: Dict[str, StepOutcome]
    errors: List[str] = field(default_factory=list)


class ActionExecutor:
    # Heuristic trigger words used to request dynamic replanning after SCAN_AREA.
    _SCAN_REPLAN_KEYWORDS = (
        "hazard",
        "spill",
        "chemical",
        "vapor",
        "smoke",
        "gas",
        "leak",
        "fire",
        "injury",
        "trapped",
        "unconscious",
        "collapsed",
        "deceased",
        "help",
    )

    def __init__(
        self,
        handlers: Optional[Dict[str, ActionHandler]] = None,
        parser: Optional[DAGParser] = None,
        logger: Optional[logging.Logger] = None,
        stop_on_error: bool = True,
    ) -> None:
        self.parser = parser or DAGParser()
        self.logger = logger or logging.getLogger(__name__)
        self.stop_on_error = stop_on_error
        self._handlers = self._default_handlers()
        if handlers:
            self._handlers.update(handlers)
        self._cancelled = False
        self._cancel_reason: Optional[str] = None

        # Edge Proxy client integration
        self._edge_client: Optional[Any] = None
        self._nav_context: Optional[NavigationContext] = None
        self._robot_location: str = ""
        self._waypoints: Dict[str, Any] = {}

        # Periodic trigger integration
        self._triggers: List[PeriodicTrigger] = []

        # TTS client — endpoint from TTS_ENDPOINT env var or default
        self._tts_client = TTSClient(
            TTSConfig(
                endpoint=os.getenv(
                    "TTS_ENDPOINT", "http://kluster.klass.dev:8200/tts"
                )
            )
        )

        # FR client — endpoint from FR_ENDPOINT env var or default
        self._fr_client = FRClient(
            FRConfig(
                endpoint=os.getenv(
                    "FR_ENDPOINT", "ws://kluster.klass.dev:42067/"
                )
            )
        )

        # VLM client — endpoint / model / key from env vars or defaults
        self._vlm_client = VLMClient(VLMConfig())

        # Dashboard broadcast callback (set by server via register_broadcast_callback)
        self._broadcast_callback: Optional[Callable[[Dict[str, Any]], Awaitable[None]]] = None

    def register_broadcast_callback(
        self, callback: Optional[Callable[[Dict[str, Any]], Awaitable[None]]]
    ) -> None:
        """Register an async callback for broadcasting messages to dashboard clients.

        Args:
            callback: Async callable that accepts a message dict and broadcasts it
                      to all connected dashboard WebSocket clients, or None to clear.
        """
        self._broadcast_callback = callback

    def register_handler(self, action: str, handler: ActionHandler) -> None:
        self._handlers[action] = handler

    def cancel(self, reason: Optional[str] = None) -> None:
        self._cancelled = True
        self._cancel_reason = reason

    def reset_cancel(self) -> None:
        self._cancelled = False
        self._cancel_reason = None

    def register_edge_proxy_client(self, client: Optional[Any]) -> None:
        """Register an Edge Proxy WebSocket client for navigation.

        Args:
            client: EdgeProxyClient instance or None to clear.
        """
        self._edge_client = client

        if client is not None:
            # Register event handlers
            if hasattr(client, "on_nav_status"):
                client.on_nav_status(self._handle_nav_status)
            if hasattr(client, "on_robot_state"):
                client.on_robot_state(self._update_robot_state)
            if hasattr(client, "on_error"):
                client.on_error(self._handle_edge_error)
            if hasattr(client, "on_waypoint_list"):
                client.on_waypoint_list(self._update_waypoint_list)

    def add_trigger(self, trigger: PeriodicTrigger) -> None:
        """Register a periodic trigger.

        Args:
            trigger: PeriodicTrigger instance to add
        """
        self._triggers.append(trigger)

    def clear_triggers(self) -> None:
        """Clear all registered triggers."""
        for trigger in self._triggers:
            trigger.stop()
        self._triggers.clear()

    async def _start_triggers(self) -> None:
        """Start all periodic triggers."""
        for trigger in self._triggers:
            trigger.start()

    async def _stop_triggers(self) -> None:
        """Stop all periodic triggers."""
        for trigger in self._triggers:
            trigger.stop()

    async def _update_trigger_progress(self, progress: float) -> None:
        """Update progress for all triggers.

        Args:
            progress: Progress value from 0.0 to 1.0
        """
        for trigger in self._triggers:
            trigger.update_progress(progress)

    async def _notify_triggers_navigation_complete(self) -> None:
        """Notify all triggers that navigation is complete."""
        for trigger in self._triggers:
            trigger.on_navigation_complete()

    # ========================================================================
    # Edge Proxy Event Handlers
    # ========================================================================

    async def _handle_nav_status(self, msg: Any) -> None:
        """Handle navigation status updates from Edge Proxy.

        Args:
            msg: NavStatusMessage from Edge Proxy.
        """
        if self._nav_context is None:
            return

        msg_request_id = getattr(msg, "request_id", "")
        if msg_request_id and msg_request_id != self._nav_context.request_id:
            return

        # Update context based on status
        status_str = msg.status if hasattr(msg, "status") else ""
        if status_str == "accepted":
            self._nav_context.status = NavState.ACCEPTED
        elif status_str == "navigating":
            self._nav_context.status = NavState.NAVIGATING
            if hasattr(msg, "progress"):
                self._nav_context.progress = msg.progress
                # Update triggers with new progress
                await self._update_trigger_progress(msg.progress)
        elif status_str == "arrived":
            self._nav_context.status = NavState.ARRIVED
            self._nav_context.progress = 1.0
            await self._update_trigger_progress(1.0)
        elif status_str == "failed":
            self._nav_context.status = NavState.FAILED
            if hasattr(msg, "reason") and msg.reason:
                self._nav_context.error = msg.reason
        elif status_str == "cancelled":
            self._nav_context.status = NavState.CANCELLED
            if hasattr(msg, "progress"):
                self._nav_context.progress = msg.progress
            if hasattr(msg, "reason") and msg.reason:
                self._nav_context.error = msg.reason

    async def _update_robot_state(self, state: Any) -> None:
        """Handle robot state updates from Edge Proxy.

        Args:
            state: RobotState from Edge Proxy.
        """
        if hasattr(state, "location"):
            self._robot_location = state.location

    async def _handle_edge_error(self, err: Any) -> None:
        """Handle error messages from Edge Proxy.

        Args:
            err: ErrorMessage from Edge Proxy.
        """
        if self._nav_context is not None:
            self._nav_context.status = NavState.FAILED
            if hasattr(err, "message"):
                self._nav_context.error = err.message

    async def _update_waypoint_list(self, msg: Any) -> None:
        """Handle waypoint list updates from Edge Proxy.

        Args:
            msg: WaypointListMessage from Edge Proxy.
        """
        # Store waypoint list for reference
        if hasattr(msg, "waypoints"):
            self._waypoints = {wp.name: wp for wp in msg.waypoints}

    async def execute_plan(
        self,
        plan: Union[PlanGraph, Dict[str, Any], str],
        context: Optional[Any] = None,
        on_step_start: Optional[Callable[[Dict[str, Any]], None]] = None,
        on_step_complete: Optional[Callable[[StepOutcome], None]] = None,
        on_replan_requested: Optional[ReplanCallback] = None,
    ) -> ExecutionReport:
        plan_graph = self._ensure_plan_graph(plan)
        plan_id = plan_graph.plan.get("plan_id", "unknown_plan")

        results: Dict[str, StepOutcome] = {}
        errors: List[str] = []

        pending = set(plan_graph.step_lookup.keys())
        ready: List[str] = sorted(plan_graph.entry_steps)

        while ready:
            step_id = ready.pop(0)
            step = plan_graph.step_lookup[step_id]

            if self._cancelled:
                errors.append(self._cancel_reason or "execution_cancelled")
                break

            if on_step_start:
                on_step_start(step)

            try:
                output = await self._execute_step(step, context, results)
                outcome = StepOutcome(
                    step_id=step_id,
                    action=step.get("action", ""),
                    status="success",
                    output=output,
                )
            except Exception as exc:
                message = str(exc)
                outcome = StepOutcome(
                    step_id=step_id,
                    action=step.get("action", ""),
                    status="failed",
                    error=message,
                )
                errors.append(message)
                results[step_id] = outcome
                pending.discard(step_id)
                if on_step_complete:
                    on_step_complete(outcome)
                if self.stop_on_error:
                    break
                continue

            results[step_id] = outcome
            pending.discard(step_id)

            if on_step_complete:
                on_step_complete(outcome)

            if (
                on_replan_requested is not None
                and outcome.status == "success"
                and outcome.action == "SCAN_AREA"
            ):
                trigger, reasons = self._should_replan_after_scan(outcome.output)
                if trigger:
                    on_replan_requested(
                        {
                            "type": "dynamic_replan_requested",
                            "step_id": outcome.step_id,
                            "action": outcome.action,
                            "reasons": reasons,
                            "scan_output": outcome.output,
                            "plan_id": plan_id,
                        }
                    )

            for child in sorted(plan_graph.children.get(step_id, set())):
                if child not in pending:
                    continue
                deps = plan_graph.dependencies.get(child, set())
                if deps.issubset(results.keys()):
                    ready.append(child)

        if pending and not errors and not self._cancelled:
            errors.append("deadlock_detected")

        for step_id in pending:
            step = plan_graph.step_lookup[step_id]
            results[step_id] = StepOutcome(
                step_id=step_id,
                action=step.get("action", ""),
                status="skipped",
                error="skipped",
            )

        success = not errors and not self._cancelled
        return ExecutionReport(plan_id=plan_id, success=success, results=results, errors=errors)

    def _should_replan_after_scan(self, output: Dict[str, Any]) -> tuple[bool, List[str]]:
        """Decide whether SCAN_AREA findings warrant a dynamic replan.

        Trigger when:
        - Structured signal says scene is interesting, OR
        - Free-form analysis text contains hazard/casualty keywords.
        """
        reasons: List[str] = []

        if output.get("interesting"):
            reasons.append("scan_marked_interesting")

        analysis = str(output.get("analysis", "")).lower()
        matched = sorted({kw for kw in self._SCAN_REPLAN_KEYWORDS if kw in analysis})
        if matched:
            reasons.append(f"analysis_keywords:{','.join(matched)}")

        return (len(reasons) > 0), reasons

    async def _execute_step(
        self,
        step: Dict[str, Any],
        context: Optional[Any],
        results: Dict[str, StepOutcome],
    ) -> Dict[str, Any]:
        action = step.get("action")
        handler = self._handlers.get(action)
        if handler is None:
            raise ActionExecutionError(f"No handler registered for action '{action}'.")

        response = handler(step, context, results)
        if inspect.isawaitable(response):
            return await response
        return response

    def _ensure_plan_graph(self, plan: Union[PlanGraph, Dict[str, Any], str]) -> PlanGraph:
        if isinstance(plan, PlanGraph):
            return plan
        parse_result: ParseResult = self.parser.parse(plan)
        if not parse_result.ok or not parse_result.plan_graph:
            message = parse_result.error.message if parse_result.error else "Unknown parse failure."
            raise ActionExecutionError(message)
        return parse_result.plan_graph

    def _default_handlers(self) -> Dict[str, ActionHandler]:
        return {
            "CHECK_BATTERY": self._handle_check_battery,
            "NAVIGATE": self._handle_navigate,
            "SCAN_AREA": self._handle_scan_area,
            "VERIFY_OBJECT": self._handle_verify_object,
            "SPEAK": self._handle_speak,
            "WAIT": self._handle_wait,
            "IDENTIFY_PERSON": self._handle_identify_person,
            "ALERT_OPERATOR": self._handle_alert_operator,
        }

    async def _handle_check_battery(
        self,
        step: Dict[str, Any],
        context: Optional[Any],
        _results: Dict[str, StepOutcome],
    ) -> Dict[str, Any]:
        params = step.get("params") or {}
        min_level = params.get("min_level", 0)

        # Prefer live battery data from edge proxy if connected and state received
        level: Optional[int] = None
        if (
            self._edge_client is not None
            and self._edge_client.is_connected
            and self._edge_client.last_robot_state is not None
        ):
            level = self._edge_client.last_robot_state.battery.level
            self.logger.debug("CHECK_BATTERY: using live battery level %d%%", level)

        # Fall back to context dict
        if level is None:
            level = _context_value(context, "battery_level", default=None)

        # Last resort: assume full
        if level is None:
            level = 100

        return {"battery_ok": level >= min_level, "current_level": level}

    async def _handle_navigate(
        self,
        step: Dict[str, Any],
        _context: Optional[Any],
        _results: Dict[str, StepOutcome],
    ) -> Dict[str, Any]:
        params = step.get("params") or {}
        destination = params.get("destination", "unknown")
        speed = params.get("speed", "normal")
        step_id = step.get("step_id", "unknown")

        # If Edge Proxy client is registered, use it
        if self._edge_client is not None:
            # Fail fast only when we already have a waypoint list.
            if self._waypoints and destination not in self._waypoints:
                return {
                    "status": "failed",
                    "destination": destination,
                    "speed": speed,
                    "error": "unknown_waypoint",
                }
            return await self._navigate_with_edge_proxy(destination, speed, step_id)

        # Fall back to mock behavior for testing
        return {"status": "arrived", "destination": destination, "speed": speed}

    async def _navigate_with_edge_proxy(
        self, destination: str, speed: str, step_id: str
    ) -> Dict[str, Any]:
        """Execute navigation using Edge Proxy client.

        Args:
            destination: Target waypoint name.
            speed: Navigation speed.
            step_id: Step ID for tracking.

        Returns:
            Result dictionary with navigation outcome.
        """
        from edge_proxy.messages import NavStatus

        # Create navigation context
        request_id = f"nav_{step_id}"
        self._nav_context = NavigationContext(
            request_id=request_id,
            destination=destination,
            speed=speed,
            status=NavState.ACCEPTED,
        )

        # Send navigation command
        try:
            await self._edge_client.send_navigate_waypoint(
                name=destination, request_id=request_id, speed=speed
            )
        except Exception as exc:
            self.logger.error("Failed to send navigate command: %s", exc)
            return {
                "status": "failed",
                "destination": destination,
                "speed": speed,
                "error": str(exc),
            }

        # Start periodic triggers before waiting for navigation
        await self._start_triggers()

        # Wait for navigation to complete
        try:
            await self._wait_for_navigation(request_id, timeout=120.0)
        except asyncio.TimeoutError:
            self.logger.error("Navigation timeout for %s", destination)
            self._nav_context.status = NavState.FAILED
            self._nav_context.error = "timeout"
            await self._notify_triggers_navigation_complete()
            return {
                "status": "failed",
                "destination": destination,
                "speed": speed,
                "error": "timeout",
            }
        except Exception as exc:
            self.logger.error("Navigation failed: %s", exc)
            await self._notify_triggers_navigation_complete()
            return {
                "status": "failed",
                "destination": destination,
                "speed": speed,
                "error": str(exc),
            }
        finally:
            # Always stop periodic triggers after navigation
            await self._stop_triggers()

        # Navigation completed successfully
        return {
            "status": "arrived",
            "destination": destination,
            "speed": speed,
        }

    async def _wait_for_navigation(
        self, request_id: str, timeout: float = 120.0
    ) -> None:
        """Wait for navigation to complete or fail.

        Args:
            request_id: Navigation request ID to track.
            timeout: Maximum time to wait in seconds.

        Raises:
            asyncio.TimeoutError: If navigation times out.
            Exception: If navigation fails.
        """
        start_time = asyncio.get_event_loop().time()
        check_interval = 0.1  # Check every 100ms

        while True:
            # Check timeout
            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed > timeout:
                raise asyncio.TimeoutError(
                    f"Navigation timeout after {timeout}s for request {request_id}"
                )

            # Check if navigation is complete
            if self._nav_context is None:
                raise RuntimeError("Navigation context lost during wait")

            status = self._nav_context.status

            if status == NavState.ARRIVED:
                # Update triggers with completion progress
                await self._update_trigger_progress(1.0)
                return  # Success
            elif status == NavState.FAILED:
                error = self._nav_context.error or "Unknown navigation failure"
                raise Exception(f"Navigation failed: {error}")
            elif status == NavState.CANCELLED:
                raise Exception("Navigation was cancelled")

            # Update progress from nav_context if available
            if self._nav_context.progress > 0:
                await self._update_trigger_progress(self._nav_context.progress)

            # Still navigating or pending, wait a bit
            await asyncio.sleep(check_interval)

    async def _handle_scan_area(
        self,
        step: Dict[str, Any],
        _context: Optional[Any],
        _results: Dict[str, StepOutcome],
    ) -> Dict[str, Any]:
        """Run VLM scene analysis on the current camera frame.

        Captures a JPEG frame, sends it to the VLM endpoint together with a
        chemical-spill-oriented prompt, broadcasts the result to the dashboard
        as a ``vlm_caption`` message, and returns a structured outcome dict.

        If no frame is available (camera not wired), the VLM call is skipped
        and the analysis field contains an empty string.
        """
        params = step.get("params") or {}
        target = params.get("target", "area")

        prompt = (
            "You are an emergency-response robot assistant analysing a scene "
            "for a chemical spill exercise.  Describe what you see concisely: "
            "number and state of any people (conscious / unconscious), visible "
            "chemical containers or hazard markings, exit routes, and any "
            "immediate dangers.  Be factual and brief (3–5 sentences)."
        )

        frame: Optional[bytes] = await self._capture_frame()

        if frame is None:
            self.logger.warning(
                "SCAN_AREA: no frame available — skipping VLM call for target %r",
                target,
            )
            analysis = ""
        else:
            loop = asyncio.get_event_loop()
            try:
                analysis = await loop.run_in_executor(
                    None, self._vlm_client.analyze_scene, frame, prompt
                )
            except Exception as exc:
                self.logger.error("SCAN_AREA: VLM call failed: %s", exc)
                analysis = ""

        analysis_lc = analysis.lower()
        signals = sorted({kw for kw in self._SCAN_REPLAN_KEYWORDS if kw in analysis_lc})
        interesting = len(signals) > 0

        # Broadcast VLM caption to dashboard
        if self._broadcast_callback is not None:
            try:
                await self._broadcast_callback({"type": "vlm_caption", "text": analysis})
            except Exception as exc:
                self.logger.warning("SCAN_AREA: failed to broadcast vlm_caption: %s", exc)

        return {
            "analysis": analysis,
            "target": target,
            "coverage_percent": 100.0,
            "interesting": interesting,
            "signals": signals,
        }

    async def _handle_verify_object(
        self,
        step: Dict[str, Any],
        _context: Optional[Any],
        _results: Dict[str, StepOutcome],
    ) -> Dict[str, Any]:
        """Verify an object in the scene using VLM.

        Captures a JPEG frame, sends it to the VLM endpoint with a prompt
        asking to identify/verify the object, and returns the verification result.
        """
        params = step.get("params") or {}
        expected = params.get("expected_class", "object")
        target = params.get("target", expected)

        prompt = (
            f"You are an emergency-response robot assistant inspecting an object. "
            f"Look at this scene and identify if there is a {target}. "
            f"If visible, describe its appearance, location, and condition. "
            f"If not visible, state that clearly. Be concise (2–3 sentences)."
        )

        frame: Optional[bytes] = await self._capture_frame()

        if frame is None:
            self.logger.warning(
                "VERIFY_OBJECT: no frame available — skipping VLM call for target %r",
                target,
            )
            analysis = ""
        else:
            loop = asyncio.get_event_loop()
            try:
                analysis = await loop.run_in_executor(
                    None, self._vlm_client.analyze_scene, frame, prompt
                )
            except Exception as exc:
                self.logger.error("VERIFY_OBJECT: VLM call failed: %s", exc)
                analysis = ""

        # Determine if object was verified (VLM mentions it)
        analysis_lc = analysis.lower()
        target_lc = target.lower()
        verified = target_lc in analysis_lc

        return {
            "verified": verified,
            "expected_class": expected,
            "actual_class": target if verified else "unknown",
            "analysis": analysis,
            "confidence": 0.9 if verified else 0.1,
        }

    async def _handle_speak(
        self,
        step: Dict[str, Any],
        _context: Optional[Any],
        _results: Dict[str, StepOutcome],
    ) -> Dict[str, Any]:
        params = step.get("params") or {}
        message = params.get("message", "")

        if not message:
            self.logger.warning("SPEAK action called with empty message — skipping TTS")
            return {"spoken": False, "message": "", "error": "empty_message"}

        wav_bytes = await self._tts_client.speak(message)

        if not wav_bytes:
            self.logger.error("TTS failed for message: %r", message[:80])
            return {"spoken": False, "message": message, "error": "tts_failed"}

        # Broadcast TTS audio to dashboard for operator-side playback
        if self._broadcast_callback is not None:
            import time

            audio_b64 = base64.b64encode(wav_bytes).decode("utf-8")
            try:
                await self._broadcast_callback({
                    "type": "tts_audio",
                    "audio_b64": audio_b64,
                    "text": message,
                    "format": "wav",
                    "timestamp": time.time(),
                })
            except Exception as exc:
                self.logger.warning("SPEAK: failed to broadcast tts_audio: %s", exc)

        return {"spoken": True, "message": message}

    async def _handle_wait(
        self,
        step: Dict[str, Any],
        _context: Optional[Any],
        _results: Dict[str, StepOutcome],
    ) -> Dict[str, Any]:
        params = step.get("params") or {}
        duration_ms = params.get("duration_ms", 0)
        await asyncio.sleep(max(duration_ms, 0) / 1000.0)
        return {"waited_ms": duration_ms}

    async def _handle_identify_person(
        self,
        step: Dict[str, Any],
        _context: Optional[Any],
        _results: Dict[str, StepOutcome],
    ) -> Dict[str, Any]:
        # Capture a frame from the live video stream
        frame_bytes = await self._capture_frame()
        if frame_bytes is None:
            self.logger.warning("IDENTIFY_PERSON: no frame available — returning unknown")
            return {"identity": "unknown", "confidence": 0.0, "error": "no_frame"}

        # Send frame to FR server
        detections = await self._fr_client.identify(frame_bytes)

        if not detections:
            self.logger.info("IDENTIFY_PERSON: FR returned no detections")
            return {"identity": "unknown", "confidence": 0.0, "detections": []}

        # Return the highest-confidence detection
        best = max(detections, key=lambda d: d.get("confidence", 0.0))
        self.logger.info(
            "IDENTIFY_PERSON: best match identity=%r confidence=%.2f",
            best.get("identity"),
            best.get("confidence", 0.0),
        )
        return {
            "identity": best.get("identity", "unknown"),
            "confidence": best.get("confidence", 0.0),
            "bbox": best.get("bbox"),
            "detections": detections,
        }

    async def _capture_frame(self, timeout: float = 8.0) -> Optional[bytes]:
        """Capture a single JPEG frame from the robot camera via the Edge Proxy.

        Sends a ``capture_frame`` command over the existing Edge Proxy WebSocket
        connection and waits for the matching ``frame_response`` message.

        The Edge Proxy handles the actual frame grab (HTTP snapshot from
        MediaMTX, with an ffmpeg-RTSP fallback) so this method only needs the
        WS connection.

        Args:
            timeout: Maximum seconds to wait for the frame response.

        Returns:
            Raw JPEG bytes decoded from the base64 response, or ``None`` if
            no Edge Proxy client is connected, capture fails, or the request
            times out.
        """
        if self._edge_client is None or not self._edge_client.is_connected:
            self.logger.debug("_capture_frame: no connected edge proxy client — returning None")
            return None

        request_id = str(uuid.uuid4())

        # Register a Future that _dispatch_message will resolve on frame_response
        loop = asyncio.get_event_loop()
        future: asyncio.Future = loop.create_future()
        self._edge_client._pending_frames[request_id] = future

        try:
            await self._edge_client.send_capture_frame(request_id=request_id)
        except Exception as exc:
            self.logger.error("_capture_frame: failed to send capture_frame: %s", exc)
            self._edge_client._pending_frames.pop(request_id, None)
            return None

        try:
            from edge_proxy.messages import FrameResponseMessage
            msg: FrameResponseMessage = await asyncio.wait_for(future, timeout=timeout)
        except asyncio.TimeoutError:
            self.logger.warning(
                "_capture_frame: timed out after %.1fs waiting for frame_response", timeout
            )
            self._edge_client._pending_frames.pop(request_id, None)
            return None
        except Exception as exc:
            self.logger.error("_capture_frame: error waiting for frame_response: %s", exc)
            self._edge_client._pending_frames.pop(request_id, None)
            return None

        if msg.error:
            self.logger.warning("_capture_frame: edge proxy reported error: %s", msg.error)
            return None

        if not msg.jpeg_b64:
            self.logger.warning("_capture_frame: frame_response missing jpeg_b64")
            return None

        try:
            frame_bytes = base64.b64decode(msg.jpeg_b64)
        except Exception as exc:
            self.logger.error("_capture_frame: failed to decode base64: %s", exc)
            return None

        self.logger.debug("_capture_frame: received %d bytes", len(frame_bytes))
        return frame_bytes

    async def _handle_alert_operator(
        self,
        step: Dict[str, Any],
        _context: Optional[Any],
        _results: Dict[str, StepOutcome],
    ) -> Dict[str, Any]:
        params = step.get("params") or {}
        reason = params.get("reason", "unspecified")
        severity = params.get("severity", "info")

        if self._broadcast_callback is not None:
            import time

            alert_message: Dict[str, Any] = {
                "type": "alert",
                "reason": reason,
                "severity": severity,
                "timestamp": time.time(),
            }
            try:
                await self._broadcast_callback(alert_message)
                self.logger.info("Alert broadcasted to dashboard: %s (%s)", reason, severity)
            except Exception as exc:
                self.logger.warning("Failed to broadcast alert to dashboard: %s", exc)
        else:
            self.logger.warning("ALERT_OPERATOR: no broadcast callback registered, alert not sent to dashboard")

        return {
            "alerted": True,
            "reason": reason,
            "severity": severity,
        }


def _context_value(context: Optional[Any], key: str, default: Any = None) -> Any:
    if context is None:
        return default
    if isinstance(context, dict):
        return context.get(key, default)
    return getattr(context, key, default)
