637 lines
No EOL
26 KiB
Python
Executable file
637 lines
No EOL
26 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
"""
|
||
PITOOLS v4 — DevOps фреймворк на Python
|
||
pinstall, piproject, deploypi, multideploypi
|
||
Для бабушек и дедушек! 🔥
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import shlex
|
||
import sys
|
||
import getpass
|
||
import json
|
||
import yaml
|
||
from typing import List, Dict, Any, Optional, Union
|
||
from pathlib import Path
|
||
import paramiko
|
||
from io import StringIO
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
|
||
# Try to import yaml, fallback if not available
|
||
try:
|
||
import yaml
|
||
YAML_AVAILABLE = True
|
||
except ImportError:
|
||
YAML_AVAILABLE = False
|
||
|
||
@dataclass
|
||
class FileSpec:
|
||
path: Path
|
||
mode: int
|
||
content: Optional[str] = None
|
||
is_dir: bool = False
|
||
|
||
class PITools:
|
||
VERSION = "4.1.0"
|
||
|
||
def __init__(self, root: Path = None, verbose: bool = False):
|
||
self.root = root or Path.cwd()
|
||
self.curdir = self.root
|
||
self.verbose = verbose
|
||
self.stats = {"files": 0, "dirs": 0, "errors": 0}
|
||
|
||
def _log(self, message: str, level: str = "INFO"):
|
||
"""Умное логирование"""
|
||
icons = {
|
||
"INFO": "ℹ️",
|
||
"SUCCESS": "✅",
|
||
"WARNING": "⚠️",
|
||
"ERROR": "❌",
|
||
"DEBUG": "🐛"
|
||
}
|
||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||
if level in icons:
|
||
print(f"{icons[level]} [{timestamp}] {message}")
|
||
else:
|
||
print(f"[{timestamp}] {message}")
|
||
|
||
def _get_smart_mode(self, filepath: Path) -> int:
|
||
"""Определение прав доступа по расширению файла"""
|
||
if filepath.is_dir():
|
||
return 0o755
|
||
|
||
ext = filepath.suffix.lower()
|
||
|
||
# Исполняемые файлы
|
||
if ext in ['.sh', '.bash', '.zsh', '.py', '.pl', '.rb', '.php', '.js', '.ts']:
|
||
return 0o755
|
||
|
||
# Приватные файлы
|
||
if ext in ['.key', '.pem', '.crt', '.pub', '.priv']:
|
||
return 0o600
|
||
|
||
# Конфиденциальные данные
|
||
if ext in ['.env', '.secret', '.token', '.password', '.credential']:
|
||
return 0o640
|
||
|
||
# Конфигурационные файлы
|
||
if ext in ['.conf', '.cfg', '.ini', '.config', '.yml', '.yaml', '.toml']:
|
||
return 0o644
|
||
|
||
# Логи и данные
|
||
if ext in ['.log', '.db', '.sqlite', '.json', '.xml']:
|
||
return 0o644
|
||
|
||
# По умолчанию
|
||
return 0o644
|
||
|
||
def pinstall(self, items: List[str], mode: Optional[int] = None,
|
||
is_dir: bool = False, content: Optional[str] = None) -> Dict[str, int]:
|
||
"""Создание файлов/папок с умными правами"""
|
||
self.stats = {"files": 0, "dirs": 0, "errors": 0}
|
||
|
||
for item in items:
|
||
if not item.strip():
|
||
continue
|
||
|
||
try:
|
||
p = self.root / item.lstrip('/')
|
||
|
||
if is_dir or item.endswith('/'):
|
||
p.mkdir(parents=True, exist_ok=True)
|
||
final_mode = mode or 0o755
|
||
p.chmod(final_mode)
|
||
self._log(f"Папка: {item.strip()} ({oct(final_mode)})", "SUCCESS")
|
||
self.stats["dirs"] += 1
|
||
|
||
else:
|
||
p.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
if content:
|
||
p.write_text(content, encoding='utf-8')
|
||
else:
|
||
p.touch(exist_ok=True)
|
||
|
||
final_mode = mode or self._get_smart_mode(p)
|
||
p.chmod(final_mode)
|
||
|
||
self._log(f"Файл: {item.strip()} ({oct(final_mode)})", "SUCCESS")
|
||
self.stats["files"] += 1
|
||
|
||
except Exception as e:
|
||
self._log(f"Ошибка создания {item}: {e}", "ERROR")
|
||
self.stats["errors"] += 1
|
||
|
||
self._log(f"Создано: {self.stats['files']} файлов, {self.stats['dirs']} папок, ошибок: {self.stats['errors']}", "INFO")
|
||
return self.stats
|
||
|
||
def piproject(self, spec_file: str, export_json: bool = False) -> bool:
|
||
"""Развёртывание проекта по .prj"""
|
||
self._log(f"Читаем спецификацию: {spec_file}")
|
||
|
||
spec_path = Path(spec_file)
|
||
if not spec_path.exists():
|
||
self._log(f"Файл не найден: {spec_file}", "ERROR")
|
||
return False
|
||
|
||
original_root = self.root
|
||
project_structure = []
|
||
|
||
try:
|
||
with open(spec_file, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
# Поддержка разных форматов
|
||
if spec_path.suffix.lower() in ['.json', '.yaml', '.yml']:
|
||
return self._load_structured_spec(spec_path, export_json)
|
||
|
||
# Стандартный .prj формат
|
||
lines = content.split('\n')
|
||
|
||
for line_num, line in enumerate(lines, 1):
|
||
line = line.strip()
|
||
if not line or line.startswith('#'):
|
||
if line.startswith('#') and self.verbose:
|
||
self._log(f"Комментарий: {line[1:].strip()}", "DEBUG")
|
||
continue
|
||
|
||
parts = shlex.split(line)
|
||
if not parts:
|
||
continue
|
||
|
||
cmd = parts[0]
|
||
args = parts[1:]
|
||
|
||
try:
|
||
if cmd == 'PRJ:':
|
||
if args:
|
||
self.root = Path(args[0]).expanduser().resolve()
|
||
self.curdir = self.root
|
||
self.root.mkdir(parents=True, exist_ok=True)
|
||
self._log(f"Проект: {self.root}/", "INFO")
|
||
|
||
elif cmd == 'DR:':
|
||
for d in args:
|
||
d = d.rstrip('/')
|
||
full_path = self.root / d
|
||
full_path.mkdir(parents=True, exist_ok=True)
|
||
full_path.chmod(0o755)
|
||
project_structure.append({"type": "dir", "path": str(full_path), "mode": "755"})
|
||
self._log(f"Папка: {d}/ (755)", "SUCCESS")
|
||
self.stats["dirs"] += 1
|
||
self.curdir = full_path
|
||
|
||
elif cmd == 'FL:':
|
||
for fname in args:
|
||
full_path = self.root / fname
|
||
full_path.parent.mkdir(parents=True, exist_ok=True)
|
||
full_path.touch(exist_ok=True)
|
||
|
||
mode = self._get_smart_mode(full_path)
|
||
full_path.chmod(mode)
|
||
|
||
project_structure.append({
|
||
"type": "file",
|
||
"path": str(full_path),
|
||
"mode": oct(mode),
|
||
"extension": full_path.suffix
|
||
})
|
||
|
||
mode_str = oct(mode).replace('0o', '')
|
||
self._log(f"Файл: {fname} ({mode_str})", "SUCCESS")
|
||
self.stats["files"] += 1
|
||
|
||
elif cmd == 'CMD:':
|
||
# Выполнение команд после создания структуры
|
||
if args:
|
||
cmd_str = ' '.join(args)
|
||
self._log(f"Выполняем: {cmd_str}", "INFO")
|
||
|
||
elif cmd == 'TPL:':
|
||
# Шаблоны файлов с переменными
|
||
if len(args) >= 2:
|
||
template_file = args[0]
|
||
target_file = args[1]
|
||
variables = args[2:] if len(args) > 2 else []
|
||
|
||
self._process_template(template_file, target_file, variables)
|
||
|
||
except Exception as e:
|
||
self._log(f"Строка {line_num}: {e}", "ERROR")
|
||
self.stats["errors"] += 1
|
||
|
||
except Exception as e:
|
||
self._log(f"Ошибка чтения файла: {e}", "ERROR")
|
||
return False
|
||
|
||
finally:
|
||
self.root = original_root
|
||
|
||
# Экспорт структуры в JSON
|
||
if export_json:
|
||
json_file = f"{spec_path.stem}_structure.json"
|
||
with open(json_file, 'w', encoding='utf-8') as f:
|
||
json.dump(project_structure, f, indent=2, ensure_ascii=False)
|
||
self._log(f"Структура экспортирована в: {json_file}", "INFO")
|
||
|
||
self._log(f"Готово! Создано: {self.stats['files']} файлов, {self.stats['dirs']} папок", "SUCCESS")
|
||
return True
|
||
|
||
def _load_structured_spec(self, spec_path: Path, export_json: bool) -> bool:
|
||
"""Загрузка структурированных спецификаций (JSON/YAML)"""
|
||
try:
|
||
if spec_path.suffix.lower() == '.json':
|
||
with open(spec_path, 'r', encoding='utf-8') as f:
|
||
spec = json.load(f)
|
||
elif spec_path.suffix.lower() in ['.yaml', '.yml']:
|
||
if not YAML_AVAILABLE:
|
||
self._log("YAML не установлен. Установите: pip install pyyaml", "ERROR")
|
||
return False
|
||
with open(spec_path, 'r', encoding='utf-8') as f:
|
||
spec = yaml.safe_load(f)
|
||
else:
|
||
self._log(f"Неподдерживаемый формат: {spec_path.suffix}", "ERROR")
|
||
return False
|
||
|
||
# Обработка структурированной спецификации
|
||
if 'project' in spec:
|
||
project_root = spec.get('project', {}).get('root', '.')
|
||
self.root = Path(project_root).expanduser().resolve()
|
||
self.root.mkdir(parents=True, exist_ok=True)
|
||
self._log(f"Проект: {self.root}/", "INFO")
|
||
|
||
# Создание директорий
|
||
for dir_path in spec.get('directories', []):
|
||
full_path = self.root / dir_path
|
||
full_path.mkdir(parents=True, exist_ok=True)
|
||
full_path.chmod(0o755)
|
||
self._log(f"Папка: {dir_path}/ (755)", "SUCCESS")
|
||
self.stats["dirs"] += 1
|
||
|
||
# Создание файлов
|
||
for file_spec in spec.get('files', []):
|
||
if isinstance(file_spec, dict):
|
||
file_path = file_spec.get('path', '')
|
||
content = file_spec.get('content', '')
|
||
mode = file_spec.get('mode')
|
||
else:
|
||
file_path = file_spec
|
||
content = ''
|
||
mode = None
|
||
|
||
full_path = self.root / file_path
|
||
full_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
if content:
|
||
full_path.write_text(content, encoding='utf-8')
|
||
else:
|
||
full_path.touch(exist_ok=True)
|
||
|
||
final_mode = mode or self._get_smart_mode(full_path)
|
||
full_path.chmod(final_mode)
|
||
|
||
mode_str = oct(final_mode).replace('0o', '')
|
||
self._log(f"Файл: {file_path} ({mode_str})", "SUCCESS")
|
||
self.stats["files"] += 1
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self._log(f"Ошибка загрузки спецификации: {e}", "ERROR")
|
||
return False
|
||
|
||
def _process_template(self, template: str, target: str, variables: List[str]):
|
||
"""Обработка шаблонов файлов"""
|
||
template_path = Path(template)
|
||
if not template_path.exists():
|
||
self._log(f"Шаблон не найден: {template}", "ERROR")
|
||
return
|
||
|
||
content = template_path.read_text(encoding='utf-8')
|
||
|
||
# Замена переменных (простая реализация)
|
||
var_dict = {}
|
||
for var in variables:
|
||
if '=' in var:
|
||
key, value = var.split('=', 1)
|
||
var_dict[key.strip()] = value.strip()
|
||
|
||
for key, value in var_dict.items():
|
||
content = content.replace(f'{{{{{key}}}}}', value)
|
||
|
||
target_path = self.root / target
|
||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||
target_path.write_text(content, encoding='utf-8')
|
||
|
||
mode = self._get_smart_mode(target_path)
|
||
target_path.chmod(mode)
|
||
|
||
self._log(f"Шаблон: {template} → {target}", "SUCCESS")
|
||
self.stats["files"] += 1
|
||
|
||
class RemoteDeployer:
|
||
def __init__(self, verbose: bool = False):
|
||
self.verbose = verbose
|
||
self.results = []
|
||
|
||
async def deploy(self, host: str, user: str, spec_content: str,
|
||
key_path: Optional[str] = None, password: Optional[str] = None) -> Dict[str, Any]:
|
||
"""Деплой на удаленный сервер"""
|
||
result = {
|
||
"host": host,
|
||
"success": False,
|
||
"output": "",
|
||
"error": "",
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
client = paramiko.SSHClient()
|
||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
|
||
try:
|
||
# Асинхронное подключение
|
||
loop = asyncio.get_event_loop()
|
||
connect_kwargs = {
|
||
"hostname": host,
|
||
"username": user,
|
||
"timeout": 15
|
||
}
|
||
|
||
if key_path:
|
||
connect_kwargs["key_filename"] = key_path
|
||
elif password:
|
||
connect_kwargs["password"] = password
|
||
|
||
await loop.run_in_executor(None, client.connect, **connect_kwargs)
|
||
|
||
# Создаем временную директорию
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
remote_dir = f"/tmp/pitools_{timestamp}"
|
||
|
||
sftp = await loop.run_in_executor(None, client.open_sftp)
|
||
|
||
# Загружаем pitools и спецификацию
|
||
current_file = Path(__file__)
|
||
pitools_code = current_file.read_text(encoding='utf-8')
|
||
|
||
def upload():
|
||
sftp.mkdir(remote_dir)
|
||
|
||
with sftp.open(f"{remote_dir}/pitools.py", 'w') as f:
|
||
f.write(pitools_code)
|
||
|
||
with sftp.open(f"{remote_dir}/spec.prj", 'w') as f:
|
||
f.write(spec_content)
|
||
|
||
sftp.chmod(f"{remote_dir}/pitools.py", 0o755)
|
||
|
||
await loop.run_in_executor(None, upload)
|
||
await loop.run_in_executor(None, sftp.close)
|
||
|
||
# Выполняем развертывание
|
||
def execute():
|
||
cmd = f"cd {remote_dir} && python3 pitools.py piproject spec.prj"
|
||
stdin, stdout, stderr = client.exec_command(cmd)
|
||
output = stdout.read().decode('utf-8', errors='ignore')
|
||
error = stderr.read().decode('utf-8', errors='ignore')
|
||
return output, error
|
||
|
||
output, error = await loop.run_in_executor(None, execute)
|
||
|
||
# Очистка
|
||
clean_cmd = f"rm -rf {remote_dir}"
|
||
client.exec_command(clean_cmd)
|
||
|
||
result["success"] = True
|
||
result["output"] = output
|
||
if error:
|
||
result["error"] = error
|
||
|
||
self._log(f"✅ {host}: Успешно", "SUCCESS")
|
||
|
||
except Exception as e:
|
||
result["error"] = str(e)
|
||
self._log(f"❌ {host}: {e}", "ERROR")
|
||
|
||
finally:
|
||
try:
|
||
client.close()
|
||
except:
|
||
pass
|
||
|
||
self.results.append(result)
|
||
return result
|
||
|
||
def _log(self, message: str, level: str = "INFO"):
|
||
"""Логирование для RemoteDeployer"""
|
||
icons = {"SUCCESS": "✅", "ERROR": "❌", "INFO": "ℹ️"}
|
||
if level in icons:
|
||
print(f"{icons[level]} {message}")
|
||
else:
|
||
print(message)
|
||
|
||
# CLI
|
||
async def main():
|
||
parser = argparse.ArgumentParser(
|
||
description=f"PITOOLS v{PITools.VERSION} — DevOps фреймворк на Python",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Примеры использования:
|
||
pitools pinstall file1.txt dir/ script.sh -s
|
||
pitools piproject project.prj
|
||
pitools deploypi server.com user project.prj
|
||
pitools multideploypi infra.prj
|
||
|
||
Форматы спецификаций:
|
||
.prj - Текстовый формат с командами
|
||
.json - JSON структура
|
||
.yaml - YAML структура (требует pyyaml)
|
||
"""
|
||
)
|
||
|
||
parser.add_argument('--version', action='version', version=f'PITOOLS v{PITools.VERSION}')
|
||
parser.add_argument('-v', '--verbose', action='store_true', help='Подробный вывод')
|
||
|
||
subparsers = parser.add_subparsers(dest='command', help='Команды')
|
||
|
||
# pinstall
|
||
p1 = subparsers.add_parser('pinstall', help='Создать файлы/папки')
|
||
p1.add_argument('items', nargs='+', help='Файлы/папки')
|
||
p1.add_argument('-s', '--script', action='store_true', help='Режим скрипта (755)')
|
||
p1.add_argument('-d', '--directory', action='store_true', help='Режим папки')
|
||
p1.add_argument('-m', '--mode', help='Права доступа (например: 755, 644)')
|
||
p1.add_argument('-c', '--content', help='Содержимое файла')
|
||
|
||
# piproject
|
||
p2 = subparsers.add_parser('piproject', help='Развёртывание проекта')
|
||
p2.add_argument('spec', help='Файл спецификации (.prj, .json, .yaml)')
|
||
p2.add_argument('-e', '--export-json', action='store_true', help='Экспортировать структуру в JSON')
|
||
p2.add_argument('-r', '--root', help='Корневая директория проекта')
|
||
|
||
# deploypi
|
||
p3 = subparsers.add_parser('deploypi', help='Деплой на сервер')
|
||
p3.add_argument('host', help='Хост')
|
||
p3.add_argument('user', help='Пользователь')
|
||
p3.add_argument('spec', help='Файл спецификации или содержимое')
|
||
p3.add_argument('-k', '--key', help='Путь к SSH ключу')
|
||
p3.add_argument('-p', '--password', action='store_true', help='Использовать парольную аутентификацию')
|
||
|
||
# multideploypi
|
||
p4 = subparsers.add_parser('multideploypi', help='Мультидеплой')
|
||
p4.add_argument('spec_or_infra', help='Файл спецификации или инфраструктуры')
|
||
p4.add_argument('-i', '--infra', help='Файл инфраструктуры (опционально)')
|
||
p4.add_argument('-u', '--user', help='Пользователь для всех хостов')
|
||
p4.add_argument('-k', '--key', help='Путь к SSH ключу')
|
||
p4.add_argument('-o', '--output', help='Файл для сохранения результатов')
|
||
|
||
args = parser.parse_args()
|
||
|
||
if not args.command:
|
||
parser.print_help()
|
||
return
|
||
|
||
try:
|
||
if args.command == 'pinstall':
|
||
root_dir = Path(args.root) if hasattr(args, 'root') and args.root else None
|
||
pitools = PITools(root=root_dir, verbose=args.verbose)
|
||
|
||
mode = None
|
||
if args.mode:
|
||
try:
|
||
mode = int(args.mode, 8)
|
||
except ValueError:
|
||
print(f"❌ Неверный формат прав: {args.mode}")
|
||
return
|
||
|
||
pitools.pinstall(
|
||
args.items,
|
||
mode=mode or (0o755 if args.script else None),
|
||
is_dir=args.directory,
|
||
content=args.content
|
||
)
|
||
|
||
elif args.command == 'piproject':
|
||
root_dir = Path(args.root) if hasattr(args, 'root') and args.root else None
|
||
pitools = PITools(root=root_dir, verbose=args.verbose)
|
||
pitools.piproject(args.spec, args.export_json)
|
||
|
||
elif args.command == 'deploypi':
|
||
spec_path = Path(args.spec)
|
||
if spec_path.exists():
|
||
spec_content = spec_path.read_text(encoding='utf-8')
|
||
else:
|
||
spec_content = args.spec
|
||
|
||
password = None
|
||
if args.password:
|
||
password = getpass.getpass(f"Пароль для {args.user}@{args.host}: ")
|
||
|
||
deployer = RemoteDeployer(verbose=args.verbose)
|
||
result = await deployer.deploy(
|
||
args.host,
|
||
args.user,
|
||
spec_content,
|
||
key_path=args.key,
|
||
password=password
|
||
)
|
||
|
||
if args.verbose and result["output"]:
|
||
print(f"\n📋 Вывод с сервера:\n{result['output']}")
|
||
|
||
elif args.command == 'multideploypi':
|
||
await run_multideploy(args)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\n👋 Прервано пользователем")
|
||
sys.exit(0)
|
||
except Exception as e:
|
||
print(f"💥 Критическая ошибка: {e}")
|
||
if args.verbose:
|
||
import traceback
|
||
traceback.print_exc()
|
||
sys.exit(1)
|
||
|
||
async def run_multideploy(args):
|
||
"""Запуск мультидеплоя"""
|
||
print("🌍 МУЛЬТИДЕПЛОЙ")
|
||
|
||
infra_file = args.infra if args.infra else None
|
||
|
||
# Определяем пользователя
|
||
user = args.user
|
||
if not user:
|
||
user = input("Пользователь (по умолчанию текущий): ").strip() or getpass.getuser()
|
||
|
||
# Парсинг инфраструктуры
|
||
if infra_file and Path(infra_file).exists():
|
||
blocks = parse_infra(infra_file)
|
||
else:
|
||
spec_path = Path(args.spec_or_infra)
|
||
if spec_path.exists():
|
||
spec_content = spec_path.read_text(encoding='utf-8')
|
||
else:
|
||
spec_content = args.spec_or_infra
|
||
blocks = [{"hosts": [args.spec_or_infra], "spec": spec_content}]
|
||
|
||
deployer = RemoteDeployer(verbose=args.verbose)
|
||
all_results = []
|
||
|
||
for i, block in enumerate(blocks, 1):
|
||
print(f"\n🔧 Блок {i} → {len(block['hosts'])} хостов")
|
||
|
||
tasks = []
|
||
for host in block['hosts']:
|
||
task = deployer.deploy(
|
||
host,
|
||
user,
|
||
block['spec'],
|
||
key_path=args.key,
|
||
password=None # Пароль запрашивается при необходимости
|
||
)
|
||
tasks.append(task)
|
||
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
successful = 0
|
||
for result in results:
|
||
if isinstance(result, dict) and result.get("success"):
|
||
successful += 1
|
||
all_results.append(result)
|
||
|
||
print(f"✅ Блок {i}: {successful}/{len(tasks)} готово!")
|
||
|
||
# Сохранение результатов
|
||
if args.output:
|
||
with open(args.output, 'w', encoding='utf-8') as f:
|
||
json.dump(all_results, f, indent=2, ensure_ascii=False)
|
||
print(f"\n📊 Результаты сохранены в: {args.output}")
|
||
|
||
def parse_infra(spec_file: str) -> List[Dict[str, Any]]:
|
||
"""Парсинг infra.prj для multideploypi"""
|
||
blocks = []
|
||
current_hosts = []
|
||
current_spec = []
|
||
|
||
with open(spec_file, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith('#'):
|
||
continue
|
||
|
||
if line.startswith('MLT:'):
|
||
if current_hosts and current_spec:
|
||
blocks.append({"hosts": current_hosts, "spec": '\n'.join(current_spec)})
|
||
current_hosts = [h.strip() for h in line[4:].split(',')]
|
||
current_spec = []
|
||
else:
|
||
current_spec.append(line)
|
||
|
||
if current_hosts and current_spec:
|
||
blocks.append({"hosts": current_hosts, "spec": '\n'.join(current_spec)})
|
||
|
||
return blocks
|
||
|
||
if __name__ == '__main__':
|
||
asyncio.run(main()) |