Add CI/CD

This commit is contained in:
peginelab 2025-12-23 02:45:20 +03:00
commit 8ec21b41fa
3 changed files with 1929 additions and 0 deletions

637
pitoolsv2.py Executable file
View file

@ -0,0 +1,637 @@
#!/usr/bin/env python3
"""
PITOOLS v4 DevOps фреймворк на Python
pinstall, piproject, deploypi, multideploypi
Для бабушек и дедушек! 🔥
"""
import argparse
import asyncio
import shlex
import sys
import getpass
import json
import yaml
from typing import List, Dict, Any, Optional, Union
from pathlib import Path
import paramiko
from io import StringIO
from dataclasses import dataclass
from datetime import datetime
# Try to import yaml, fallback if not available
try:
import yaml
YAML_AVAILABLE = True
except ImportError:
YAML_AVAILABLE = False
@dataclass
class FileSpec:
path: Path
mode: int
content: Optional[str] = None
is_dir: bool = False
class PITools:
VERSION = "4.1.0"
def __init__(self, root: Path = None, verbose: bool = False):
self.root = root or Path.cwd()
self.curdir = self.root
self.verbose = verbose
self.stats = {"files": 0, "dirs": 0, "errors": 0}
def _log(self, message: str, level: str = "INFO"):
"""Умное логирование"""
icons = {
"INFO": "",
"SUCCESS": "",
"WARNING": "⚠️",
"ERROR": "",
"DEBUG": "🐛"
}
timestamp = datetime.now().strftime("%H:%M:%S")
if level in icons:
print(f"{icons[level]} [{timestamp}] {message}")
else:
print(f"[{timestamp}] {message}")
def _get_smart_mode(self, filepath: Path) -> int:
"""Определение прав доступа по расширению файла"""
if filepath.is_dir():
return 0o755
ext = filepath.suffix.lower()
# Исполняемые файлы
if ext in ['.sh', '.bash', '.zsh', '.py', '.pl', '.rb', '.php', '.js', '.ts']:
return 0o755
# Приватные файлы
if ext in ['.key', '.pem', '.crt', '.pub', '.priv']:
return 0o600
# Конфиденциальные данные
if ext in ['.env', '.secret', '.token', '.password', '.credential']:
return 0o640
# Конфигурационные файлы
if ext in ['.conf', '.cfg', '.ini', '.config', '.yml', '.yaml', '.toml']:
return 0o644
# Логи и данные
if ext in ['.log', '.db', '.sqlite', '.json', '.xml']:
return 0o644
# По умолчанию
return 0o644
def pinstall(self, items: List[str], mode: Optional[int] = None,
is_dir: bool = False, content: Optional[str] = None) -> Dict[str, int]:
"""Создание файлов/папок с умными правами"""
self.stats = {"files": 0, "dirs": 0, "errors": 0}
for item in items:
if not item.strip():
continue
try:
p = self.root / item.lstrip('/')
if is_dir or item.endswith('/'):
p.mkdir(parents=True, exist_ok=True)
final_mode = mode or 0o755
p.chmod(final_mode)
self._log(f"Папка: {item.strip()} ({oct(final_mode)})", "SUCCESS")
self.stats["dirs"] += 1
else:
p.parent.mkdir(parents=True, exist_ok=True)
if content:
p.write_text(content, encoding='utf-8')
else:
p.touch(exist_ok=True)
final_mode = mode or self._get_smart_mode(p)
p.chmod(final_mode)
self._log(f"Файл: {item.strip()} ({oct(final_mode)})", "SUCCESS")
self.stats["files"] += 1
except Exception as e:
self._log(f"Ошибка создания {item}: {e}", "ERROR")
self.stats["errors"] += 1
self._log(f"Создано: {self.stats['files']} файлов, {self.stats['dirs']} папок, ошибок: {self.stats['errors']}", "INFO")
return self.stats
def piproject(self, spec_file: str, export_json: bool = False) -> bool:
"""Развёртывание проекта по .prj"""
self._log(f"Читаем спецификацию: {spec_file}")
spec_path = Path(spec_file)
if not spec_path.exists():
self._log(f"Файл не найден: {spec_file}", "ERROR")
return False
original_root = self.root
project_structure = []
try:
with open(spec_file, 'r', encoding='utf-8') as f:
content = f.read()
# Поддержка разных форматов
if spec_path.suffix.lower() in ['.json', '.yaml', '.yml']:
return self._load_structured_spec(spec_path, export_json)
# Стандартный .prj формат
lines = content.split('\n')
for line_num, line in enumerate(lines, 1):
line = line.strip()
if not line or line.startswith('#'):
if line.startswith('#') and self.verbose:
self._log(f"Комментарий: {line[1:].strip()}", "DEBUG")
continue
parts = shlex.split(line)
if not parts:
continue
cmd = parts[0]
args = parts[1:]
try:
if cmd == 'PRJ:':
if args:
self.root = Path(args[0]).expanduser().resolve()
self.curdir = self.root
self.root.mkdir(parents=True, exist_ok=True)
self._log(f"Проект: {self.root}/", "INFO")
elif cmd == 'DR:':
for d in args:
d = d.rstrip('/')
full_path = self.root / d
full_path.mkdir(parents=True, exist_ok=True)
full_path.chmod(0o755)
project_structure.append({"type": "dir", "path": str(full_path), "mode": "755"})
self._log(f"Папка: {d}/ (755)", "SUCCESS")
self.stats["dirs"] += 1
self.curdir = full_path
elif cmd == 'FL:':
for fname in args:
full_path = self.root / fname
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.touch(exist_ok=True)
mode = self._get_smart_mode(full_path)
full_path.chmod(mode)
project_structure.append({
"type": "file",
"path": str(full_path),
"mode": oct(mode),
"extension": full_path.suffix
})
mode_str = oct(mode).replace('0o', '')
self._log(f"Файл: {fname} ({mode_str})", "SUCCESS")
self.stats["files"] += 1
elif cmd == 'CMD:':
# Выполнение команд после создания структуры
if args:
cmd_str = ' '.join(args)
self._log(f"Выполняем: {cmd_str}", "INFO")
elif cmd == 'TPL:':
# Шаблоны файлов с переменными
if len(args) >= 2:
template_file = args[0]
target_file = args[1]
variables = args[2:] if len(args) > 2 else []
self._process_template(template_file, target_file, variables)
except Exception as e:
self._log(f"Строка {line_num}: {e}", "ERROR")
self.stats["errors"] += 1
except Exception as e:
self._log(f"Ошибка чтения файла: {e}", "ERROR")
return False
finally:
self.root = original_root
# Экспорт структуры в JSON
if export_json:
json_file = f"{spec_path.stem}_structure.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(project_structure, f, indent=2, ensure_ascii=False)
self._log(f"Структура экспортирована в: {json_file}", "INFO")
self._log(f"Готово! Создано: {self.stats['files']} файлов, {self.stats['dirs']} папок", "SUCCESS")
return True
def _load_structured_spec(self, spec_path: Path, export_json: bool) -> bool:
"""Загрузка структурированных спецификаций (JSON/YAML)"""
try:
if spec_path.suffix.lower() == '.json':
with open(spec_path, 'r', encoding='utf-8') as f:
spec = json.load(f)
elif spec_path.suffix.lower() in ['.yaml', '.yml']:
if not YAML_AVAILABLE:
self._log("YAML не установлен. Установите: pip install pyyaml", "ERROR")
return False
with open(spec_path, 'r', encoding='utf-8') as f:
spec = yaml.safe_load(f)
else:
self._log(f"Неподдерживаемый формат: {spec_path.suffix}", "ERROR")
return False
# Обработка структурированной спецификации
if 'project' in spec:
project_root = spec.get('project', {}).get('root', '.')
self.root = Path(project_root).expanduser().resolve()
self.root.mkdir(parents=True, exist_ok=True)
self._log(f"Проект: {self.root}/", "INFO")
# Создание директорий
for dir_path in spec.get('directories', []):
full_path = self.root / dir_path
full_path.mkdir(parents=True, exist_ok=True)
full_path.chmod(0o755)
self._log(f"Папка: {dir_path}/ (755)", "SUCCESS")
self.stats["dirs"] += 1
# Создание файлов
for file_spec in spec.get('files', []):
if isinstance(file_spec, dict):
file_path = file_spec.get('path', '')
content = file_spec.get('content', '')
mode = file_spec.get('mode')
else:
file_path = file_spec
content = ''
mode = None
full_path = self.root / file_path
full_path.parent.mkdir(parents=True, exist_ok=True)
if content:
full_path.write_text(content, encoding='utf-8')
else:
full_path.touch(exist_ok=True)
final_mode = mode or self._get_smart_mode(full_path)
full_path.chmod(final_mode)
mode_str = oct(final_mode).replace('0o', '')
self._log(f"Файл: {file_path} ({mode_str})", "SUCCESS")
self.stats["files"] += 1
return True
except Exception as e:
self._log(f"Ошибка загрузки спецификации: {e}", "ERROR")
return False
def _process_template(self, template: str, target: str, variables: List[str]):
"""Обработка шаблонов файлов"""
template_path = Path(template)
if not template_path.exists():
self._log(f"Шаблон не найден: {template}", "ERROR")
return
content = template_path.read_text(encoding='utf-8')
# Замена переменных (простая реализация)
var_dict = {}
for var in variables:
if '=' in var:
key, value = var.split('=', 1)
var_dict[key.strip()] = value.strip()
for key, value in var_dict.items():
content = content.replace(f'{{{{{key}}}}}', value)
target_path = self.root / target
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_text(content, encoding='utf-8')
mode = self._get_smart_mode(target_path)
target_path.chmod(mode)
self._log(f"Шаблон: {template}{target}", "SUCCESS")
self.stats["files"] += 1
class RemoteDeployer:
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.results = []
async def deploy(self, host: str, user: str, spec_content: str,
key_path: Optional[str] = None, password: Optional[str] = None) -> Dict[str, Any]:
"""Деплой на удаленный сервер"""
result = {
"host": host,
"success": False,
"output": "",
"error": "",
"timestamp": datetime.now().isoformat()
}
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# Асинхронное подключение
loop = asyncio.get_event_loop()
connect_kwargs = {
"hostname": host,
"username": user,
"timeout": 15
}
if key_path:
connect_kwargs["key_filename"] = key_path
elif password:
connect_kwargs["password"] = password
await loop.run_in_executor(None, client.connect, **connect_kwargs)
# Создаем временную директорию
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
remote_dir = f"/tmp/pitools_{timestamp}"
sftp = await loop.run_in_executor(None, client.open_sftp)
# Загружаем pitools и спецификацию
current_file = Path(__file__)
pitools_code = current_file.read_text(encoding='utf-8')
def upload():
sftp.mkdir(remote_dir)
with sftp.open(f"{remote_dir}/pitools.py", 'w') as f:
f.write(pitools_code)
with sftp.open(f"{remote_dir}/spec.prj", 'w') as f:
f.write(spec_content)
sftp.chmod(f"{remote_dir}/pitools.py", 0o755)
await loop.run_in_executor(None, upload)
await loop.run_in_executor(None, sftp.close)
# Выполняем развертывание
def execute():
cmd = f"cd {remote_dir} && python3 pitools.py piproject spec.prj"
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read().decode('utf-8', errors='ignore')
error = stderr.read().decode('utf-8', errors='ignore')
return output, error
output, error = await loop.run_in_executor(None, execute)
# Очистка
clean_cmd = f"rm -rf {remote_dir}"
client.exec_command(clean_cmd)
result["success"] = True
result["output"] = output
if error:
result["error"] = error
self._log(f"{host}: Успешно", "SUCCESS")
except Exception as e:
result["error"] = str(e)
self._log(f"{host}: {e}", "ERROR")
finally:
try:
client.close()
except:
pass
self.results.append(result)
return result
def _log(self, message: str, level: str = "INFO"):
"""Логирование для RemoteDeployer"""
icons = {"SUCCESS": "", "ERROR": "", "INFO": ""}
if level in icons:
print(f"{icons[level]} {message}")
else:
print(message)
# CLI
async def main():
parser = argparse.ArgumentParser(
description=f"PITOOLS v{PITools.VERSION} — DevOps фреймворк на Python",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Примеры использования:
pitools pinstall file1.txt dir/ script.sh -s
pitools piproject project.prj
pitools deploypi server.com user project.prj
pitools multideploypi infra.prj
Форматы спецификаций:
.prj - Текстовый формат с командами
.json - JSON структура
.yaml - YAML структура (требует pyyaml)
"""
)
parser.add_argument('--version', action='version', version=f'PITOOLS v{PITools.VERSION}')
parser.add_argument('-v', '--verbose', action='store_true', help='Подробный вывод')
subparsers = parser.add_subparsers(dest='command', help='Команды')
# pinstall
p1 = subparsers.add_parser('pinstall', help='Создать файлы/папки')
p1.add_argument('items', nargs='+', help='Файлы/папки')
p1.add_argument('-s', '--script', action='store_true', help='Режим скрипта (755)')
p1.add_argument('-d', '--directory', action='store_true', help='Режим папки')
p1.add_argument('-m', '--mode', help='Права доступа (например: 755, 644)')
p1.add_argument('-c', '--content', help='Содержимое файла')
# piproject
p2 = subparsers.add_parser('piproject', help='Развёртывание проекта')
p2.add_argument('spec', help='Файл спецификации (.prj, .json, .yaml)')
p2.add_argument('-e', '--export-json', action='store_true', help='Экспортировать структуру в JSON')
p2.add_argument('-r', '--root', help='Корневая директория проекта')
# deploypi
p3 = subparsers.add_parser('deploypi', help='Деплой на сервер')
p3.add_argument('host', help='Хост')
p3.add_argument('user', help='Пользователь')
p3.add_argument('spec', help='Файл спецификации или содержимое')
p3.add_argument('-k', '--key', help='Путь к SSH ключу')
p3.add_argument('-p', '--password', action='store_true', help='Использовать парольную аутентификацию')
# multideploypi
p4 = subparsers.add_parser('multideploypi', help='Мультидеплой')
p4.add_argument('spec_or_infra', help='Файл спецификации или инфраструктуры')
p4.add_argument('-i', '--infra', help='Файл инфраструктуры (опционально)')
p4.add_argument('-u', '--user', help='Пользователь для всех хостов')
p4.add_argument('-k', '--key', help='Путь к SSH ключу')
p4.add_argument('-o', '--output', help='Файл для сохранения результатов')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
try:
if args.command == 'pinstall':
root_dir = Path(args.root) if hasattr(args, 'root') and args.root else None
pitools = PITools(root=root_dir, verbose=args.verbose)
mode = None
if args.mode:
try:
mode = int(args.mode, 8)
except ValueError:
print(f"❌ Неверный формат прав: {args.mode}")
return
pitools.pinstall(
args.items,
mode=mode or (0o755 if args.script else None),
is_dir=args.directory,
content=args.content
)
elif args.command == 'piproject':
root_dir = Path(args.root) if hasattr(args, 'root') and args.root else None
pitools = PITools(root=root_dir, verbose=args.verbose)
pitools.piproject(args.spec, args.export_json)
elif args.command == 'deploypi':
spec_path = Path(args.spec)
if spec_path.exists():
spec_content = spec_path.read_text(encoding='utf-8')
else:
spec_content = args.spec
password = None
if args.password:
password = getpass.getpass(f"Пароль для {args.user}@{args.host}: ")
deployer = RemoteDeployer(verbose=args.verbose)
result = await deployer.deploy(
args.host,
args.user,
spec_content,
key_path=args.key,
password=password
)
if args.verbose and result["output"]:
print(f"\n📋 Вывод с сервера:\n{result['output']}")
elif args.command == 'multideploypi':
await run_multideploy(args)
except KeyboardInterrupt:
print("\n\n👋 Прервано пользователем")
sys.exit(0)
except Exception as e:
print(f"💥 Критическая ошибка: {e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)
async def run_multideploy(args):
"""Запуск мультидеплоя"""
print("🌍 МУЛЬТИДЕПЛОЙ")
infra_file = args.infra if args.infra else None
# Определяем пользователя
user = args.user
if not user:
user = input("Пользователь (по умолчанию текущий): ").strip() or getpass.getuser()
# Парсинг инфраструктуры
if infra_file and Path(infra_file).exists():
blocks = parse_infra(infra_file)
else:
spec_path = Path(args.spec_or_infra)
if spec_path.exists():
spec_content = spec_path.read_text(encoding='utf-8')
else:
spec_content = args.spec_or_infra
blocks = [{"hosts": [args.spec_or_infra], "spec": spec_content}]
deployer = RemoteDeployer(verbose=args.verbose)
all_results = []
for i, block in enumerate(blocks, 1):
print(f"\n🔧 Блок {i}{len(block['hosts'])} хостов")
tasks = []
for host in block['hosts']:
task = deployer.deploy(
host,
user,
block['spec'],
key_path=args.key,
password=None # Пароль запрашивается при необходимости
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = 0
for result in results:
if isinstance(result, dict) and result.get("success"):
successful += 1
all_results.append(result)
print(f"✅ Блок {i}: {successful}/{len(tasks)} готово!")
# Сохранение результатов
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(all_results, f, indent=2, ensure_ascii=False)
print(f"\n📊 Результаты сохранены в: {args.output}")
def parse_infra(spec_file: str) -> List[Dict[str, Any]]:
"""Парсинг infra.prj для multideploypi"""
blocks = []
current_hosts = []
current_spec = []
with open(spec_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if line.startswith('MLT:'):
if current_hosts and current_spec:
blocks.append({"hosts": current_hosts, "spec": '\n'.join(current_spec)})
current_hosts = [h.strip() for h in line[4:].split(',')]
current_spec = []
else:
current_spec.append(line)
if current_hosts and current_spec:
blocks.append({"hosts": current_hosts, "spec": '\n'.join(current_spec)})
return blocks
if __name__ == '__main__':
asyncio.run(main())