Last active 1769498000

Сервис создает асинхронно рабочие столы на UC Flora

Revision d95a172c6082eb6a14280907fec73a8c3c1ef310

handler_cdr.py Raw
1#!/usr/bin/env python3
2
3import pymysql
4import time
5import hashlib
6import random
7import threading
8import os
9
10BATCH_SIZE = 1000
11NUM_IVR_WORKERS = 1
12
13
14def get_connection():
15 return pymysql.connect(
16 host="localhost",
17 user="root",
18 password="MySecretPass",
19 db="voicetech",
20 charset="utf8mb4",
21 cursorclass=pymysql.cursors.DictCursor,
22 autocommit=False,
23 )
24
25
26def generate_gid(caller, callee, uniqueid):
27 return hashlib.md5(
28 f"{caller or random.random()}{callee or random.random()}{time.time()}{random.random()}{uniqueid or random.random()}".encode()
29 ).hexdigest()
30
31
32def close_old_interactions(conn, interval=300):
33 try:
34 with conn.cursor() as cursor:
35 cursor.execute(
36 """
37 UPDATE interactions
38 SET isClosed = 1, dtStop = NOW()
39 WHERE isClosed = 0 AND (UNIX_TIMESTAMP() - UNIX_TIMESTAMP(dtUpdate)) > %s
40 """,
41 (interval,),
42 )
43 conn.commit()
44 except Exception as e:
45 print(f"Error closing old interactions: {str(e)}")
46
47
48def process_close_old_interactions(conn):
49 while True:
50 close_old_interactions(conn)
51 time.sleep(30)
52
53
54def process_cdr_queue(conn):
55 wait_time = 1
56 while True:
57 try:
58 with conn.cursor() as cursor:
59 cursor.execute(
60 """
61 SELECT * FROM cdr_processing_queue
62 WHERE status = 'new'
63 ORDER BY created_at
64 LIMIT 1
65 FOR UPDATE SKIP LOCKED
66 """
67 )
68 task = cursor.fetchone()
69 if not task:
70 conn.rollback()
71 time.sleep(wait_time)
72 wait_time = min(wait_time * 2, 10)
73 continue
74 wait_time = 1
75
76 cursor.execute(
77 "UPDATE cdr_processing_queue SET status='processing' WHERE id=%s",
78 (task["id"],),
79 )
80
81 cursor.execute("SELECT * FROM phone_cdr WHERE id=%s", (task["cdr_id"],))
82 cdr = cursor.fetchone()
83 if not cdr:
84 raise Exception("CDR not found")
85
86 caller = cdr.get("caller") or cdr.get("src")
87 callee = cdr.get("callee") or cdr.get("dst")
88 dataset_id = None if cdr.get("dataset_id") == 0 else cdr.get("dataset_id")
89
90 cursor.execute(
91 """
92 SELECT interactionGID FROM phone_cdr
93 WHERE interactionID = %s AND interactionGID IS NOT NULL AND interactionGID != ''
94 LIMIT 1
95 """,
96 (cdr["interactionID"],),
97 )
98 result = cursor.fetchone()
99 interactionGID = (
100 result["interactionGID"]
101 if result
102 else generate_gid(caller, callee, cdr["uniqueid"])
103 )
104
105 cursor.execute(
106 """
107 UPDATE phone_cdr
108 SET interactionGID = %s,
109 caller = %s,
110 callee = %s,
111 dataset_id = %s
112 WHERE id = %s
113 """,
114 (interactionGID, caller, callee, dataset_id, cdr["id"]),
115 )
116
117 if not result:
118 customerPhone = caller if cdr["isManager"] != caller else callee
119 customerPhoneID = customerPhone[-10:] if customerPhone else ""
120 customerName = "-"
121 customer_id = None
122
123 cursor.execute(
124 """
125 INSERT INTO interactions (
126 interactionID,
127 dtStart,
128 dtUpdate,
129 customerPhone,
130 customerName,
131 customerPhoneID,
132 dataset_id,
133 customer_id,
134 source_id,
135 hasVoice
136 ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 0, 1)
137 """,
138 (
139 interactionGID,
140 cdr["start"],
141 cdr["start"],
142 customerPhone,
143 customerName,
144 customerPhoneID,
145 dataset_id,
146 customer_id,
147 ),
148 )
149
150 cursor.execute(
151 "UPDATE cdr_processing_queue SET status='done' WHERE id=%s",
152 (task["id"],),
153 )
154
155 conn.commit()
156 except Exception as e:
157 conn.rollback()
158 with conn.cursor() as cursor:
159 cursor.execute(
160 "UPDATE cdr_processing_queue SET status='error', error_text=%s WHERE id=%s",
161 (str(e), task["id"] if 'task' in locals() else None),
162 )
163 conn.commit()
164
165
166def process_ivr_worker():
167 conn = get_connection()
168 wait_time = 1
169 while True:
170 try:
171 with conn.cursor() as cursor:
172 cursor.execute(
173 f"""
174 SELECT jp.*
175 FROM journeys_processing_queue jp
176 JOIN phone_cdr pc ON pc.interactionID = jp.interactionID
177 JOIN cdr_processing_queue cpq ON cpq.cdr_id = pc.id
178 WHERE jp.status = 'new' AND cpq.status = 'done'
179 ORDER BY jp.created_at
180 LIMIT {BATCH_SIZE};
181 """
182 )
183 tasks = cursor.fetchall()
184
185 if not tasks:
186 time.sleep(wait_time)
187 wait_time = min(wait_time * 2, 10)
188 continue
189 wait_time = 1
190
191 print(f"Process tasks len {len(tasks)}")
192
193 for task in tasks:
194 try:
195 with conn.cursor() as cursor:
196 interactionID = task["interactionID"]
197
198 cursor.execute(
199 "SELECT * FROM phone_ivr_journeys WHERE id = %s",
200 (task["ivr_id"],),
201 )
202 ivr = cursor.fetchone()
203 if not ivr:
204 raise Exception("IVR record not found")
205
206 cursor.execute(
207 "UPDATE phone_ivr_journeys SET data = %s WHERE id = %s",
208 (ivr["recognizedText"], ivr["id"]),
209 )
210
211 cursor.execute(
212 """
213 SELECT interactionGID FROM phone_cdr
214 WHERE interactionID = %s AND interactionGID IS NOT NULL AND interactionGID != ''
215 LIMIT 1
216 """,
217 (interactionID,),
218 )
219 result = cursor.fetchone()
220 if not result:
221 raise Exception("No interactionGID found in phone_cdr")
222
223 interactionGID = result["interactionGID"]
224
225 cursor.execute(
226 """
227 UPDATE interactions
228 SET operatorRequested = IF(%s=1, 1, operatorRequested),
229 hasOperator = IF(%s=1, 1, hasOperator),
230 lastMessage = %s,
231 dtUpdate = NOW(),
232 unreadByOperator = unreadByOperator + IF(%s IS NULL,1,0)
233 WHERE interactionID = %s
234 """,
235 (
236 ivr["requestOperator"],
237 ivr["requestOperator"],
238 ivr["recognizedText"],
239 ivr["ivr_id"],
240 interactionGID,
241 ),
242 )
243
244 cursor.execute(
245 "UPDATE journeys_processing_queue SET status='done' WHERE id=%s",
246 (task["id"],),
247 )
248
249 conn.commit()
250
251 except Exception as e:
252 print(f">>> Task process error ({type(e)}): {e}")
253
254 conn.rollback()
255 try:
256 with conn.cursor() as cursor:
257 cursor.execute(
258 "UPDATE journeys_processing_queue SET status='error', error_text=%s WHERE id=%s",
259 (str(e), task["id"]),
260 )
261 conn.commit()
262 except Exception as inner_e:
263 print(f"[IVR worker] failed to log error: {inner_e}")
264
265 except Exception as e:
266 conn.rollback()
267 print(f"[IVR worker] error: {e}")
268
269
270def main():
271 threading.Thread(target=process_close_old_interactions, args=(get_connection(),), daemon=True).start()
272 threading.Thread(target=process_cdr_queue, args=(get_connection(),), daemon=True).start()
273
274 for _ in range(NUM_IVR_WORKERS):
275 threading.Thread(target=process_ivr_worker, daemon=True).start()
276
277 while True:
278 time.sleep(10)
279
280
281if __name__ == "__main__":
282 main()