import asyncio
import json
import math

import pytest
import websockets

from edge_proxy.server import EdgeProxyServer


class _FakeRosbridge:
    def __init__(self, *, pose: dict | None):
        self.pose = pose
        self.pose_sent = False
        self.published: list[dict] = []

    async def handler(self, ws):
        async for raw in ws:
            msg = json.loads(raw)
            op = msg.get("op")

            if op == "subscribe":
                topic = msg.get("topic")
                if topic == "gui/get_robot_pose" and self.pose is not None and not self.pose_sent:
                    self.pose_sent = True
                    await ws.send(
                        json.dumps(
                            {
                                "op": "publish",
                                "topic": "gui/get_robot_pose",
                                "msg": self.pose,
                            }
                        )
                    )
                continue

            if op != "publish":
                continue

            self.published.append(msg)
            if msg.get("topic") != "gui/execute_plan":
                continue

            await ws.send(
                json.dumps(
                    {
                        "op": "publish",
                        "topic": "gui/get_plan_feedback",
                        "msg": {"data": 1},
                    }
                )
            )
            await ws.send(
                json.dumps(
                    {
                        "op": "publish",
                        "topic": "gui/get_waypoints_new",
                        "msg": {
                            "waypoints": [{"status": 2}],
                            "next_waypoint": 0,
                        },
                    }
                )
            )
            await ws.send(
                json.dumps(
                    {
                        "op": "publish",
                        "topic": "gui/get_plan_feedback",
                        "msg": {"data": 0},
                    }
                )
            )


async def _drain_initial(ws):
    msg1 = json.loads(await ws.recv())
    msg2 = json.loads(await ws.recv())
    assert msg1["type"] in ("waypoint_list", "robot_state")
    assert msg2["type"] in ("waypoint_list", "robot_state")


async def _collect_nav_statuses(ws, request_id: str, timeout_sec: float = 5.0) -> list[str]:
    statuses: list[str] = []
    deadline = asyncio.get_event_loop().time() + timeout_sec
    while asyncio.get_event_loop().time() < deadline:
        msg = json.loads(await ws.recv())
        if msg.get("type") != "nav_status" or msg.get("request_id") != request_id:
            continue
        statuses.append(msg.get("status"))
        if msg.get("status") in {"arrived", "failed", "cancelled"}:
            return statuses
    raise AssertionError(f"Timed out waiting for terminal nav status for {request_id}")


@pytest.mark.asyncio
async def test_rosbridge_demo_scenario_three_scene_waypoints(tmp_path):
    waypoints = tmp_path / "wps.yaml"
    waypoints.write_text(
        """
waypoints:
  - name: office_scene
    x: 1.0
    y: 1.0
    theta: 0.0
  - name: boxes_scene
    x: 2.0
    y: 3.0
    theta: 0.0
  - name: spill_scene
    x: 5.0
    y: 2.0
    theta: 0.0
""".lstrip()
    )

    fake_rb = _FakeRosbridge(pose={"x": 0.0, "y": 0.0, "z": 0.0})
    rb_server = await websockets.serve(fake_rb.handler, "127.0.0.1", 0)
    try:
        rb_port = rb_server.sockets[0].getsockname()[1]
        server = EdgeProxyServer(
            host="127.0.0.1",
            port=0,
            ws_path="/edge",
            health_path="/health",
            backend="rosbridge",
            waypoints_path=str(waypoints),
            rosbridge_url=f"ws://127.0.0.1:{rb_port}",
            rosbridge_insecure_tls=False,
        )
        await server._backend.start()
        edge_ws_server = await websockets.serve(
            server._handler,
            server.host,
            0,
            process_request=server._process_request,
        )
        try:
            edge_port = edge_ws_server.sockets[0].getsockname()[1]
            async with websockets.connect(f"ws://127.0.0.1:{edge_port}/edge") as ws:
                await _drain_initial(ws)
                for request_id, name in (
                    ("office", "office_scene"),
                    ("boxes", "boxes_scene"),
                    ("spill", "spill_scene"),
                ):
                    await ws.send(
                        json.dumps(
                            {
                                "type": "navigate",
                                "request_id": request_id,
                                "goal": {"type": "waypoint", "name": name},
                                "speed": "normal",
                            }
                        )
                    )
                    statuses = await _collect_nav_statuses(ws, request_id=request_id)
                    assert "accepted" in statuses
                    assert "arrived" in statuses

            execute_msgs = [m for m in fake_rb.published if m.get("topic") == "gui/execute_plan"]
            assert len(execute_msgs) == 3
        finally:
            edge_ws_server.close()
            await edge_ws_server.wait_closed()
            await server._backend.stop()
    finally:
        rb_server.close()
        await rb_server.wait_closed()


@pytest.mark.asyncio
async def test_rosbridge_relative_goal_resolves_to_absolute_pose(tmp_path):
    waypoints = tmp_path / "wps.yaml"
    waypoints.write_text("waypoints: []\n")

    fake_rb = _FakeRosbridge(pose={"x": 10.0, "y": 5.0, "z": math.pi / 2})
    rb_server = await websockets.serve(fake_rb.handler, "127.0.0.1", 0)
    try:
        rb_port = rb_server.sockets[0].getsockname()[1]
        server = EdgeProxyServer(
            host="127.0.0.1",
            port=0,
            ws_path="/edge",
            health_path="/health",
            backend="rosbridge",
            waypoints_path=str(waypoints),
            rosbridge_url=f"ws://127.0.0.1:{rb_port}",
            rosbridge_insecure_tls=False,
        )
        await server._backend.start()
        edge_ws_server = await websockets.serve(
            server._handler,
            server.host,
            0,
            process_request=server._process_request,
        )
        try:
            edge_port = edge_ws_server.sockets[0].getsockname()[1]
            async with websockets.connect(f"ws://127.0.0.1:{edge_port}/edge") as ws:
                await _drain_initial(ws)
                await asyncio.sleep(0.05)
                await ws.send(
                    json.dumps(
                        {
                            "type": "navigate",
                            "request_id": "relative_forward",
                            "goal": {
                                "type": "relative",
                                "direction": "forward",
                                "distance": 2.0,
                            },
                            "speed": "slow",
                        }
                    )
                )
                statuses = await _collect_nav_statuses(ws, request_id="relative_forward")
                assert "arrived" in statuses

            add_msgs = [m for m in fake_rb.published if m.get("topic") == "gui/add_waypoint_new"]
            assert add_msgs, "Expected gui/add_waypoint_new to be published"
            waypoint_msg = add_msgs[-1]["msg"]
            assert waypoint_msg["header"]["frame_id"] == "map"

            pose = waypoint_msg["waypoints"][0]["pose"]["position"]
            assert pose["x"] == pytest.approx(10.0, abs=1e-3)
            assert pose["y"] == pytest.approx(7.0, abs=1e-3)
        finally:
            edge_ws_server.close()
            await edge_ws_server.wait_closed()
            await server._backend.stop()
    finally:
        rb_server.close()
        await rb_server.wait_closed()


@pytest.mark.asyncio
async def test_rosbridge_relative_goal_fails_when_pose_unavailable(tmp_path):
    waypoints = tmp_path / "wps.yaml"
    waypoints.write_text("waypoints: []\n")

    fake_rb = _FakeRosbridge(pose=None)
    rb_server = await websockets.serve(fake_rb.handler, "127.0.0.1", 0)
    try:
        rb_port = rb_server.sockets[0].getsockname()[1]
        server = EdgeProxyServer(
            host="127.0.0.1",
            port=0,
            ws_path="/edge",
            health_path="/health",
            backend="rosbridge",
            waypoints_path=str(waypoints),
            rosbridge_url=f"ws://127.0.0.1:{rb_port}",
            rosbridge_insecure_tls=False,
        )
        await server._backend.start()
        edge_ws_server = await websockets.serve(
            server._handler,
            server.host,
            0,
            process_request=server._process_request,
        )
        try:
            edge_port = edge_ws_server.sockets[0].getsockname()[1]
            async with websockets.connect(f"ws://127.0.0.1:{edge_port}/edge") as ws:
                await _drain_initial(ws)
                await ws.send(
                    json.dumps(
                        {
                            "type": "navigate",
                            "request_id": "relative_fail",
                            "goal": {
                                "type": "relative",
                                "direction": "left",
                                "distance": 1.0,
                            },
                            "speed": "slow",
                        }
                    )
                )

                statuses = await _collect_nav_statuses(ws, request_id="relative_fail")
                assert "failed" in statuses

            execute_msgs = [m for m in fake_rb.published if m.get("topic") == "gui/execute_plan"]
            assert not execute_msgs, "No execute_plan publish expected when pose is unavailable"
        finally:
            edge_ws_server.close()
            await edge_ws_server.wait_closed()
            await server._backend.stop()
    finally:
        rb_server.close()
        await rb_server.wait_closed()


@pytest.mark.asyncio
async def test_navigation_continues_after_orchestrator_disconnect(tmp_path):
    waypoints = tmp_path / "wps.yaml"
    waypoints.write_text(
        """
waypoints:
  - name: office_scene
    x: 1.0
    y: 1.0
    theta: 0.0
""".lstrip()
    )

    fake_rb = _FakeRosbridge(pose={"x": 0.0, "y": 0.0, "z": 0.0})
    rb_server = await websockets.serve(fake_rb.handler, "127.0.0.1", 0)
    try:
        rb_port = rb_server.sockets[0].getsockname()[1]
        server = EdgeProxyServer(
            host="127.0.0.1",
            port=0,
            ws_path="/edge",
            health_path="/health",
            backend="rosbridge",
            waypoints_path=str(waypoints),
            rosbridge_url=f"ws://127.0.0.1:{rb_port}",
            rosbridge_insecure_tls=False,
        )
        await server._backend.start()
        edge_ws_server = await websockets.serve(
            server._handler,
            server.host,
            0,
            process_request=server._process_request,
        )
        try:
            edge_port = edge_ws_server.sockets[0].getsockname()[1]

            async with websockets.connect(f"ws://127.0.0.1:{edge_port}/edge") as ws:
                await _drain_initial(ws)
                await ws.send(
                    json.dumps(
                        {
                            "type": "navigate",
                            "request_id": "disconnect_case",
                            "goal": {"type": "waypoint", "name": "office_scene"},
                            "speed": "normal",
                        }
                    )
                )
                # Simulate orchestrator disconnecting immediately after command.

            execute_deadline = asyncio.get_event_loop().time() + 2.0
            while asyncio.get_event_loop().time() < execute_deadline:
                if any(m.get("topic") == "gui/execute_plan" for m in fake_rb.published):
                    break
                await asyncio.sleep(0.05)
            else:
                raise AssertionError("Edge proxy did not execute plan after client disconnect")

            async with websockets.connect(f"ws://127.0.0.1:{edge_port}/edge") as ws2:
                await _drain_initial(ws2)
                nav_state = ""
                deadline = asyncio.get_event_loop().time() + 3.0
                while asyncio.get_event_loop().time() < deadline:
                    await ws2.send(json.dumps({"type": "get_state", "request_id": "state_probe"}))
                    msg = json.loads(await ws2.recv())
                    if msg.get("type") != "robot_state":
                        continue
                    nav_state = msg.get("nav_state", "")
                    if nav_state == "arrived":
                        break
                    await asyncio.sleep(0.1)
                assert nav_state == "arrived"
        finally:
            edge_ws_server.close()
            await edge_ws_server.wait_closed()
            await server._backend.stop()
    finally:
        rb_server.close()
        await rb_server.wait_closed()
