Last active 1772600491

en2zmax's Avatar en2zmax revised this gist 1772600491. Go to revision

1 file changed, 241 insertions

routes.py(file created)

@@ -0,0 +1,241 @@
1 + import asyncio
2 + import json
3 + import logging
4 + import os
5 + import re
6 +
7 + from fastapi import APIRouter, Depends, HTTPException
8 + from faststream import Context
9 + from faststream.rabbit import RabbitRouter, RabbitQueue, RabbitMessage
10 +
11 + from max_integration.integrations.lexicom.builder import (
12 + build_button_attachments,
13 + build_file_attachments,
14 + )
15 + from max_integration.integrations.lexicom.chats import (
16 + AsyncMongo,
17 + TextChannelDB,
18 + AsyncRedis,
19 + upsert_channel_binding_in_cache,
20 + delete_channel_from_cache,
21 + )
22 + from max_integration.integrations.lexicom.schemas import CreateChannel, PatchChannel
23 + from max_integration.integrations.lexicom.storage import StorageClient
24 + from max_integration.markers import MongoMarker, RedisMarker
25 + from max_integration.max.client import MaxClient
26 + from max_integration.max.schemas import SendMessageBody
27 + from max_integration.locker import AsyncLocker
28 + from max_integration.max.storage import upload_file
29 + from max_integration.tools import now_timestamp
30 +
31 + MAX_NAME_PREFIX = os.getenv("MAX_NAME_PREFIX", "(MAX)")
32 + logger = logging.getLogger(__name__)
33 + max_queue = RabbitQueue("max", durable=True)
34 + rabbit_router = RabbitRouter()
35 + api_router = APIRouter(prefix="/lexicom", tags=["Lexicom"])
36 +
37 +
38 + @rabbit_router.subscriber(max_queue)
39 + async def max_handler(
40 + message: RabbitMessage,
41 + mongo_client: AsyncMongo = Context(),
42 + locker: AsyncLocker = Context(),
43 + storage_client: StorageClient = Context(),
44 + ):
45 + exchange_name = message.headers.get("exchange_name", "")
46 + routing_keys = message.headers.get("routing_keys", []) or []
47 +
48 + if exchange_name != "" or "tg" not in routing_keys:
49 + await message.ack()
50 + return
51 +
52 + body = json.loads(message.body)
53 +
54 + channel_id = body.get("channel_id")
55 +
56 + if not channel_id:
57 + await message.ack()
58 + return
59 +
60 + channel = await mongo_client.get_one(
61 + TextChannelDB, {"_id": channel_id}, show_fields={"token": 1, "name": 1}
62 + )
63 +
64 + if not channel:
65 + await message.ack()
66 + return
67 +
68 + name = channel.get("name", "")
69 +
70 + if not name.startswith(MAX_NAME_PREFIX):
71 + await message.ack()
72 + return
73 +
74 + max_client = MaxClient(access_token=channel.get("token"))
75 +
76 + logger.debug("MAX receive message with body %r", body)
77 +
78 + user_id = body.get("client_id")
79 + text = body.get("body", {}).get("message", "")
80 + files = body.get("files", [])
81 + buttons = body.get("buttons", [])
82 +
83 + if files:
84 + body = SendMessageBody()
85 +
86 + tasks = [
87 + asyncio.create_task(storage_client.read_file(file_name=file))
88 + for file in files
89 + ]
90 + files = await asyncio.gather(*tasks)
91 +
92 + tasks = [
93 + upload_file(max_client=max_client, file_bytes=file[1], file_name=file[0])
94 + for file in files
95 + ]
96 + files = await asyncio.gather(*tasks)
97 +
98 + body.text = None
99 +
100 + # must be only one file attachment in message
101 + for file in files:
102 + body.attachments = build_file_attachments(file=file)
103 +
104 + await max_client.send_message_with_file(body, user_id=int(user_id))
105 +
106 + if text:
107 + body = SendMessageBody(text=text)
108 +
109 + body.attachments = build_button_attachments(buttons=buttons)
110 +
111 + await max_client.send_message(body, user_id=int(user_id))
112 +
113 + await message.ack()
114 + await max_client.close()
115 +
116 + # locker.release(build_locker_key(user_id=user_id, bot_id=channel_id))
117 +
118 +
119 + @api_router.post("/channels/")
120 + async def create_channel(
121 + channel: CreateChannel,
122 + mongo_client: AsyncMongo = Depends(MongoMarker),
123 + redis_client: AsyncRedis = Depends(RedisMarker),
124 + ):
125 + """Подключение MAX бота к Lexicom Omnichannel Platform."""
126 +
127 + now_ts = now_timestamp()
128 +
129 + # TODO:
130 + doc = {
131 + "__collection__": "TextChannelDB",
132 + "name": f"{MAX_NAME_PREFIX} {channel.name}",
133 + "token": channel.token,
134 + "ivr_id": channel.scenario_id,
135 + "ivr_version": channel.scenario_version,
136 + "channel": "tg",
137 + "connection": channel.connection,
138 + "status": "not_connected",
139 + "url": channel.url,
140 + "outbound_company": False,
141 + "use_button": True,
142 + "email": "",
143 + "host_email": "",
144 + "imap_host_email": "",
145 + "imap_port_email": 0,
146 + "notify_token": "",
147 + "password": "",
148 + "queue_id": channel.queue_id,
149 + "smtp_host_email": "",
150 + "smtp_port_email": 0,
151 + "created_at": now_ts,
152 + "updated_at": now_ts,
153 + }
154 +
155 + inserted_id = await mongo_client.insert(doc)
156 + created = await mongo_client.get_one(TextChannelDB, {"_id": inserted_id})
157 +
158 + await upsert_channel_binding_in_cache(
159 + redis_client=redis_client,
160 + db=mongo_client,
161 + queue_id=created.get("queue_id", ""),
162 + channel_id=created["_id"],
163 + ivr_id=created.get("ivr_id", ""),
164 + ivr_version=created.get("ivr_version", ""),
165 + )
166 +
167 + return created
168 +
169 +
170 + @api_router.get("/channels/")
171 + async def list_channels(
172 + mongo_client: AsyncMongo = Depends(MongoMarker),
173 + ):
174 + """Получить список MAX ботов в Lexicom Omnichannel Platform."""
175 +
176 + pattern = f"^{re.escape(MAX_NAME_PREFIX)}"
177 +
178 + docs = await mongo_client.find_many(TextChannelDB, {"name": {"$regex": pattern}})
179 + return docs
180 +
181 +
182 + @api_router.patch("/channels/{channel_id}")
183 + async def patch_channel(
184 + channel_id: str,
185 + data: PatchChannel,
186 + mongo_client: AsyncMongo = Depends(MongoMarker),
187 + redis_client: AsyncRedis = Depends(RedisMarker),
188 + ):
189 + """Обновить MAX бота в Lexicom Omnichannel Platform."""
190 +
191 + current = await mongo_client.get_one(TextChannelDB, {"_id": channel_id})
192 + if not current or not str(current.get("name", "")).startswith(MAX_NAME_PREFIX):
193 + raise HTTPException(status_code=404, detail="Channel not found or not editable")
194 +
195 + update_data = data.model_dump(exclude_unset=True, exclude_none=True)
196 + if "scenario_id" in update_data:
197 + update_data["ivr_id"] = update_data.pop("scenario_id")
198 + if "scenario_version" in update_data:
199 + update_data["ivr_version"] = update_data.pop("scenario_version")
200 +
201 + if not update_data:
202 + return current
203 +
204 + update_data["updated_at"] = now_timestamp()
205 + await mongo_client.update(TextChannelDB, {"_id": channel_id}, update_data)
206 +
207 + updated = await mongo_client.get_one(TextChannelDB, {"_id": channel_id})
208 +
209 + await upsert_channel_binding_in_cache(
210 + redis_client=redis_client,
211 + db=mongo_client,
212 + queue_id=updated.get("queue_id", ""),
213 + channel_id=updated["_id"],
214 + ivr_id=updated.get("ivr_id", ""),
215 + ivr_version=updated.get("ivr_version", ""),
216 + )
217 +
218 + return updated
219 +
220 +
221 + @api_router.delete("/channels/{channel_id}")
222 + async def delete_channel(
223 + channel_id: str,
224 + mongo_client: AsyncMongo = Depends(MongoMarker),
225 + redis_client: AsyncRedis = Depends(RedisMarker),
226 + ):
227 + """Удалить MAX бота из Lexicom Omnichannel Platform."""
228 +
229 + doc = await mongo_client.get_one(TextChannelDB, {"_id": channel_id})
230 + if not doc or not str(doc.get("name", "")).startswith(MAX_NAME_PREFIX):
231 + raise HTTPException(
232 + status_code=404, detail="Channel not found or not deletable"
233 + )
234 +
235 + deleted_count = await mongo_client.delete_one(TextChannelDB, {"_id": channel_id})
236 + if deleted_count != 1:
237 + raise HTTPException(status_code=409, detail="Channel was not deleted")
238 +
239 + await delete_channel_from_cache(redis_client=redis_client, channel_id=channel_id)
240 +
241 + return doc
Newer Older