from fastapi import FastAPI, Form, Request, HTTPException from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse import os, json, secrets, uuid, time, shlex, subprocess, signal, asyncio from typing import Dict, Any, List, Optional app = FastAPI() BASE_DIR = os.path.dirname(__file__) DATA_DIR = os.path.join(BASE_DIR, "data") DATA_FILE = os.path.join(DATA_DIR, "bots.json") SESS_FILE = os.path.join(DATA_DIR, "sessions.json") RUNTIME_FILE = os.path.join(DATA_DIR, "runtime.json") ADMIN_USER = "superadmin" ADMIN_PASS = "a9D2!3kP#Q111x@!" VERSION = "V8-FIXED" # ✅ 修复进程杀不掉问题 + ✅ session 持久化 + ✅ 启停按钮 # -------------------- sessions (persist) -------------------- def load_sessions() -> Dict[str, int]: os.makedirs(DATA_DIR, exist_ok=True) if not os.path.exists(SESS_FILE): with open(SESS_FILE, "w", encoding="utf-8") as f: json.dump({}, f) try: with open(SESS_FILE, "r", encoding="utf-8") as f: return json.load(f) # token -> expire_ts except Exception: return {} def save_sessions(s: Dict[str, int]) -> None: os.makedirs(DATA_DIR, exist_ok=True) with open(SESS_FILE, "w", encoding="utf-8") as f: json.dump(s, f, ensure_ascii=False, indent=2) SESSIONS: Dict[str, int] = load_sessions() # token -> expire_ts # -------------------- runtime (persist) -------------------- def load_runtime() -> Dict[str, Any]: os.makedirs(DATA_DIR, exist_ok=True) if not os.path.exists(RUNTIME_FILE): with open(RUNTIME_FILE, "w", encoding="utf-8") as f: json.dump({}, f) try: with open(RUNTIME_FILE, "r", encoding="utf-8") as f: return json.load(f) # bot_id -> {pid, started_at} except Exception: return {} def save_runtime(rt: Dict[str, Any]) -> None: os.makedirs(DATA_DIR, exist_ok=True) with open(RUNTIME_FILE, "w", encoding="utf-8") as f: json.dump(rt, f, ensure_ascii=False, indent=2) RUNTIME: Dict[str, Any] = load_runtime() def _pid_alive(pid: int) -> bool: try: os.kill(pid, 0) return True except Exception: return False def get_status(bot_id: str) -> Dict[str, Any]: # 每次读最新 runtime(避免多进程/重启不同步) global RUNTIME RUNTIME = load_runtime() info = RUNTIME.get(bot_id) or {} pid = info.get("pid") if isinstance(pid, int) and pid > 0 and _pid_alive(pid): return {"running": True, "pid": pid, "started_at": info.get("started_at", 0)} # 清理死 pid if bot_id in RUNTIME: RUNTIME.pop(bot_id, None) save_runtime(RUNTIME) return {"running": False, "pid": None, "started_at": 0} def safe_parse_cmd(cmd: str) -> List[str]: """ 只允许纯命令参数,不允许 ; | & > < 这些 shell 拼接符。 用 shlex.split 解析,且 subprocess 不使用 shell=True。 """ cmd = (cmd or "").strip() if not cmd: raise ValueError("启动命令为空") bad = [";", "|", "&", ">", "<", "`"] if any(x in cmd for x in bad): raise ValueError("启动命令包含危险符号(禁止 ; | & > < `)") args = shlex.split(cmd) if not args: raise ValueError("启动命令解析失败") return args # -------------------- storage -------------------- def _ensure_data(): os.makedirs(DATA_DIR, exist_ok=True) if not os.path.exists(DATA_FILE): seed = { "bots": { "bot1": { "name": "Bot 1", "port": 8001, "cmd": "python3 bot1.py", # 示例启动命令 "welcome_text": "欢迎使用机器人", "menu_items": [ {"id": "m1", "parent_id": None, "title": "开始", "type": "submenu", "value": ""}, {"id": "m2", "parent_id": "m1", "title": "发送文案", "type": "text", "value": "你好,这里是二级菜单文案"}, {"id": "m3", "parent_id": "m1", "title": "打开链接", "type": "url", "value": "https://t.me/"}, ], } } } with open(DATA_FILE, "w", encoding="utf-8") as f: json.dump(seed, f, ensure_ascii=False, indent=2) def load_db() -> Dict[str, Any]: _ensure_data() with open(DATA_FILE, "r", encoding="utf-8") as f: return json.load(f) def save_db(db: Dict[str, Any]) -> None: _ensure_data() with open(DATA_FILE, "w", encoding="utf-8") as f: json.dump(db, f, ensure_ascii=False, indent=2) # -------------------- auth -------------------- def is_logged_in(request: Request) -> bool: token = request.cookies.get("bc_token") if not token: return False exp = SESSIONS.get(token) if not exp: return False if int(time.time()) > int(exp): SESSIONS.pop(token, None) save_sessions(SESSIONS) return False return True # -------------------- UI shell -------------------- CSS = """ :root{ --bg:#0b1220; --card:#0f1a2e; --muted:#9aa7bd; --text:#e8eefc; --line:rgba(255,255,255,0.10); --brand:#4f8cff; --danger:#ff5b5b; --ok:#35d07f; } *{box-sizing:border-box;} body{margin:0;font-family:ui-sans-serif,system-ui,-apple-system,"Segoe UI",Roboto,Arial;background:linear-gradient(180deg,#0b1220,#050913);color:var(--text);} a{color:var(--brand);text-decoration:none;} .wrap{display:flex;min-height:100vh;} .side{width:240px;padding:18px;border-right:1px solid var(--line);background:rgba(255,255,255,0.02);} .brand{font-size:18px;font-weight:800;letter-spacing:.2px;} .small{color:var(--muted);font-size:12px;margin-top:6px;} .nav{margin-top:16px;display:flex;flex-direction:column;gap:8px;} .nav a{padding:10px 12px;border:1px solid var(--line);border-radius:10px;color:var(--text);background:rgba(255,255,255,0.02);} .nav a:hover{border-color:rgba(79,140,255,.6);} .main{flex:1;padding:22px;} .top{display:flex;justify-content:space-between;align-items:center;gap:12px;margin-bottom:14px;} .h1{font-size:20px;font-weight:800;} .badge{font-size:12px;color:var(--muted);} .card{background:rgba(255,255,255,0.03);border:1px solid var(--line);border-radius:16px;padding:16px;margin-top:12px;} .row{display:flex;gap:12px;flex-wrap:wrap;align-items:center;} .btn{display:inline-flex;align-items:center;justify-content:center;gap:8px;padding:9px 12px;border-radius:12px;border:1px solid var(--line);background:rgba(255,255,255,0.03);color:var(--text);cursor:pointer;} .btn:hover{border-color:rgba(79,140,255,.6);} .btn.primary{background:rgba(79,140,255,.16);border-color:rgba(79,140,255,.5);} .btn.danger{background:rgba(255,91,91,.12);border-color:rgba(255,91,91,.45);} .btn.ok{background:rgba(53,208,127,.12);border-color:rgba(53,208,127,.45);} .table{width:100%;border-collapse:collapse;margin-top:10px;} .table th,.table td{border-bottom:1px solid var(--line);padding:12px;text-align:left;vertical-align:top;} .table th{color:var(--muted);font-weight:700;font-size:12px;} .input, textarea, select{width:100%;max-width:720px;padding:10px 12px;border-radius:12px;border:1px solid var(--line);background:rgba(255,255,255,0.02);color:var(--text);outline:none;} textarea{min-height:160px;font-family:ui-monospace,SFMono-Regular,Menlo,Consolas,monospace;} .grid2{display:grid;grid-template-columns:1fr 1fr;gap:12px;align-items:start;} .muted{color:var(--muted);font-size:12px;} .pill{display:inline-block;padding:3px 8px;border:1px solid var(--line);border-radius:999px;font-size:12px;color:var(--muted);} .pill.ok{border-color:rgba(53,208,127,.55); color:rgba(53,208,127,.95);} .pill.bad{border-color:rgba(255,91,91,.55); color:rgba(255,91,91,.95);} .hr{height:1px;background:var(--line);margin:14px 0;} .kbd{font-family:ui-monospace,Consolas; padding:2px 6px;border:1px solid var(--line);border-radius:8px;color:var(--muted);} """ def shell(title: str, body: str) -> str: return f""" {title}
{body}
""" # -------------------- routes basic -------------------- @app.get("/health", response_class=HTMLResponse) async def health(): return HTMLResponse(f"OK - bot-center {VERSION}") @app.get("/login", response_class=HTMLResponse) async def login_page(): return f""" Login

Bot Center Login ({VERSION})

用户名
密码
""" @app.post("/login") async def login_action(username: str = Form(...), password: str = Form(...)): if username != ADMIN_USER or password != ADMIN_PASS: return HTMLResponse("账号或密码错误
返回", status_code=401) token = secrets.token_urlsafe(32) expire = int(time.time()) + 60 * 60 * 24 * 7 # 7天有效 SESSIONS[token] = expire save_sessions(SESSIONS) resp = RedirectResponse("/admin", status_code=302) resp.set_cookie("bc_token", token, httponly=True, samesite="lax", max_age=60 * 60 * 24 * 7) return resp @app.get("/logout") async def logout(request: Request): token = request.cookies.get("bc_token") if token and token in SESSIONS: del SESSIONS[token] save_sessions(SESSIONS) resp = RedirectResponse("/login", status_code=302) resp.delete_cookie("bc_token") return resp # -------------------- helpers for menu -------------------- def menu_depth(items: List[Dict[str, Any]], item_id: str) -> int: mp = {x["id"]: x for x in items} d = 1 cur = mp.get(item_id) while cur and cur.get("parent_id"): d += 1 cur = mp.get(cur["parent_id"]) if d > 10: break return d def children_of(items: List[Dict[str, Any]], parent_id: Optional[str]) -> List[Dict[str, Any]]: return [x for x in items if x.get("parent_id") == parent_id] def render_menu_tree(items: List[Dict[str, Any]], parent_id: Optional[str], level: int) -> str: rows = [] for x in children_of(items, parent_id): t = x.get("type", "submenu") typemap = {"submenu":"打开子菜单","text":"发送文本","url":"打开链接"} badge = typemap.get(t, t) indent = " " * (level - 1) * 6 rows.append(f""" {indent}{x.get("title","")} {badge} {(x.get("value","") or "-")} 编辑
""") if level < 3: rows.append(render_menu_tree(items, x["id"], level + 1)) return "".join(rows) # -------------------- admin pages -------------------- @app.get("/admin", response_class=HTMLResponse) async def admin_home(request: Request): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() bots = db.get("bots", {}) trs = [] for bot_id, b in bots.items(): st = get_status(bot_id) pill = f"运行中 PID {st['pid']}" if st["running"] else "已停止" # 列表页也加启停 start_btn = f"""
""" if not st["running"] else "" stop_btn = f"""
""" if st["running"] else "" trs.append(f""" {bot_id}
{b.get("name","")}
{b.get("port","")} {pill} 进入设置 {start_btn} {stop_btn}
""") body = f"""
机器人列表
新增 / 删除 / 进入设置(文案 + 多级菜单 + 启停)
+ 新增机器人 API 用法
{''.join(trs) if trs else ""}
ID / 名称端口状态操作
暂无机器人
配置API:/api/bots/<bot_id>/config
""" return HTMLResponse(shell("Admin", body)) @app.get("/admin/new", response_class=HTMLResponse) async def admin_new(request: Request): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) body = """
新增机器人
只填字段,不写代码
返回列表
机器人 ID(唯一,建议 bot1/bot2)
名称(显示用)
端口(展示用)
启动命令 cmd(点“启动”会执行)
建议用绝对路径。若用 venv,就写 venv 的 python 全路径。
欢迎文案
取消
""" return HTMLResponse(shell("New", body)) @app.post("/admin/new") async def admin_new_post( request: Request, bot_id: str = Form(...), name: str = Form(""), port: int = Form(0), cmd: str = Form(""), welcome_text: str = Form(""), ): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) bot_id = bot_id.strip() if not bot_id: return HTMLResponse("bot_id 不能为空", status_code=400) db = load_db() bots = db.setdefault("bots", {}) if bot_id in bots: return HTMLResponse("bot_id 已存在", status_code=400) bots[bot_id] = { "name": name.strip(), "port": int(port), "cmd": cmd.strip(), "welcome_text": welcome_text, "menu_items": [], } save_db(db) return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) @app.post("/admin/delete/{bot_id}") async def admin_delete(request: Request, bot_id: str): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) # 删除前先停 st = get_status(bot_id) if st["running"]: try: os.killpg(int(st["pid"]), signal.SIGTERM) await asyncio.sleep(0.5) if _pid_alive(int(st["pid"])): os.killpg(int(st["pid"]), signal.SIGKILL) except Exception: pass global RUNTIME RUNTIME = load_runtime() RUNTIME.pop(bot_id, None) save_runtime(RUNTIME) db = load_db() bots = db.setdefault("bots", {}) if bot_id in bots: del bots[bot_id] save_db(db) return RedirectResponse("/admin", status_code=302) @app.get("/admin/bot/{bot_id}", response_class=HTMLResponse) async def admin_bot(request: Request, bot_id: str): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() b = db.get("bots", {}).get(bot_id) if not b: return HTMLResponse("not found", status_code=404) st = get_status(bot_id) status_badge = f"运行中 PID {st['pid']}" if st["running"] else "已停止" items = b.get("menu_items", []) table = render_menu_tree(items, None, 1).replace("{BOT}", bot_id) start_btn = f"""
""" if not st["running"] else "" stop_btn = f"""
""" if st["running"] else "" body = f"""
机器人设置:{bot_id}
{b.get("name","")} · 端口 {b.get("port","")} · {status_badge}
返回列表 {start_btn} {stop_btn} + 新增按钮
欢迎文案
机器人启动后 /start 给用户发的内容
菜单设置(支持 1~3 级)
不写 JSON:新增按钮时选择“父级”,自动形成二级/三级菜单。
{table if table else ""}
按钮内容/链接操作
还没有菜单,点右上角“新增按钮”。
高级设置(必填:启停靠这个)
启动命令 cmd:你平时在终端怎么启动机器人,就把那一行粘贴进来(必须是完整路径)。
端口(展示用)
启动命令 cmd
例:python3 /www/wwwroot/bot-center/bot1.py
""" return HTMLResponse(shell("Bot", body)) @app.post("/admin/bot/{bot_id}/welcome") async def admin_save_welcome(request: Request, bot_id: str, welcome_text: str = Form("")): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() if bot_id not in db.get("bots", {}): return HTMLResponse("not found", status_code=404) db["bots"][bot_id]["welcome_text"] = welcome_text save_db(db) return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) @app.post("/admin/bot/{bot_id}/advanced") async def admin_save_advanced( request: Request, bot_id: str, port: int = Form(0), cmd: str = Form("") ): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() if bot_id not in db.get("bots", {}): return HTMLResponse("not found", status_code=404) db["bots"][bot_id]["port"] = int(port) db["bots"][bot_id]["cmd"] = cmd.strip() save_db(db) return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) # -------------------- START/STOP (核心 - 修复版) -------------------- @app.post("/admin/bot/{bot_id}/start") async def admin_bot_start(request: Request, bot_id: str): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() b = db.get("bots", {}).get(bot_id) if not b: return HTMLResponse("not found", status_code=404) st = get_status(bot_id) if st["running"]: return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) cmd = (b.get("cmd") or "").strip() if not cmd: return HTMLResponse("启动失败:cmd 为空(去高级设置里填)", status_code=400) try: args = safe_parse_cmd(cmd) log_path = os.path.join(DATA_DIR, f"{bot_id}.log") lf = open(log_path, "a", encoding="utf-8") p = subprocess.Popen( args, stdout=lf, stderr=lf, cwd=BASE_DIR, # 在 bot-center 目录启动 start_new_session=True # 让 pid 成为新的进程组 leader ) global RUNTIME RUNTIME = load_runtime() RUNTIME[bot_id] = {"pid": p.pid, "started_at": int(time.time())} save_runtime(RUNTIME) except Exception as e: return HTMLResponse(f"启动失败:{str(e)}", status_code=400) return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) @app.post("/admin/bot/{bot_id}/stop") async def admin_bot_stop(request: Request, bot_id: str): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) st = get_status(bot_id) if not st["running"]: return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) pid = int(st["pid"]) try: # 步骤1:发送优雅停止信号 os.killpg(pid, signal.SIGTERM) # 等待2秒,给进程优雅退出的时间(关键!) await asyncio.sleep(2) # 步骤2:检查进程是否还活着,活着就强制杀死 if _pid_alive(pid): print(f"⚠️ PID {pid} 未优雅退出,强制杀死") os.killpg(pid, signal.SIGKILL) await asyncio.sleep(1) # 步骤3:最后的兜底(需要安装 psutil:pip install psutil) try: import psutil if _pid_alive(pid): parent = psutil.Process(pid) children = parent.children(recursive=True) for child in children: child.kill() parent.kill() except ImportError: pass # 没装psutil就跳过 except Exception: pass except Exception as e: # 退化:只杀 pid try: os.kill(pid, signal.SIGTERM) await asyncio.sleep(2) if _pid_alive(pid): os.kill(pid, signal.SIGKILL) except Exception as e2: return HTMLResponse(f"停止失败:{str(e2)}", status_code=500) global RUNTIME RUNTIME = load_runtime() RUNTIME.pop(bot_id, None) save_runtime(RUNTIME) # 最后更新状态,清理死进程 get_status(bot_id) return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) # -------------------- menu editor -------------------- @app.get("/admin/menu/new", response_class=HTMLResponse) async def menu_new(request: Request, bot_id: str): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() b = db.get("bots", {}).get(bot_id) if not b: return HTMLResponse("not found", status_code=404) items = b.get("menu_items", []) parent_options = [''] for it in items: if it.get("type") == "submenu" and menu_depth(items, it["id"]) < 3: parent_options.append(f'') body = f"""
新增按钮
机器人:{bot_id} · 选择父级即可生成二级/三级菜单
返回
按钮名称
父级位置
点击动作
内容(文本/链接;如果是“子菜单”可留空)
""" return HTMLResponse(shell("Menu New", body)) @app.post("/admin/menu/new") async def menu_new_post( request: Request, bot_id: str = Form(...), title: str = Form(...), parent_id: str = Form(""), type: str = Form("submenu"), value: str = Form(""), ): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() b = db.get("bots", {}).get(bot_id) if not b: return HTMLResponse("not found", status_code=404) items = b.get("menu_items", []) pid = parent_id.strip() or None if pid: mp = {x["id"]: x for x in items} if pid not in mp or mp[pid].get("type") != "submenu": return HTMLResponse("父级不合法", status_code=400) if menu_depth(items, pid) >= 3: return HTMLResponse("最多支持三级菜单", status_code=400) it = { "id": uuid.uuid4().hex[:8], "parent_id": pid, "title": title.strip(), "type": type, "value": value.strip(), } items.append(it) b["menu_items"] = items save_db(db) return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) @app.get("/admin/menu/edit", response_class=HTMLResponse) async def menu_edit(request: Request, bot_id: str, item_id: str): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() b = db.get("bots", {}).get(bot_id) if not b: return HTMLResponse("not found", status_code=404) items = b.get("menu_items", []) mp = {x["id"]: x for x in items} if item_id not in mp: return HTMLResponse("not found", status_code=404) cur = mp[item_id] def is_descendant(child_id: str, parent_id: str) -> bool: m = {x["id"]: x for x in items} curx = m.get(child_id) while curx and curx.get("parent_id"): if curx["parent_id"] == parent_id: return True curx = m.get(curx["parent_id"]) return False parent_options = [f''] for it in items: if it["id"] == item_id: continue if it.get("type") != "submenu": continue if menu_depth(items, it["id"]) >= 3: continue if is_descendant(it["id"], item_id): continue sel = "selected" if cur.get("parent_id") == it["id"] else "" parent_options.append(f'') def opt(v: str) -> str: return "selected" if cur.get("type") == v else "" body = f"""
编辑按钮
机器人:{bot_id}
返回
按钮名称
父级位置
点击动作
内容(文本/链接;子菜单可留空)
""" return HTMLResponse(shell("Menu Edit", body)) @app.post("/admin/menu/edit") async def menu_edit_post( request: Request, bot_id: str = Form(...), item_id: str = Form(...), title: str = Form(...), parent_id: str = Form(""), type: str = Form("submenu"), value: str = Form(""), ): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() b = db.get("bots", {}).get(bot_id) if not b: return HTMLResponse("not found", status_code=404) items = b.get("menu_items", []) mp = {x["id"]: x for x in items} if item_id not in mp: return HTMLResponse("not found", status_code=404) pid = parent_id.strip() or None if pid: if pid not in mp or mp[pid].get("type") != "submenu": return HTMLResponse("父级不合法", status_code=400) if menu_depth(items, pid) >= 3: return HTMLResponse("最多支持三级菜单", status_code=400) for it in items: if it["id"] == item_id: it["title"] = title.strip() it["parent_id"] = pid it["type"] = type it["value"] = value.strip() break b["menu_items"] = items save_db(db) return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) @app.post("/admin/menu/delete") async def menu_delete( request: Request, bot_id: str = Form(...), item_id: str = Form(...), ): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) db = load_db() b = db.get("bots", {}).get(bot_id) if not b: return HTMLResponse("not found", status_code=404) items = b.get("menu_items", []) to_del = {item_id} changed = True while changed: changed = False for it in items: if it.get("parent_id") in to_del and it["id"] not in to_del: to_del.add(it["id"]) changed = True items = [x for x in items if x["id"] not in to_del] b["menu_items"] = items save_db(db) return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302) @app.get("/admin/api-doc", response_class=HTMLResponse) async def admin_api_doc(request: Request): if not is_logged_in(request): return RedirectResponse("/login", status_code=302) body = f"""
API 用法
机器人端用 /api 拉配置
返回
配置:
GET /api/bots/<bot_id>/config
机器人列表:
GET /api/bots
返回结构:
welcome_text: 欢迎文案(/start 发)
menu_items: 菜单树(最多三级)
- id / parent_id / title / type / value
      
""" return HTMLResponse(shell("API", body)) # -------------------- API -------------------- @app.get("/api/bots") async def api_list_bots(): db = load_db() return JSONResponse(db.get("bots", {})) @app.get("/api/bots/{bot_id}/config") async def api_bot_config(bot_id: str): db = load_db() b = db.get("bots", {}).get(bot_id) if not b: raise HTTPException(404, "not found") return { "bot_id": bot_id, "welcome_text": b.get("welcome_text", ""), "menu_items": b.get("menu_items", []), } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8810)