en2zmax revised this gist . Go to revision
1 file changed, 241 insertions
routes.py(file created)
| @@ -0,0 +1,241 @@ | |||
| 1 | + | import asyncio | |
| 2 | + | import json | |
| 3 | + | import logging | |
| 4 | + | import os | |
| 5 | + | import re | |
| 6 | + | ||
| 7 | + | from fastapi import APIRouter, Depends, HTTPException | |
| 8 | + | from faststream import Context | |
| 9 | + | from faststream.rabbit import RabbitRouter, RabbitQueue, RabbitMessage | |
| 10 | + | ||
| 11 | + | from max_integration.integrations.lexicom.builder import ( | |
| 12 | + | build_button_attachments, | |
| 13 | + | build_file_attachments, | |
| 14 | + | ) | |
| 15 | + | from max_integration.integrations.lexicom.chats import ( | |
| 16 | + | AsyncMongo, | |
| 17 | + | TextChannelDB, | |
| 18 | + | AsyncRedis, | |
| 19 | + | upsert_channel_binding_in_cache, | |
| 20 | + | delete_channel_from_cache, | |
| 21 | + | ) | |
| 22 | + | from max_integration.integrations.lexicom.schemas import CreateChannel, PatchChannel | |
| 23 | + | from max_integration.integrations.lexicom.storage import StorageClient | |
| 24 | + | from max_integration.markers import MongoMarker, RedisMarker | |
| 25 | + | from max_integration.max.client import MaxClient | |
| 26 | + | from max_integration.max.schemas import SendMessageBody | |
| 27 | + | from max_integration.locker import AsyncLocker | |
| 28 | + | from max_integration.max.storage import upload_file | |
| 29 | + | from max_integration.tools import now_timestamp | |
| 30 | + | ||
| 31 | + | MAX_NAME_PREFIX = os.getenv("MAX_NAME_PREFIX", "(MAX)") | |
| 32 | + | logger = logging.getLogger(__name__) | |
| 33 | + | max_queue = RabbitQueue("max", durable=True) | |
| 34 | + | rabbit_router = RabbitRouter() | |
| 35 | + | api_router = APIRouter(prefix="/lexicom", tags=["Lexicom"]) | |
| 36 | + | ||
| 37 | + | ||
| 38 | + | @rabbit_router.subscriber(max_queue) | |
| 39 | + | async def max_handler( | |
| 40 | + | message: RabbitMessage, | |
| 41 | + | mongo_client: AsyncMongo = Context(), | |
| 42 | + | locker: AsyncLocker = Context(), | |
| 43 | + | storage_client: StorageClient = Context(), | |
| 44 | + | ): | |
| 45 | + | exchange_name = message.headers.get("exchange_name", "") | |
| 46 | + | routing_keys = message.headers.get("routing_keys", []) or [] | |
| 47 | + | ||
| 48 | + | if exchange_name != "" or "tg" not in routing_keys: | |
| 49 | + | await message.ack() | |
| 50 | + | return | |
| 51 | + | ||
| 52 | + | body = json.loads(message.body) | |
| 53 | + | ||
| 54 | + | channel_id = body.get("channel_id") | |
| 55 | + | ||
| 56 | + | if not channel_id: | |
| 57 | + | await message.ack() | |
| 58 | + | return | |
| 59 | + | ||
| 60 | + | channel = await mongo_client.get_one( | |
| 61 | + | TextChannelDB, {"_id": channel_id}, show_fields={"token": 1, "name": 1} | |
| 62 | + | ) | |
| 63 | + | ||
| 64 | + | if not channel: | |
| 65 | + | await message.ack() | |
| 66 | + | return | |
| 67 | + | ||
| 68 | + | name = channel.get("name", "") | |
| 69 | + | ||
| 70 | + | if not name.startswith(MAX_NAME_PREFIX): | |
| 71 | + | await message.ack() | |
| 72 | + | return | |
| 73 | + | ||
| 74 | + | max_client = MaxClient(access_token=channel.get("token")) | |
| 75 | + | ||
| 76 | + | logger.debug("MAX receive message with body %r", body) | |
| 77 | + | ||
| 78 | + | user_id = body.get("client_id") | |
| 79 | + | text = body.get("body", {}).get("message", "") | |
| 80 | + | files = body.get("files", []) | |
| 81 | + | buttons = body.get("buttons", []) | |
| 82 | + | ||
| 83 | + | if files: | |
| 84 | + | body = SendMessageBody() | |
| 85 | + | ||
| 86 | + | tasks = [ | |
| 87 | + | asyncio.create_task(storage_client.read_file(file_name=file)) | |
| 88 | + | for file in files | |
| 89 | + | ] | |
| 90 | + | files = await asyncio.gather(*tasks) | |
| 91 | + | ||
| 92 | + | tasks = [ | |
| 93 | + | upload_file(max_client=max_client, file_bytes=file[1], file_name=file[0]) | |
| 94 | + | for file in files | |
| 95 | + | ] | |
| 96 | + | files = await asyncio.gather(*tasks) | |
| 97 | + | ||
| 98 | + | body.text = None | |
| 99 | + | ||
| 100 | + | # must be only one file attachment in message | |
| 101 | + | for file in files: | |
| 102 | + | body.attachments = build_file_attachments(file=file) | |
| 103 | + | ||
| 104 | + | await max_client.send_message_with_file(body, user_id=int(user_id)) | |
| 105 | + | ||
| 106 | + | if text: | |
| 107 | + | body = SendMessageBody(text=text) | |
| 108 | + | ||
| 109 | + | body.attachments = build_button_attachments(buttons=buttons) | |
| 110 | + | ||
| 111 | + | await max_client.send_message(body, user_id=int(user_id)) | |
| 112 | + | ||
| 113 | + | await message.ack() | |
| 114 | + | await max_client.close() | |
| 115 | + | ||
| 116 | + | # locker.release(build_locker_key(user_id=user_id, bot_id=channel_id)) | |
| 117 | + | ||
| 118 | + | ||
| 119 | + | @api_router.post("/channels/") | |
| 120 | + | async def create_channel( | |
| 121 | + | channel: CreateChannel, | |
| 122 | + | mongo_client: AsyncMongo = Depends(MongoMarker), | |
| 123 | + | redis_client: AsyncRedis = Depends(RedisMarker), | |
| 124 | + | ): | |
| 125 | + | """Подключение MAX бота к Lexicom Omnichannel Platform.""" | |
| 126 | + | ||
| 127 | + | now_ts = now_timestamp() | |
| 128 | + | ||
| 129 | + | # TODO: | |
| 130 | + | doc = { | |
| 131 | + | "__collection__": "TextChannelDB", | |
| 132 | + | "name": f"{MAX_NAME_PREFIX} {channel.name}", | |
| 133 | + | "token": channel.token, | |
| 134 | + | "ivr_id": channel.scenario_id, | |
| 135 | + | "ivr_version": channel.scenario_version, | |
| 136 | + | "channel": "tg", | |
| 137 | + | "connection": channel.connection, | |
| 138 | + | "status": "not_connected", | |
| 139 | + | "url": channel.url, | |
| 140 | + | "outbound_company": False, | |
| 141 | + | "use_button": True, | |
| 142 | + | "email": "", | |
| 143 | + | "host_email": "", | |
| 144 | + | "imap_host_email": "", | |
| 145 | + | "imap_port_email": 0, | |
| 146 | + | "notify_token": "", | |
| 147 | + | "password": "", | |
| 148 | + | "queue_id": channel.queue_id, | |
| 149 | + | "smtp_host_email": "", | |
| 150 | + | "smtp_port_email": 0, | |
| 151 | + | "created_at": now_ts, | |
| 152 | + | "updated_at": now_ts, | |
| 153 | + | } | |
| 154 | + | ||
| 155 | + | inserted_id = await mongo_client.insert(doc) | |
| 156 | + | created = await mongo_client.get_one(TextChannelDB, {"_id": inserted_id}) | |
| 157 | + | ||
| 158 | + | await upsert_channel_binding_in_cache( | |
| 159 | + | redis_client=redis_client, | |
| 160 | + | db=mongo_client, | |
| 161 | + | queue_id=created.get("queue_id", ""), | |
| 162 | + | channel_id=created["_id"], | |
| 163 | + | ivr_id=created.get("ivr_id", ""), | |
| 164 | + | ivr_version=created.get("ivr_version", ""), | |
| 165 | + | ) | |
| 166 | + | ||
| 167 | + | return created | |
| 168 | + | ||
| 169 | + | ||
| 170 | + | @api_router.get("/channels/") | |
| 171 | + | async def list_channels( | |
| 172 | + | mongo_client: AsyncMongo = Depends(MongoMarker), | |
| 173 | + | ): | |
| 174 | + | """Получить список MAX ботов в Lexicom Omnichannel Platform.""" | |
| 175 | + | ||
| 176 | + | pattern = f"^{re.escape(MAX_NAME_PREFIX)}" | |
| 177 | + | ||
| 178 | + | docs = await mongo_client.find_many(TextChannelDB, {"name": {"$regex": pattern}}) | |
| 179 | + | return docs | |
| 180 | + | ||
| 181 | + | ||
| 182 | + | @api_router.patch("/channels/{channel_id}") | |
| 183 | + | async def patch_channel( | |
| 184 | + | channel_id: str, | |
| 185 | + | data: PatchChannel, | |
| 186 | + | mongo_client: AsyncMongo = Depends(MongoMarker), | |
| 187 | + | redis_client: AsyncRedis = Depends(RedisMarker), | |
| 188 | + | ): | |
| 189 | + | """Обновить MAX бота в Lexicom Omnichannel Platform.""" | |
| 190 | + | ||
| 191 | + | current = await mongo_client.get_one(TextChannelDB, {"_id": channel_id}) | |
| 192 | + | if not current or not str(current.get("name", "")).startswith(MAX_NAME_PREFIX): | |
| 193 | + | raise HTTPException(status_code=404, detail="Channel not found or not editable") | |
| 194 | + | ||
| 195 | + | update_data = data.model_dump(exclude_unset=True, exclude_none=True) | |
| 196 | + | if "scenario_id" in update_data: | |
| 197 | + | update_data["ivr_id"] = update_data.pop("scenario_id") | |
| 198 | + | if "scenario_version" in update_data: | |
| 199 | + | update_data["ivr_version"] = update_data.pop("scenario_version") | |
| 200 | + | ||
| 201 | + | if not update_data: | |
| 202 | + | return current | |
| 203 | + | ||
| 204 | + | update_data["updated_at"] = now_timestamp() | |
| 205 | + | await mongo_client.update(TextChannelDB, {"_id": channel_id}, update_data) | |
| 206 | + | ||
| 207 | + | updated = await mongo_client.get_one(TextChannelDB, {"_id": channel_id}) | |
| 208 | + | ||
| 209 | + | await upsert_channel_binding_in_cache( | |
| 210 | + | redis_client=redis_client, | |
| 211 | + | db=mongo_client, | |
| 212 | + | queue_id=updated.get("queue_id", ""), | |
| 213 | + | channel_id=updated["_id"], | |
| 214 | + | ivr_id=updated.get("ivr_id", ""), | |
| 215 | + | ivr_version=updated.get("ivr_version", ""), | |
| 216 | + | ) | |
| 217 | + | ||
| 218 | + | return updated | |
| 219 | + | ||
| 220 | + | ||
| 221 | + | @api_router.delete("/channels/{channel_id}") | |
| 222 | + | async def delete_channel( | |
| 223 | + | channel_id: str, | |
| 224 | + | mongo_client: AsyncMongo = Depends(MongoMarker), | |
| 225 | + | redis_client: AsyncRedis = Depends(RedisMarker), | |
| 226 | + | ): | |
| 227 | + | """Удалить MAX бота из Lexicom Omnichannel Platform.""" | |
| 228 | + | ||
| 229 | + | doc = await mongo_client.get_one(TextChannelDB, {"_id": channel_id}) | |
| 230 | + | if not doc or not str(doc.get("name", "")).startswith(MAX_NAME_PREFIX): | |
| 231 | + | raise HTTPException( | |
| 232 | + | status_code=404, detail="Channel not found or not deletable" | |
| 233 | + | ) | |
| 234 | + | ||
| 235 | + | deleted_count = await mongo_client.delete_one(TextChannelDB, {"_id": channel_id}) | |
| 236 | + | if deleted_count != 1: | |
| 237 | + | raise HTTPException(status_code=409, detail="Channel was not deleted") | |
| 238 | + | ||
| 239 | + | await delete_channel_from_cache(redis_client=redis_client, channel_id=channel_id) | |
| 240 | + | ||
| 241 | + | return doc | |
Newer
Older