#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
moodle.py — автономное прохождение Moodle-тестов.
Поддержка: Moodle 3.x / 4.x quiz module.
Generic-fallback: HTML-тесты с radio/checkbox/text-input.
Структура Moodle quiz:
Навигация:
"""
import asyncio
import logging
import random
import re
from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger(__name__)
MAX_QUESTIONS = 200 # safety cap
# ---------------------------------------------------------------------------
# Moodle quiz page signatures
# ---------------------------------------------------------------------------
# Ordered list — first match wins.
# detect conditions use AND-logic:
# html_cls — all listed substrings must appear in page text
# el_type — at least one interactive element must have this type
#
# interaction values:
# "click_radio_input" — click input[type=radio] by #id selector
# "click_checkbox_input" — click input[type=checkbox] by #id selector
# "type_text" — type text into input/textarea
# "click_by_text" — click element by visible text (generic fallback)
# ---------------------------------------------------------------------------
MOODLE_SIGNATURES: List[Dict] = [
# ── Moodle: True/False ────────────────────────────────────────────────
{
"name": "moodle_truefalse",
"detect": {"html_cls": ["que", "truefalse"], "el_type": ["radio"]},
"interaction": "click_radio_input",
"question_class": "qtext",
"multiple": False,
},
# ── Moodle: Matching (select dropdowns) ───────────────────────────────
{
"name": "moodle_matching",
"detect": {"html_cls": ["que", "match"]},
"interaction": "click_by_text",
"question_class": "qtext",
"multiple": False,
},
# ── Moodle: Multiple-select (checkboxes) ──────────────────────────────
{
"name": "moodle_multianswer",
"detect": {"html_cls": ["que"], "el_type": ["checkbox"]},
"interaction": "click_checkbox_input",
"question_class": "qtext",
"multiple": True,
},
# ── Moodle: Single-choice (radio) ─────────────────────────────────────
{
"name": "moodle_multichoice",
"detect": {"html_cls": ["que"], "el_type": ["radio"]},
"interaction": "click_radio_input",
"question_class": "qtext",
"multiple": False,
},
# ── Moodle: Short-answer / Essay / Numerical (text input) ─────────────
{
"name": "moodle_shortanswer",
"detect": {"html_cls": ["que"]},
"interaction": "type_text",
"question_class": "qtext",
"multiple": False,
},
# ── Generic fallback: radio buttons ───────────────────────────────────
{
"name": "generic_radio",
"detect": {"el_type": ["radio"]},
"interaction": "click_radio_input",
"question_class": None,
"multiple": False,
},
# ── Generic fallback: checkboxes ──────────────────────────────────────
{
"name": "generic_checkbox",
"detect": {"el_type": ["checkbox"]},
"interaction": "click_checkbox_input",
"question_class": None,
"multiple": True,
},
# ── Generic fallback: clickable div/span options ──────────────────────
{
"name": "generic_div_options",
"detect": {"el_type": ["clickable"]},
"interaction": "click_by_text",
"question_class": None,
"multiple": False,
},
]
# Moodle-specific submit/navigate button selectors (tried in order)
MOODLE_NAV_SELECTORS = [
"input[name='next']",
"input[name='finishattempt']",
"input[name='submit']",
".submitbtns input[type='submit']",
"button[type='submit']",
]
# Markers that indicate the quiz is complete (Moodle + generic)
RESULT_MARKERS = [
# Moodle-specific
"quizsummaryofattempt",
"gradingdetails",
"gradingsummary",
"reviewsummary",
"mod-quiz-attempt-summary",
# Generic
"результаты теста",
"ваш результат",
"your score",
"тест завершён",
"тест завершен",
"test complete",
]
# ---------------------------------------------------------------------------
# Signature detection
# ---------------------------------------------------------------------------
def _detect_signature(page_text: str, elems: list) -> Dict:
"""Return the first matching signature from MOODLE_SIGNATURES."""
text_lower = page_text.lower()
el_types = {(e.get("type") or "").lower() for e in elems}
for sig in MOODLE_SIGNATURES:
d = sig.get("detect", {})
# Check html_cls — all substrings must be present in page text
if "html_cls" in d:
if not all(cls in text_lower for cls in d["html_cls"]):
continue
# Check el_type — at least one must match
if "el_type" in d:
if not any(t in el_types for t in d["el_type"]):
continue
logger.info(f"moodle_solver: matched signature '{sig['name']}'")
return sig
return {
"name": "unknown",
"interaction": "click_by_text",
"question_class": None,
"multiple": False,
}
# ---------------------------------------------------------------------------
# Question text extraction
# ---------------------------------------------------------------------------
# Navigation / UI phrases that are never the question text
_NAV_WORDS = frozenset([
"назад", "далее", "next", "back", "finish", "submit", "отправить",
"завершить", "следующая", "предыдущая", "previous",
])
# Lines that look like Moodle metadata / UI noise — skip when extracting question
_NOISE_RE = re.compile(
r'(?:'
r'перейти к основному содержан|skip to main content|skip navigation'
r'|ещё не отвечен|not yet answered|не отвечен|пока нет ответа'
r'|^балл\s*:|^mark[s]?\s*:|^grade\s*:'
r'|отметить вопрос|flag question'
r'|^навигация по тесту|^quiz navigation'
r'|^информация о вопросе|^question information'
r'|^выберите один|^select one|^choose'
r')',
re.IGNORECASE,
)
def _is_noise_line(s: str) -> bool:
"""True if line is Moodle UI noise / skip-nav / metadata."""
return bool(_NOISE_RE.search(s))
def _extract_question_text(
page_text: str,
option_hints: List[str],
question_class: Optional[str] = None,
) -> str:
"""Extract the question text from plain page content.
Strategies (in order):
1. Line just before the first answer option (most reliable for Moodle)
2. Line after "Question N" / "Вопрос N" heading
3. Line after progress indicator "N / M" or "N из M"
4. First line containing "?"
5. First sufficiently long line
"""
lines = [ln.strip() for ln in page_text.splitlines() if ln.strip()]
def _is_bad(s: str) -> bool:
if not s.split():
return True
if s.lower().split()[0] in _NAV_WORDS:
return True
if _is_noise_line(s):
return True
return False
# Strategy 1: find the line immediately before the first option hint
if option_hints:
best_idx = None
for i, line in enumerate(lines):
for hint in option_hints:
hint_key = hint[:25].lower().strip()
if hint_key and len(hint_key) > 3 and hint_key in line.lower():
if best_idx is None or i < best_idx:
best_idx = i
break
if best_idx and best_idx > 0:
for i in range(best_idx - 1, max(-1, best_idx - 6), -1):
candidate = lines[i]
if len(candidate) > 10 and not _is_bad(candidate):
if not re.match(r'^\d+\s*(?:[/из])\s*\d+$', candidate):
return candidate[:120]
# Strategy 2: line after "Question N" / "Вопрос N" heading
for i, line in enumerate(lines):
if re.match(r'(?:question|вопрос|задание)\s+\d+', line, re.IGNORECASE):
for j in range(i + 1, min(i + 8, len(lines))):
candidate = lines[j]
if len(candidate) > 10 and not _is_bad(candidate):
return candidate[:120]
# Strategy 3: line after progress "N / M" or "N из M"
for i, line in enumerate(lines):
if re.match(r'^\s*\d+\s*(?:/|из|of)\s*\d+\s*$', line) and i + 1 < len(lines):
candidate = lines[i + 1]
if len(candidate) > 10 and not _is_bad(candidate):
return candidate[:120]
# Strategy 4: first line with "?" of sufficient length
for line in lines:
if "?" in line and len(line) >= 15 and not _is_bad(line):
return line[:120]
# Strategy 5: first sufficiently long non-noise line
for line in lines:
if len(line) > 15 and not _is_bad(line):
return line[:120]
return ""
# ---------------------------------------------------------------------------
# Main solver
# ---------------------------------------------------------------------------
async def solve_moodle_test(
send_fn: Callable,
tab_id: int,
student_name: str = "",
timing_hint: str = "",
mode: str = "",
finish_test: bool = False,
db: Optional[Any] = None,
user_id: Optional[Any] = None,
progress_queue: Optional[Any] = None,
trace_id: Optional[Any] = None,
parent_span_id: Optional[Any] = None,
) -> str:
"""Autonomously solve a Moodle (or generic HTML) web quiz.
mode — operating mode:
"" (default) — normal: answer questions automatically
"trace" — tracing/learning mode: collect HTML of each question without answering.
Use when asked to run in "режим трассировки" or "режим обучения".
finish_test — if True, after answering all questions submit the test
(click "Отправить всё и завершить тест") after verifying all answers are saved.
Use when user explicitly asks to finish/submit the test after solving.
timing_hint — optional user timing instruction:
Examples: "5 секунд на вопрос", "3 минуты на весь тест",
"от 10 до 20 секунд", "быстро", "медленно".
Leave empty for automatic adaptive timing based on question complexity.
"""
from uuid import UUID as _UUID
from core.tracing.tracing_service import get_tracing_service, OperationType
# ── Tracing helper ────────────────────────────────────────────────────
_tracing = get_tracing_service()
async def _start_span(op_type, metadata=None):
if not _tracing:
return None
return await _tracing.start_span(
operation_type=op_type,
trace_id=trace_id,
parent_span_id=parent_span_id,
metadata=metadata or {},
)
async def _end_span(span_id, success=True, error_message=None, metadata_updates=None):
if span_id and _tracing:
await _tracing.end_span(span_id, success=success,
error_message=error_message,
metadata_updates=metadata_updates)
# Start MOODLE_SOLVE span
_solve_span = await _start_span(OperationType.MOODLE_SOLVE, {
"mode": mode or "normal",
"tab_id": tab_id,
"finish_test": finish_test,
})
_qa_records: List[dict] = []
_results_text: str = ""
_moodle_results_data: Optional[dict] = None
answered = 0
_total_simulated_sec = 0.0
_test_submitted: bool = False
_submit_problem: str = ""
# ── Parse timing_hint ─────────────────────────────────────────────────
_override_fixed: Optional[float] = None
_override_total: Optional[float] = None
_override_min: Optional[float] = None
_override_max: Optional[float] = None
_override_scale: float = 1.0
if timing_hint:
_th = timing_hint.lower().strip()
_rm = re.search(
r'(?:от\s*)?(\d+(?:[.,]\d+)?)\s*(?:до|to|-)\s*(\d+(?:[.,]\d+)?)', _th
)
if _rm:
_override_min = float(_rm.group(1).replace(',', '.'))
_override_max = float(_rm.group(2).replace(',', '.'))
elif re.search(r'(\d+(?:[.,]\d+)?)\s*(?:мин|min)', _th) and \
re.search(r'тест|test|весь|all|total', _th):
_m = re.search(r'(\d+(?:[.,]\d+)?)\s*(?:мин|min)', _th)
_override_total = float(_m.group(1).replace(',', '.')) * 60
elif re.search(r'(\d+(?:[.,]\d+)?)\s*(?:секунд|сек|с\b|sec|s\b)', _th):
_m = re.search(r'(\d+(?:[.,]\d+)?)\s*(?:секунд|сек|с\b|sec|s\b)', _th)
_override_fixed = float(_m.group(1).replace(',', '.'))
elif re.search(r'очень быстр|very fast|максимально быстр', _th):
_override_scale = 0.2
elif re.search(r'быстр|fast|quick|speed', _th):
_override_scale = 0.4
elif re.search(r'медленн|slow', _th):
_override_scale = 2.0
def _human_delay_sec(q_text: str, options: Optional[List[str]] = None) -> float:
if _override_fixed is not None:
return random.uniform(max(0.5, _override_fixed * 0.85), _override_fixed * 1.15)
if _override_min is not None and _override_max is not None:
return random.uniform(_override_min, _override_max)
total_chars = len(q_text or "") + sum(len(o) for o in (options or []))
if total_chars < 60:
base = random.uniform(2.0, 4.5)
elif total_chars < 200:
base = random.uniform(4.0, 7.5)
elif total_chars < 400:
base = random.uniform(7.0, 12.0)
else:
base = random.uniform(10.0, 18.0)
if re.search(r'\d+\s*[\+\-\×\÷\*/]\s*\d+|\b\d{3,}\b', q_text or ""):
base += random.uniform(1.5, 4.0)
return base * _override_scale
# ── Auto-resolve tab_id ───────────────────────────────────────────────
try:
await send_fn("browser.getTabContent", {"tabId": tab_id, "offset": 0})
except Exception:
logger.warning(f"moodle_solver: tab_id={tab_id} invalid, auto-detecting active tab")
try:
tabs_result = await send_fn("browser.listTabs", {"offset": 0, "limit": 50})
tabs = tabs_result if isinstance(tabs_result, list) else \
(tabs_result or {}).get("tabs", [])
if tabs:
active = next((t for t in tabs if t.get("active")), None)
tab_id = (active or tabs[-1]).get("id", tab_id)
logger.info(f"moodle_solver: auto-resolved tab_id={tab_id}")
else:
return "Error: No open tabs found in browser"
except Exception as tab_err:
return f"Error: Could not detect active tab: {tab_err}"
def _emit(event: dict):
if progress_queue is not None:
try:
progress_queue.put_nowait(event)
except Exception:
pass
# ── LLM helper ────────────────────────────────────────────────────────
async def _get_llm():
if db is None or user_id is None:
return None
try:
from sqlalchemy import select as sa_select
from core.models.settings import ConnectorConfig
stmt = sa_select(ConnectorConfig).where(
ConnectorConfig.user_id == (
user_id if isinstance(user_id, _UUID) else _UUID(str(user_id))
),
ConnectorConfig.is_enabled == True,
)
rows = (await db.execute(stmt)).scalars().all()
if not rows:
return None
connector = next((c for c in rows if c.is_default), rows[0])
models = (connector.config or {}).get("models", [])
model = models[0] if models else "deepseek-chat"
if connector.connector_name == "ollama":
from langchain_ollama import ChatOllama
kw: Dict[str, Any] = {}
if model.lower().startswith("qwen3"):
kw["reasoning"] = False
return ChatOllama(
base_url=connector.endpoint or "", model=model,
temperature=0.3, **kw
)
from langchain_openai import ChatOpenAI
from pydantic import SecretStr
return ChatOpenAI(
base_url=connector.endpoint or "", model=model,
api_key=SecretStr(connector.api_key) if connector.api_key else None,
temperature=0.3, timeout=30, max_retries=1,
)
except Exception as exc:
logger.warning(f"moodle_solver: failed to create LLM: {exc}")
return None
llm = await _get_llm()
async def _ask_llm(prompt: str) -> str:
if llm is None:
return ""
try:
from langchain_core.messages import HumanMessage as HM
resp = await llm.ainvoke([HM(content=prompt)])
return (resp.content or "").strip()
except Exception as exc:
logger.warning(f"moodle_solver LLM call failed: {exc}")
return ""
# ── Page reader ───────────────────────────────────────────────────────
async def _read_page():
content, elems = await asyncio.gather(
send_fn("browser.getTabContent", {"tabId": tab_id, "offset": 0}),
send_fn("browser.getInteractiveElements", {"tabId": tab_id, "maxElements": 50}),
)
text = content.get("content", "") if isinstance(content, dict) else str(content)
elem_list = elems.get("elements", []) if isinstance(elems, dict) else []
return text, elem_list
async def _read_selector(selector: str, max_len: int = 500) -> str:
"""Extract text content of a DOM element via CSS selector.
Returns empty string if extension doesn't support selector parameter
(detected by result being suspiciously long = full page text).
"""
try:
result = await send_fn("browser.getTabContent", {
"tabId": tab_id, "offset": 0, "selector": selector,
})
text = (result.get("content", "") if isinstance(result, dict) else "").strip()
# If very long, extension probably ignores selector → full page text
if len(text) > max_len:
return ""
return text
except Exception:
return ""
async def _read_qtext() -> str:
"""Try to extract Moodle .qtext content directly."""
return await _read_selector(".qtext", max_len=500)
async def _read_moodle_answers() -> List[str]:
"""Try to extract Moodle answer texts via .answer selector."""
raw = await _read_selector(".answer", max_len=2000)
if not raw:
return []
# Each line in .answer innerText is typically one option
return [ln.strip() for ln in raw.splitlines()
if ln.strip() and len(ln.strip()) > 1]
async def _read_question_html() -> str:
"""Read outerHTML of the current Moodle question block (.que).
Falls back to .formulation, then full page HTML (truncated)."""
for sel in [".que", ".formulation", "#responseform"]:
try:
result = await send_fn("browser.getTabContent", {
"tabId": tab_id, "offset": 0,
"selector": sel, "html": True,
})
html = (result.get("content", "") if isinstance(result, dict) else "").strip()
if html and len(html) > 20:
return html
except Exception:
pass
return ""
# ── Click helper ──────────────────────────────────────────────────────
async def _click_option(
opt_text: str,
el: Optional[dict],
interaction: str,
idx: int,
) -> bool:
"""Click an answer option. Returns True on success."""
# Radio / checkbox: prefer #id selector (most reliable for Moodle)
if interaction in ("click_radio_input", "click_checkbox_input"):
el_id = (el or {}).get("id") or ""
el_sel = (el or {}).get("selector") or (f"#{el_id}" if el_id else "")
if el_sel:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id, "selector": el_sel
})
logger.debug(f"moodle_solver: clicked {interaction} '{el_sel}'")
return True
except Exception:
pass # fall through to text-based click
# Fallback: click by visible text
if opt_text:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "text": opt_text})
logger.debug(f"moodle_solver: clicked by text '{opt_text[:50]}'")
return True
except Exception as e:
logger.warning(
f"moodle_solver: all click strategies failed "
f"for '{opt_text[:50]}': {e}"
)
return False
# ── Step 1: detect platform and navigate to attempt page ─────────────
_emit({"status": "test_reading", "question": 0, "total": "?"})
# Detect quiz platform via extension command (may not be available in older extension)
_platform_info = {}
_detect_available = False
try:
_platform_info = await send_fn("browser.detectQuizPlatform", {"tabId": tab_id}) or {}
_detect_available = bool(_platform_info)
except Exception as e:
logger.warning(f"moodle_solver: detectQuizPlatform not available: {e}")
_platform = _platform_info.get("platform", "unknown")
_page_type = _platform_info.get("pageType", "unknown")
# ── Fallback detection if detectQuizPlatform unavailable ──
if not _detect_available:
logger.info("moodle_solver: using fallback platform detection (URL + DOM probe)")
# Get tab URL to check for Moodle patterns
_tab_url = ""
try:
tabs_result = await send_fn("browser.listTabs", {})
if isinstance(tabs_result, dict):
for t in tabs_result.get("tabs", []):
if t.get("id") == tab_id:
_tab_url = t.get("url", "")
break
except Exception:
pass
# Check URL for Moodle quiz patterns
if "/mod/quiz/" in _tab_url:
_platform = "moodle"
if "attempt.php" in _tab_url:
_page_type = "attempt"
elif "summary.php" in _tab_url:
_page_type = "summary"
elif "review.php" in _tab_url:
_page_type = "review"
elif "view.php" in _tab_url:
_page_type = "view"
elif "startattempt.php" in _tab_url:
_page_type = "startattempt"
else:
_page_type = "unknown"
_platform_info = {"platform": _platform, "pageType": _page_type, "url": _tab_url}
else:
# Probe for .que elements as last resort
_moodle_probe = await _read_selector(".que .info", max_len=2000)
if not _moodle_probe:
_moodle_probe = await _read_selector(".que", max_len=10000)
if _moodle_probe:
_platform = "moodle"
_page_type = "attempt"
_platform_info = {"platform": "moodle", "pageType": "attempt", "url": _tab_url}
else:
_platform_info = {"platform": "unknown", "pageType": "unknown", "url": _tab_url}
logger.info(f"moodle_solver: platform={_platform}, pageType={_page_type}, url={_platform_info.get('url', '?')}")
# ── Reject unsupported platforms ──
if _platform != "moodle":
_emit({"status": "test_error", "error": "unsupported_platform"})
await _end_span(_solve_span, success=False, error_message="unsupported_platform")
return (
"Не удалось определить платформу тестирования как Moodle.\n"
"На данный момент автоматическое прохождение тестов поддерживается "
"только для **Moodle** (mod/quiz).\n\n"
f"Обнаружено: platform={_platform}, URL={_platform_info.get('url', '?')}"
)
# ── Navigate from intro/confirmation to attempt page ──
_max_nav_steps = 3 # view → startattempt → attempt (max 3 transitions)
for _nav_step in range(_max_nav_steps):
if _page_type == "attempt":
logger.info("moodle_solver: on attempt page, starting quiz")
break
if _page_type == "review":
_emit({"status": "test_error", "error": "review_page"})
return "Тест уже завершён — открыта страница просмотра результатов (review)."
if _page_type == "summary":
logger.info("moodle_solver: on summary page, will proceed to submit")
break
# ── Click start/confirm button to navigate forward ──
_btn_text = None
if _page_type == "view":
_btn_text = _platform_info.get("startBtn")
elif _page_type == "startattempt":
_btn_text = _platform_info.get("confirmBtn")
if not _btn_text:
# Fallback: search interactive elements for start/confirm keywords
text, elems = await _read_page()
_start_keywords = [
"начать попытку", "пройти тест", "start attempt",
"attempt quiz", "начать тест", "приступить",
"продолжить попытку", "continue attempt",
]
for el in elems:
el_text = (el.get("text") or "").lower()
if any(kw in el_text for kw in _start_keywords):
_btn_text = (el.get("text") or "").strip()
break
if _btn_text:
logger.info(f"moodle_solver: clicking '{_btn_text}' (page={_page_type})")
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "text": _btn_text})
except Exception as e:
logger.warning(f"moodle_solver: click failed: {e}")
# Try generic submit button as last resort
try:
await send_fn("browser.clickElement", {
"tabId": tab_id,
"selector": "input[type='submit'], button[type='submit']",
})
except Exception:
break
await asyncio.sleep(1.5)
elif _page_type in ("view", "startattempt"):
logger.warning(f"moodle_solver: no button found on {_page_type} page")
break
else:
logger.warning(f"moodle_solver: unexpected Moodle page type: {_page_type}")
break
# Re-detect after navigation
if _detect_available:
try:
_platform_info = await send_fn("browser.detectQuizPlatform", {"tabId": tab_id}) or {}
_page_type = _platform_info.get("pageType", "unknown")
except Exception:
_page_type = "unknown"
else:
# Fallback: re-check URL
try:
tabs_result = await send_fn("browser.listTabs", {})
_tab_url = ""
if isinstance(tabs_result, dict):
for t in tabs_result.get("tabs", []):
if t.get("id") == tab_id:
_tab_url = t.get("url", "")
break
if "attempt.php" in _tab_url:
_page_type = "attempt"
elif "summary.php" in _tab_url:
_page_type = "summary"
elif "startattempt.php" in _tab_url:
_page_type = "startattempt"
elif "view.php" in _tab_url:
_page_type = "view"
else:
# Probe for .que
_probe = await _read_selector(".que .info", max_len=2000)
_page_type = "attempt" if _probe else "unknown"
except Exception:
_page_type = "unknown"
logger.info(f"moodle_solver: after nav step {_nav_step + 1}: pageType={_page_type}")
# ── Handle student name input (if on attempt page with name field) ──
if student_name and _page_type == "attempt":
text, elems = await _read_page()
for el in elems:
el_id = (el.get("id") or "").lower()
el_tag = (el.get("tag") or "").lower()
if el_tag == "input" and any(
k in el_id for k in ["name", "student", "username", "firstname"]
):
sel = el.get("selector") or f"#{el.get('id', '')}"
await send_fn("browser.typeText", {
"tabId": tab_id, "selector": sel,
"text": student_name, "clearFirst": True,
})
break
# ── Step 2: verify we're on a quiz attempt page ──────────────────────
_is_moodle = _page_type in ("attempt", "summary")
if not _is_moodle:
# Last resort: probe for .que elements directly
_moodle_probe = await _read_selector(".que .info", max_len=2000)
if not _moodle_probe:
_moodle_probe = await _read_selector(".que", max_len=10000)
_is_moodle = bool(_moodle_probe)
if _is_moodle:
_emit({"status": "test_strategy", "framework": "moodle", "answer_type": "structured"})
logger.info("moodle_solver: Moodle quiz confirmed, starting solver")
else:
_emit({"status": "test_error", "error": "not_on_attempt"})
await _end_span(_solve_span, success=False, error_message="not_on_attempt")
return (
"Не удалось перейти на страницу с вопросами теста.\n"
f"Текущая страница: {_page_type} ({_platform_info.get('url', '?')})\n\n"
"Попробуйте открыть тест вручную и запустить решение снова."
)
# ── Helper: click Next page button ────────────────────────────────────
async def _click_next_page() -> bool:
for _moodle_sel in MOODLE_NAV_SELECTORS:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id, "selector": _moodle_sel
})
return True
except Exception:
pass
for kw in [
"Следующая страница", "Next page", "Далее", "Next",
"Завершить попытку", "Finish attempt",
]:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "text": kw})
return True
except Exception:
pass
return False
# ── TRACE MODE: collect HTML of each question without answering ──────
_mode_lo = (mode or "").strip().lower()
_is_trace = _mode_lo in (
"trace", "tracing", "learn", "learning",
"трассировка", "трассировки", "обучение", "обучения",
) or "трассир" in _mode_lo or "обучен" in _mode_lo
if _is_trace:
_trace_records: List[dict] = []
prev_trace_text = ""
stuck_trace = 0
for _step in range(MAX_QUESTIONS):
text, elems = await _read_page()
text_lower = text.lower()
# Detect results / finish page
if _trace_records:
if any(m in text_lower for m in RESULT_MARKERS):
if not re.search(r'(?:вопрос|question)\s*\d+', text_lower):
break
# Stuck detection
if text.strip() == prev_trace_text:
stuck_trace += 1
if stuck_trace >= 3:
break
else:
stuck_trace = 0
prev_trace_text = text.strip()
# Use getMoodleQuestions if Moodle detected
if _is_moodle:
try:
mq_result = await send_fn("browser.getMoodleQuestions", {"tabId": tab_id})
mq_list = (mq_result.get("questions", [])
if isinstance(mq_result, dict) else [])
except Exception:
mq_list = []
if mq_list:
for q in mq_list:
q_html = await _read_question_html()
_trace_records.append({
"num": str(len(_trace_records) + 1),
"total": "?",
"text": q.get("qtext", "")[:200],
"qtype": q.get("qtype", "unknown"),
"options": [o.get("text", "") for o in q.get("options", [])],
"selects": len(q.get("selects", [])),
"text_inputs": len(q.get("textInputs", [])),
"html": q_html,
})
_emit({"status": "test_tracing", "question": len(_trace_records)})
# Click Next
await _click_next_page()
await asyncio.sleep(0.5)
continue
# Fallback: generic trace
q_html = await _read_question_html()
q_text_val = (await _read_qtext())[:200] or \
_extract_question_text(text, [], None)[:200]
_trace_records.append({
"num": str(len(_trace_records) + 1),
"total": "?",
"text": q_text_val,
"qtype": _detect_signature(text, elems)["name"],
"options": [],
"selects": 0,
"text_inputs": 0,
"html": q_html,
})
_emit({"status": "test_tracing", "question": len(_trace_records)})
# Click Next
await _click_next_page()
await asyncio.sleep(0.5)
# Build trace report
lines_out: List[str] = [
f"## Трассировка теста — {len(_trace_records)} вопросов",
"",
]
for rec in _trace_records:
lines_out.append(f"### Вопрос {rec['num']}")
lines_out.append(f"**Тип:** `{rec.get('qtype', '?')}`")
if rec.get("text"):
lines_out.append(f"**Текст:** {rec['text']}")
if rec.get("options"):
lines_out.append(f"**Варианты:** {', '.join(rec['options'][:10])}")
if rec.get("selects"):
lines_out.append(f"**Выпадающих списков:** {rec['selects']}")
if rec.get("text_inputs"):
lines_out.append(f"**Текстовых полей:** {rec['text_inputs']}")
if rec.get("html"):
lines_out.append(f"\n```html\n{rec['html']}\n```")
else:
lines_out.append("*HTML не загружен (обновите расширение)*")
lines_out.append("")
_emit({"status": "test_complete", "answered": 0, "mode": "trace"})
await _end_span(_solve_span, success=True, metadata_updates={
"mode": "trace", "questions_total": len(_trace_records),
})
return "\n".join(lines_out)
# ── Helper: answer one question (Moodle structured) ──────────────────
async def _answer_moodle_question(q: dict, q_idx: int, q_total_str: str):
"""Answer a single Moodle question using structured data from getMoodleQuestions."""
nonlocal answered, _total_simulated_sec
qtext = q.get("qtext", "")
qtype = q.get("qtype", "unknown")
options = q.get("options", [])
selects = q.get("selects", [])
# Start MOODLE_QUESTION span
q_num_val = str(q.get("qno", q_idx + 1))
_q_span = await _start_span(OperationType.MOODLE_QUESTION, {
"question_num": q_num_val,
"question_type": qtype,
"question_text": qtext[:200],
})
t_inputs = q.get("textInputs", [])
ddwtos_places = q.get("ddwtosPlaces", [])
is_mult = q.get("multiple", False)
# numerical: radios with _unit in name are unit selectors, not answer options
unit_options = []
if qtype == "numerical" and options:
unit_options = [o for o in options if "_unit" in o.get("selector", "")]
options = [o for o in options if "_unit" not in o.get("selector", "")]
# Use real question number from Moodle (qno), fallback to sequential index
q_num = str(q.get("qno", q_idx + 1))
# Use total from navigation block if available
_moodle_total = q.get("totalQuestions", 0)
if _moodle_total and _moodle_total > 0:
q_total_str = str(_moodle_total)
q_short = qtext[:80] if qtext else f"Вопрос {q_num}"
_emit({
"status": "test_thinking",
"question": int(q_num),
"total": int(q_total_str) if q_total_str != "?" else 0,
"question_text": q_short,
"human_delay_sec": 0,
})
# Build LLM prompt
opt_texts = [o.get("text", "") for o in options]
prompt = f"Вопрос {q_num} из {q_total_str}:\n{qtext}\n"
if opt_texts:
label = "Чекбоксы (несколько верных)" if is_mult else "Варианты ответа"
opts_str = "\n".join(f" {i+1}. {t}" for i, t in enumerate(opt_texts))
prompt += f"\n{label}:\n{opts_str}\n"
if selects:
prompt += "\nВыпадающие списки:\n"
for si, sel_data in enumerate(selects):
stem = sel_data.get("stem", "")
sel_opts = [o["text"] for o in sel_data.get("options", []) if o.get("text")]
prompt += f" Список {si+1}{' ('+stem+')' if stem else ''}: {', '.join(sel_opts)}\n"
if t_inputs and not opt_texts and not selects:
prompt += "\nЕсть поле для ввода текстового ответа.\n"
prompt += (
"\nИнструкция:\n"
"- Варианты → верни ТОЛЬКО номер(а) через запятую (например: 2 или 1,3)\n"
"- Текстовый ответ → верни ТЕКСТ: ответ\n"
"- Без пояснений."
)
# Human delay
if _override_total is not None:
_est = int(q_total_str) if q_total_str != "?" else MAX_QUESTIONS
_per = _override_total / max(_est, 1)
delay = random.uniform(max(0.5, _per * 0.85), _per * 1.15)
else:
delay = _human_delay_sec(qtext, opt_texts or None)
_total_simulated_sec += delay
_emit({
"status": "test_thinking",
"question": int(q_num),
"total": int(q_total_str) if q_total_str != "?" else 0,
"question_text": q_short,
"human_delay_sec": round(delay, 1),
})
await asyncio.sleep(delay)
# ── ddwtos: Drag and Drop into Text ──────────────────────────────
if ddwtos_places:
# Group places by group — each group has its own choice set
groups: Dict[str, list] = {}
for p in ddwtos_places:
g = p.get("group", "1")
if g not in groups:
groups[g] = []
groups[g].append(p)
# Build prompt showing blanks and choices per group
prompt_ddwtos = f"Вопрос {q_num} из {q_total_str}:\n{qtext}\n\n"
prompt_ddwtos += f"В тексте {len(ddwtos_places)} пропусков. Нужно заполнить каждый пропуск словом из списка.\n\n"
for g_id, places in sorted(groups.items()):
choices = places[0].get("choices", [])
choice_texts = [c["text"] for c in sorted(choices, key=lambda c: c["value"])]
place_nums = [str(p["place"]) for p in places]
prompt_ddwtos += f"Пропуски {', '.join(place_nums)} — варианты: {', '.join(f'{i+1}. {t}' for i, t in enumerate(choice_texts))}\n"
prompt_ddwtos += (
f"\nВерни {len(ddwtos_places)} чисел через запятую — номер варианта для каждого пропуска "
f"по порядку (пропуск 1, пропуск 2, ...).\n"
f"Без пояснений. Пример: 1,2,1,2,2,1,1"
)
ddwtos_answer = await _ask_llm(prompt_ddwtos)
logger.info(f"moodle_solver Q{q_num} (ddwtos): LLM → {ddwtos_answer!r}")
nums = re.findall(r'\d+', ddwtos_answer)
ddwtos_results = []
for pi, (place, num_str) in enumerate(zip(ddwtos_places, nums)):
try:
choice_idx = int(num_str) - 1
choices = sorted(place.get("choices", []), key=lambda c: c["value"])
if 0 <= choice_idx < len(choices):
choice_val = str(choices[choice_idx]["value"])
choice_text = choices[choice_idx]["text"]
sel = place.get("selector", "")
if sel:
await send_fn("browser.setInputValue", {
"tabId": tab_id, "selector": sel, "value": choice_val,
})
ddwtos_results.append(choice_text)
logger.debug(f"moodle_solver Q{q_num}: place {place['place']} → '{choice_text}' (value={choice_val})")
except Exception as e:
logger.warning(f"moodle_solver Q{q_num}: setInputValue failed for place {pi+1}: {e}")
chosen_answer = "; ".join(ddwtos_results)
answer_raw = chosen_answer
answered += 1
_final = chosen_answer.strip() or ddwtos_answer.strip()
_emit({
"status": "test_answered",
"question": int(q_num),
"total": int(q_total_str) if q_total_str != "?" else 0,
"answer": _final[:60],
"question_text": q_short,
})
_qa_records.append({
"num": q_num, "total": q_total_str,
"text": q_short,
"answer": _final,
"delay_sec": round(delay, 1),
})
await _end_span(_q_span, success=True, metadata_updates={
"chosen_answer": _final[:100], "delay_sec": round(delay, 1),
})
return
# Ask LLM (skip generic call for select-only questions — they use own prompt)
if opt_texts or t_inputs or not selects:
answer_raw = await _ask_llm(prompt)
logger.info(f"moodle_solver Q{q_num} ({qtype}): LLM → {answer_raw!r}")
else:
answer_raw = ""
# Apply answer
chosen_answer = ""
# Determine if this is a text-input question:
# - has text inputs AND no option radios (after filtering units for numerical)
# - OR LLM returned "ТЕКСТ:" AND question actually has text inputs
_is_text_answer = (
(t_inputs and not opt_texts and not selects)
or (answer_raw.upper().startswith("ТЕКСТ:") and t_inputs and not opt_texts)
)
if _is_text_answer:
# Текстовый ввод (shortanswer, numerical, essay)
text_val = (
answer_raw[6:].strip()
if answer_raw.upper().startswith("ТЕКСТ:")
else answer_raw.strip()
)
# numerical: extract only the number (strip units, text)
if qtype == "numerical" and text_val:
_num_match = re.search(r'-?\d+(?:[.,]\d+)?', text_val)
if _num_match:
text_val = _num_match.group(0).replace(',', '.')
logger.debug(f"moodle_solver Q{q_num}: numerical cleaned → '{text_val}'")
if text_val and t_inputs:
try:
await send_fn("browser.typeText", {
"tabId": tab_id, "selector": t_inputs[0]["selector"],
"text": text_val, "clearFirst": True,
})
chosen_answer = text_val
except Exception as e:
logger.warning(f"moodle_solver Q{q_num}: typeText failed: {e}")
chosen_answer = f"[ошибка ввода] {text_val}"
# numerical: also select first unit radio if available
if unit_options:
try:
u_sel = unit_options[0].get("selector", "")
if u_sel:
await send_fn("browser.clickElement", {
"tabId": tab_id, "selector": u_sel,
})
logger.debug(f"moodle_solver Q{q_num}: clicked unit option '{unit_options[0].get('text', '')}'")
except Exception as e:
logger.warning(f"moodle_solver Q{q_num}: unit click failed: {e}")
else:
# Strip "ТЕКСТ:" prefix if LLM mistakenly used it for a radio question
if answer_raw.upper().startswith("ТЕКСТ:"):
answer_raw = answer_raw[6:].strip()
# Radio/checkbox (multichoice, truefalse, multianswer)
if opt_texts:
nums = re.findall(r'\d+', answer_raw)
if not is_mult and nums:
nums = nums[:1]
# Fallback: letters
if not nums:
_letter_map = {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5,
'а': 1, 'б': 2, 'в': 3, 'г': 4, 'д': 5}
_letters = re.findall(r'\b([a-eа-д])\b', answer_raw, re.IGNORECASE)
for _let in _letters:
n = _letter_map.get(_let.lower(), 0)
if n and str(n) not in nums:
nums.append(str(n))
if not is_mult and nums:
nums = nums[:1]
# Fallback: text match
if not nums and answer_raw.strip():
_ans_lo = answer_raw.strip().lower()
for _oi, _ot in enumerate(opt_texts):
if _ot.lower().strip() in _ans_lo or _ans_lo in _ot.lower().strip():
nums = [str(_oi + 1)]
break
for num_str in nums:
idx = int(num_str) - 1
if 0 <= idx < len(options):
opt = options[idx]
sel = opt.get("selector", "")
opt_text = opt.get("text", "")
_clicked = False
# Try selector + text (clickElement uses text as fallback)
if sel:
try:
res = await send_fn("browser.clickElement", {
"tabId": tab_id, "selector": sel, "text": opt_text
})
_clicked = True
logger.debug(f"moodle_solver Q{q_num}: click OK sel='{sel}' → {res}")
except Exception as e:
logger.warning(f"moodle_solver Q{q_num}: click by selector failed sel='{sel}': {e}")
# Fallback: click by text only
if not _clicked and opt_text:
try:
res = await send_fn("browser.clickElement", {
"tabId": tab_id, "text": opt_text
})
_clicked = True
logger.debug(f"moodle_solver Q{q_num}: click OK text='{opt_text[:50]}' → {res}")
except Exception as e:
logger.warning(f"moodle_solver Q{q_num}: click failed for '{opt_text[:50]}': {e}")
if _clicked:
chosen_answer += opt_text + "; "
# Select dropdowns (match, ddmatch, gapselect, multianswer inline)
# Независимо от options — multianswer может иметь и то, и другое
if selects:
if qtype in ("matching", "ddmatch"):
common_opts = [o["text"] for o in selects[0].get("options", []) if o.get("text") and o.get("value", "0") != "0"]
n_sels = len(selects)
prompt_sel = (
f"Задание на сопоставление:\n{qtext}\n\n"
f"Нужно сопоставить {n_sels} строк с вариантами из списка.\n\n"
"СТРОКИ:\n"
)
for si, sel_data in enumerate(selects):
stem = sel_data.get("stem", f"Строка {si+1}")
prompt_sel += f" [{si+1}] {stem}\n"
prompt_sel += "\nВАРИАНТЫ (номер → текст):\n"
for oi, ot in enumerate(common_opts):
prompt_sel += f" {oi+1}. {ot}\n"
prompt_sel += (
f"\nОТВЕТ: выведи ТОЛЬКО {n_sels} чисел через запятую — "
f"номер варианта для каждой строки по порядку.\n"
f"Формат: число,число,число (без пробелов, без пояснений).\n"
f"Пример для 3 строк: 2,1,3"
)
else: # gapselect / multianswer inline
prompt_sel = f"Текст: {qtext}\n\nЗаполните пропуски:\n"
for si, sel_data in enumerate(selects):
stem = sel_data.get("stem", "")
sel_opts = [o["text"] for o in sel_data.get("options", []) if o.get("text") and o.get("value", "") != ""]
prompt_sel += f"\nПропуск {si+1}{' ('+stem+')' if stem else ''}:\n"
for oi, ot in enumerate(sel_opts):
prompt_sel += f" {oi+1}. {ot}\n"
prompt_sel += "\nИнструкция: верни номера через запятую для каждого пропуска. Пример: 1,3"
logger.info(f"moodle_solver Q{q_num} ({qtype}): selects prompt:\n{prompt_sel}")
sel_answer_raw = await _ask_llm(prompt_sel)
logger.info(f"moodle_solver Q{q_num} ({qtype}): LLM selects → {sel_answer_raw!r}")
# Extract answer numbers robustly:
# Prefer comma-separated sequence (e.g. "3,5,2,4,7,1,8,9,6")
# Fallback: all numbers in text
_csv_match = re.search(r'(\d+(?:\s*,\s*\d+)+)', sel_answer_raw)
if _csv_match:
nums = re.findall(r'\d+', _csv_match.group(1))
else:
nums = re.findall(r'\d+', sel_answer_raw)
sel_results = []
for si, (sel_data, num_str) in enumerate(zip(selects, nums)):
try:
idx = int(num_str) - 1
if qtype in ("matching", "ddmatch"):
valid_opts = [o for o in sel_data.get("options", []) if o.get("value", "0") != "0"]
else:
valid_opts = [o for o in sel_data.get("options", []) if o.get("value", "") != ""]
if 0 <= idx < len(valid_opts):
opt_value = valid_opts[idx]["value"]
opt_text = valid_opts[idx]["text"]
sel_sel = sel_data.get("selector", "")
if sel_sel:
res = await send_fn("browser.selectOption", {
"tabId": tab_id, "selector": sel_sel, "value": opt_value
})
sel_results.append(opt_text)
logger.info(f"moodle_solver Q{q_num}: select {si+1} OK → '{opt_text}' (value={opt_value}, result={res})")
except Exception as e:
logger.warning(f"moodle_solver Q{q_num}: selectOption failed for select {si+1}: {e}")
if sel_results:
chosen_answer += "; ".join(sel_results)
answered += 1
_final = (chosen_answer.rstrip("; ") if chosen_answer else answer_raw).strip()
_emit({
"status": "test_answered",
"question": int(q_num),
"total": int(q_total_str) if q_total_str != "?" else 0,
"answer": _final[:60],
"question_text": q_short,
})
_qa_records.append({
"num": q_num, "total": q_total_str,
"text": q_short,
"answer": _final,
"delay_sec": round(delay, 1),
})
await _end_span(_q_span, success=True, metadata_updates={
"chosen_answer": _final[:100], "delay_sec": round(delay, 1),
})
# ── Step 3: question loop ─────────────────────────────────────────────
prev_text = ""
stuck_count = 0
_summary_fix_attempts = 0
_MAX_SUMMARY_FIX_ATTEMPTS = 2
for _page in range(MAX_QUESTIONS):
text, elems = await _read_page()
text_lower = text.lower()
# ── Detect results / finish page ──────────────────────────────
_strong_score = bool(re.search(
r'(?:ваш результат|your (?:score|grade)|правильно на|итог)\s*:?\s*\d+',
text_lower
))
if answered > 0 or _strong_score:
if any(m in text_lower for m in RESULT_MARKERS):
if not re.search(r'(?:вопрос|question)\s*\d+', text_lower):
_results_text = text
break
# ── Stuck detection ───────────────────────────────────────────
if text.strip() == prev_text:
stuck_count += 1
if stuck_count >= 3:
_qa_records.append({
"num": str(answered + 1), "total": "?",
"text": "[застрял — страница не меняется, прерываю]",
"answer": "", "delay_sec": 0.0,
})
break
else:
stuck_count = 0
prev_text = text.strip()
# ── MOODLE PATH: use getMoodleQuestions ───────────────────────
if _is_moodle:
try:
mq_result = await send_fn("browser.getMoodleQuestions", {"tabId": tab_id})
mq_list = (mq_result.get("questions", [])
if isinstance(mq_result, dict) else [])
except Exception as e:
logger.warning(f"moodle_solver: getMoodleQuestions failed: {e}")
mq_list = []
if mq_list:
q_total_str = "?"
# Try to determine total from page title
_title_m = re.search(
r'страница\s+\d+\s+из\s+(\d+)', text_lower
)
# Detect adaptive mode: questions have per-question Check buttons
_is_adaptive = any(
q.get("checkButtonSelector") for q in mq_list
)
if _is_adaptive:
# ── ADAPTIVE MODE: answer + check each question individually ──
logger.info(f"moodle_solver: adaptive mode detected, {len(mq_list)} questions")
for q in mq_list:
q_state = q.get("state", "notyetanswered")
check_sel = q.get("checkButtonSelector", "")
q_num_adp = q.get("qno", 0)
# Skip already correctly answered questions
if q_state == "correct":
logger.debug(f"moodle_solver Q{q_num_adp}: already correct, skipping")
continue
# Answer the question
await _answer_moodle_question(q, answered, q_total_str)
# Click Check button to register the answer
if check_sel:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id, "selector": check_sel,
})
logger.info(f"moodle_solver Q{q_num_adp}: clicked Check button")
except Exception as _chk_err:
logger.warning(f"moodle_solver Q{q_num_adp}: Check click failed: {_chk_err}")
continue
# Wait for page reload after Check
await asyncio.sleep(2.0)
# Re-read questions to get feedback
try:
_upd_mq = await send_fn("browser.getMoodleQuestions", {"tabId": tab_id})
_upd_list = (_upd_mq.get("questions", [])
if isinstance(_upd_mq, dict) else [])
except Exception:
_upd_list = []
# Find this question in updated list
_upd_q = next(
(uq for uq in _upd_list if uq.get("qno") == q_num_adp),
None,
)
if _upd_q:
_new_state = _upd_q.get("state", "")
_fb = _upd_q.get("feedbackText", "")
_corr = _upd_q.get("correctnessText", "")
logger.info(
f"moodle_solver Q{q_num_adp}: after Check — "
f"state={_new_state}, correctness='{_corr}', "
f"feedback='{_fb[:100]}'"
)
# Determine if retry needed using correctnessText
# (state from .que class is unreliable in adaptive mode)
_corr_lo = _corr.lower()
_needs_retry = (
"неверн" in _corr_lo
or "incorrect" in _corr_lo
or "частичн" in _corr_lo
or "partial" in _corr_lo
or _new_state in ("incorrect", "partiallycorrect")
)
if _needs_retry:
_retry_qtype = _upd_q.get("qtype", q.get("qtype", "unknown"))
logger.info(f"moodle_solver Q{q_num_adp}: retrying (correctness='{_corr}', qtype={_retry_qtype})")
_retry_qtext = _upd_q.get("qtext", q.get("qtext", ""))
_retry_opts = _upd_q.get("options", q.get("options", []))
_retry_ddwtos = _upd_q.get("ddwtosPlaces", q.get("ddwtosPlaces", []))
_retry_selects = _upd_q.get("selects", [])
# ── SMART FLIP: 2-option questions ──
if _retry_opts and len(_retry_opts) == 2 and not _upd_q.get("multiple") and not _retry_ddwtos:
_unchecked = [o for o in _retry_opts if not o.get("checked")]
if _unchecked:
_flip_opt = _unchecked[0]
_flip_sel = _flip_opt.get("selector", "")
if _flip_sel:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id, "selector": _flip_sel,
})
logger.info(f"moodle_solver Q{q_num_adp}: smart flip → '{_flip_opt.get('text', '')}'")
except Exception as _re:
logger.warning(f"moodle_solver Q{q_num_adp}: smart flip click failed: {_re}")
# Click Check after flip
_rc_sel = _upd_q.get("checkButtonSelector", check_sel)
if _rc_sel:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "selector": _rc_sel})
logger.info(f"moodle_solver Q{q_num_adp}: clicked Check after smart flip")
await asyncio.sleep(2.0)
except Exception as _re:
logger.warning(f"moodle_solver Q{q_num_adp}: Check after flip failed: {_re}")
# ── DDWTOS RETRY (single attempt) ──
elif _retry_ddwtos:
_wrong_places = [p for p in _retry_ddwtos if p.get("isCorrect") is not True]
_correct_places = [p for p in _retry_ddwtos if p.get("isCorrect") is True]
if _wrong_places and _correct_places:
# Per-blank feedback available — retry only wrong blanks
logger.info(
f"moodle_solver Q{q_num_adp}: ddwtos {len(_correct_places)} correct, "
f"{len(_wrong_places)} wrong"
)
_d_prompt = (
f"Вопрос {q_num_adp}:\n{_retry_qtext}\n\n"
f"Некоторые пропуски заполнены правильно, а некоторые — нет.\n"
)
for p in _correct_places:
choices = sorted(p.get("choices", []), key=lambda c: c["value"])
cur_val = p.get("currentValue", "")
cur_text = next((c["text"] for c in choices if str(c["value"]) == str(cur_val)), "?")
_d_prompt += f"Пропуск {p['place']}: ✓ {cur_text} (верно)\n"
_d_prompt += "\nИсправь только НЕВЕРНЫЕ пропуски:\n"
for p in _wrong_places:
choices = sorted(p.get("choices", []), key=lambda c: c["value"])
choice_texts = [c["text"] for c in choices]
cur_val = p.get("currentValue", "")
cur_text = next((c["text"] for c in choices if str(c["value"]) == str(cur_val)), "?")
_d_prompt += f"Пропуск {p['place']}: ✗ было '{cur_text}' — варианты: {', '.join(f'{i+1}. {t}' for i, t in enumerate(choice_texts))}\n"
_d_prompt += (
f"\nВерни {len(_wrong_places)} чисел через запятую — номер варианта для каждого неверного пропуска.\n"
f"Без пояснений."
)
_target_places = _wrong_places
else:
# No per-blank feedback — retry all blanks
_d_groups: Dict[str, list] = {}
for p in _retry_ddwtos:
g = p.get("group", "1")
if g not in _d_groups:
_d_groups[g] = []
_d_groups[g].append(p)
_d_prompt = (
f"Вопрос {q_num_adp}:\n{_retry_qtext}\n\n"
f"Предыдущий ответ был НЕВЕРНЫМ.\n"
)
if _fb:
_d_prompt += f"Обратная связь: {_fb}\n"
_d_prompt += f"\nВ тексте {len(_retry_ddwtos)} пропусков.\n\n"
for g_id, places in sorted(_d_groups.items()):
choices = places[0].get("choices", [])
choice_texts = [c["text"] for c in sorted(choices, key=lambda c: c["value"])]
place_nums = [str(p["place"]) for p in places]
_d_prompt += f"Пропуски {', '.join(place_nums)} — варианты: {', '.join(f'{i+1}. {t}' for i, t in enumerate(choice_texts))}\n"
_d_prompt += f"\nВерни {len(_retry_ddwtos)} чисел через запятую.\nБез пояснений."
_target_places = _retry_ddwtos
_d_raw = await _ask_llm(_d_prompt)
logger.info(f"moodle_solver Q{q_num_adp}: ddwtos retry LLM → {_d_raw!r}")
_d_nums = re.findall(r'\d+', _d_raw)
for pi, (place, num_str) in enumerate(zip(_target_places, _d_nums)):
try:
choice_idx = int(num_str) - 1
choices = sorted(place.get("choices", []), key=lambda c: c["value"])
if 0 <= choice_idx < len(choices):
sel = place.get("selector", "")
if sel:
await send_fn("browser.setInputValue", {
"tabId": tab_id, "selector": sel,
"value": str(choices[choice_idx]["value"]),
})
except Exception as _re:
logger.warning(f"moodle_solver Q{q_num_adp}: ddwtos retry set failed: {_re}")
# Click Check after ddwtos retry
_rc_sel = _upd_q.get("checkButtonSelector", check_sel)
if _rc_sel:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "selector": _rc_sel})
logger.info(f"moodle_solver Q{q_num_adp}: clicked Check after ddwtos retry")
await asyncio.sleep(2.0)
except Exception as _re:
logger.warning(f"moodle_solver Q{q_num_adp}: ddwtos retry Check failed: {_re}")
# ── RADIO/CHECKBOX RETRY via LLM ──
elif _retry_opts:
_retry_prompt = (
f"Вопрос {q_num_adp}:\n{_retry_qtext}\n\n"
f"Предыдущий ответ был НЕВЕРНЫМ.\n"
)
if _fb:
_retry_prompt += f"Обратная связь от системы: {_fb}\n"
if _corr:
_retry_prompt += f"Статус: {_corr}\n"
_retry_opt_texts = [o.get("text", "") for o in _retry_opts]
_is_m = _upd_q.get("multiple", False)
_lbl = "Чекбоксы (несколько верных)" if _is_m else "Варианты ответа"
_opts_s = "\n".join(
f" {i+1}. {t}" for i, t in enumerate(_retry_opt_texts)
)
_retry_prompt += f"\n{_lbl}:\n{_opts_s}\n"
_retry_prompt += (
"\nИсправь ответ. Верни ТОЛЬКО номер(а) через запятую.\n"
"Без пояснений."
)
_retry_raw = await _ask_llm(_retry_prompt)
logger.info(f"moodle_solver Q{q_num_adp}: retry LLM → {_retry_raw!r}")
_retry_nums = re.findall(r'\d+', _retry_raw)
if not _upd_q.get("multiple") and _retry_nums:
_retry_nums = _retry_nums[:1]
for _rn in _retry_nums:
_ri = int(_rn) - 1
if 0 <= _ri < len(_retry_opts):
_r_sel = _retry_opts[_ri].get("selector", "")
if _r_sel:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id, "selector": _r_sel,
})
except Exception as _re:
logger.warning(f"moodle_solver Q{q_num_adp}: retry click failed: {_re}")
# Click Check after radio/checkbox retry
_rc_sel = _upd_q.get("checkButtonSelector", check_sel)
if _rc_sel:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "selector": _rc_sel})
logger.info(f"moodle_solver Q{q_num_adp}: clicked Check after retry")
await asyncio.sleep(2.0)
except Exception as _re:
logger.warning(f"moodle_solver Q{q_num_adp}: retry Check failed: {_re}")
# ── SELECT RETRY via LLM ──
elif _retry_selects:
_retry_prompt = (
f"Вопрос {q_num_adp}:\n{_retry_qtext}\n\n"
f"Предыдущий ответ был НЕВЕРНЫМ.\n"
)
if _fb:
_retry_prompt += f"Обратная связь от системы: {_fb}\n"
_retry_prompt += "\nВыпадающие списки:\n"
for si, sd in enumerate(_retry_selects):
_s_opts = [o["text"] for o in sd.get("options", []) if o.get("text")]
_retry_prompt += f" Список {si+1}: {', '.join(_s_opts)}\n"
_retry_prompt += (
"\nИсправь ответ. Верни ТОЛЬКО номер(а) через запятую.\n"
"Без пояснений."
)
_retry_raw = await _ask_llm(_retry_prompt)
logger.info(f"moodle_solver Q{q_num_adp}: retry LLM → {_retry_raw!r}")
_retry_nums = re.findall(r'\d+', _retry_raw)
for si, (sd, ns) in enumerate(zip(_retry_selects, _retry_nums)):
try:
_ri = int(ns) - 1
_vopts = [o for o in sd.get("options", []) if o.get("value", "0") != "0" and o.get("text")]
if 0 <= _ri < len(_vopts):
_ssel = sd.get("selector", "")
if _ssel:
await send_fn("browser.selectOption", {
"tabId": tab_id, "selector": _ssel,
"value": _vopts[_ri]["value"],
})
except Exception as _re:
logger.warning(f"moodle_solver Q{q_num_adp}: retry select failed: {_re}")
# Click Check after select retry
_rc_sel = _upd_q.get("checkButtonSelector", check_sel)
if _rc_sel:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "selector": _rc_sel})
logger.info(f"moodle_solver Q{q_num_adp}: clicked Check after retry")
await asyncio.sleep(2.0)
except Exception as _re:
logger.warning(f"moodle_solver Q{q_num_adp}: retry Check failed: {_re}")
# After all adaptive questions — navigate to next page / summary
try:
_sf_result = await send_fn("browser.submitMoodleForm", {"tabId": tab_id})
logger.info(f"moodle_solver: submitMoodleForm after adaptive: {_sf_result}")
except Exception as _sf_err:
logger.debug(f"moodle_solver: submitMoodleForm failed ({_sf_err}), falling back to clickNext")
await _click_next_page()
await asyncio.sleep(2.0)
continue
else:
# ── NORMAL MODE: answer all questions, then navigate ──
for q in mq_list:
await _answer_moodle_question(q, answered, q_total_str)
# Remember current question IDs before navigation
_old_q_ids = {q.get("id", "") for q in mq_list if q.get("id")}
# Submit form explicitly (ensures answers are POSTed)
try:
_sf_result = await send_fn("browser.submitMoodleForm", {"tabId": tab_id})
logger.info(f"moodle_solver: submitMoodleForm result: {_sf_result}")
except Exception as _sf_err:
logger.debug(f"moodle_solver: submitMoodleForm failed ({_sf_err}), falling back to clickNext")
await _click_next_page()
# Wait for page to actually change
for _wait_i in range(15): # max ~7.5s
await asyncio.sleep(0.5)
try:
_new_mq = await send_fn("browser.getMoodleQuestions", {"tabId": tab_id})
_new_list = (_new_mq.get("questions", [])
if isinstance(_new_mq, dict) else [])
_new_ids = {q.get("id", "") for q in _new_list if q.get("id")}
if _new_ids != _old_q_ids:
break # page changed
except Exception:
pass # page may be mid-navigation
else:
logger.warning("moodle_solver: page didn't change after 7.5s")
continue
# No questions on page — check if summary page (all questions answered)
if answered > 0:
try:
_summary = await send_fn("browser.getMoodleQuizSummary", {"tabId": tab_id})
except Exception:
_summary = {}
if isinstance(_summary, dict) and _summary.get("isSummaryPage"):
_s_total = _summary.get("totalQuestions", 0)
_s_all_ok = _summary.get("allAnswered", False)
_s_unanswered = _summary.get("unanswered", [])
_s_submit_sel = _summary.get("submitSelector", "")
logger.info(
f"moodle_solver: summary page — "
f"total={_s_total}, allAnswered={_s_all_ok}, "
f"unanswered={_s_unanswered}"
)
_s_invalid = _summary.get("invalid", [])
_s_problem_nums = list(_s_unanswered) + [q["num"] for q in _s_invalid]
if finish_test:
if _s_all_ok and not _s_invalid:
# All answers saved — submit
_emit({"status": "test_submitting"})
_submitted = False
if _s_submit_sel:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id, "selector": _s_submit_sel
})
logger.info("moodle_solver: clicked submit button")
_submitted = True
except Exception:
pass
if not _submitted:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id,
"text": "Отправить всё и завершить тест"
})
_submitted = True
except Exception as e:
logger.warning(f"moodle_solver: submit click failed: {e}")
if _submitted:
# Wait for Moodle confirmation modal to appear and confirm
_confirmed = False
for _modal_wait in range(10): # up to 5s
await asyncio.sleep(0.5)
# Try CSS selector first (Moodle modal save button)
for _sel in [
".modal.show [data-action='save']",
".moodle-dialogue [data-action='save']",
".confirmation-dialogue [data-action='save']",
"[data-action='save'].btn-primary",
]:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id, "selector": _sel
})
logger.info(f"moodle_solver: confirmed via selector {_sel}")
_confirmed = True
break
except Exception:
pass
if _confirmed:
break
# Fallback: try text match
for _confirm_text in [
"Отправить всё и завершить тест",
"Submit all and finish",
]:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id, "text": _confirm_text
})
logger.info(f"moodle_solver: confirmed via text '{_confirm_text}'")
_confirmed = True
break
except Exception:
pass
if _confirmed:
break
if not _confirmed:
logger.warning("moodle_solver: could not confirm submission modal")
# Wait for page to load after submission
await asyncio.sleep(2.0)
_test_submitted = _confirmed
# ── Handle review page → results page flow ──
_moodle_results_data = None
try:
_quiz_result = await send_fn("browser.getMoodleQuizResults", {"tabId": tab_id})
except Exception:
_quiz_result = {}
if _quiz_result.get("isQuizResultPage") and _quiz_result.get("pageType") == "review":
# We're on the review page — extract summary, then click "Закончить обзор"
_review_summary = _quiz_result.get("reviewSummary", {})
logger.info(f"moodle_solver: review page detected, summary={_review_summary}")
_emit({
"status": "test_review",
"message": "Обзор результатов...",
})
_finish_href = _quiz_result.get("finishReviewHref", "")
_navigated_to_results = False
if _finish_href:
try:
await send_fn("browser.navigateTab", {"tabId": tab_id, "url": _finish_href})
_navigated_to_results = True
except Exception:
pass
if not _navigated_to_results:
for _finish_text in ["Закончить обзор", "Finish review"]:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "text": _finish_text})
_navigated_to_results = True
break
except Exception:
pass
if _navigated_to_results:
await asyncio.sleep(2.0)
try:
_quiz_result = await send_fn("browser.getMoodleQuizResults", {"tabId": tab_id})
except Exception:
_quiz_result = {}
if _quiz_result.get("isQuizResultPage") and _quiz_result.get("pageType") == "results":
_moodle_results_data = _quiz_result
logger.info(f"moodle_solver: results page — attempts={_quiz_result.get('attempts')}, feedback={_quiz_result.get('feedback')}")
# Read page text as fallback for regex parsing
_results_text, _ = await _read_page()
break # Exit loop — submitted
elif _s_problem_nums and _summary_fix_attempts < _MAX_SUMMARY_FIX_ATTEMPTS:
# ── Go back and fix invalid/unanswered questions ──
_summary_fix_attempts += 1
_problem_str = ", ".join(str(n) for n in _s_problem_nums)
logger.info(
f"moodle_solver: summary fix attempt {_summary_fix_attempts}/{_MAX_SUMMARY_FIX_ATTEMPTS} "
f"— problems: {_problem_str}"
)
_emit({
"status": "test_fixing",
"message": f"Исправляю вопросы: {_problem_str} (попытка {_summary_fix_attempts})",
})
# Navigate back: prefer direct link to problem question page
_went_back = False
# Try using href from the first invalid question (navigates to correct page)
_first_invalid_href = ""
for _inv in _s_invalid:
if _inv.get("href"):
_first_invalid_href = _inv["href"]
break
if _first_invalid_href:
try:
await send_fn("browser.navigateTab", {"tabId": tab_id, "url": _first_invalid_href})
_went_back = True
except Exception:
pass
# Fallback: returnHref
if not _went_back:
_return_href = _summary.get("returnHref", "")
if _return_href:
try:
await send_fn("browser.navigateTab", {"tabId": tab_id, "url": _return_href})
_went_back = True
except Exception:
pass
if not _went_back:
for _back_text in ["Вернуться к попытке", "Return to attempt"]:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "text": _back_text})
_went_back = True
break
except Exception:
pass
if not _went_back:
_return_sel = _summary.get("returnSelector", "")
if _return_sel:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "selector": _return_sel})
_went_back = True
except Exception:
pass
if not _went_back:
logger.warning("moodle_solver: could not navigate back from summary")
_submit_problem = f"Не удалось вернуться к попытке для исправления вопросов: {_problem_str}"
break
await asyncio.sleep(1.5)
# Now we need to find and fix the problem questions
# getMoodleQuestions on current page, fix only problem ones
try:
_fix_data = await send_fn("browser.getMoodleQuestions", {"tabId": tab_id})
except Exception:
_fix_data = {}
_fix_questions = _fix_data.get("questions", [])
_fixed_any = False
for _fq in _fix_questions:
_fq_num = str(_fq.get("qno", ""))
if _fq_num not in [str(n) for n in _s_problem_nums]:
continue
_val_err = _fq.get("validationError", "")
logger.info(f"moodle_solver: fixing Q{_fq_num} on return"
f"{' (validation: '+_val_err+')' if _val_err else ''}")
# If validation error mentions "число" — clear text input value first
if _val_err and "число" in _val_err.lower():
# Clear any existing bad value in text inputs
for _ti in _fq.get("textInputs", []):
if _ti.get("value"):
_ti["value"] = "" # reset so solver re-types
# Re-answer this question
await _answer_moodle_question(_fq, 0, str(_s_total))
_fixed_any = True
if not _fixed_any:
logger.warning("moodle_solver: problem questions not found on this page")
# Try navigating to next page if multi-page quiz
break
# Submit the page to save answers
await asyncio.sleep(0.5)
try:
await send_fn("browser.submitMoodleForm", {"tabId": tab_id})
except Exception:
try:
await send_fn("browser.clickElement", {
"tabId": tab_id, "text": "Закончить попытку"
})
except Exception:
pass
await asyncio.sleep(1.5)
# Loop will continue — next iteration should land on summary again
continue
else:
# Max fix attempts reached or no problems detected
_problem_str = ", ".join(str(n) for n in _s_problem_nums)
_submit_problem = f"Не все ответы сохранены (вопросы: {_problem_str})"
break
break # Exit loop — we're on summary page (no finish_test)
# ── GENERIC PATH (non-Moodle) ────────────────────────────────
q_match = re.search(
r'(?:вопрос|question|задание)\s*(\d+)\s*(?:из|of|/)?\s*(\d+)?',
text_lower
)
if not q_match:
q_match = re.search(r'(\d+)\s*(?:/|из|of)\s*(\d+)', text_lower)
q_num = q_match.group(1) if q_match else str(answered + 1)
q_total = (
q_match.group(2)
if q_match and q_match.lastindex and q_match.lastindex >= 2
else None
) or "?"
_sig = _detect_signature(text, elems)
interaction = _sig["interaction"]
is_multiple = _sig.get("multiple", False)
# Classify interactive elements
radio_buttons: List[dict] = []
checkboxes: List[dict] = []
text_inputs: List[dict] = []
option_elems: List[dict] = []
next_btn: Optional[dict] = None
for el in elems:
el_tag = (el.get("tag") or "").lower()
el_type = (el.get("type") or "").lower()
el_text_raw = (el.get("text") or "").strip()
el_text_lo = el_text_raw.lower()
el_id = (el.get("id") or "").lower()
if any(kw in el_text_lo for kw in [
"далее", "next", "следующ", "завершить", "finish", "submit",
"отправить", "проверить",
]):
if not next_btn:
next_btn = el
continue
if any(kw in el_text_lo for kw in ["назад", "prev", "back", "предыдущ"]):
continue
if el_type == "radio":
radio_buttons.append(el)
elif el_type == "checkbox":
checkboxes.append(el)
elif el_tag in ("input", "textarea") and el_type in ("text", "number", "textarea", ""):
text_inputs.append(el)
elif el_tag in ("div", "span", "label", "li", "a", "button", "p"):
if el_text_raw and len(el_text_raw) < 500:
option_elems.append(el)
primary_elems: List[dict] = []
all_options: List[str] = []
if radio_buttons:
primary_elems = radio_buttons
all_options = [el.get("text", "").strip() for el in radio_buttons]
is_multiple = False
elif checkboxes:
primary_elems = checkboxes
all_options = [el.get("text", "").strip() for el in checkboxes]
is_multiple = True
elif option_elems:
primary_elems = option_elems
all_options = [el.get("text", "").strip() for el in option_elems]
q_short = _extract_question_text(text, all_options, None)[:80]
# Build LLM prompt
prompt = f"Вопрос {q_num} из {q_total}:\n{q_short}\n"
if all_options:
label = "Чекбоксы (несколько верных)" if is_multiple else "Варианты ответа"
opts_str = "\n".join(f" {i+1}. {t}" for i, t in enumerate(all_options))
prompt += f"\n{label}:\n{opts_str}\n"
prompt += (
"\nИнструкция:\n"
"- Варианты → верни ТОЛЬКО номер(а) через запятую\n"
"- Текстовый ответ → верни ТЕКСТ: ответ\n"
"- Без пояснений."
)
delay = _human_delay_sec(q_short, all_options or None)
_total_simulated_sec += delay
await asyncio.sleep(delay)
answer_raw = await _ask_llm(prompt)
chosen_answer = ""
if all_options:
nums = re.findall(r'\d+', answer_raw)
if not is_multiple and nums:
nums = nums[:1]
for num_str in nums:
idx = int(num_str) - 1
if 0 <= idx < len(primary_elems):
ok = await _click_option(
all_options[idx], primary_elems[idx], interaction, idx
)
if ok:
chosen_answer += all_options[idx] + "; "
answered += 1
_final = (chosen_answer.rstrip("; ") if chosen_answer else answer_raw).strip()
_qa_records.append({
"num": q_num, "total": q_total,
"text": q_short, "answer": _final, "delay_sec": round(delay, 1),
})
await _click_next_page()
await asyncio.sleep(0.5)
# ── Parse score from results page ─────────────────────────────────────
_correct_total = 0
_incorrect_total = 0
_score_pct = ""
_moodle_points: Optional[str] = None # e.g. "18,80 / 20,00"
_moodle_grade: Optional[str] = None # e.g. "94,00 / 100,00"
_moodle_feedback: str = ""
_moodle_attempts_table: List[dict] = []
# ── Try to detect and navigate review → results page ────────────
if not _moodle_results_data:
try:
_quiz_result = await send_fn("browser.getMoodleQuizResults", {"tabId": tab_id})
except Exception:
_quiz_result = {}
if _quiz_result.get("isQuizResultPage") and _quiz_result.get("pageType") == "review":
_review_summary = _quiz_result.get("reviewSummary", {})
logger.info(f"moodle_solver: on review page after loop, summary={_review_summary}")
_emit({"status": "test_review", "message": "Обзор результатов..."})
_finish_href = _quiz_result.get("finishReviewHref", "")
_navigated = False
if _finish_href:
try:
await send_fn("browser.navigateTab", {"tabId": tab_id, "url": _finish_href})
_navigated = True
except Exception:
pass
if not _navigated:
for _ft in ["Закончить обзор", "Finish review"]:
try:
await send_fn("browser.clickElement", {"tabId": tab_id, "text": _ft})
_navigated = True
break
except Exception:
pass
if _navigated:
await asyncio.sleep(2.0)
try:
_quiz_result = await send_fn("browser.getMoodleQuizResults", {"tabId": tab_id})
except Exception:
_quiz_result = {}
# Use whatever we got (review or results)
if _quiz_result.get("isQuizResultPage"):
_moodle_results_data = _quiz_result
_results_text, _ = await _read_page()
logger.info(f"moodle_solver: got results data from {_quiz_result.get('pageType')} page")
elif _quiz_result.get("isQuizResultPage") and _quiz_result.get("pageType") == "results":
_moodle_results_data = _quiz_result
logger.info("moodle_solver: already on results page")
# ── Structured results from extension (higher priority) ───────────
if _moodle_results_data:
# Results page — attempts table
_moodle_attempts_table = _moodle_results_data.get("attempts", [])
_moodle_feedback = _moodle_results_data.get("feedback", "")
# Extract points/grade from attempts table (last attempt row)
if _moodle_attempts_table:
_last_attempt = _moodle_attempts_table[-1]
for _col_key, _col_val in _last_attempt.items():
_ck = _col_key.lower()
_slash_parts = _col_key.split("/")
if len(_slash_parts) != 2:
continue
_max_val = _slash_parts[1].strip().replace(",", ".").strip()
_got_val = _col_val.strip().replace(",", ".")
_got_match = re.search(r'[\d]+[.]?[\d]*', _got_val)
_max_match = re.search(r'[\d]+[.]?[\d]*', _max_val)
if not _got_match or not _max_match:
continue
_got_num = _got_match.group(0)
_max_num = _max_match.group(0)
# "Баллы / 10,00" → points; "Оценка / 5,00" → grade
if "балл" in _ck or "marks" in _ck or "mark" in _ck:
_moodle_points = f"{_got_num} / {_max_num}"
try:
_correct_total = round(float(_got_num))
_incorrect_total = round(float(_max_num)) - _correct_total
except ValueError:
pass
elif "оценка" in _ck or "grade" in _ck:
_moodle_grade = f"{_got_num} / {_max_num}"
# Review summary (fallback if no attempts table)
_review_summary = _moodle_results_data.get("reviewSummary", {})
if _review_summary and not _moodle_points:
for _rk, _rv in _review_summary.items():
_rkl = _rk.lower()
if "балл" in _rkl:
_pts_m = re.search(r'([\d]+[,.][\d]+)\s*/\s*([\d]+[,.][\d]+)', _rv)
if _pts_m:
_moodle_points = f"{_pts_m.group(1).replace(',','.')} / {_pts_m.group(2).replace(',','.')}"
elif "оценка" in _rkl:
_grd_m = re.search(r'([\d]+[,.][\d]+)\s*(?:из|/)\s*([\d]+[,.][\d]+)', _rv)
if _grd_m:
_moodle_grade = f"{_grd_m.group(1).replace(',','.')} / {_grd_m.group(2).replace(',','.')}"
_pct_m = re.search(r'\((\d+)%?\)', _rv)
if _pct_m:
_score_pct = _pct_m.group(1)
elif "отзыв" in _rkl or "feedback" in _rkl:
_moodle_feedback = _rv
if _results_text:
_rt = _results_text
# Moodle results page: "Баллы / 20,00" header + value "18,80"
# Text from _read_page flattens the table, so look for patterns like:
# "Баллы / 20,00 ... 18,80" or "Marks ... 18.80 / 20.00"
# Also: "Средняя оценка: 94,00 / 100,00."
# Pattern: "Баллы / MAX" ... number (with comma or dot as decimal)
_p_marks = re.search(
r'[Бб]алл\w*\s*/\s*([\d]+[,.][\d]+|[\d]+)',
_rt
)
_p_grade = re.search(
r'[Оо]ценка\s*/\s*([\d]+[,.][\d]+|[\d]+)',
_rt
)
if _p_marks and not _moodle_points:
_marks_max = _p_marks.group(1).replace(',', '.')
_after_marks = _rt[_p_marks.end():]
_score_nums = re.findall(r'([\d]+[,.][\d]+)', _after_marks[:200])
for _sn in _score_nums:
_marks_got = _sn.replace(',', '.')
try:
if float(_marks_got) <= float(_marks_max):
_moodle_points = f"{_marks_got} / {_marks_max}"
_correct_total = round(float(_marks_got))
_incorrect_total = round(float(_marks_max)) - _correct_total
break
except ValueError:
pass
if _p_grade and not _moodle_grade:
_grade_max = _p_grade.group(1).replace(',', '.')
_after_grade = _rt[_p_grade.end():]
_grade_nums = re.findall(r'([\d]+[,.][\d]+)', _after_grade[:200])
for _gn in _grade_nums:
_grade_got = _gn.replace(',', '.')
try:
if float(_grade_got) <= float(_grade_max):
_moodle_grade = f"{_grade_got} / {_grade_max}"
_score_pct = str(round(float(_grade_got)))
break
except ValueError:
pass
# Fallback: "Средняя оценка: X / Y"
if not _moodle_grade:
_avg_m = re.search(
r'[Сс]редняя\s+оценка[:\s]*([\d]+[,.][\d]+)\s*/\s*([\d]+[,.][\d]+)',
_rt
)
if _avg_m:
_g = _avg_m.group(1).replace(',', '.')
_gm = _avg_m.group(2).replace(',', '.')
_moodle_grade = f"{_g} / {_gm}"
if not _score_pct:
try:
_score_pct = str(round(float(_g)))
except ValueError:
pass
# Fallback: generic patterns
if not _moodle_points and not _moodle_grade:
_moodle_marks_fb = re.search(
r'(?:marks|оценка|grade)\s*[:\s]*(\d+\.?\d*)\s*/\s*(\d+\.?\d*)',
_rt, re.IGNORECASE
)
if _moodle_marks_fb:
try:
_correct_total = round(float(_moodle_marks_fb.group(1)))
_incorrect_total = round(float(_moodle_marks_fb.group(2))) - _correct_total
except ValueError:
pass
else:
_sc = re.search(r'правильно\s+на\s+(\d+)\s+из\s+(\d+)', _rt, re.IGNORECASE)
if _sc:
_correct_total = int(_sc.group(1))
_incorrect_total = int(_sc.group(2)) - _correct_total
if not _score_pct:
_pct = re.search(r'(\d+)\s*%', _rt)
if _pct:
_score_pct = _pct.group(1)
# ── Build markdown report ──────────────────────────────────────────────
_emit({"status": "test_complete", "answered": answered})
_avg = _total_simulated_sec / answered if answered else 0.0
_m, _s = divmod(int(_total_simulated_sec), 60)
_time_str = f"{_m} мин {_s} с" if _m else f"{_s} с"
# Header with submission status
if _test_submitted:
_header = f"## Тест завершён — {answered} вопросов"
elif _submit_problem:
_header = f"## Тест пройден (не отправлен) — {answered} вопросов"
elif finish_test:
_header = f"## Тест пройден (не удалось отправить) — {answered} вопросов"
else:
_header = f"## Тест пройден — {answered} вопросов"
lines_out: List[str] = [
_header,
f"Время: {_time_str} (среднее {_avg:.1f} с/вопрос)",
"",
]
# Submission status line
if _test_submitted:
lines_out.append("Тест отправлен и завершён.")
lines_out.append("")
elif _submit_problem:
lines_out.append(f"⚠ {_submit_problem}")
lines_out.append("")
elif finish_test:
lines_out.append("⚠ Не удалось найти страницу подтверждения для отправки теста.")
lines_out.append("")
# Per-question answers
for rec in _qa_records:
lines_out.append(f"**Вопрос {rec['num']}.** {rec['text']}")
lines_out.append(f"- {rec['answer']}")
lines_out.append("")
if _moodle_points or _moodle_grade:
# Moodle-style results table
lines_out.append("---")
lines_out.append("### Результаты")
lines_out.append("")
lines_out.append("| Показатель | Получено | Максимум |")
lines_out.append("|---|---|---|")
if _moodle_points:
_pts = _moodle_points.split(" / ")
lines_out.append(f"| Баллы | {_pts[0]} | {_pts[1]} |")
if _moodle_grade:
_grd = _moodle_grade.split(" / ")
lines_out.append(f"| Оценка | {_grd[0]} | {_grd[1]} |")
elif _correct_total or _incorrect_total or _score_pct:
lines_out.append("---")
lines_out.append("### Итог")
if _correct_total or _incorrect_total:
lines_out.append(f"- Правильно: **{_correct_total}** из **{_correct_total + _incorrect_total}**")
if _score_pct:
lines_out.append(f"- Результат: **{_score_pct}%**")
# Feedback line
if _moodle_feedback:
lines_out.append("")
lines_out.append(f"**Отзыв:** {_moodle_feedback}")
# End MOODLE_SOLVE span
await _end_span(_solve_span, success=True, metadata_updates={
"questions_total": answered,
"score": _score_pct or "",
"grade": _moodle_grade or "",
"test_submitted": _test_submitted,
})
return "\n".join(lines_out)