#!/usr/bin/env python3 import pymysql import time import hashlib import random import threading import os BATCH_SIZE = 1000 NUM_IVR_WORKERS = 1 def get_connection(): return pymysql.connect( host="localhost", user="root", password="MySecretPass", db="voicetech", charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, autocommit=False, ) def generate_gid(caller, callee, uniqueid): return hashlib.md5( f"{caller or random.random()}{callee or random.random()}{time.time()}{random.random()}{uniqueid or random.random()}".encode() ).hexdigest() def close_old_interactions(conn, interval=300): try: with conn.cursor() as cursor: cursor.execute( """ UPDATE interactions SET isClosed = 1, dtStop = NOW() WHERE isClosed = 0 AND (UNIX_TIMESTAMP() - UNIX_TIMESTAMP(dtUpdate)) > %s """, (interval,), ) conn.commit() except Exception as e: print(f"Error closing old interactions: {str(e)}") def process_close_old_interactions(conn): while True: close_old_interactions(conn) time.sleep(30) def process_cdr_queue(conn): wait_time = 1 while True: try: with conn.cursor() as cursor: cursor.execute( """ SELECT * FROM cdr_processing_queue WHERE status = 'new' ORDER BY created_at LIMIT 1 FOR UPDATE SKIP LOCKED """ ) task = cursor.fetchone() if not task: conn.rollback() time.sleep(wait_time) wait_time = min(wait_time * 2, 10) continue wait_time = 1 cursor.execute( "UPDATE cdr_processing_queue SET status='processing' WHERE id=%s", (task["id"],), ) cursor.execute("SELECT * FROM phone_cdr WHERE id=%s", (task["cdr_id"],)) cdr = cursor.fetchone() if not cdr: raise Exception("CDR not found") caller = cdr.get("caller") or cdr.get("src") callee = cdr.get("callee") or cdr.get("dst") dataset_id = None if cdr.get("dataset_id") == 0 else cdr.get("dataset_id") cursor.execute( """ SELECT interactionGID FROM phone_cdr WHERE interactionID = %s AND interactionGID IS NOT NULL AND interactionGID != '' LIMIT 1 """, (cdr["interactionID"],), ) result = cursor.fetchone() interactionGID = ( result["interactionGID"] if result else generate_gid(caller, callee, cdr["uniqueid"]) ) cursor.execute( """ UPDATE phone_cdr SET interactionGID = %s, caller = %s, callee = %s, dataset_id = %s WHERE id = %s """, (interactionGID, caller, callee, dataset_id, cdr["id"]), ) if not result: customerPhone = caller if cdr["isManager"] != caller else callee customerPhoneID = customerPhone[-10:] if customerPhone else "" customerName = "-" customer_id = None cursor.execute( """ INSERT INTO interactions ( interactionID, dtStart, dtUpdate, customerPhone, customerName, customerPhoneID, dataset_id, customer_id, source_id, hasVoice ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 0, 1) """, ( interactionGID, cdr["start"], cdr["start"], customerPhone, customerName, customerPhoneID, dataset_id, customer_id, ), ) cursor.execute( "UPDATE cdr_processing_queue SET status='done' WHERE id=%s", (task["id"],), ) conn.commit() except Exception as e: conn.rollback() with conn.cursor() as cursor: cursor.execute( "UPDATE cdr_processing_queue SET status='error', error_text=%s WHERE id=%s", (str(e), task["id"] if 'task' in locals() else None), ) conn.commit() def process_ivr_worker(): conn = get_connection() wait_time = 1 while True: try: with conn.cursor() as cursor: cursor.execute( f""" SELECT jp.* FROM journeys_processing_queue jp JOIN phone_cdr pc ON pc.interactionID = jp.interactionID JOIN cdr_processing_queue cpq ON cpq.cdr_id = pc.id WHERE jp.status = 'new' AND cpq.status = 'done' ORDER BY jp.created_at LIMIT {BATCH_SIZE}; """ ) tasks = cursor.fetchall() if not tasks: time.sleep(wait_time) wait_time = min(wait_time * 2, 10) continue wait_time = 1 print(f"Process tasks len {len(tasks)}") for task in tasks: try: with conn.cursor() as cursor: interactionID = task["interactionID"] cursor.execute( "SELECT * FROM phone_ivr_journeys WHERE id = %s", (task["ivr_id"],), ) ivr = cursor.fetchone() if not ivr: raise Exception("IVR record not found") cursor.execute( "UPDATE phone_ivr_journeys SET data = %s WHERE id = %s", (ivr["recognizedText"], ivr["id"]), ) cursor.execute( """ SELECT interactionGID FROM phone_cdr WHERE interactionID = %s AND interactionGID IS NOT NULL AND interactionGID != '' LIMIT 1 """, (interactionID,), ) result = cursor.fetchone() if not result: raise Exception("No interactionGID found in phone_cdr") interactionGID = result["interactionGID"] cursor.execute( """ UPDATE interactions SET operatorRequested = IF(%s=1, 1, operatorRequested), hasOperator = IF(%s=1, 1, hasOperator), lastMessage = %s, dtUpdate = NOW(), unreadByOperator = unreadByOperator + IF(%s IS NULL,1,0) WHERE interactionID = %s """, ( ivr["requestOperator"], ivr["requestOperator"], ivr["recognizedText"], ivr["ivr_id"], interactionGID, ), ) cursor.execute( "UPDATE journeys_processing_queue SET status='done' WHERE id=%s", (task["id"],), ) conn.commit() except Exception as e: print(f">>> Task process error ({type(e)}): {e}") conn.rollback() try: with conn.cursor() as cursor: cursor.execute( "UPDATE journeys_processing_queue SET status='error', error_text=%s WHERE id=%s", (str(e), task["id"]), ) conn.commit() except Exception as inner_e: print(f"[IVR worker] failed to log error: {inner_e}") except Exception as e: conn.rollback() print(f"[IVR worker] error: {e}") def main(): threading.Thread(target=process_close_old_interactions, args=(get_connection(),), daemon=True).start() threading.Thread(target=process_cdr_queue, args=(get_connection(),), daemon=True).start() for _ in range(NUM_IVR_WORKERS): threading.Thread(target=process_ivr_worker, daemon=True).start() while True: time.sleep(10) if __name__ == "__main__": main()