Last active 1769498000

Сервис создает асинхронно рабочие столы на UC Flora

en2zmax's Avatar en2zmax revised this gist 1769498000. Go to revision

1 file changed, 282 insertions

handler_cdr.py(file created)

@@ -0,0 +1,282 @@
1 + #!/usr/bin/env python3
2 +
3 + import pymysql
4 + import time
5 + import hashlib
6 + import random
7 + import threading
8 + import os
9 +
10 + BATCH_SIZE = 1000
11 + NUM_IVR_WORKERS = 1
12 +
13 +
14 + def get_connection():
15 + return pymysql.connect(
16 + host="localhost",
17 + user="root",
18 + password="MySecretPass",
19 + db="voicetech",
20 + charset="utf8mb4",
21 + cursorclass=pymysql.cursors.DictCursor,
22 + autocommit=False,
23 + )
24 +
25 +
26 + def generate_gid(caller, callee, uniqueid):
27 + return hashlib.md5(
28 + f"{caller or random.random()}{callee or random.random()}{time.time()}{random.random()}{uniqueid or random.random()}".encode()
29 + ).hexdigest()
30 +
31 +
32 + def close_old_interactions(conn, interval=300):
33 + try:
34 + with conn.cursor() as cursor:
35 + cursor.execute(
36 + """
37 + UPDATE interactions
38 + SET isClosed = 1, dtStop = NOW()
39 + WHERE isClosed = 0 AND (UNIX_TIMESTAMP() - UNIX_TIMESTAMP(dtUpdate)) > %s
40 + """,
41 + (interval,),
42 + )
43 + conn.commit()
44 + except Exception as e:
45 + print(f"Error closing old interactions: {str(e)}")
46 +
47 +
48 + def process_close_old_interactions(conn):
49 + while True:
50 + close_old_interactions(conn)
51 + time.sleep(30)
52 +
53 +
54 + def process_cdr_queue(conn):
55 + wait_time = 1
56 + while True:
57 + try:
58 + with conn.cursor() as cursor:
59 + cursor.execute(
60 + """
61 + SELECT * FROM cdr_processing_queue
62 + WHERE status = 'new'
63 + ORDER BY created_at
64 + LIMIT 1
65 + FOR UPDATE SKIP LOCKED
66 + """
67 + )
68 + task = cursor.fetchone()
69 + if not task:
70 + conn.rollback()
71 + time.sleep(wait_time)
72 + wait_time = min(wait_time * 2, 10)
73 + continue
74 + wait_time = 1
75 +
76 + cursor.execute(
77 + "UPDATE cdr_processing_queue SET status='processing' WHERE id=%s",
78 + (task["id"],),
79 + )
80 +
81 + cursor.execute("SELECT * FROM phone_cdr WHERE id=%s", (task["cdr_id"],))
82 + cdr = cursor.fetchone()
83 + if not cdr:
84 + raise Exception("CDR not found")
85 +
86 + caller = cdr.get("caller") or cdr.get("src")
87 + callee = cdr.get("callee") or cdr.get("dst")
88 + dataset_id = None if cdr.get("dataset_id") == 0 else cdr.get("dataset_id")
89 +
90 + cursor.execute(
91 + """
92 + SELECT interactionGID FROM phone_cdr
93 + WHERE interactionID = %s AND interactionGID IS NOT NULL AND interactionGID != ''
94 + LIMIT 1
95 + """,
96 + (cdr["interactionID"],),
97 + )
98 + result = cursor.fetchone()
99 + interactionGID = (
100 + result["interactionGID"]
101 + if result
102 + else generate_gid(caller, callee, cdr["uniqueid"])
103 + )
104 +
105 + cursor.execute(
106 + """
107 + UPDATE phone_cdr
108 + SET interactionGID = %s,
109 + caller = %s,
110 + callee = %s,
111 + dataset_id = %s
112 + WHERE id = %s
113 + """,
114 + (interactionGID, caller, callee, dataset_id, cdr["id"]),
115 + )
116 +
117 + if not result:
118 + customerPhone = caller if cdr["isManager"] != caller else callee
119 + customerPhoneID = customerPhone[-10:] if customerPhone else ""
120 + customerName = "-"
121 + customer_id = None
122 +
123 + cursor.execute(
124 + """
125 + INSERT INTO interactions (
126 + interactionID,
127 + dtStart,
128 + dtUpdate,
129 + customerPhone,
130 + customerName,
131 + customerPhoneID,
132 + dataset_id,
133 + customer_id,
134 + source_id,
135 + hasVoice
136 + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 0, 1)
137 + """,
138 + (
139 + interactionGID,
140 + cdr["start"],
141 + cdr["start"],
142 + customerPhone,
143 + customerName,
144 + customerPhoneID,
145 + dataset_id,
146 + customer_id,
147 + ),
148 + )
149 +
150 + cursor.execute(
151 + "UPDATE cdr_processing_queue SET status='done' WHERE id=%s",
152 + (task["id"],),
153 + )
154 +
155 + conn.commit()
156 + except Exception as e:
157 + conn.rollback()
158 + with conn.cursor() as cursor:
159 + cursor.execute(
160 + "UPDATE cdr_processing_queue SET status='error', error_text=%s WHERE id=%s",
161 + (str(e), task["id"] if 'task' in locals() else None),
162 + )
163 + conn.commit()
164 +
165 +
166 + def process_ivr_worker():
167 + conn = get_connection()
168 + wait_time = 1
169 + while True:
170 + try:
171 + with conn.cursor() as cursor:
172 + cursor.execute(
173 + f"""
174 + SELECT jp.*
175 + FROM journeys_processing_queue jp
176 + JOIN phone_cdr pc ON pc.interactionID = jp.interactionID
177 + JOIN cdr_processing_queue cpq ON cpq.cdr_id = pc.id
178 + WHERE jp.status = 'new' AND cpq.status = 'done'
179 + ORDER BY jp.created_at
180 + LIMIT {BATCH_SIZE};
181 + """
182 + )
183 + tasks = cursor.fetchall()
184 +
185 + if not tasks:
186 + time.sleep(wait_time)
187 + wait_time = min(wait_time * 2, 10)
188 + continue
189 + wait_time = 1
190 +
191 + print(f"Process tasks len {len(tasks)}")
192 +
193 + for task in tasks:
194 + try:
195 + with conn.cursor() as cursor:
196 + interactionID = task["interactionID"]
197 +
198 + cursor.execute(
199 + "SELECT * FROM phone_ivr_journeys WHERE id = %s",
200 + (task["ivr_id"],),
201 + )
202 + ivr = cursor.fetchone()
203 + if not ivr:
204 + raise Exception("IVR record not found")
205 +
206 + cursor.execute(
207 + "UPDATE phone_ivr_journeys SET data = %s WHERE id = %s",
208 + (ivr["recognizedText"], ivr["id"]),
209 + )
210 +
211 + cursor.execute(
212 + """
213 + SELECT interactionGID FROM phone_cdr
214 + WHERE interactionID = %s AND interactionGID IS NOT NULL AND interactionGID != ''
215 + LIMIT 1
216 + """,
217 + (interactionID,),
218 + )
219 + result = cursor.fetchone()
220 + if not result:
221 + raise Exception("No interactionGID found in phone_cdr")
222 +
223 + interactionGID = result["interactionGID"]
224 +
225 + cursor.execute(
226 + """
227 + UPDATE interactions
228 + SET operatorRequested = IF(%s=1, 1, operatorRequested),
229 + hasOperator = IF(%s=1, 1, hasOperator),
230 + lastMessage = %s,
231 + dtUpdate = NOW(),
232 + unreadByOperator = unreadByOperator + IF(%s IS NULL,1,0)
233 + WHERE interactionID = %s
234 + """,
235 + (
236 + ivr["requestOperator"],
237 + ivr["requestOperator"],
238 + ivr["recognizedText"],
239 + ivr["ivr_id"],
240 + interactionGID,
241 + ),
242 + )
243 +
244 + cursor.execute(
245 + "UPDATE journeys_processing_queue SET status='done' WHERE id=%s",
246 + (task["id"],),
247 + )
248 +
249 + conn.commit()
250 +
251 + except Exception as e:
252 + print(f">>> Task process error ({type(e)}): {e}")
253 +
254 + conn.rollback()
255 + try:
256 + with conn.cursor() as cursor:
257 + cursor.execute(
258 + "UPDATE journeys_processing_queue SET status='error', error_text=%s WHERE id=%s",
259 + (str(e), task["id"]),
260 + )
261 + conn.commit()
262 + except Exception as inner_e:
263 + print(f"[IVR worker] failed to log error: {inner_e}")
264 +
265 + except Exception as e:
266 + conn.rollback()
267 + print(f"[IVR worker] error: {e}")
268 +
269 +
270 + def main():
271 + threading.Thread(target=process_close_old_interactions, args=(get_connection(),), daemon=True).start()
272 + threading.Thread(target=process_cdr_queue, args=(get_connection(),), daemon=True).start()
273 +
274 + for _ in range(NUM_IVR_WORKERS):
275 + threading.Thread(target=process_ivr_worker, daemon=True).start()
276 +
277 + while True:
278 + time.sleep(10)
279 +
280 +
281 + if __name__ == "__main__":
282 + main()
Newer Older