import asyncio import json import logging import os import re from fastapi import APIRouter, Depends, HTTPException from faststream import Context from faststream.rabbit import RabbitRouter, RabbitQueue, RabbitMessage from max_integration.integrations.lexicom.builder import ( build_button_attachments, build_file_attachments, ) from max_integration.integrations.lexicom.chats import ( AsyncMongo, TextChannelDB, AsyncRedis, upsert_channel_binding_in_cache, delete_channel_from_cache, ) from max_integration.integrations.lexicom.schemas import CreateChannel, PatchChannel from max_integration.integrations.lexicom.storage import StorageClient from max_integration.markers import MongoMarker, RedisMarker from max_integration.max.client import MaxClient from max_integration.max.schemas import SendMessageBody from max_integration.locker import AsyncLocker from max_integration.max.storage import upload_file from max_integration.tools import now_timestamp MAX_NAME_PREFIX = os.getenv("MAX_NAME_PREFIX", "(MAX)") logger = logging.getLogger(__name__) max_queue = RabbitQueue("max", durable=True) rabbit_router = RabbitRouter() api_router = APIRouter(prefix="/lexicom", tags=["Lexicom"]) @rabbit_router.subscriber(max_queue) async def max_handler( message: RabbitMessage, mongo_client: AsyncMongo = Context(), locker: AsyncLocker = Context(), storage_client: StorageClient = Context(), ): exchange_name = message.headers.get("exchange_name", "") routing_keys = message.headers.get("routing_keys", []) or [] if exchange_name != "" or "tg" not in routing_keys: await message.ack() return body = json.loads(message.body) channel_id = body.get("channel_id") if not channel_id: await message.ack() return channel = await mongo_client.get_one( TextChannelDB, {"_id": channel_id}, show_fields={"token": 1, "name": 1} ) if not channel: await message.ack() return name = channel.get("name", "") if not name.startswith(MAX_NAME_PREFIX): await message.ack() return max_client = MaxClient(access_token=channel.get("token")) logger.debug("MAX receive message with body %r", body) user_id = body.get("client_id") text = body.get("body", {}).get("message", "") files = body.get("files", []) buttons = body.get("buttons", []) if files: body = SendMessageBody() tasks = [ asyncio.create_task(storage_client.read_file(file_name=file)) for file in files ] files = await asyncio.gather(*tasks) tasks = [ upload_file(max_client=max_client, file_bytes=file[1], file_name=file[0]) for file in files ] files = await asyncio.gather(*tasks) body.text = None # must be only one file attachment in message for file in files: body.attachments = build_file_attachments(file=file) await max_client.send_message_with_file(body, user_id=int(user_id)) if text: body = SendMessageBody(text=text) body.attachments = build_button_attachments(buttons=buttons) await max_client.send_message(body, user_id=int(user_id)) await message.ack() await max_client.close() # locker.release(build_locker_key(user_id=user_id, bot_id=channel_id)) @api_router.post("/channels/") async def create_channel( channel: CreateChannel, mongo_client: AsyncMongo = Depends(MongoMarker), redis_client: AsyncRedis = Depends(RedisMarker), ): """Подключение MAX бота к Lexicom Omnichannel Platform.""" now_ts = now_timestamp() # TODO: doc = { "__collection__": "TextChannelDB", "name": f"{MAX_NAME_PREFIX} {channel.name}", "token": channel.token, "ivr_id": channel.scenario_id, "ivr_version": channel.scenario_version, "channel": "tg", "connection": channel.connection, "status": "not_connected", "url": channel.url, "outbound_company": False, "use_button": True, "email": "", "host_email": "", "imap_host_email": "", "imap_port_email": 0, "notify_token": "", "password": "", "queue_id": channel.queue_id, "smtp_host_email": "", "smtp_port_email": 0, "created_at": now_ts, "updated_at": now_ts, } inserted_id = await mongo_client.insert(doc) created = await mongo_client.get_one(TextChannelDB, {"_id": inserted_id}) await upsert_channel_binding_in_cache( redis_client=redis_client, db=mongo_client, queue_id=created.get("queue_id", ""), channel_id=created["_id"], ivr_id=created.get("ivr_id", ""), ivr_version=created.get("ivr_version", ""), ) return created @api_router.get("/channels/") async def list_channels( mongo_client: AsyncMongo = Depends(MongoMarker), ): """Получить список MAX ботов в Lexicom Omnichannel Platform.""" pattern = f"^{re.escape(MAX_NAME_PREFIX)}" docs = await mongo_client.find_many(TextChannelDB, {"name": {"$regex": pattern}}) return docs @api_router.patch("/channels/{channel_id}") async def patch_channel( channel_id: str, data: PatchChannel, mongo_client: AsyncMongo = Depends(MongoMarker), redis_client: AsyncRedis = Depends(RedisMarker), ): """Обновить MAX бота в Lexicom Omnichannel Platform.""" current = await mongo_client.get_one(TextChannelDB, {"_id": channel_id}) if not current or not str(current.get("name", "")).startswith(MAX_NAME_PREFIX): raise HTTPException(status_code=404, detail="Channel not found or not editable") update_data = data.model_dump(exclude_unset=True, exclude_none=True) if "scenario_id" in update_data: update_data["ivr_id"] = update_data.pop("scenario_id") if "scenario_version" in update_data: update_data["ivr_version"] = update_data.pop("scenario_version") if not update_data: return current update_data["updated_at"] = now_timestamp() await mongo_client.update(TextChannelDB, {"_id": channel_id}, update_data) updated = await mongo_client.get_one(TextChannelDB, {"_id": channel_id}) await upsert_channel_binding_in_cache( redis_client=redis_client, db=mongo_client, queue_id=updated.get("queue_id", ""), channel_id=updated["_id"], ivr_id=updated.get("ivr_id", ""), ivr_version=updated.get("ivr_version", ""), ) return updated @api_router.delete("/channels/{channel_id}") async def delete_channel( channel_id: str, mongo_client: AsyncMongo = Depends(MongoMarker), redis_client: AsyncRedis = Depends(RedisMarker), ): """Удалить MAX бота из Lexicom Omnichannel Platform.""" doc = await mongo_client.get_one(TextChannelDB, {"_id": channel_id}) if not doc or not str(doc.get("name", "")).startswith(MAX_NAME_PREFIX): raise HTTPException( status_code=404, detail="Channel not found or not deletable" ) deleted_count = await mongo_client.delete_one(TextChannelDB, {"_id": channel_id}) if deleted_count != 1: raise HTTPException(status_code=409, detail="Channel was not deleted") await delete_channel_from_cache(redis_client=redis_client, channel_id=channel_id) return doc