from fastapi import FastAPI, Form, Request, HTTPException
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import os, json, secrets, uuid, time, shlex, subprocess, signal, asyncio
from typing import Dict, Any, List, Optional
app = FastAPI()
BASE_DIR = os.path.dirname(__file__)
DATA_DIR = os.path.join(BASE_DIR, "data")
DATA_FILE = os.path.join(DATA_DIR, "bots.json")
SESS_FILE = os.path.join(DATA_DIR, "sessions.json")
RUNTIME_FILE = os.path.join(DATA_DIR, "runtime.json")
ADMIN_USER = "superadmin"
ADMIN_PASS = "a9D2!3kP#Q111x@!"
VERSION = "V8-FIXED" # ✅ 修复进程杀不掉问题 + ✅ session 持久化 + ✅ 启停按钮
# -------------------- sessions (persist) --------------------
def load_sessions() -> Dict[str, int]:
os.makedirs(DATA_DIR, exist_ok=True)
if not os.path.exists(SESS_FILE):
with open(SESS_FILE, "w", encoding="utf-8") as f:
json.dump({}, f)
try:
with open(SESS_FILE, "r", encoding="utf-8") as f:
return json.load(f) # token -> expire_ts
except Exception:
return {}
def save_sessions(s: Dict[str, int]) -> None:
os.makedirs(DATA_DIR, exist_ok=True)
with open(SESS_FILE, "w", encoding="utf-8") as f:
json.dump(s, f, ensure_ascii=False, indent=2)
SESSIONS: Dict[str, int] = load_sessions() # token -> expire_ts
# -------------------- runtime (persist) --------------------
def load_runtime() -> Dict[str, Any]:
os.makedirs(DATA_DIR, exist_ok=True)
if not os.path.exists(RUNTIME_FILE):
with open(RUNTIME_FILE, "w", encoding="utf-8") as f:
json.dump({}, f)
try:
with open(RUNTIME_FILE, "r", encoding="utf-8") as f:
return json.load(f) # bot_id -> {pid, started_at}
except Exception:
return {}
def save_runtime(rt: Dict[str, Any]) -> None:
os.makedirs(DATA_DIR, exist_ok=True)
with open(RUNTIME_FILE, "w", encoding="utf-8") as f:
json.dump(rt, f, ensure_ascii=False, indent=2)
RUNTIME: Dict[str, Any] = load_runtime()
def _pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except Exception:
return False
def get_status(bot_id: str) -> Dict[str, Any]:
# 每次读最新 runtime(避免多进程/重启不同步)
global RUNTIME
RUNTIME = load_runtime()
info = RUNTIME.get(bot_id) or {}
pid = info.get("pid")
if isinstance(pid, int) and pid > 0 and _pid_alive(pid):
return {"running": True, "pid": pid, "started_at": info.get("started_at", 0)}
# 清理死 pid
if bot_id in RUNTIME:
RUNTIME.pop(bot_id, None)
save_runtime(RUNTIME)
return {"running": False, "pid": None, "started_at": 0}
def safe_parse_cmd(cmd: str) -> List[str]:
"""
只允许纯命令参数,不允许 ; | & > < 这些 shell 拼接符。
用 shlex.split 解析,且 subprocess 不使用 shell=True。
"""
cmd = (cmd or "").strip()
if not cmd:
raise ValueError("启动命令为空")
bad = [";", "|", "&", ">", "<", "`"]
if any(x in cmd for x in bad):
raise ValueError("启动命令包含危险符号(禁止 ; | & > < `)")
args = shlex.split(cmd)
if not args:
raise ValueError("启动命令解析失败")
return args
# -------------------- storage --------------------
def _ensure_data():
os.makedirs(DATA_DIR, exist_ok=True)
if not os.path.exists(DATA_FILE):
seed = {
"bots": {
"bot1": {
"name": "Bot 1",
"port": 8001,
"cmd": "python3 bot1.py", # 示例启动命令
"welcome_text": "欢迎使用机器人",
"menu_items": [
{"id": "m1", "parent_id": None, "title": "开始", "type": "submenu", "value": ""},
{"id": "m2", "parent_id": "m1", "title": "发送文案", "type": "text", "value": "你好,这里是二级菜单文案"},
{"id": "m3", "parent_id": "m1", "title": "打开链接", "type": "url", "value": "https://t.me/"},
],
}
}
}
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(seed, f, ensure_ascii=False, indent=2)
def load_db() -> Dict[str, Any]:
_ensure_data()
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def save_db(db: Dict[str, Any]) -> None:
_ensure_data()
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(db, f, ensure_ascii=False, indent=2)
# -------------------- auth --------------------
def is_logged_in(request: Request) -> bool:
token = request.cookies.get("bc_token")
if not token:
return False
exp = SESSIONS.get(token)
if not exp:
return False
if int(time.time()) > int(exp):
SESSIONS.pop(token, None)
save_sessions(SESSIONS)
return False
return True
# -------------------- UI shell --------------------
CSS = """
:root{
--bg:#0b1220; --card:#0f1a2e; --muted:#9aa7bd; --text:#e8eefc;
--line:rgba(255,255,255,0.10); --brand:#4f8cff; --danger:#ff5b5b; --ok:#35d07f;
}
*{box-sizing:border-box;}
body{margin:0;font-family:ui-sans-serif,system-ui,-apple-system,"Segoe UI",Roboto,Arial;background:linear-gradient(180deg,#0b1220,#050913);color:var(--text);}
a{color:var(--brand);text-decoration:none;}
.wrap{display:flex;min-height:100vh;}
.side{width:240px;padding:18px;border-right:1px solid var(--line);background:rgba(255,255,255,0.02);}
.brand{font-size:18px;font-weight:800;letter-spacing:.2px;}
.small{color:var(--muted);font-size:12px;margin-top:6px;}
.nav{margin-top:16px;display:flex;flex-direction:column;gap:8px;}
.nav a{padding:10px 12px;border:1px solid var(--line);border-radius:10px;color:var(--text);background:rgba(255,255,255,0.02);}
.nav a:hover{border-color:rgba(79,140,255,.6);}
.main{flex:1;padding:22px;}
.top{display:flex;justify-content:space-between;align-items:center;gap:12px;margin-bottom:14px;}
.h1{font-size:20px;font-weight:800;}
.badge{font-size:12px;color:var(--muted);}
.card{background:rgba(255,255,255,0.03);border:1px solid var(--line);border-radius:16px;padding:16px;margin-top:12px;}
.row{display:flex;gap:12px;flex-wrap:wrap;align-items:center;}
.btn{display:inline-flex;align-items:center;justify-content:center;gap:8px;padding:9px 12px;border-radius:12px;border:1px solid var(--line);background:rgba(255,255,255,0.03);color:var(--text);cursor:pointer;}
.btn:hover{border-color:rgba(79,140,255,.6);}
.btn.primary{background:rgba(79,140,255,.16);border-color:rgba(79,140,255,.5);}
.btn.danger{background:rgba(255,91,91,.12);border-color:rgba(255,91,91,.45);}
.btn.ok{background:rgba(53,208,127,.12);border-color:rgba(53,208,127,.45);}
.table{width:100%;border-collapse:collapse;margin-top:10px;}
.table th,.table td{border-bottom:1px solid var(--line);padding:12px;text-align:left;vertical-align:top;}
.table th{color:var(--muted);font-weight:700;font-size:12px;}
.input, textarea, select{width:100%;max-width:720px;padding:10px 12px;border-radius:12px;border:1px solid var(--line);background:rgba(255,255,255,0.02);color:var(--text);outline:none;}
textarea{min-height:160px;font-family:ui-monospace,SFMono-Regular,Menlo,Consolas,monospace;}
.grid2{display:grid;grid-template-columns:1fr 1fr;gap:12px;align-items:start;}
.muted{color:var(--muted);font-size:12px;}
.pill{display:inline-block;padding:3px 8px;border:1px solid var(--line);border-radius:999px;font-size:12px;color:var(--muted);}
.pill.ok{border-color:rgba(53,208,127,.55); color:rgba(53,208,127,.95);}
.pill.bad{border-color:rgba(255,91,91,.55); color:rgba(255,91,91,.95);}
.hr{height:1px;background:var(--line);margin:14px 0;}
.kbd{font-family:ui-monospace,Consolas; padding:2px 6px;border:1px solid var(--line);border-radius:8px;color:var(--muted);}
"""
def shell(title: str, body: str) -> str:
return f"""
{title}
"""
# -------------------- routes basic --------------------
@app.get("/health", response_class=HTMLResponse)
async def health():
return HTMLResponse(f"OK - bot-center {VERSION}")
@app.get("/login", response_class=HTMLResponse)
async def login_page():
return f"""
Login
Bot Center Login ({VERSION})
"""
@app.post("/login")
async def login_action(username: str = Form(...), password: str = Form(...)):
if username != ADMIN_USER or password != ADMIN_PASS:
return HTMLResponse("账号或密码错误返回 ", status_code=401)
token = secrets.token_urlsafe(32)
expire = int(time.time()) + 60 * 60 * 24 * 7 # 7天有效
SESSIONS[token] = expire
save_sessions(SESSIONS)
resp = RedirectResponse("/admin", status_code=302)
resp.set_cookie("bc_token", token, httponly=True, samesite="lax", max_age=60 * 60 * 24 * 7)
return resp
@app.get("/logout")
async def logout(request: Request):
token = request.cookies.get("bc_token")
if token and token in SESSIONS:
del SESSIONS[token]
save_sessions(SESSIONS)
resp = RedirectResponse("/login", status_code=302)
resp.delete_cookie("bc_token")
return resp
# -------------------- helpers for menu --------------------
def menu_depth(items: List[Dict[str, Any]], item_id: str) -> int:
mp = {x["id"]: x for x in items}
d = 1
cur = mp.get(item_id)
while cur and cur.get("parent_id"):
d += 1
cur = mp.get(cur["parent_id"])
if d > 10:
break
return d
def children_of(items: List[Dict[str, Any]], parent_id: Optional[str]) -> List[Dict[str, Any]]:
return [x for x in items if x.get("parent_id") == parent_id]
def render_menu_tree(items: List[Dict[str, Any]], parent_id: Optional[str], level: int) -> str:
rows = []
for x in children_of(items, parent_id):
t = x.get("type", "submenu")
typemap = {"submenu":"打开子菜单","text":"发送文本","url":"打开链接"}
badge = typemap.get(t, t)
indent = " " * (level - 1) * 6
rows.append(f"""
{indent}{x.get("title","")} {badge}
{(x.get("value","") or "- ")}
编辑
""")
if level < 3:
rows.append(render_menu_tree(items, x["id"], level + 1))
return "".join(rows)
# -------------------- admin pages --------------------
@app.get("/admin", response_class=HTMLResponse)
async def admin_home(request: Request):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
bots = db.get("bots", {})
trs = []
for bot_id, b in bots.items():
st = get_status(bot_id)
pill = f"运行中 PID {st['pid']} " if st["running"] else "已停止 "
# 列表页也加启停
start_btn = f"""
""" if not st["running"] else ""
stop_btn = f"""
""" if st["running"] else ""
trs.append(f"""
{bot_id} {b.get("name","")}
{b.get("port","")}
{pill}
进入设置
{start_btn}
{stop_btn}
""")
body = f"""
机器人列表
新增 / 删除 / 进入设置(文案 + 多级菜单 + 启停)
ID / 名称 端口 状态 操作
{''.join(trs) if trs else "暂无机器人 "}
配置API:/api/bots/<bot_id>/config
"""
return HTMLResponse(shell("Admin", body))
@app.get("/admin/new", response_class=HTMLResponse)
async def admin_new(request: Request):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
body = """
"""
return HTMLResponse(shell("New", body))
@app.post("/admin/new")
async def admin_new_post(
request: Request,
bot_id: str = Form(...),
name: str = Form(""),
port: int = Form(0),
cmd: str = Form(""),
welcome_text: str = Form(""),
):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
bot_id = bot_id.strip()
if not bot_id:
return HTMLResponse("bot_id 不能为空", status_code=400)
db = load_db()
bots = db.setdefault("bots", {})
if bot_id in bots:
return HTMLResponse("bot_id 已存在", status_code=400)
bots[bot_id] = {
"name": name.strip(),
"port": int(port),
"cmd": cmd.strip(),
"welcome_text": welcome_text,
"menu_items": [],
}
save_db(db)
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
@app.post("/admin/delete/{bot_id}")
async def admin_delete(request: Request, bot_id: str):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
# 删除前先停
st = get_status(bot_id)
if st["running"]:
try:
os.killpg(int(st["pid"]), signal.SIGTERM)
await asyncio.sleep(0.5)
if _pid_alive(int(st["pid"])):
os.killpg(int(st["pid"]), signal.SIGKILL)
except Exception:
pass
global RUNTIME
RUNTIME = load_runtime()
RUNTIME.pop(bot_id, None)
save_runtime(RUNTIME)
db = load_db()
bots = db.setdefault("bots", {})
if bot_id in bots:
del bots[bot_id]
save_db(db)
return RedirectResponse("/admin", status_code=302)
@app.get("/admin/bot/{bot_id}", response_class=HTMLResponse)
async def admin_bot(request: Request, bot_id: str):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
b = db.get("bots", {}).get(bot_id)
if not b:
return HTMLResponse("not found", status_code=404)
st = get_status(bot_id)
status_badge = f"运行中 PID {st['pid']} " if st["running"] else "已停止 "
items = b.get("menu_items", [])
table = render_menu_tree(items, None, 1).replace("{BOT}", bot_id)
start_btn = f"""
▶ 启动
""" if not st["running"] else ""
stop_btn = f"""
⏹ 停止
""" if st["running"] else ""
body = f"""
机器人设置:{bot_id}
{b.get("name","")} · 端口 {b.get("port","")} · {status_badge}
欢迎文案
机器人启动后 /start 给用户发的内容
{b.get("welcome_text","")}
保存文案
菜单设置(支持 1~3 级)
不写 JSON:新增按钮时选择“父级”,自动形成二级/三级菜单。
按钮 内容/链接 操作
{table if table else "还没有菜单,点右上角“新增按钮”。 "}
高级设置(必填:启停靠这个)
启动命令 cmd:你平时在终端怎么启动机器人,就把那一行粘贴进来(必须是完整路径)。
保存高级设置
"""
return HTMLResponse(shell("Bot", body))
@app.post("/admin/bot/{bot_id}/welcome")
async def admin_save_welcome(request: Request, bot_id: str, welcome_text: str = Form("")):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
if bot_id not in db.get("bots", {}):
return HTMLResponse("not found", status_code=404)
db["bots"][bot_id]["welcome_text"] = welcome_text
save_db(db)
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
@app.post("/admin/bot/{bot_id}/advanced")
async def admin_save_advanced(
request: Request,
bot_id: str,
port: int = Form(0),
cmd: str = Form("")
):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
if bot_id not in db.get("bots", {}):
return HTMLResponse("not found", status_code=404)
db["bots"][bot_id]["port"] = int(port)
db["bots"][bot_id]["cmd"] = cmd.strip()
save_db(db)
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
# -------------------- START/STOP (核心 - 修复版) --------------------
@app.post("/admin/bot/{bot_id}/start")
async def admin_bot_start(request: Request, bot_id: str):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
b = db.get("bots", {}).get(bot_id)
if not b:
return HTMLResponse("not found", status_code=404)
st = get_status(bot_id)
if st["running"]:
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
cmd = (b.get("cmd") or "").strip()
if not cmd:
return HTMLResponse("启动失败:cmd 为空(去高级设置里填)", status_code=400)
try:
args = safe_parse_cmd(cmd)
log_path = os.path.join(DATA_DIR, f"{bot_id}.log")
lf = open(log_path, "a", encoding="utf-8")
p = subprocess.Popen(
args,
stdout=lf,
stderr=lf,
cwd=BASE_DIR, # 在 bot-center 目录启动
start_new_session=True # 让 pid 成为新的进程组 leader
)
global RUNTIME
RUNTIME = load_runtime()
RUNTIME[bot_id] = {"pid": p.pid, "started_at": int(time.time())}
save_runtime(RUNTIME)
except Exception as e:
return HTMLResponse(f"启动失败:{str(e)}", status_code=400)
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
@app.post("/admin/bot/{bot_id}/stop")
async def admin_bot_stop(request: Request, bot_id: str):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
st = get_status(bot_id)
if not st["running"]:
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
pid = int(st["pid"])
try:
# 步骤1:发送优雅停止信号
os.killpg(pid, signal.SIGTERM)
# 等待2秒,给进程优雅退出的时间(关键!)
await asyncio.sleep(2)
# 步骤2:检查进程是否还活着,活着就强制杀死
if _pid_alive(pid):
print(f"⚠️ PID {pid} 未优雅退出,强制杀死")
os.killpg(pid, signal.SIGKILL)
await asyncio.sleep(1)
# 步骤3:最后的兜底(需要安装 psutil:pip install psutil)
try:
import psutil
if _pid_alive(pid):
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
parent.kill()
except ImportError:
pass # 没装psutil就跳过
except Exception:
pass
except Exception as e:
# 退化:只杀 pid
try:
os.kill(pid, signal.SIGTERM)
await asyncio.sleep(2)
if _pid_alive(pid):
os.kill(pid, signal.SIGKILL)
except Exception as e2:
return HTMLResponse(f"停止失败:{str(e2)}", status_code=500)
global RUNTIME
RUNTIME = load_runtime()
RUNTIME.pop(bot_id, None)
save_runtime(RUNTIME)
# 最后更新状态,清理死进程
get_status(bot_id)
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
# -------------------- menu editor --------------------
@app.get("/admin/menu/new", response_class=HTMLResponse)
async def menu_new(request: Request, bot_id: str):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
b = db.get("bots", {}).get(bot_id)
if not b:
return HTMLResponse("not found", status_code=404)
items = b.get("menu_items", [])
parent_options = ['(一级菜单) ']
for it in items:
if it.get("type") == "submenu" and menu_depth(items, it["id"]) < 3:
parent_options.append(f'{it["title"]}(可挂子菜单) ')
body = f"""
新增按钮
机器人:{bot_id} · 选择父级即可生成二级/三级菜单
父级位置
{''.join(parent_options)}
点击动作
打开子菜单(进入下一层)
发送文本
打开链接
保存按钮
"""
return HTMLResponse(shell("Menu New", body))
@app.post("/admin/menu/new")
async def menu_new_post(
request: Request,
bot_id: str = Form(...),
title: str = Form(...),
parent_id: str = Form(""),
type: str = Form("submenu"),
value: str = Form(""),
):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
b = db.get("bots", {}).get(bot_id)
if not b:
return HTMLResponse("not found", status_code=404)
items = b.get("menu_items", [])
pid = parent_id.strip() or None
if pid:
mp = {x["id"]: x for x in items}
if pid not in mp or mp[pid].get("type") != "submenu":
return HTMLResponse("父级不合法", status_code=400)
if menu_depth(items, pid) >= 3:
return HTMLResponse("最多支持三级菜单", status_code=400)
it = {
"id": uuid.uuid4().hex[:8],
"parent_id": pid,
"title": title.strip(),
"type": type,
"value": value.strip(),
}
items.append(it)
b["menu_items"] = items
save_db(db)
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
@app.get("/admin/menu/edit", response_class=HTMLResponse)
async def menu_edit(request: Request, bot_id: str, item_id: str):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
b = db.get("bots", {}).get(bot_id)
if not b:
return HTMLResponse("not found", status_code=404)
items = b.get("menu_items", [])
mp = {x["id"]: x for x in items}
if item_id not in mp:
return HTMLResponse("not found", status_code=404)
cur = mp[item_id]
def is_descendant(child_id: str, parent_id: str) -> bool:
m = {x["id"]: x for x in items}
curx = m.get(child_id)
while curx and curx.get("parent_id"):
if curx["parent_id"] == parent_id:
return True
curx = m.get(curx["parent_id"])
return False
parent_options = [f'(一级菜单) ']
for it in items:
if it["id"] == item_id:
continue
if it.get("type") != "submenu":
continue
if menu_depth(items, it["id"]) >= 3:
continue
if is_descendant(it["id"], item_id):
continue
sel = "selected" if cur.get("parent_id") == it["id"] else ""
parent_options.append(f'{it["title"]} ')
def opt(v: str) -> str:
return "selected" if cur.get("type") == v else ""
body = f"""
父级位置
{''.join(parent_options)}
保存
"""
return HTMLResponse(shell("Menu Edit", body))
@app.post("/admin/menu/edit")
async def menu_edit_post(
request: Request,
bot_id: str = Form(...),
item_id: str = Form(...),
title: str = Form(...),
parent_id: str = Form(""),
type: str = Form("submenu"),
value: str = Form(""),
):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
b = db.get("bots", {}).get(bot_id)
if not b:
return HTMLResponse("not found", status_code=404)
items = b.get("menu_items", [])
mp = {x["id"]: x for x in items}
if item_id not in mp:
return HTMLResponse("not found", status_code=404)
pid = parent_id.strip() or None
if pid:
if pid not in mp or mp[pid].get("type") != "submenu":
return HTMLResponse("父级不合法", status_code=400)
if menu_depth(items, pid) >= 3:
return HTMLResponse("最多支持三级菜单", status_code=400)
for it in items:
if it["id"] == item_id:
it["title"] = title.strip()
it["parent_id"] = pid
it["type"] = type
it["value"] = value.strip()
break
b["menu_items"] = items
save_db(db)
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
@app.post("/admin/menu/delete")
async def menu_delete(
request: Request,
bot_id: str = Form(...),
item_id: str = Form(...),
):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
db = load_db()
b = db.get("bots", {}).get(bot_id)
if not b:
return HTMLResponse("not found", status_code=404)
items = b.get("menu_items", [])
to_del = {item_id}
changed = True
while changed:
changed = False
for it in items:
if it.get("parent_id") in to_del and it["id"] not in to_del:
to_del.add(it["id"])
changed = True
items = [x for x in items if x["id"] not in to_del]
b["menu_items"] = items
save_db(db)
return RedirectResponse(f"/admin/bot/{bot_id}", status_code=302)
@app.get("/admin/api-doc", response_class=HTMLResponse)
async def admin_api_doc(request: Request):
if not is_logged_in(request):
return RedirectResponse("/login", status_code=302)
body = f"""
配置:
GET /api/bots/<bot_id>/config
机器人列表:
GET /api/bots
返回结构:
welcome_text: 欢迎文案(/start 发)
menu_items: 菜单树(最多三级)
- id / parent_id / title / type / value
"""
return HTMLResponse(shell("API", body))
# -------------------- API --------------------
@app.get("/api/bots")
async def api_list_bots():
db = load_db()
return JSONResponse(db.get("bots", {}))
@app.get("/api/bots/{bot_id}/config")
async def api_bot_config(bot_id: str):
db = load_db()
b = db.get("bots", {}).get(bot_id)
if not b:
raise HTTPException(404, "not found")
return {
"bot_id": bot_id,
"welcome_text": b.get("welcome_text", ""),
"menu_items": b.get("menu_items", []),
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8810)