42.py
· 8.8 KiB · Python
Raw
# Предложить слоты по времени
import logging
from typing import Any, Dict, Optional, List
from start import (
MfcApi,
build_buttons_text,
get_data_element,
CARD,
CARD_GEN,
PART_OF_DAY,
build_sorted_time_slots,
get_total_slots_availability_bulk,
dedupe_slots,
setup_pagination,
get_pagination_data,
)
api = MfcApi()
def get_nearest_times(time_list: List[str], time: str, n: int = 3) -> List[str]:
def _norm(t: str) -> str:
h, m = t.split(":")
return f"{int(h):02d}:{int(m):02d}"
def _to_minutes(t: str) -> int:
h, m = map(int, t.split(":"))
return h * 60 + m
if not time_list or n <= 0:
return []
times = [_norm(t) for t in time_list]
n = min(n, len(times))
target = _norm(time)
tgt = _to_minutes(target)
mins = [_to_minutes(t) for t in times]
i0 = min(range(len(mins)), key=lambda i: (abs(mins[i] - tgt), mins[i]))
res = [times[i0]]
l, r = i0 - 1, i0 + 1
while len(res) < n and (l >= 0 or r < len(times)):
dl = abs(mins[l] - tgt) if l >= 0 else float("inf")
dr = abs(mins[r] - tgt) if r < len(times) else float("inf")
if dl <= dr:
if l >= 0:
res.append(times[l])
l -= 1
elif r < len(times):
res.append(times[r])
r += 1
else:
if r < len(times):
res.append(times[r])
r += 1
elif l >= 0:
res.append(times[l])
l -= 1
res_sorted = sorted(res, key=lambda t: _to_minutes(t))
return res_sorted
def num_cardinal(n: int) -> str:
if n in CARD:
return CARD[n]
tens = (n // 10) * 10
unit = n % 10
return f"{CARD[tens]} {CARD[unit]}"
def num_genitive(n: int) -> str:
if n in CARD_GEN:
return CARD_GEN[n]
tens = (n // 10) * 10
unit = n % 10
return f"{CARD_GEN[tens]} {CARD_GEN[unit]}"
def parse_times(times: List[str]) -> List[int]:
mins = []
for t in times:
h, m = t.strip().split(":")
mins.append(int(h) * 60 + int(m))
return sorted(set(mins))
def part_of_day_label(mins: int) -> str:
h = (mins // 60) % 24
for start, end, label in PART_OF_DAY:
if start <= h < end:
return label
return "дня"
def time_words_point(mins: int) -> str:
h = mins // 60
m = mins % 60
if m == 0:
return f"{num_cardinal(h)}"
return f"{num_cardinal(h)} {num_cardinal(m)}"
def time_words_gen(mins: int) -> str:
h = mins // 60
m = mins % 60
if m == 0:
return f"{num_genitive(h)}"
return f"{num_genitive(h)} {num_genitive(m)}"
def make_spans(mins: List[int], max_gap: int = 60) -> List[List[int]]:
if not mins:
return []
spans = [[mins[0], mins[0]]]
for x in mins[1:]:
if x - spans[-1][1] <= max_gap:
spans[-1][1] = x
else:
spans.append([x, x])
return spans
def join_times_with_and(parts: List[str]) -> str:
if not parts:
return ""
if len(parts) == 1:
return parts[0]
if len(parts) == 2:
return f"{parts[0]} и {parts[1]}"
return f"{', '.join(parts[:-1])} и {parts[-1]}"
def group_by_part_of_day(mins: List[int]) -> Dict[str, List[int]]:
buckets: Dict[str, List[int]] = {}
for t in mins:
label = part_of_day_label(t)
buckets.setdefault(label, []).append(t)
for k in buckets:
buckets[k].sort()
return buckets
def fmt_time_numeric(mins: int) -> str:
h = (mins // 60) % 24
m = mins % 60
return f"{h}:{m:02d}"
def format_hour_block(times_in_hour: List[int], *, numeric: bool = False) -> str:
if numeric:
return join_times_with_and([fmt_time_numeric(t) for t in times_in_hour])
return join_times_with_and([time_words_point(t) for t in times_in_hour])
def format_part_of_day_group(
times: List[int], label: str, *, numeric: bool = False
) -> str:
by_hour: Dict[int, List[int]] = {}
for t in times:
h = t // 60
by_hour.setdefault(h, []).append(t)
blocks = []
for h in sorted(by_hour):
blocks.append(format_hour_block(by_hour[h], numeric=numeric))
core = join_times_with_and(blocks)
if numeric:
return f"в {core}"
return f"в {core} {label}"
def build_times_phrase(
times: List[str], separate: bool = False, numeric: bool = False
) -> str:
mins = parse_times(times)
if not mins:
return ""
if separate:
pods = group_by_part_of_day(mins)
parts = []
for label, ts in sorted(pods.items(), key=lambda kv: (min(kv[1]), kv[0])):
parts.append(format_part_of_day_group(ts, label, numeric=numeric))
return "; ".join(parts)
spans = make_spans(mins, max_gap=60)
phrases = []
for a, b in spans:
if a == b:
if numeric:
phrases.append(f"в {fmt_time_numeric(a)}")
else:
label = part_of_day_label(a)
phrases.append(f"в {time_words_point(a)} {label}")
else:
if numeric:
phrases.append(f"с {fmt_time_numeric(a)} до {fmt_time_numeric(b)}")
else:
label_a = part_of_day_label(a)
label_b = part_of_day_label(b)
if label_a == label_b:
phrases.append(
f"с {time_words_gen(a)} до {time_words_gen(b)} {label_a}"
)
else:
phrases.append(
f"с {time_words_gen(a)} {label_a} до {time_words_gen(b)} {label_b}"
)
return "; ".join(phrases)
def rebuild_time_slots(
*, time_slots: List[Dict[str, Any]], channel: str
) -> List[Dict[str, Any]]:
if channel not in ["max", "telegram"]:
return time_slots
for time_slot in time_slots:
time_slot["name"] = time_slot["name"].replace(":", ".")
return time_slots
def handler(
data: Optional[Dict[str, Any]] = None,
session_id: Optional[str] = None,
channel: str = "default",
):
channel = data["variables"].get("channel_type", channel)
mfc_id = data["variables"]["mfc_id"]
service_ids = data["variables"]["service_ids"]
service_count = data["variables"]["service_count"]
date = get_data_element(
data=data,
filter={"type": "see", "model": "record_date"},
exclude_keys=["declined"],
)
time = get_data_element(
data=data,
filter={
"type": "see",
"model": "record_time",
"more_index": date.get("index"),
},
exclude_keys=["declined"],
)
slots = get_total_slots_availability_bulk(
session=data,
api=api,
mfc_id=mfc_id,
service_ids=service_ids,
service_count=service_count,
)
# Убираем дубликаты.
# Почему они могут возникать?
# Потому что мы получаем слоты по нескольким услугам (service_ids).
slots = dedupe_slots([x for slot in slots for x in slot["slots"]])
logging.info(
"Suggest time slots with mfc_id %r, service_ids %r, service_count %r",
mfc_id,
service_ids,
service_count,
)
time_slots = build_sorted_time_slots(slots=slots, date=date["param"])
text, variables = "", {}
if channel == "voice":
# если пользователь сказал время - мы ищем БЛИЖАЙИШЕ среди альтернативных
if time:
alter_times = get_nearest_times(time_slots, time, n=2)
alter_times = build_times_phrase(alter_times, separate=True, numeric=True)
else:
alter_times = build_times_phrase(time_slots, numeric=True)
variables.update({"alter_times": alter_times})
else:
if channel == "vkontakte":
per_page = api.cfg.vk_buttons_rows
else:
per_page = api.cfg.max_buttons_rows
if not data["variables"].get("__pagination") or data["variables"].get("__pagination_type") != "time":
setup_pagination(session=data, data=time_slots, per_page=per_page)
data["variables"]["__pagination_type"] = "time"
previous, page_times, next_ = get_pagination_data(session=data)
pagination_count = int(bool(previous)) + int(bool(next_))
max_time_buttons = per_page - pagination_count
buttons = [{"name": time_slot} for time_slot in page_times[:max_time_buttons]]
if previous:
buttons += [{"name": api.cfg.pagination_previous_button_name}]
if next_:
buttons += [{"name": api.cfg.pagination_next_button_name}]
text = build_buttons_text(
buttons=rebuild_time_slots(time_slots=buttons, channel=channel),
buttons_in_row=1,
)
| 1 | # Предложить слоты по времени |
| 2 | |
| 3 | import logging |
| 4 | from typing import Any, Dict, Optional, List |
| 5 | |
| 6 | from start import ( |
| 7 | MfcApi, |
| 8 | build_buttons_text, |
| 9 | get_data_element, |
| 10 | CARD, |
| 11 | CARD_GEN, |
| 12 | PART_OF_DAY, |
| 13 | build_sorted_time_slots, |
| 14 | get_total_slots_availability_bulk, |
| 15 | dedupe_slots, |
| 16 | setup_pagination, |
| 17 | get_pagination_data, |
| 18 | ) |
| 19 | |
| 20 | api = MfcApi() |
| 21 | |
| 22 | |
| 23 | def get_nearest_times(time_list: List[str], time: str, n: int = 3) -> List[str]: |
| 24 | |
| 25 | def _norm(t: str) -> str: |
| 26 | h, m = t.split(":") |
| 27 | return f"{int(h):02d}:{int(m):02d}" |
| 28 | |
| 29 | def _to_minutes(t: str) -> int: |
| 30 | h, m = map(int, t.split(":")) |
| 31 | return h * 60 + m |
| 32 | |
| 33 | if not time_list or n <= 0: |
| 34 | return [] |
| 35 | |
| 36 | times = [_norm(t) for t in time_list] |
| 37 | n = min(n, len(times)) |
| 38 | target = _norm(time) |
| 39 | tgt = _to_minutes(target) |
| 40 | mins = [_to_minutes(t) for t in times] |
| 41 | |
| 42 | i0 = min(range(len(mins)), key=lambda i: (abs(mins[i] - tgt), mins[i])) |
| 43 | |
| 44 | res = [times[i0]] |
| 45 | l, r = i0 - 1, i0 + 1 |
| 46 | |
| 47 | while len(res) < n and (l >= 0 or r < len(times)): |
| 48 | dl = abs(mins[l] - tgt) if l >= 0 else float("inf") |
| 49 | dr = abs(mins[r] - tgt) if r < len(times) else float("inf") |
| 50 | |
| 51 | if dl <= dr: |
| 52 | if l >= 0: |
| 53 | res.append(times[l]) |
| 54 | l -= 1 |
| 55 | elif r < len(times): |
| 56 | res.append(times[r]) |
| 57 | r += 1 |
| 58 | else: |
| 59 | if r < len(times): |
| 60 | res.append(times[r]) |
| 61 | r += 1 |
| 62 | elif l >= 0: |
| 63 | res.append(times[l]) |
| 64 | l -= 1 |
| 65 | |
| 66 | res_sorted = sorted(res, key=lambda t: _to_minutes(t)) |
| 67 | return res_sorted |
| 68 | |
| 69 | |
| 70 | def num_cardinal(n: int) -> str: |
| 71 | if n in CARD: |
| 72 | return CARD[n] |
| 73 | tens = (n // 10) * 10 |
| 74 | unit = n % 10 |
| 75 | return f"{CARD[tens]} {CARD[unit]}" |
| 76 | |
| 77 | |
| 78 | def num_genitive(n: int) -> str: |
| 79 | if n in CARD_GEN: |
| 80 | return CARD_GEN[n] |
| 81 | tens = (n // 10) * 10 |
| 82 | unit = n % 10 |
| 83 | return f"{CARD_GEN[tens]} {CARD_GEN[unit]}" |
| 84 | |
| 85 | |
| 86 | def parse_times(times: List[str]) -> List[int]: |
| 87 | mins = [] |
| 88 | for t in times: |
| 89 | h, m = t.strip().split(":") |
| 90 | mins.append(int(h) * 60 + int(m)) |
| 91 | return sorted(set(mins)) |
| 92 | |
| 93 | |
| 94 | def part_of_day_label(mins: int) -> str: |
| 95 | h = (mins // 60) % 24 |
| 96 | for start, end, label in PART_OF_DAY: |
| 97 | if start <= h < end: |
| 98 | return label |
| 99 | return "дня" |
| 100 | |
| 101 | |
| 102 | def time_words_point(mins: int) -> str: |
| 103 | h = mins // 60 |
| 104 | m = mins % 60 |
| 105 | if m == 0: |
| 106 | return f"{num_cardinal(h)}" |
| 107 | return f"{num_cardinal(h)} {num_cardinal(m)}" |
| 108 | |
| 109 | |
| 110 | def time_words_gen(mins: int) -> str: |
| 111 | h = mins // 60 |
| 112 | m = mins % 60 |
| 113 | if m == 0: |
| 114 | return f"{num_genitive(h)}" |
| 115 | return f"{num_genitive(h)} {num_genitive(m)}" |
| 116 | |
| 117 | |
| 118 | def make_spans(mins: List[int], max_gap: int = 60) -> List[List[int]]: |
| 119 | if not mins: |
| 120 | return [] |
| 121 | spans = [[mins[0], mins[0]]] |
| 122 | for x in mins[1:]: |
| 123 | if x - spans[-1][1] <= max_gap: |
| 124 | spans[-1][1] = x |
| 125 | else: |
| 126 | spans.append([x, x]) |
| 127 | return spans |
| 128 | |
| 129 | |
| 130 | def join_times_with_and(parts: List[str]) -> str: |
| 131 | if not parts: |
| 132 | return "" |
| 133 | if len(parts) == 1: |
| 134 | return parts[0] |
| 135 | if len(parts) == 2: |
| 136 | return f"{parts[0]} и {parts[1]}" |
| 137 | return f"{', '.join(parts[:-1])} и {parts[-1]}" |
| 138 | |
| 139 | |
| 140 | def group_by_part_of_day(mins: List[int]) -> Dict[str, List[int]]: |
| 141 | buckets: Dict[str, List[int]] = {} |
| 142 | for t in mins: |
| 143 | label = part_of_day_label(t) |
| 144 | buckets.setdefault(label, []).append(t) |
| 145 | for k in buckets: |
| 146 | buckets[k].sort() |
| 147 | return buckets |
| 148 | |
| 149 | |
| 150 | def fmt_time_numeric(mins: int) -> str: |
| 151 | h = (mins // 60) % 24 |
| 152 | m = mins % 60 |
| 153 | return f"{h}:{m:02d}" |
| 154 | |
| 155 | |
| 156 | def format_hour_block(times_in_hour: List[int], *, numeric: bool = False) -> str: |
| 157 | if numeric: |
| 158 | return join_times_with_and([fmt_time_numeric(t) for t in times_in_hour]) |
| 159 | return join_times_with_and([time_words_point(t) for t in times_in_hour]) |
| 160 | |
| 161 | |
| 162 | def format_part_of_day_group( |
| 163 | times: List[int], label: str, *, numeric: bool = False |
| 164 | ) -> str: |
| 165 | by_hour: Dict[int, List[int]] = {} |
| 166 | for t in times: |
| 167 | h = t // 60 |
| 168 | by_hour.setdefault(h, []).append(t) |
| 169 | blocks = [] |
| 170 | for h in sorted(by_hour): |
| 171 | blocks.append(format_hour_block(by_hour[h], numeric=numeric)) |
| 172 | core = join_times_with_and(blocks) |
| 173 | if numeric: |
| 174 | return f"в {core}" |
| 175 | return f"в {core} {label}" |
| 176 | |
| 177 | |
| 178 | def build_times_phrase( |
| 179 | times: List[str], separate: bool = False, numeric: bool = False |
| 180 | ) -> str: |
| 181 | mins = parse_times(times) |
| 182 | if not mins: |
| 183 | return "" |
| 184 | |
| 185 | if separate: |
| 186 | pods = group_by_part_of_day(mins) |
| 187 | parts = [] |
| 188 | for label, ts in sorted(pods.items(), key=lambda kv: (min(kv[1]), kv[0])): |
| 189 | parts.append(format_part_of_day_group(ts, label, numeric=numeric)) |
| 190 | return "; ".join(parts) |
| 191 | |
| 192 | spans = make_spans(mins, max_gap=60) |
| 193 | phrases = [] |
| 194 | for a, b in spans: |
| 195 | if a == b: |
| 196 | if numeric: |
| 197 | phrases.append(f"в {fmt_time_numeric(a)}") |
| 198 | else: |
| 199 | label = part_of_day_label(a) |
| 200 | phrases.append(f"в {time_words_point(a)} {label}") |
| 201 | else: |
| 202 | if numeric: |
| 203 | phrases.append(f"с {fmt_time_numeric(a)} до {fmt_time_numeric(b)}") |
| 204 | else: |
| 205 | label_a = part_of_day_label(a) |
| 206 | label_b = part_of_day_label(b) |
| 207 | if label_a == label_b: |
| 208 | phrases.append( |
| 209 | f"с {time_words_gen(a)} до {time_words_gen(b)} {label_a}" |
| 210 | ) |
| 211 | else: |
| 212 | phrases.append( |
| 213 | f"с {time_words_gen(a)} {label_a} до {time_words_gen(b)} {label_b}" |
| 214 | ) |
| 215 | return "; ".join(phrases) |
| 216 | |
| 217 | |
| 218 | def rebuild_time_slots( |
| 219 | *, time_slots: List[Dict[str, Any]], channel: str |
| 220 | ) -> List[Dict[str, Any]]: |
| 221 | if channel not in ["max", "telegram"]: |
| 222 | return time_slots |
| 223 | |
| 224 | for time_slot in time_slots: |
| 225 | time_slot["name"] = time_slot["name"].replace(":", ".") |
| 226 | |
| 227 | return time_slots |
| 228 | |
| 229 | |
| 230 | def handler( |
| 231 | data: Optional[Dict[str, Any]] = None, |
| 232 | session_id: Optional[str] = None, |
| 233 | channel: str = "default", |
| 234 | ): |
| 235 | channel = data["variables"].get("channel_type", channel) |
| 236 | mfc_id = data["variables"]["mfc_id"] |
| 237 | service_ids = data["variables"]["service_ids"] |
| 238 | service_count = data["variables"]["service_count"] |
| 239 | |
| 240 | date = get_data_element( |
| 241 | data=data, |
| 242 | filter={"type": "see", "model": "record_date"}, |
| 243 | exclude_keys=["declined"], |
| 244 | ) |
| 245 | time = get_data_element( |
| 246 | data=data, |
| 247 | filter={ |
| 248 | "type": "see", |
| 249 | "model": "record_time", |
| 250 | "more_index": date.get("index"), |
| 251 | }, |
| 252 | exclude_keys=["declined"], |
| 253 | ) |
| 254 | |
| 255 | slots = get_total_slots_availability_bulk( |
| 256 | session=data, |
| 257 | api=api, |
| 258 | mfc_id=mfc_id, |
| 259 | service_ids=service_ids, |
| 260 | service_count=service_count, |
| 261 | ) |
| 262 | |
| 263 | # Убираем дубликаты. |
| 264 | # Почему они могут возникать? |
| 265 | # Потому что мы получаем слоты по нескольким услугам (service_ids). |
| 266 | slots = dedupe_slots([x for slot in slots for x in slot["slots"]]) |
| 267 | |
| 268 | logging.info( |
| 269 | "Suggest time slots with mfc_id %r, service_ids %r, service_count %r", |
| 270 | mfc_id, |
| 271 | service_ids, |
| 272 | service_count, |
| 273 | ) |
| 274 | |
| 275 | time_slots = build_sorted_time_slots(slots=slots, date=date["param"]) |
| 276 | |
| 277 | text, variables = "", {} |
| 278 | |
| 279 | if channel == "voice": |
| 280 | # если пользователь сказал время - мы ищем БЛИЖАЙИШЕ среди альтернативных |
| 281 | if time: |
| 282 | alter_times = get_nearest_times(time_slots, time, n=2) |
| 283 | |
| 284 | alter_times = build_times_phrase(alter_times, separate=True, numeric=True) |
| 285 | else: |
| 286 | alter_times = build_times_phrase(time_slots, numeric=True) |
| 287 | |
| 288 | variables.update({"alter_times": alter_times}) |
| 289 | else: |
| 290 | if channel == "vkontakte": |
| 291 | per_page = api.cfg.vk_buttons_rows |
| 292 | else: |
| 293 | per_page = api.cfg.max_buttons_rows |
| 294 | |
| 295 | if not data["variables"].get("__pagination") or data["variables"].get("__pagination_type") != "time": |
| 296 | setup_pagination(session=data, data=time_slots, per_page=per_page) |
| 297 | data["variables"]["__pagination_type"] = "time" |
| 298 | |
| 299 | previous, page_times, next_ = get_pagination_data(session=data) |
| 300 | |
| 301 | pagination_count = int(bool(previous)) + int(bool(next_)) |
| 302 | max_time_buttons = per_page - pagination_count |
| 303 | |
| 304 | buttons = [{"name": time_slot} for time_slot in page_times[:max_time_buttons]] |
| 305 | |
| 306 | if previous: |
| 307 | buttons += [{"name": api.cfg.pagination_previous_button_name}] |
| 308 | if next_: |
| 309 | buttons += [{"name": api.cfg.pagination_next_button_name}] |
| 310 | |
| 311 | text = build_buttons_text( |
| 312 | buttons=rebuild_time_slots(time_slots=buttons, channel=channel), |
| 313 | buttons_in_row=1, |
| 314 | ) |
| 315 | |
| 316 |