#!/usr/bin/env python3 """ PITOOLS v4 — DevOps фреймворк на Python pinstall, piproject, deploypi, multideploypi Для бабушек и дедушек! 🔥 """ import argparse import asyncio import shlex import sys import getpass import json import yaml from typing import List, Dict, Any, Optional, Union from pathlib import Path import paramiko from io import StringIO from dataclasses import dataclass from datetime import datetime # Try to import yaml, fallback if not available try: import yaml YAML_AVAILABLE = True except ImportError: YAML_AVAILABLE = False @dataclass class FileSpec: path: Path mode: int content: Optional[str] = None is_dir: bool = False class PITools: VERSION = "4.1.0" def __init__(self, root: Path = None, verbose: bool = False): self.root = root or Path.cwd() self.curdir = self.root self.verbose = verbose self.stats = {"files": 0, "dirs": 0, "errors": 0} def _log(self, message: str, level: str = "INFO"): """Умное логирование""" icons = { "INFO": "ℹ️", "SUCCESS": "✅", "WARNING": "⚠️", "ERROR": "❌", "DEBUG": "🐛" } timestamp = datetime.now().strftime("%H:%M:%S") if level in icons: print(f"{icons[level]} [{timestamp}] {message}") else: print(f"[{timestamp}] {message}") def _get_smart_mode(self, filepath: Path) -> int: """Определение прав доступа по расширению файла""" if filepath.is_dir(): return 0o755 ext = filepath.suffix.lower() # Исполняемые файлы if ext in ['.sh', '.bash', '.zsh', '.py', '.pl', '.rb', '.php', '.js', '.ts']: return 0o755 # Приватные файлы if ext in ['.key', '.pem', '.crt', '.pub', '.priv']: return 0o600 # Конфиденциальные данные if ext in ['.env', '.secret', '.token', '.password', '.credential']: return 0o640 # Конфигурационные файлы if ext in ['.conf', '.cfg', '.ini', '.config', '.yml', '.yaml', '.toml']: return 0o644 # Логи и данные if ext in ['.log', '.db', '.sqlite', '.json', '.xml']: return 0o644 # По умолчанию return 0o644 def pinstall(self, items: List[str], mode: Optional[int] = None, is_dir: bool = False, content: Optional[str] = None) -> Dict[str, int]: """Создание файлов/папок с умными правами""" self.stats = {"files": 0, "dirs": 0, "errors": 0} for item in items: if not item.strip(): continue try: p = self.root / item.lstrip('/') if is_dir or item.endswith('/'): p.mkdir(parents=True, exist_ok=True) final_mode = mode or 0o755 p.chmod(final_mode) self._log(f"Папка: {item.strip()} ({oct(final_mode)})", "SUCCESS") self.stats["dirs"] += 1 else: p.parent.mkdir(parents=True, exist_ok=True) if content: p.write_text(content, encoding='utf-8') else: p.touch(exist_ok=True) final_mode = mode or self._get_smart_mode(p) p.chmod(final_mode) self._log(f"Файл: {item.strip()} ({oct(final_mode)})", "SUCCESS") self.stats["files"] += 1 except Exception as e: self._log(f"Ошибка создания {item}: {e}", "ERROR") self.stats["errors"] += 1 self._log(f"Создано: {self.stats['files']} файлов, {self.stats['dirs']} папок, ошибок: {self.stats['errors']}", "INFO") return self.stats def piproject(self, spec_file: str, export_json: bool = False) -> bool: """Развёртывание проекта по .prj""" self._log(f"Читаем спецификацию: {spec_file}") spec_path = Path(spec_file) if not spec_path.exists(): self._log(f"Файл не найден: {spec_file}", "ERROR") return False original_root = self.root project_structure = [] try: with open(spec_file, 'r', encoding='utf-8') as f: content = f.read() # Поддержка разных форматов if spec_path.suffix.lower() in ['.json', '.yaml', '.yml']: return self._load_structured_spec(spec_path, export_json) # Стандартный .prj формат lines = content.split('\n') for line_num, line in enumerate(lines, 1): line = line.strip() if not line or line.startswith('#'): if line.startswith('#') and self.verbose: self._log(f"Комментарий: {line[1:].strip()}", "DEBUG") continue parts = shlex.split(line) if not parts: continue cmd = parts[0] args = parts[1:] try: if cmd == 'PRJ:': if args: self.root = Path(args[0]).expanduser().resolve() self.curdir = self.root self.root.mkdir(parents=True, exist_ok=True) self._log(f"Проект: {self.root}/", "INFO") elif cmd == 'DR:': for d in args: d = d.rstrip('/') full_path = self.root / d full_path.mkdir(parents=True, exist_ok=True) full_path.chmod(0o755) project_structure.append({"type": "dir", "path": str(full_path), "mode": "755"}) self._log(f"Папка: {d}/ (755)", "SUCCESS") self.stats["dirs"] += 1 self.curdir = full_path elif cmd == 'FL:': for fname in args: full_path = self.root / fname full_path.parent.mkdir(parents=True, exist_ok=True) full_path.touch(exist_ok=True) mode = self._get_smart_mode(full_path) full_path.chmod(mode) project_structure.append({ "type": "file", "path": str(full_path), "mode": oct(mode), "extension": full_path.suffix }) mode_str = oct(mode).replace('0o', '') self._log(f"Файл: {fname} ({mode_str})", "SUCCESS") self.stats["files"] += 1 elif cmd == 'CMD:': # Выполнение команд после создания структуры if args: cmd_str = ' '.join(args) self._log(f"Выполняем: {cmd_str}", "INFO") elif cmd == 'TPL:': # Шаблоны файлов с переменными if len(args) >= 2: template_file = args[0] target_file = args[1] variables = args[2:] if len(args) > 2 else [] self._process_template(template_file, target_file, variables) except Exception as e: self._log(f"Строка {line_num}: {e}", "ERROR") self.stats["errors"] += 1 except Exception as e: self._log(f"Ошибка чтения файла: {e}", "ERROR") return False finally: self.root = original_root # Экспорт структуры в JSON if export_json: json_file = f"{spec_path.stem}_structure.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(project_structure, f, indent=2, ensure_ascii=False) self._log(f"Структура экспортирована в: {json_file}", "INFO") self._log(f"Готово! Создано: {self.stats['files']} файлов, {self.stats['dirs']} папок", "SUCCESS") return True def _load_structured_spec(self, spec_path: Path, export_json: bool) -> bool: """Загрузка структурированных спецификаций (JSON/YAML)""" try: if spec_path.suffix.lower() == '.json': with open(spec_path, 'r', encoding='utf-8') as f: spec = json.load(f) elif spec_path.suffix.lower() in ['.yaml', '.yml']: if not YAML_AVAILABLE: self._log("YAML не установлен. Установите: pip install pyyaml", "ERROR") return False with open(spec_path, 'r', encoding='utf-8') as f: spec = yaml.safe_load(f) else: self._log(f"Неподдерживаемый формат: {spec_path.suffix}", "ERROR") return False # Обработка структурированной спецификации if 'project' in spec: project_root = spec.get('project', {}).get('root', '.') self.root = Path(project_root).expanduser().resolve() self.root.mkdir(parents=True, exist_ok=True) self._log(f"Проект: {self.root}/", "INFO") # Создание директорий for dir_path in spec.get('directories', []): full_path = self.root / dir_path full_path.mkdir(parents=True, exist_ok=True) full_path.chmod(0o755) self._log(f"Папка: {dir_path}/ (755)", "SUCCESS") self.stats["dirs"] += 1 # Создание файлов for file_spec in spec.get('files', []): if isinstance(file_spec, dict): file_path = file_spec.get('path', '') content = file_spec.get('content', '') mode = file_spec.get('mode') else: file_path = file_spec content = '' mode = None full_path = self.root / file_path full_path.parent.mkdir(parents=True, exist_ok=True) if content: full_path.write_text(content, encoding='utf-8') else: full_path.touch(exist_ok=True) final_mode = mode or self._get_smart_mode(full_path) full_path.chmod(final_mode) mode_str = oct(final_mode).replace('0o', '') self._log(f"Файл: {file_path} ({mode_str})", "SUCCESS") self.stats["files"] += 1 return True except Exception as e: self._log(f"Ошибка загрузки спецификации: {e}", "ERROR") return False def _process_template(self, template: str, target: str, variables: List[str]): """Обработка шаблонов файлов""" template_path = Path(template) if not template_path.exists(): self._log(f"Шаблон не найден: {template}", "ERROR") return content = template_path.read_text(encoding='utf-8') # Замена переменных (простая реализация) var_dict = {} for var in variables: if '=' in var: key, value = var.split('=', 1) var_dict[key.strip()] = value.strip() for key, value in var_dict.items(): content = content.replace(f'{{{{{key}}}}}', value) target_path = self.root / target target_path.parent.mkdir(parents=True, exist_ok=True) target_path.write_text(content, encoding='utf-8') mode = self._get_smart_mode(target_path) target_path.chmod(mode) self._log(f"Шаблон: {template} → {target}", "SUCCESS") self.stats["files"] += 1 class RemoteDeployer: def __init__(self, verbose: bool = False): self.verbose = verbose self.results = [] async def deploy(self, host: str, user: str, spec_content: str, key_path: Optional[str] = None, password: Optional[str] = None) -> Dict[str, Any]: """Деплой на удаленный сервер""" result = { "host": host, "success": False, "output": "", "error": "", "timestamp": datetime.now().isoformat() } client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: # Асинхронное подключение loop = asyncio.get_event_loop() connect_kwargs = { "hostname": host, "username": user, "timeout": 15 } if key_path: connect_kwargs["key_filename"] = key_path elif password: connect_kwargs["password"] = password await loop.run_in_executor(None, client.connect, **connect_kwargs) # Создаем временную директорию timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") remote_dir = f"/tmp/pitools_{timestamp}" sftp = await loop.run_in_executor(None, client.open_sftp) # Загружаем pitools и спецификацию current_file = Path(__file__) pitools_code = current_file.read_text(encoding='utf-8') def upload(): sftp.mkdir(remote_dir) with sftp.open(f"{remote_dir}/pitools.py", 'w') as f: f.write(pitools_code) with sftp.open(f"{remote_dir}/spec.prj", 'w') as f: f.write(spec_content) sftp.chmod(f"{remote_dir}/pitools.py", 0o755) await loop.run_in_executor(None, upload) await loop.run_in_executor(None, sftp.close) # Выполняем развертывание def execute(): cmd = f"cd {remote_dir} && python3 pitools.py piproject spec.prj" stdin, stdout, stderr = client.exec_command(cmd) output = stdout.read().decode('utf-8', errors='ignore') error = stderr.read().decode('utf-8', errors='ignore') return output, error output, error = await loop.run_in_executor(None, execute) # Очистка clean_cmd = f"rm -rf {remote_dir}" client.exec_command(clean_cmd) result["success"] = True result["output"] = output if error: result["error"] = error self._log(f"✅ {host}: Успешно", "SUCCESS") except Exception as e: result["error"] = str(e) self._log(f"❌ {host}: {e}", "ERROR") finally: try: client.close() except: pass self.results.append(result) return result def _log(self, message: str, level: str = "INFO"): """Логирование для RemoteDeployer""" icons = {"SUCCESS": "✅", "ERROR": "❌", "INFO": "ℹ️"} if level in icons: print(f"{icons[level]} {message}") else: print(message) # CLI async def main(): parser = argparse.ArgumentParser( description=f"PITOOLS v{PITools.VERSION} — DevOps фреймворк на Python", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Примеры использования: pitools pinstall file1.txt dir/ script.sh -s pitools piproject project.prj pitools deploypi server.com user project.prj pitools multideploypi infra.prj Форматы спецификаций: .prj - Текстовый формат с командами .json - JSON структура .yaml - YAML структура (требует pyyaml) """ ) parser.add_argument('--version', action='version', version=f'PITOOLS v{PITools.VERSION}') parser.add_argument('-v', '--verbose', action='store_true', help='Подробный вывод') subparsers = parser.add_subparsers(dest='command', help='Команды') # pinstall p1 = subparsers.add_parser('pinstall', help='Создать файлы/папки') p1.add_argument('items', nargs='+', help='Файлы/папки') p1.add_argument('-s', '--script', action='store_true', help='Режим скрипта (755)') p1.add_argument('-d', '--directory', action='store_true', help='Режим папки') p1.add_argument('-m', '--mode', help='Права доступа (например: 755, 644)') p1.add_argument('-c', '--content', help='Содержимое файла') # piproject p2 = subparsers.add_parser('piproject', help='Развёртывание проекта') p2.add_argument('spec', help='Файл спецификации (.prj, .json, .yaml)') p2.add_argument('-e', '--export-json', action='store_true', help='Экспортировать структуру в JSON') p2.add_argument('-r', '--root', help='Корневая директория проекта') # deploypi p3 = subparsers.add_parser('deploypi', help='Деплой на сервер') p3.add_argument('host', help='Хост') p3.add_argument('user', help='Пользователь') p3.add_argument('spec', help='Файл спецификации или содержимое') p3.add_argument('-k', '--key', help='Путь к SSH ключу') p3.add_argument('-p', '--password', action='store_true', help='Использовать парольную аутентификацию') # multideploypi p4 = subparsers.add_parser('multideploypi', help='Мультидеплой') p4.add_argument('spec_or_infra', help='Файл спецификации или инфраструктуры') p4.add_argument('-i', '--infra', help='Файл инфраструктуры (опционально)') p4.add_argument('-u', '--user', help='Пользователь для всех хостов') p4.add_argument('-k', '--key', help='Путь к SSH ключу') p4.add_argument('-o', '--output', help='Файл для сохранения результатов') args = parser.parse_args() if not args.command: parser.print_help() return try: if args.command == 'pinstall': root_dir = Path(args.root) if hasattr(args, 'root') and args.root else None pitools = PITools(root=root_dir, verbose=args.verbose) mode = None if args.mode: try: mode = int(args.mode, 8) except ValueError: print(f"❌ Неверный формат прав: {args.mode}") return pitools.pinstall( args.items, mode=mode or (0o755 if args.script else None), is_dir=args.directory, content=args.content ) elif args.command == 'piproject': root_dir = Path(args.root) if hasattr(args, 'root') and args.root else None pitools = PITools(root=root_dir, verbose=args.verbose) pitools.piproject(args.spec, args.export_json) elif args.command == 'deploypi': spec_path = Path(args.spec) if spec_path.exists(): spec_content = spec_path.read_text(encoding='utf-8') else: spec_content = args.spec password = None if args.password: password = getpass.getpass(f"Пароль для {args.user}@{args.host}: ") deployer = RemoteDeployer(verbose=args.verbose) result = await deployer.deploy( args.host, args.user, spec_content, key_path=args.key, password=password ) if args.verbose and result["output"]: print(f"\n📋 Вывод с сервера:\n{result['output']}") elif args.command == 'multideploypi': await run_multideploy(args) except KeyboardInterrupt: print("\n\n👋 Прервано пользователем") sys.exit(0) except Exception as e: print(f"💥 Критическая ошибка: {e}") if args.verbose: import traceback traceback.print_exc() sys.exit(1) async def run_multideploy(args): """Запуск мультидеплоя""" print("🌍 МУЛЬТИДЕПЛОЙ") infra_file = args.infra if args.infra else None # Определяем пользователя user = args.user if not user: user = input("Пользователь (по умолчанию текущий): ").strip() or getpass.getuser() # Парсинг инфраструктуры if infra_file and Path(infra_file).exists(): blocks = parse_infra(infra_file) else: spec_path = Path(args.spec_or_infra) if spec_path.exists(): spec_content = spec_path.read_text(encoding='utf-8') else: spec_content = args.spec_or_infra blocks = [{"hosts": [args.spec_or_infra], "spec": spec_content}] deployer = RemoteDeployer(verbose=args.verbose) all_results = [] for i, block in enumerate(blocks, 1): print(f"\n🔧 Блок {i} → {len(block['hosts'])} хостов") tasks = [] for host in block['hosts']: task = deployer.deploy( host, user, block['spec'], key_path=args.key, password=None # Пароль запрашивается при необходимости ) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) successful = 0 for result in results: if isinstance(result, dict) and result.get("success"): successful += 1 all_results.append(result) print(f"✅ Блок {i}: {successful}/{len(tasks)} готово!") # Сохранение результатов if args.output: with open(args.output, 'w', encoding='utf-8') as f: json.dump(all_results, f, indent=2, ensure_ascii=False) print(f"\n📊 Результаты сохранены в: {args.output}") def parse_infra(spec_file: str) -> List[Dict[str, Any]]: """Парсинг infra.prj для multideploypi""" blocks = [] current_hosts = [] current_spec = [] with open(spec_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue if line.startswith('MLT:'): if current_hosts and current_spec: blocks.append({"hosts": current_hosts, "spec": '\n'.join(current_spec)}) current_hosts = [h.strip() for h in line[4:].split(',')] current_spec = [] else: current_spec.append(line) if current_hosts and current_spec: blocks.append({"hosts": current_hosts, "spec": '\n'.join(current_spec)}) return blocks if __name__ == '__main__': asyncio.run(main())