Last active 1772600491

routes.py Raw
1import asyncio
2import json
3import logging
4import os
5import re
6
7from fastapi import APIRouter, Depends, HTTPException
8from faststream import Context
9from faststream.rabbit import RabbitRouter, RabbitQueue, RabbitMessage
10
11from max_integration.integrations.lexicom.builder import (
12 build_button_attachments,
13 build_file_attachments,
14)
15from max_integration.integrations.lexicom.chats import (
16 AsyncMongo,
17 TextChannelDB,
18 AsyncRedis,
19 upsert_channel_binding_in_cache,
20 delete_channel_from_cache,
21)
22from max_integration.integrations.lexicom.schemas import CreateChannel, PatchChannel
23from max_integration.integrations.lexicom.storage import StorageClient
24from max_integration.markers import MongoMarker, RedisMarker
25from max_integration.max.client import MaxClient
26from max_integration.max.schemas import SendMessageBody
27from max_integration.locker import AsyncLocker
28from max_integration.max.storage import upload_file
29from max_integration.tools import now_timestamp
30
31MAX_NAME_PREFIX = os.getenv("MAX_NAME_PREFIX", "(MAX)")
32logger = logging.getLogger(__name__)
33max_queue = RabbitQueue("max", durable=True)
34rabbit_router = RabbitRouter()
35api_router = APIRouter(prefix="/lexicom", tags=["Lexicom"])
36
37
38@rabbit_router.subscriber(max_queue)
39async def max_handler(
40 message: RabbitMessage,
41 mongo_client: AsyncMongo = Context(),
42 locker: AsyncLocker = Context(),
43 storage_client: StorageClient = Context(),
44):
45 exchange_name = message.headers.get("exchange_name", "")
46 routing_keys = message.headers.get("routing_keys", []) or []
47
48 if exchange_name != "" or "tg" not in routing_keys:
49 await message.ack()
50 return
51
52 body = json.loads(message.body)
53
54 channel_id = body.get("channel_id")
55
56 if not channel_id:
57 await message.ack()
58 return
59
60 channel = await mongo_client.get_one(
61 TextChannelDB, {"_id": channel_id}, show_fields={"token": 1, "name": 1}
62 )
63
64 if not channel:
65 await message.ack()
66 return
67
68 name = channel.get("name", "")
69
70 if not name.startswith(MAX_NAME_PREFIX):
71 await message.ack()
72 return
73
74 max_client = MaxClient(access_token=channel.get("token"))
75
76 logger.debug("MAX receive message with body %r", body)
77
78 user_id = body.get("client_id")
79 text = body.get("body", {}).get("message", "")
80 files = body.get("files", [])
81 buttons = body.get("buttons", [])
82
83 if files:
84 body = SendMessageBody()
85
86 tasks = [
87 asyncio.create_task(storage_client.read_file(file_name=file))
88 for file in files
89 ]
90 files = await asyncio.gather(*tasks)
91
92 tasks = [
93 upload_file(max_client=max_client, file_bytes=file[1], file_name=file[0])
94 for file in files
95 ]
96 files = await asyncio.gather(*tasks)
97
98 body.text = None
99
100 # must be only one file attachment in message
101 for file in files:
102 body.attachments = build_file_attachments(file=file)
103
104 await max_client.send_message_with_file(body, user_id=int(user_id))
105
106 if text:
107 body = SendMessageBody(text=text)
108
109 body.attachments = build_button_attachments(buttons=buttons)
110
111 await max_client.send_message(body, user_id=int(user_id))
112
113 await message.ack()
114 await max_client.close()
115
116 # locker.release(build_locker_key(user_id=user_id, bot_id=channel_id))
117
118
119@api_router.post("/channels/")
120async def create_channel(
121 channel: CreateChannel,
122 mongo_client: AsyncMongo = Depends(MongoMarker),
123 redis_client: AsyncRedis = Depends(RedisMarker),
124):
125 """Подключение MAX бота к Lexicom Omnichannel Platform."""
126
127 now_ts = now_timestamp()
128
129 # TODO:
130 doc = {
131 "__collection__": "TextChannelDB",
132 "name": f"{MAX_NAME_PREFIX} {channel.name}",
133 "token": channel.token,
134 "ivr_id": channel.scenario_id,
135 "ivr_version": channel.scenario_version,
136 "channel": "tg",
137 "connection": channel.connection,
138 "status": "not_connected",
139 "url": channel.url,
140 "outbound_company": False,
141 "use_button": True,
142 "email": "",
143 "host_email": "",
144 "imap_host_email": "",
145 "imap_port_email": 0,
146 "notify_token": "",
147 "password": "",
148 "queue_id": channel.queue_id,
149 "smtp_host_email": "",
150 "smtp_port_email": 0,
151 "created_at": now_ts,
152 "updated_at": now_ts,
153 }
154
155 inserted_id = await mongo_client.insert(doc)
156 created = await mongo_client.get_one(TextChannelDB, {"_id": inserted_id})
157
158 await upsert_channel_binding_in_cache(
159 redis_client=redis_client,
160 db=mongo_client,
161 queue_id=created.get("queue_id", ""),
162 channel_id=created["_id"],
163 ivr_id=created.get("ivr_id", ""),
164 ivr_version=created.get("ivr_version", ""),
165 )
166
167 return created
168
169
170@api_router.get("/channels/")
171async def list_channels(
172 mongo_client: AsyncMongo = Depends(MongoMarker),
173):
174 """Получить список MAX ботов в Lexicom Omnichannel Platform."""
175
176 pattern = f"^{re.escape(MAX_NAME_PREFIX)}"
177
178 docs = await mongo_client.find_many(TextChannelDB, {"name": {"$regex": pattern}})
179 return docs
180
181
182@api_router.patch("/channels/{channel_id}")
183async def patch_channel(
184 channel_id: str,
185 data: PatchChannel,
186 mongo_client: AsyncMongo = Depends(MongoMarker),
187 redis_client: AsyncRedis = Depends(RedisMarker),
188):
189 """Обновить MAX бота в Lexicom Omnichannel Platform."""
190
191 current = await mongo_client.get_one(TextChannelDB, {"_id": channel_id})
192 if not current or not str(current.get("name", "")).startswith(MAX_NAME_PREFIX):
193 raise HTTPException(status_code=404, detail="Channel not found or not editable")
194
195 update_data = data.model_dump(exclude_unset=True, exclude_none=True)
196 if "scenario_id" in update_data:
197 update_data["ivr_id"] = update_data.pop("scenario_id")
198 if "scenario_version" in update_data:
199 update_data["ivr_version"] = update_data.pop("scenario_version")
200
201 if not update_data:
202 return current
203
204 update_data["updated_at"] = now_timestamp()
205 await mongo_client.update(TextChannelDB, {"_id": channel_id}, update_data)
206
207 updated = await mongo_client.get_one(TextChannelDB, {"_id": channel_id})
208
209 await upsert_channel_binding_in_cache(
210 redis_client=redis_client,
211 db=mongo_client,
212 queue_id=updated.get("queue_id", ""),
213 channel_id=updated["_id"],
214 ivr_id=updated.get("ivr_id", ""),
215 ivr_version=updated.get("ivr_version", ""),
216 )
217
218 return updated
219
220
221@api_router.delete("/channels/{channel_id}")
222async def delete_channel(
223 channel_id: str,
224 mongo_client: AsyncMongo = Depends(MongoMarker),
225 redis_client: AsyncRedis = Depends(RedisMarker),
226):
227 """Удалить MAX бота из Lexicom Omnichannel Platform."""
228
229 doc = await mongo_client.get_one(TextChannelDB, {"_id": channel_id})
230 if not doc or not str(doc.get("name", "")).startswith(MAX_NAME_PREFIX):
231 raise HTTPException(
232 status_code=404, detail="Channel not found or not deletable"
233 )
234
235 deleted_count = await mongo_client.delete_one(TextChannelDB, {"_id": channel_id})
236 if deleted_count != 1:
237 raise HTTPException(status_code=409, detail="Channel was not deleted")
238
239 await delete_channel_from_cache(redis_client=redis_client, channel_id=channel_id)
240
241 return doc
242