# Предложить слоты по времени import logging from typing import Any, Dict, Optional, List from start import ( MfcApi, build_buttons_text, get_data_element, CARD, CARD_GEN, PART_OF_DAY, build_sorted_time_slots, get_total_slots_availability_bulk, dedupe_slots, setup_pagination, get_pagination_data, ) api = MfcApi() def get_nearest_times(time_list: List[str], time: str, n: int = 3) -> List[str]: def _norm(t: str) -> str: h, m = t.split(":") return f"{int(h):02d}:{int(m):02d}" def _to_minutes(t: str) -> int: h, m = map(int, t.split(":")) return h * 60 + m if not time_list or n <= 0: return [] times = [_norm(t) for t in time_list] n = min(n, len(times)) target = _norm(time) tgt = _to_minutes(target) mins = [_to_minutes(t) for t in times] i0 = min(range(len(mins)), key=lambda i: (abs(mins[i] - tgt), mins[i])) res = [times[i0]] l, r = i0 - 1, i0 + 1 while len(res) < n and (l >= 0 or r < len(times)): dl = abs(mins[l] - tgt) if l >= 0 else float("inf") dr = abs(mins[r] - tgt) if r < len(times) else float("inf") if dl <= dr: if l >= 0: res.append(times[l]) l -= 1 elif r < len(times): res.append(times[r]) r += 1 else: if r < len(times): res.append(times[r]) r += 1 elif l >= 0: res.append(times[l]) l -= 1 res_sorted = sorted(res, key=lambda t: _to_minutes(t)) return res_sorted def num_cardinal(n: int) -> str: if n in CARD: return CARD[n] tens = (n // 10) * 10 unit = n % 10 return f"{CARD[tens]} {CARD[unit]}" def num_genitive(n: int) -> str: if n in CARD_GEN: return CARD_GEN[n] tens = (n // 10) * 10 unit = n % 10 return f"{CARD_GEN[tens]} {CARD_GEN[unit]}" def parse_times(times: List[str]) -> List[int]: mins = [] for t in times: h, m = t.strip().split(":") mins.append(int(h) * 60 + int(m)) return sorted(set(mins)) def part_of_day_label(mins: int) -> str: h = (mins // 60) % 24 for start, end, label in PART_OF_DAY: if start <= h < end: return label return "дня" def time_words_point(mins: int) -> str: h = mins // 60 m = mins % 60 if m == 0: return f"{num_cardinal(h)}" return f"{num_cardinal(h)} {num_cardinal(m)}" def time_words_gen(mins: int) -> str: h = mins // 60 m = mins % 60 if m == 0: return f"{num_genitive(h)}" return f"{num_genitive(h)} {num_genitive(m)}" def make_spans(mins: List[int], max_gap: int = 60) -> List[List[int]]: if not mins: return [] spans = [[mins[0], mins[0]]] for x in mins[1:]: if x - spans[-1][1] <= max_gap: spans[-1][1] = x else: spans.append([x, x]) return spans def join_times_with_and(parts: List[str]) -> str: if not parts: return "" if len(parts) == 1: return parts[0] if len(parts) == 2: return f"{parts[0]} и {parts[1]}" return f"{', '.join(parts[:-1])} и {parts[-1]}" def group_by_part_of_day(mins: List[int]) -> Dict[str, List[int]]: buckets: Dict[str, List[int]] = {} for t in mins: label = part_of_day_label(t) buckets.setdefault(label, []).append(t) for k in buckets: buckets[k].sort() return buckets def fmt_time_numeric(mins: int) -> str: h = (mins // 60) % 24 m = mins % 60 return f"{h}:{m:02d}" def format_hour_block(times_in_hour: List[int], *, numeric: bool = False) -> str: if numeric: return join_times_with_and([fmt_time_numeric(t) for t in times_in_hour]) return join_times_with_and([time_words_point(t) for t in times_in_hour]) def format_part_of_day_group( times: List[int], label: str, *, numeric: bool = False ) -> str: by_hour: Dict[int, List[int]] = {} for t in times: h = t // 60 by_hour.setdefault(h, []).append(t) blocks = [] for h in sorted(by_hour): blocks.append(format_hour_block(by_hour[h], numeric=numeric)) core = join_times_with_and(blocks) if numeric: return f"в {core}" return f"в {core} {label}" def build_times_phrase( times: List[str], separate: bool = False, numeric: bool = False ) -> str: mins = parse_times(times) if not mins: return "" if separate: pods = group_by_part_of_day(mins) parts = [] for label, ts in sorted(pods.items(), key=lambda kv: (min(kv[1]), kv[0])): parts.append(format_part_of_day_group(ts, label, numeric=numeric)) return "; ".join(parts) spans = make_spans(mins, max_gap=60) phrases = [] for a, b in spans: if a == b: if numeric: phrases.append(f"в {fmt_time_numeric(a)}") else: label = part_of_day_label(a) phrases.append(f"в {time_words_point(a)} {label}") else: if numeric: phrases.append(f"с {fmt_time_numeric(a)} до {fmt_time_numeric(b)}") else: label_a = part_of_day_label(a) label_b = part_of_day_label(b) if label_a == label_b: phrases.append( f"с {time_words_gen(a)} до {time_words_gen(b)} {label_a}" ) else: phrases.append( f"с {time_words_gen(a)} {label_a} до {time_words_gen(b)} {label_b}" ) return "; ".join(phrases) def rebuild_time_slots( *, time_slots: List[Dict[str, Any]], channel: str ) -> List[Dict[str, Any]]: if channel not in ["max", "telegram"]: return time_slots for time_slot in time_slots: time_slot["name"] = time_slot["name"].replace(":", ".") return time_slots def handler( data: Optional[Dict[str, Any]] = None, session_id: Optional[str] = None, channel: str = "default", ): channel = data["variables"].get("channel_type", channel) mfc_id = data["variables"]["mfc_id"] service_ids = data["variables"]["service_ids"] service_count = data["variables"]["service_count"] date = get_data_element( data=data, filter={"type": "see", "model": "record_date"}, exclude_keys=["declined"], ) time = get_data_element( data=data, filter={ "type": "see", "model": "record_time", "more_index": date.get("index"), }, exclude_keys=["declined"], ) slots = get_total_slots_availability_bulk( session=data, api=api, mfc_id=mfc_id, service_ids=service_ids, service_count=service_count, ) # Убираем дубликаты. # Почему они могут возникать? # Потому что мы получаем слоты по нескольким услугам (service_ids). slots = dedupe_slots([x for slot in slots for x in slot["slots"]]) logging.info( "Suggest time slots with mfc_id %r, service_ids %r, service_count %r", mfc_id, service_ids, service_count, ) time_slots = build_sorted_time_slots(slots=slots, date=date["param"]) text, variables = "", {} if channel == "voice": # если пользователь сказал время - мы ищем БЛИЖАЙИШЕ среди альтернативных if time: alter_times = get_nearest_times(time_slots, time, n=2) alter_times = build_times_phrase(alter_times, separate=True, numeric=True) else: alter_times = build_times_phrase(time_slots, numeric=True) variables.update({"alter_times": alter_times}) else: if channel == "vkontakte": per_page = api.cfg.vk_buttons_rows else: per_page = api.cfg.max_buttons_rows if not data["variables"].get("__pagination") or data["variables"].get("__pagination_type") != "time": setup_pagination(session=data, data=time_slots, per_page=per_page) data["variables"]["__pagination_type"] = "time" previous, page_times, next_ = get_pagination_data(session=data) pagination_count = int(bool(previous)) + int(bool(next_)) max_time_buttons = per_page - pagination_count buttons = [{"name": time_slot} for time_slot in page_times[:max_time_buttons]] if previous: buttons += [{"name": api.cfg.pagination_previous_button_name}] if next_: buttons += [{"name": api.cfg.pagination_next_button_name}] text = build_buttons_text( buttons=rebuild_time_slots(time_slots=buttons, channel=channel), buttons_in_row=1, ) return {"text": text}, variables