routes.py
· 7.3 KiB · Python
Raw
import asyncio
import json
import logging
import os
import re
from fastapi import APIRouter, Depends, HTTPException
from faststream import Context
from faststream.rabbit import RabbitRouter, RabbitQueue, RabbitMessage
from max_integration.integrations.lexicom.builder import (
build_button_attachments,
build_file_attachments,
)
from max_integration.integrations.lexicom.chats import (
AsyncMongo,
TextChannelDB,
AsyncRedis,
upsert_channel_binding_in_cache,
delete_channel_from_cache,
)
from max_integration.integrations.lexicom.schemas import CreateChannel, PatchChannel
from max_integration.integrations.lexicom.storage import StorageClient
from max_integration.markers import MongoMarker, RedisMarker
from max_integration.max.client import MaxClient
from max_integration.max.schemas import SendMessageBody
from max_integration.locker import AsyncLocker
from max_integration.max.storage import upload_file
from max_integration.tools import now_timestamp
MAX_NAME_PREFIX = os.getenv("MAX_NAME_PREFIX", "(MAX)")
logger = logging.getLogger(__name__)
max_queue = RabbitQueue("max", durable=True)
rabbit_router = RabbitRouter()
api_router = APIRouter(prefix="/lexicom", tags=["Lexicom"])
@rabbit_router.subscriber(max_queue)
async def max_handler(
message: RabbitMessage,
mongo_client: AsyncMongo = Context(),
locker: AsyncLocker = Context(),
storage_client: StorageClient = Context(),
):
exchange_name = message.headers.get("exchange_name", "")
routing_keys = message.headers.get("routing_keys", []) or []
if exchange_name != "" or "tg" not in routing_keys:
await message.ack()
return
body = json.loads(message.body)
channel_id = body.get("channel_id")
if not channel_id:
await message.ack()
return
channel = await mongo_client.get_one(
TextChannelDB, {"_id": channel_id}, show_fields={"token": 1, "name": 1}
)
if not channel:
await message.ack()
return
name = channel.get("name", "")
if not name.startswith(MAX_NAME_PREFIX):
await message.ack()
return
max_client = MaxClient(access_token=channel.get("token"))
logger.debug("MAX receive message with body %r", body)
user_id = body.get("client_id")
text = body.get("body", {}).get("message", "")
files = body.get("files", [])
buttons = body.get("buttons", [])
if files:
body = SendMessageBody()
tasks = [
asyncio.create_task(storage_client.read_file(file_name=file))
for file in files
]
files = await asyncio.gather(*tasks)
tasks = [
upload_file(max_client=max_client, file_bytes=file[1], file_name=file[0])
for file in files
]
files = await asyncio.gather(*tasks)
body.text = None
# must be only one file attachment in message
for file in files:
body.attachments = build_file_attachments(file=file)
await max_client.send_message_with_file(body, user_id=int(user_id))
if text:
body = SendMessageBody(text=text)
body.attachments = build_button_attachments(buttons=buttons)
await max_client.send_message(body, user_id=int(user_id))
await message.ack()
await max_client.close()
# locker.release(build_locker_key(user_id=user_id, bot_id=channel_id))
@api_router.post("/channels/")
async def create_channel(
channel: CreateChannel,
mongo_client: AsyncMongo = Depends(MongoMarker),
redis_client: AsyncRedis = Depends(RedisMarker),
):
"""Подключение MAX бота к Lexicom Omnichannel Platform."""
now_ts = now_timestamp()
# TODO:
doc = {
"__collection__": "TextChannelDB",
"name": f"{MAX_NAME_PREFIX} {channel.name}",
"token": channel.token,
"ivr_id": channel.scenario_id,
"ivr_version": channel.scenario_version,
"channel": "tg",
"connection": channel.connection,
"status": "not_connected",
"url": channel.url,
"outbound_company": False,
"use_button": True,
"email": "",
"host_email": "",
"imap_host_email": "",
"imap_port_email": 0,
"notify_token": "",
"password": "",
"queue_id": channel.queue_id,
"smtp_host_email": "",
"smtp_port_email": 0,
"created_at": now_ts,
"updated_at": now_ts,
}
inserted_id = await mongo_client.insert(doc)
created = await mongo_client.get_one(TextChannelDB, {"_id": inserted_id})
await upsert_channel_binding_in_cache(
redis_client=redis_client,
db=mongo_client,
queue_id=created.get("queue_id", ""),
channel_id=created["_id"],
ivr_id=created.get("ivr_id", ""),
ivr_version=created.get("ivr_version", ""),
)
return created
@api_router.get("/channels/")
async def list_channels(
mongo_client: AsyncMongo = Depends(MongoMarker),
):
"""Получить список MAX ботов в Lexicom Omnichannel Platform."""
pattern = f"^{re.escape(MAX_NAME_PREFIX)}"
docs = await mongo_client.find_many(TextChannelDB, {"name": {"$regex": pattern}})
return docs
@api_router.patch("/channels/{channel_id}")
async def patch_channel(
channel_id: str,
data: PatchChannel,
mongo_client: AsyncMongo = Depends(MongoMarker),
redis_client: AsyncRedis = Depends(RedisMarker),
):
"""Обновить MAX бота в Lexicom Omnichannel Platform."""
current = await mongo_client.get_one(TextChannelDB, {"_id": channel_id})
if not current or not str(current.get("name", "")).startswith(MAX_NAME_PREFIX):
raise HTTPException(status_code=404, detail="Channel not found or not editable")
update_data = data.model_dump(exclude_unset=True, exclude_none=True)
if "scenario_id" in update_data:
update_data["ivr_id"] = update_data.pop("scenario_id")
if "scenario_version" in update_data:
update_data["ivr_version"] = update_data.pop("scenario_version")
if not update_data:
return current
update_data["updated_at"] = now_timestamp()
await mongo_client.update(TextChannelDB, {"_id": channel_id}, update_data)
updated = await mongo_client.get_one(TextChannelDB, {"_id": channel_id})
await upsert_channel_binding_in_cache(
redis_client=redis_client,
db=mongo_client,
queue_id=updated.get("queue_id", ""),
channel_id=updated["_id"],
ivr_id=updated.get("ivr_id", ""),
ivr_version=updated.get("ivr_version", ""),
)
return updated
@api_router.delete("/channels/{channel_id}")
async def delete_channel(
channel_id: str,
mongo_client: AsyncMongo = Depends(MongoMarker),
redis_client: AsyncRedis = Depends(RedisMarker),
):
"""Удалить MAX бота из Lexicom Omnichannel Platform."""
doc = await mongo_client.get_one(TextChannelDB, {"_id": channel_id})
if not doc or not str(doc.get("name", "")).startswith(MAX_NAME_PREFIX):
raise HTTPException(
status_code=404, detail="Channel not found or not deletable"
)
deleted_count = await mongo_client.delete_one(TextChannelDB, {"_id": channel_id})
if deleted_count != 1:
raise HTTPException(status_code=409, detail="Channel was not deleted")
await delete_channel_from_cache(redis_client=redis_client, channel_id=channel_id)
return doc
| 1 | import asyncio |
| 2 | import json |
| 3 | import logging |
| 4 | import os |
| 5 | import re |
| 6 | |
| 7 | from fastapi import APIRouter, Depends, HTTPException |
| 8 | from faststream import Context |
| 9 | from faststream.rabbit import RabbitRouter, RabbitQueue, RabbitMessage |
| 10 | |
| 11 | from max_integration.integrations.lexicom.builder import ( |
| 12 | build_button_attachments, |
| 13 | build_file_attachments, |
| 14 | ) |
| 15 | from max_integration.integrations.lexicom.chats import ( |
| 16 | AsyncMongo, |
| 17 | TextChannelDB, |
| 18 | AsyncRedis, |
| 19 | upsert_channel_binding_in_cache, |
| 20 | delete_channel_from_cache, |
| 21 | ) |
| 22 | from max_integration.integrations.lexicom.schemas import CreateChannel, PatchChannel |
| 23 | from max_integration.integrations.lexicom.storage import StorageClient |
| 24 | from max_integration.markers import MongoMarker, RedisMarker |
| 25 | from max_integration.max.client import MaxClient |
| 26 | from max_integration.max.schemas import SendMessageBody |
| 27 | from max_integration.locker import AsyncLocker |
| 28 | from max_integration.max.storage import upload_file |
| 29 | from max_integration.tools import now_timestamp |
| 30 | |
| 31 | MAX_NAME_PREFIX = os.getenv("MAX_NAME_PREFIX", "(MAX)") |
| 32 | logger = logging.getLogger(__name__) |
| 33 | max_queue = RabbitQueue("max", durable=True) |
| 34 | rabbit_router = RabbitRouter() |
| 35 | api_router = APIRouter(prefix="/lexicom", tags=["Lexicom"]) |
| 36 | |
| 37 | |
| 38 | @rabbit_router.subscriber(max_queue) |
| 39 | async def max_handler( |
| 40 | message: RabbitMessage, |
| 41 | mongo_client: AsyncMongo = Context(), |
| 42 | locker: AsyncLocker = Context(), |
| 43 | storage_client: StorageClient = Context(), |
| 44 | ): |
| 45 | exchange_name = message.headers.get("exchange_name", "") |
| 46 | routing_keys = message.headers.get("routing_keys", []) or [] |
| 47 | |
| 48 | if exchange_name != "" or "tg" not in routing_keys: |
| 49 | await message.ack() |
| 50 | return |
| 51 | |
| 52 | body = json.loads(message.body) |
| 53 | |
| 54 | channel_id = body.get("channel_id") |
| 55 | |
| 56 | if not channel_id: |
| 57 | await message.ack() |
| 58 | return |
| 59 | |
| 60 | channel = await mongo_client.get_one( |
| 61 | TextChannelDB, {"_id": channel_id}, show_fields={"token": 1, "name": 1} |
| 62 | ) |
| 63 | |
| 64 | if not channel: |
| 65 | await message.ack() |
| 66 | return |
| 67 | |
| 68 | name = channel.get("name", "") |
| 69 | |
| 70 | if not name.startswith(MAX_NAME_PREFIX): |
| 71 | await message.ack() |
| 72 | return |
| 73 | |
| 74 | max_client = MaxClient(access_token=channel.get("token")) |
| 75 | |
| 76 | logger.debug("MAX receive message with body %r", body) |
| 77 | |
| 78 | user_id = body.get("client_id") |
| 79 | text = body.get("body", {}).get("message", "") |
| 80 | files = body.get("files", []) |
| 81 | buttons = body.get("buttons", []) |
| 82 | |
| 83 | if files: |
| 84 | body = SendMessageBody() |
| 85 | |
| 86 | tasks = [ |
| 87 | asyncio.create_task(storage_client.read_file(file_name=file)) |
| 88 | for file in files |
| 89 | ] |
| 90 | files = await asyncio.gather(*tasks) |
| 91 | |
| 92 | tasks = [ |
| 93 | upload_file(max_client=max_client, file_bytes=file[1], file_name=file[0]) |
| 94 | for file in files |
| 95 | ] |
| 96 | files = await asyncio.gather(*tasks) |
| 97 | |
| 98 | body.text = None |
| 99 | |
| 100 | # must be only one file attachment in message |
| 101 | for file in files: |
| 102 | body.attachments = build_file_attachments(file=file) |
| 103 | |
| 104 | await max_client.send_message_with_file(body, user_id=int(user_id)) |
| 105 | |
| 106 | if text: |
| 107 | body = SendMessageBody(text=text) |
| 108 | |
| 109 | body.attachments = build_button_attachments(buttons=buttons) |
| 110 | |
| 111 | await max_client.send_message(body, user_id=int(user_id)) |
| 112 | |
| 113 | await message.ack() |
| 114 | await max_client.close() |
| 115 | |
| 116 | # locker.release(build_locker_key(user_id=user_id, bot_id=channel_id)) |
| 117 | |
| 118 | |
| 119 | @api_router.post("/channels/") |
| 120 | async def create_channel( |
| 121 | channel: CreateChannel, |
| 122 | mongo_client: AsyncMongo = Depends(MongoMarker), |
| 123 | redis_client: AsyncRedis = Depends(RedisMarker), |
| 124 | ): |
| 125 | """Подключение MAX бота к Lexicom Omnichannel Platform.""" |
| 126 | |
| 127 | now_ts = now_timestamp() |
| 128 | |
| 129 | # TODO: |
| 130 | doc = { |
| 131 | "__collection__": "TextChannelDB", |
| 132 | "name": f"{MAX_NAME_PREFIX} {channel.name}", |
| 133 | "token": channel.token, |
| 134 | "ivr_id": channel.scenario_id, |
| 135 | "ivr_version": channel.scenario_version, |
| 136 | "channel": "tg", |
| 137 | "connection": channel.connection, |
| 138 | "status": "not_connected", |
| 139 | "url": channel.url, |
| 140 | "outbound_company": False, |
| 141 | "use_button": True, |
| 142 | "email": "", |
| 143 | "host_email": "", |
| 144 | "imap_host_email": "", |
| 145 | "imap_port_email": 0, |
| 146 | "notify_token": "", |
| 147 | "password": "", |
| 148 | "queue_id": channel.queue_id, |
| 149 | "smtp_host_email": "", |
| 150 | "smtp_port_email": 0, |
| 151 | "created_at": now_ts, |
| 152 | "updated_at": now_ts, |
| 153 | } |
| 154 | |
| 155 | inserted_id = await mongo_client.insert(doc) |
| 156 | created = await mongo_client.get_one(TextChannelDB, {"_id": inserted_id}) |
| 157 | |
| 158 | await upsert_channel_binding_in_cache( |
| 159 | redis_client=redis_client, |
| 160 | db=mongo_client, |
| 161 | queue_id=created.get("queue_id", ""), |
| 162 | channel_id=created["_id"], |
| 163 | ivr_id=created.get("ivr_id", ""), |
| 164 | ivr_version=created.get("ivr_version", ""), |
| 165 | ) |
| 166 | |
| 167 | return created |
| 168 | |
| 169 | |
| 170 | @api_router.get("/channels/") |
| 171 | async def list_channels( |
| 172 | mongo_client: AsyncMongo = Depends(MongoMarker), |
| 173 | ): |
| 174 | """Получить список MAX ботов в Lexicom Omnichannel Platform.""" |
| 175 | |
| 176 | pattern = f"^{re.escape(MAX_NAME_PREFIX)}" |
| 177 | |
| 178 | docs = await mongo_client.find_many(TextChannelDB, {"name": {"$regex": pattern}}) |
| 179 | return docs |
| 180 | |
| 181 | |
| 182 | @api_router.patch("/channels/{channel_id}") |
| 183 | async def patch_channel( |
| 184 | channel_id: str, |
| 185 | data: PatchChannel, |
| 186 | mongo_client: AsyncMongo = Depends(MongoMarker), |
| 187 | redis_client: AsyncRedis = Depends(RedisMarker), |
| 188 | ): |
| 189 | """Обновить MAX бота в Lexicom Omnichannel Platform.""" |
| 190 | |
| 191 | current = await mongo_client.get_one(TextChannelDB, {"_id": channel_id}) |
| 192 | if not current or not str(current.get("name", "")).startswith(MAX_NAME_PREFIX): |
| 193 | raise HTTPException(status_code=404, detail="Channel not found or not editable") |
| 194 | |
| 195 | update_data = data.model_dump(exclude_unset=True, exclude_none=True) |
| 196 | if "scenario_id" in update_data: |
| 197 | update_data["ivr_id"] = update_data.pop("scenario_id") |
| 198 | if "scenario_version" in update_data: |
| 199 | update_data["ivr_version"] = update_data.pop("scenario_version") |
| 200 | |
| 201 | if not update_data: |
| 202 | return current |
| 203 | |
| 204 | update_data["updated_at"] = now_timestamp() |
| 205 | await mongo_client.update(TextChannelDB, {"_id": channel_id}, update_data) |
| 206 | |
| 207 | updated = await mongo_client.get_one(TextChannelDB, {"_id": channel_id}) |
| 208 | |
| 209 | await upsert_channel_binding_in_cache( |
| 210 | redis_client=redis_client, |
| 211 | db=mongo_client, |
| 212 | queue_id=updated.get("queue_id", ""), |
| 213 | channel_id=updated["_id"], |
| 214 | ivr_id=updated.get("ivr_id", ""), |
| 215 | ivr_version=updated.get("ivr_version", ""), |
| 216 | ) |
| 217 | |
| 218 | return updated |
| 219 | |
| 220 | |
| 221 | @api_router.delete("/channels/{channel_id}") |
| 222 | async def delete_channel( |
| 223 | channel_id: str, |
| 224 | mongo_client: AsyncMongo = Depends(MongoMarker), |
| 225 | redis_client: AsyncRedis = Depends(RedisMarker), |
| 226 | ): |
| 227 | """Удалить MAX бота из Lexicom Omnichannel Platform.""" |
| 228 | |
| 229 | doc = await mongo_client.get_one(TextChannelDB, {"_id": channel_id}) |
| 230 | if not doc or not str(doc.get("name", "")).startswith(MAX_NAME_PREFIX): |
| 231 | raise HTTPException( |
| 232 | status_code=404, detail="Channel not found or not deletable" |
| 233 | ) |
| 234 | |
| 235 | deleted_count = await mongo_client.delete_one(TextChannelDB, {"_id": channel_id}) |
| 236 | if deleted_count != 1: |
| 237 | raise HTTPException(status_code=409, detail="Channel was not deleted") |
| 238 | |
| 239 | await delete_channel_from_cache(redis_client=redis_client, channel_id=channel_id) |
| 240 | |
| 241 | return doc |
| 242 |