mig_s.py
· 8.3 KiB · Python
Raw
#!/usr/bin/env python3
"""
Скрипт миграции tmux сессий в новую структуру инвентарей
"""
import os
import re
import shutil
from pathlib import Path
class SessionMigrator:
def __init__(self):
self.source_dir = Path.home() / '.config' / 'tmux' / 'sessions'
self.target_dir = Path.home() / '.config' / 'tmux-sessions' / 'inventory'
def parse_session_file(self, file_path):
"""Парсит файл сессии и извлекает данные"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Базовые данные
data = {
'filename': file_path.name,
'session_name': None,
'sshpass_path': None,
'servers': []
}
# Парсим имя сессии
session_match = re.search(r'session_name=(["\']?)([^"\'\n]+)\1', content)
if session_match:
data['session_name'] = session_match.group(2).strip()
# Парсим путь к паролю
sshpass_match = re.search(r'sshpass=\$\(pass ([^)\n]+)\)', content)
if sshpass_match:
data['sshpass_path'] = sshpass_match.group(1).strip()
# Парсим серверы из массива servers
servers_match = re.search(r'servers=\(([^)]+)\)', content, re.DOTALL)
if servers_match:
servers_content = servers_match.group(1)
# Извлекаем строки серверов
server_lines = re.findall(r'"([^"]+)"', servers_content)
for line in server_lines:
parts = line.strip().split(' ', 1)
if len(parts) == 2:
data['servers'].append(f'"{parts[0]} {parts[1]}"')
# Обработка специального формата для Приморье
if 'Приморье' in file_path.name:
data = self._parse_primorye_format(content, data)
return data
def _parse_primorye_format(self, content, data):
"""Специальный парсинг для файла Приморье с другим форматом"""
# Имя сессии
session_match = re.search(r'SESSION=([^\n]+)', content)
if session_match:
data['session_name'] = session_match.group(1).strip()
# Путь к паролю
sshpass_match = re.search(r'SSHPASS=\$\(pass ([^)\n]+)\)', content)
if sshpass_match:
data['sshpass_path'] = sshpass_match.group(1).strip()
# Извлекаем серверы из tmux new-window команд
servers = []
window_matches = re.findall(r'tmux new-window[^>]+-n ([^\s]+)[^>]+"sshpass[^>]+ssh ([^"\s]+)"', content)
for server_name, server_connect in window_matches:
servers.append(f'"{server_name} {server_connect}"')
data['servers'] = servers
return data
def generate_config_content(self, data):
"""Генерирует содержимое нового конфигурационного файла"""
lines = [
f"# Конфигурация для проекта {data['filename'].replace('.session', '')}",
f"SESSION_NAME=\"{data['session_name']}\"",
f"SSHPASS_PATH=\"{data['sshpass_path']}\"",
"",
"# Список серверов в формате \"имя_вкладки подключение\"",
"SERVERS=("
]
# Добавляем серверы
for server in data['servers']:
lines.append(f" {server}")
lines.extend([
")",
""
])
return '\n'.join(lines)
def sanitize_filename(self, filename):
"""Очищает имя файла от недопустимых символов"""
# Убираем расширение .session
clean_name = filename.replace('.session', '')
# Заменяем пробелы и другие проблемные символы
clean_name = re.sub(r'[^\w\-_.]', '_', clean_name)
return clean_name + '.conf'
def migrate(self):
"""Основная функция миграции"""
print("🚀 Начинаем миграцию tmux сессий...")
print(f"📁 Исходный каталог: {self.source_dir}")
print(f"📁 Целевой каталог: {self.target_dir}")
print()
# Создаем целевой каталог если не существует
self.target_dir.mkdir(parents=True, exist_ok=True)
# Получаем список файлов сессий
session_files = list(self.source_dir.glob('*.session'))
if not session_files:
print("❌ Не найдено файлов сессий для миграции")
return
migrated_count = 0
errors = []
for session_file in session_files:
try:
print(f"📖 Обрабатываем: {session_file.name}")
# Парсим файл
data = self.parse_session_file(session_file)
# Проверяем что все необходимые данные есть
if not data['session_name']:
print(f" ⚠️ Предупреждение: не найдено имя сессии в {session_file.name}")
if not data['sshpass_path']:
print(f" ⚠️ Предупреждение: не найден путь к паролю в {session_file.name}")
if not data['servers']:
print(f" ⚠️ Предупреждение: не найдены серверы в {session_file.name}")
# Генерируем новое имя файла
new_filename = self.sanitize_filename(session_file.name)
new_file_path = self.target_dir / new_filename
# Генерируем содержимое
config_content = self.generate_config_content(data)
# Сохраняем файл
with open(new_file_path, 'w', encoding='utf-8') as f:
f.write(config_content)
print(f" ✅ Создан: {new_filename}")
print(f" Session: {data['session_name']}")
print(f" Servers: {len(data['servers'])}")
print(f" Pass path: {data['sshpass_path']}")
print()
migrated_count += 1
except Exception as e:
error_msg = f"Ошибка обработки {session_file.name}: {str(e)}"
print(f" ❌ {error_msg}")
errors.append(error_msg)
# Выводим итоги
print("=" * 50)
print(f"📊 Миграция завершена!")
print(f"✅ Успешно мигрировано: {migrated_count}/{len(session_files)}")
if errors:
print(f"❌ Ошибки ({len(errors)}):")
for error in errors:
print(f" - {error}")
# Показываем созданные файлы
print(f"\n📁 Созданные файлы в {self.target_dir}:")
for conf_file in sorted(self.target_dir.glob('*.conf')):
print(f" 📄 {conf_file.name}")
print(f"\n🎯 Теперь вы можете использовать новый лаунчер:")
print(f" tmux-sessions # Меню выбора")
print(f" tmux-sessions --list # Список проектов")
def main():
migrator = SessionMigrator()
# Проверяем существует ли исходный каталог
if not migrator.source_dir.exists():
print(f"❌ Исходный каталог не существует: {migrator.source_dir}")
return
# Запускаем миграцию
migrator.migrate()
if __name__ == '__main__':
main()
| 1 | #!/usr/bin/env python3 |
| 2 | """ |
| 3 | Скрипт миграции tmux сессий в новую структуру инвентарей |
| 4 | """ |
| 5 | |
| 6 | import os |
| 7 | import re |
| 8 | import shutil |
| 9 | from pathlib import Path |
| 10 | |
| 11 | class SessionMigrator: |
| 12 | def __init__(self): |
| 13 | self.source_dir = Path.home() / '.config' / 'tmux' / 'sessions' |
| 14 | self.target_dir = Path.home() / '.config' / 'tmux-sessions' / 'inventory' |
| 15 | |
| 16 | def parse_session_file(self, file_path): |
| 17 | """Парсит файл сессии и извлекает данные""" |
| 18 | with open(file_path, 'r', encoding='utf-8') as f: |
| 19 | content = f.read() |
| 20 | |
| 21 | # Базовые данные |
| 22 | data = { |
| 23 | 'filename': file_path.name, |
| 24 | 'session_name': None, |
| 25 | 'sshpass_path': None, |
| 26 | 'servers': [] |
| 27 | } |
| 28 | |
| 29 | # Парсим имя сессии |
| 30 | session_match = re.search(r'session_name=(["\']?)([^"\'\n]+)\1', content) |
| 31 | if session_match: |
| 32 | data['session_name'] = session_match.group(2).strip() |
| 33 | |
| 34 | # Парсим путь к паролю |
| 35 | sshpass_match = re.search(r'sshpass=\$\(pass ([^)\n]+)\)', content) |
| 36 | if sshpass_match: |
| 37 | data['sshpass_path'] = sshpass_match.group(1).strip() |
| 38 | |
| 39 | # Парсим серверы из массива servers |
| 40 | servers_match = re.search(r'servers=\(([^)]+)\)', content, re.DOTALL) |
| 41 | if servers_match: |
| 42 | servers_content = servers_match.group(1) |
| 43 | # Извлекаем строки серверов |
| 44 | server_lines = re.findall(r'"([^"]+)"', servers_content) |
| 45 | for line in server_lines: |
| 46 | parts = line.strip().split(' ', 1) |
| 47 | if len(parts) == 2: |
| 48 | data['servers'].append(f'"{parts[0]} {parts[1]}"') |
| 49 | |
| 50 | # Обработка специального формата для Приморье |
| 51 | if 'Приморье' in file_path.name: |
| 52 | data = self._parse_primorye_format(content, data) |
| 53 | |
| 54 | return data |
| 55 | |
| 56 | def _parse_primorye_format(self, content, data): |
| 57 | """Специальный парсинг для файла Приморье с другим форматом""" |
| 58 | # Имя сессии |
| 59 | session_match = re.search(r'SESSION=([^\n]+)', content) |
| 60 | if session_match: |
| 61 | data['session_name'] = session_match.group(1).strip() |
| 62 | |
| 63 | # Путь к паролю |
| 64 | sshpass_match = re.search(r'SSHPASS=\$\(pass ([^)\n]+)\)', content) |
| 65 | if sshpass_match: |
| 66 | data['sshpass_path'] = sshpass_match.group(1).strip() |
| 67 | |
| 68 | # Извлекаем серверы из tmux new-window команд |
| 69 | servers = [] |
| 70 | window_matches = re.findall(r'tmux new-window[^>]+-n ([^\s]+)[^>]+"sshpass[^>]+ssh ([^"\s]+)"', content) |
| 71 | for server_name, server_connect in window_matches: |
| 72 | servers.append(f'"{server_name} {server_connect}"') |
| 73 | |
| 74 | data['servers'] = servers |
| 75 | return data |
| 76 | |
| 77 | def generate_config_content(self, data): |
| 78 | """Генерирует содержимое нового конфигурационного файла""" |
| 79 | lines = [ |
| 80 | f"# Конфигурация для проекта {data['filename'].replace('.session', '')}", |
| 81 | f"SESSION_NAME=\"{data['session_name']}\"", |
| 82 | f"SSHPASS_PATH=\"{data['sshpass_path']}\"", |
| 83 | "", |
| 84 | "# Список серверов в формате \"имя_вкладки подключение\"", |
| 85 | "SERVERS=(" |
| 86 | ] |
| 87 | |
| 88 | # Добавляем серверы |
| 89 | for server in data['servers']: |
| 90 | lines.append(f" {server}") |
| 91 | |
| 92 | lines.extend([ |
| 93 | ")", |
| 94 | "" |
| 95 | ]) |
| 96 | |
| 97 | return '\n'.join(lines) |
| 98 | |
| 99 | def sanitize_filename(self, filename): |
| 100 | """Очищает имя файла от недопустимых символов""" |
| 101 | # Убираем расширение .session |
| 102 | clean_name = filename.replace('.session', '') |
| 103 | # Заменяем пробелы и другие проблемные символы |
| 104 | clean_name = re.sub(r'[^\w\-_.]', '_', clean_name) |
| 105 | return clean_name + '.conf' |
| 106 | |
| 107 | def migrate(self): |
| 108 | """Основная функция миграции""" |
| 109 | print("🚀 Начинаем миграцию tmux сессий...") |
| 110 | print(f"📁 Исходный каталог: {self.source_dir}") |
| 111 | print(f"📁 Целевой каталог: {self.target_dir}") |
| 112 | print() |
| 113 | |
| 114 | # Создаем целевой каталог если не существует |
| 115 | self.target_dir.mkdir(parents=True, exist_ok=True) |
| 116 | |
| 117 | # Получаем список файлов сессий |
| 118 | session_files = list(self.source_dir.glob('*.session')) |
| 119 | |
| 120 | if not session_files: |
| 121 | print("❌ Не найдено файлов сессий для миграции") |
| 122 | return |
| 123 | |
| 124 | migrated_count = 0 |
| 125 | errors = [] |
| 126 | |
| 127 | for session_file in session_files: |
| 128 | try: |
| 129 | print(f"📖 Обрабатываем: {session_file.name}") |
| 130 | |
| 131 | # Парсим файл |
| 132 | data = self.parse_session_file(session_file) |
| 133 | |
| 134 | # Проверяем что все необходимые данные есть |
| 135 | if not data['session_name']: |
| 136 | print(f" ⚠️ Предупреждение: не найдено имя сессии в {session_file.name}") |
| 137 | |
| 138 | if not data['sshpass_path']: |
| 139 | print(f" ⚠️ Предупреждение: не найден путь к паролю в {session_file.name}") |
| 140 | |
| 141 | if not data['servers']: |
| 142 | print(f" ⚠️ Предупреждение: не найдены серверы в {session_file.name}") |
| 143 | |
| 144 | # Генерируем новое имя файла |
| 145 | new_filename = self.sanitize_filename(session_file.name) |
| 146 | new_file_path = self.target_dir / new_filename |
| 147 | |
| 148 | # Генерируем содержимое |
| 149 | config_content = self.generate_config_content(data) |
| 150 | |
| 151 | # Сохраняем файл |
| 152 | with open(new_file_path, 'w', encoding='utf-8') as f: |
| 153 | f.write(config_content) |
| 154 | |
| 155 | print(f" ✅ Создан: {new_filename}") |
| 156 | print(f" Session: {data['session_name']}") |
| 157 | print(f" Servers: {len(data['servers'])}") |
| 158 | print(f" Pass path: {data['sshpass_path']}") |
| 159 | print() |
| 160 | |
| 161 | migrated_count += 1 |
| 162 | |
| 163 | except Exception as e: |
| 164 | error_msg = f"Ошибка обработки {session_file.name}: {str(e)}" |
| 165 | print(f" ❌ {error_msg}") |
| 166 | errors.append(error_msg) |
| 167 | |
| 168 | # Выводим итоги |
| 169 | print("=" * 50) |
| 170 | print(f"📊 Миграция завершена!") |
| 171 | print(f"✅ Успешно мигрировано: {migrated_count}/{len(session_files)}") |
| 172 | |
| 173 | if errors: |
| 174 | print(f"❌ Ошибки ({len(errors)}):") |
| 175 | for error in errors: |
| 176 | print(f" - {error}") |
| 177 | |
| 178 | # Показываем созданные файлы |
| 179 | print(f"\n📁 Созданные файлы в {self.target_dir}:") |
| 180 | for conf_file in sorted(self.target_dir.glob('*.conf')): |
| 181 | print(f" 📄 {conf_file.name}") |
| 182 | |
| 183 | print(f"\n🎯 Теперь вы можете использовать новый лаунчер:") |
| 184 | print(f" tmux-sessions # Меню выбора") |
| 185 | print(f" tmux-sessions --list # Список проектов") |
| 186 | |
| 187 | def main(): |
| 188 | migrator = SessionMigrator() |
| 189 | |
| 190 | # Проверяем существует ли исходный каталог |
| 191 | if not migrator.source_dir.exists(): |
| 192 | print(f"❌ Исходный каталог не существует: {migrator.source_dir}") |
| 193 | return |
| 194 | |
| 195 | # Запускаем миграцию |
| 196 | migrator.migrate() |
| 197 | |
| 198 | if __name__ == '__main__': |
| 199 | main() |
| 200 |