-
Notifications
You must be signed in to change notification settings - Fork 141
Expand file tree
/
Copy pathmain_server.py
More file actions
1937 lines (1692 loc) · 86 KB
/
main_server.py
File metadata and controls
1937 lines (1692 loc) · 86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Windows multiprocessing 支持:确保子进程不会重复执行模块级初始化
from multiprocessing import freeze_support
import multiprocessing
from utils.port_utils import set_port_probe_reuse
freeze_support()
# 设置 multiprocessing 启动方法(确保跨进程共享结构的一致性)
# 在 Linux/macOS 上使用 fork,在 Windows 上使用 spawn(默认)
if sys.platform != "win32":
try:
multiprocessing.set_start_method('fork', force=False)
except RuntimeError:
# 启动方法已经设置过,忽略
pass
# 检查是否需要执行初始化(用于防止 Windows spawn 方式创建的子进程重复初始化)
# 方案:首次导入时设置环境变量标记,子进程会继承这个标记从而跳过初始化
_INIT_MARKER = '_NEKO_MAIN_SERVER_INITIALIZED'
_IS_MAIN_PROCESS = _INIT_MARKER not in os.environ
if _IS_MAIN_PROCESS:
# 立即设置标记,这样任何从此进程 spawn 的子进程都会继承此标记
os.environ[_INIT_MARKER] = '1'
# 获取应用程序根目录(与 config_manager 保持一致)
def _get_app_root():
if getattr(sys, 'frozen', False):
if hasattr(sys, '_MEIPASS'):
return sys._MEIPASS
else:
return os.path.dirname(sys.executable)
else:
return os.path.dirname(os.path.abspath(__file__))
# 仅在 Windows 上调整 DLL 搜索路径
if sys.platform == "win32" and hasattr(os, "add_dll_directory"):
os.add_dll_directory(_get_app_root())
import mimetypes # noqa
mimetypes.add_type("application/javascript", ".js")
import asyncio # noqa
import importlib # noqa
import inspect # noqa
import logging # noqa
import atexit # noqa
import httpx # noqa
import time # noqa
from datetime import datetime, timezone # noqa
from config import MAIN_SERVER_PORT, MONITOR_SERVER_PORT # noqa
from utils.cloudsave_autocloud import get_cloudsave_manager # noqa
from utils.cloudsave_runtime import (
CloudsaveDeadlineExceeded,
MaintenanceModeError,
ROOT_MODE_NORMAL,
bootstrap_local_cloudsave_environment,
is_write_fence_active,
maintenance_error_payload,
set_root_mode,
)
from utils.config_manager import get_config_manager, get_reserved # noqa
# 将日志初始化提前,确保导入阶段异常也能落盘
from utils.logger_config import setup_logging # noqa: E402
from utils.ssl_env_diagnostics import probe_ssl_environment, write_ssl_diagnostic # noqa: E402
logger, log_config = setup_logging(service_name="Main", log_level=logging.INFO, silent=not _IS_MAIN_PROCESS)
if _IS_MAIN_PROCESS:
_ssl_precheck = probe_ssl_environment()
if not _ssl_precheck.get("ok", True):
diag_dir = os.path.join(log_config.get_log_directory_path(), "diagnostics")
diag_path = write_ssl_diagnostic(
event="main_server_ssl_precheck_failed",
output_dir=diag_dir,
extra=_ssl_precheck,
)
logger.warning(
"SSL environment precheck failed: %s%s",
_ssl_precheck.get("error_message"),
f" | diagnostic: {diag_path}" if diag_path else "",
)
try:
from fastapi import FastAPI # noqa
from fastapi.staticfiles import StaticFiles # noqa
from main_logic import core as core, cross_server as cross_server # noqa
from main_logic.agent_event_bus import MainServerAgentBridge, notify_analyze_ack, set_main_bridge # noqa
from fastapi.templating import Jinja2Templates # noqa
from threading import Thread, Event as ThreadEvent # noqa
from queue import Queue, Empty as QueueEmpty # noqa
from dataclasses import dataclass # noqa
from typing import Any, Optional # noqa
except Exception as e:
logger.exception(f"[Main] Module import failed during startup: {e}")
raise
# 导入创意工坊工具模块
from utils.workshop_utils import ( # noqa
get_workshop_root,
get_workshop_path
)
# 导入创意工坊路由中的函数
from main_routers.workshop_router import get_subscribed_workshop_items, sync_workshop_character_cards, warmup_ugc_cache # noqa
# 确定 templates 目录位置(使用 _get_app_root)
template_dir = _get_app_root()
templates = Jinja2Templates(directory=template_dir)
def initialize_steamworks():
try:
# 明确读取steam_appid.txt文件以获取应用ID
app_id = None
app_id_file = os.path.join(_get_app_root(), 'steam_appid.txt')
if os.path.exists(app_id_file):
with open(app_id_file, 'r') as f:
app_id = f.read().strip()
print(f"从steam_appid.txt读取到应用ID: {app_id}")
# 创建并初始化Steamworks实例
from steamworks import STEAMWORKS
steamworks = STEAMWORKS()
# 显示Steamworks初始化过程的详细日志
print("正在初始化Steamworks...")
steamworks.initialize()
steamworks.UserStats.RequestCurrentStats()
# 初始化后再次获取应用ID以确认
actual_app_id = steamworks.app_id
print(f"Steamworks初始化完成,实际使用的应用ID: {actual_app_id}")
# 检查全局logger是否已初始化,如果已初始化则记录成功信息
if 'logger' in globals():
logger.info(f"Steamworks初始化成功,应用ID: {actual_app_id}")
logger.info(f"Steam客户端运行状态: {steamworks.IsSteamRunning()}")
logger.info(f"Steam覆盖层启用状态: {steamworks.IsOverlayEnabled()}")
return steamworks
except Exception as e:
# 检查全局logger是否已初始化,如果已初始化则记录错误,否则使用print
error_msg = f"初始化Steamworks失败: {e}"
if 'logger' in globals():
logger.error(error_msg)
else:
print(error_msg)
return None
def get_default_steam_info():
global steamworks
# 检查steamworks是否初始化成功
if steamworks is None:
print("Steamworks not initialized. Skipping Steam functionality.")
if 'logger' in globals():
logger.info("Steamworks not initialized. Skipping Steam functionality.")
return
try:
my_steam64 = steamworks.Users.GetSteamID()
my_steam_level = steamworks.Users.GetPlayerSteamLevel()
subscribed_apps = steamworks.Workshop.GetNumSubscribedItems()
print(f'Subscribed apps: {subscribed_apps}')
print(f'Logged on as {my_steam64}, level: {my_steam_level}')
print('Is subscribed to current app?', steamworks.Apps.IsSubscribed())
except Exception as e:
print(f"Error accessing Steamworks API: {e}")
if 'logger' in globals():
logger.error(f"Error accessing Steamworks API: {e}")
# Steamworks 初始化将在 @app.on_event("startup") 中延迟执行
# 这样可以避免在模块导入时就执行 DLL 加载等操作
steamworks = None
_server_loop: asyncio.AbstractEventLoop | None = None
_config_manager = get_config_manager()
_cloudsave_manager = get_cloudsave_manager(_config_manager)
def _cloudsave_action_supports_deadline(action) -> bool:
try:
signature = inspect.signature(action)
except (TypeError, ValueError):
return False
if "deadline_monotonic" in signature.parameters:
return True
return any(
parameter.kind == inspect.Parameter.VAR_KEYWORD
for parameter in signature.parameters.values()
)
def _cloudsave_action_supports_steamworks(action) -> bool:
try:
signature = inspect.signature(action)
except (TypeError, ValueError):
return False
if "steamworks" in signature.parameters:
return True
return any(
parameter.kind == inspect.Parameter.VAR_KEYWORD
for parameter in signature.parameters.values()
)
async def _run_cloudsave_manager_action(
action_name: str,
*,
reason: str,
budget_seconds: float | None = None,
steamworks=None,
):
action = getattr(_cloudsave_manager, action_name)
kwargs = {"reason": reason}
if (
budget_seconds is not None
and budget_seconds > 0
and _cloudsave_action_supports_deadline(action)
):
kwargs["deadline_monotonic"] = time.monotonic() + float(budget_seconds)
if steamworks is not None and _cloudsave_action_supports_steamworks(action):
kwargs["steamworks"] = steamworks
return await asyncio.to_thread(action, **kwargs)
async def _request_memory_server_shutdown() -> None:
"""Request memory_server shutdown after main_server has finished its own cleanup."""
try:
from config import MEMORY_SERVER_PORT
shutdown_url = f"http://127.0.0.1:{MEMORY_SERVER_PORT}/shutdown"
async with httpx.AsyncClient(timeout=1, proxy=None, trust_env=False) as client:
response = await client.post(shutdown_url)
if response.status_code == 200:
logger.info("已向memory_server发送关闭信号")
else:
logger.warning(f"向memory_server发送关闭信号失败,状态码: {response.status_code}")
except Exception as e:
logger.warning(f"向memory_server发送关闭信号时出错: {e}")
@dataclass
class RoleState:
"""单个 catgirl 的 per-k 运行态容器。
把之前 6 张并列 module-global dict(sync_message_queue / sync_shutdown_event /
session_id / sync_process / websocket_locks / session_manager)合并成一个
record,由 role_state[k] 统一持有,避免半初始化状态 + 维护成本分散。
见 issue #857 / PR #855 review。
不变量:
- sync_message_queue / sync_shutdown_event / websocket_lock 在
_ensure_character_slots 一次性构造,之后**永不替换**。特别是
websocket_lock —— 替换会让已经 ``async with`` 进来的协程阻塞在一把
孤立的旧 Lock 上;如果任何逻辑需要整体重建 role_state[k],必须
把旧 lock 原样传过去。
- session_id / sync_process / session_manager 初始为 None,分别由
websocket_router / _init_character_resources 后续赋值。
"""
sync_message_queue: Queue
sync_shutdown_event: ThreadEvent
websocket_lock: asyncio.Lock
session_id: Optional[str] = None
sync_process: Optional[Thread] = None
# 用 Any 而非 core.LLMSessionManager:避免 dataclass 运行时求值 annotation
# 时踩到 forward-ref / 循环引用边界
session_manager: Optional[Any] = None
# 角色名 -> RoleState 的主存储;所有 per-k 同步资源都通过它访问
role_state: dict[str, RoleState] = {}
def _iter_sync_connector_threads():
"""迭代所有仍然存活的同步连接器线程(按 role_state 为准)。"""
for name, rs in role_state.items():
thread = rs.sync_process
if thread is None:
continue
yield name, thread
def _signal_sync_connectors_shutdown(*, log: bool = True) -> None:
if log:
logger.info("正在关闭同步线程...")
for rs in role_state.values():
try:
rs.sync_shutdown_event.set()
except Exception as e:
logger.debug(f"设置同步关闭事件失败: {e}", exc_info=True)
async def join_sync_connector_threads(timeout: float = 3.0) -> list[str]:
"""并行 join 所有同步连接器线程,返回在 timeout 内仍未退出的线程名。
N 个角色串行 join 会把最坏墙钟放大成 N * timeout;gather + to_thread
让每个 join 在自己的线程里跑,墙钟收敛到单个 timeout。
"""
wait_timeout = max(0.0, float(timeout))
targets = list(_iter_sync_connector_threads())
if not targets:
return []
async def _join_one(name: str, thread) -> str | None:
try:
await asyncio.to_thread(thread.join, wait_timeout)
except Exception as e:
logger.debug(f"等待同步连接器线程 {name} 退出时出错: {e}", exc_info=True)
return None
return name if thread.is_alive() else None
results = await asyncio.gather(
*(_join_one(name, thread) for name, thread in targets),
return_exceptions=False,
)
alive_threads = [name for name in results if name]
if alive_threads:
logger.warning(
"以下同步连接器线程未在 %.1fs 内退出: %s",
wait_timeout,
", ".join(alive_threads),
)
return alive_threads
def cleanup(*, log: bool = True):
"""通知所有同步线程停止。log=False 用于 atexit 二次触发时抑制重复日志。"""
_signal_sync_connectors_shutdown(log=log)
# 只在主进程中注册 cleanup 函数,防止子进程退出时执行清理
# log=False:on_shutdown 已经打印过 "正在清理资源...",atexit 补一刀时不重复 log
if _IS_MAIN_PROCESS:
atexit.register(cleanup, log=False)
# 角色数据全局变量(会在重载时更新)
master_name = None
her_name = None
master_basic_config = None
lanlan_basic_config = None
name_mapping = None
lanlan_prompt = None
time_store = None
setting_store = None
recent_log = None
catgirl_names = []
agent_event_bridge: MainServerAgentBridge | None = None
def _is_websocket_connected(ws) -> bool:
"""Check if a WebSocket is in CONNECTED state."""
if not ws:
return False
if not hasattr(ws, "client_state"):
return False
try:
return ws.client_state == ws.client_state.CONNECTED
except Exception:
return False
def _iter_session_managers():
"""Yield (name, session_manager) for every role with a live session_manager.
Replaces the old ``session_manager.items()`` pattern after the per-k dicts
were consolidated into ``role_state``.
"""
for name, rs in role_state.items():
if rs.session_manager is not None:
yield name, rs.session_manager
def _get_session_manager(name):
"""Return ``role_state[name].session_manager`` or None — dict.get() equivalent."""
if not name:
return None
rs = role_state.get(name)
return rs.session_manager if rs is not None else None
def _select_fallback_session_manager():
"""Return a single connected session manager as a safe fallback, if unambiguous."""
connected = []
for name, mgr in _iter_session_managers():
ws = getattr(mgr, "websocket", None)
if _is_websocket_connected(ws):
connected.append((name, mgr))
if len(connected) == 1:
return connected[0]
return None, None
async def _broadcast_to_all_connected(event_payload: dict) -> int:
"""Broadcast an event to all connected WebSocket sessions in parallel.
每秒可能多次(agent status),串行 await 会让一个慢的 ws 拖累其它会话。"""
# Take a snapshot to avoid RuntimeError from concurrent dict mutation
targets = [
(name, getattr(mgr, "websocket", None))
for name, mgr in list(_iter_session_managers())
if mgr
]
targets = [(n, ws) for n, ws in targets if _is_websocket_connected(ws) and hasattr(ws, "send_json")]
async def _send_one(name, ws):
try:
await ws.send_json(event_payload)
return True
except Exception as e:
logger.debug("[EventBus] broadcast to %s failed: %s", name, e)
return False
results = await asyncio.gather(*(_send_one(n, ws) for n, ws in targets), return_exceptions=False)
return sum(1 for r in results if r is True)
async def _handle_agent_event(event: dict):
"""通过 ZeroMQ 接收 agent_server 事件,并分发到 core/websocket。"""
try:
event_type = event.get("event_type")
lanlan = event.get("lanlan_name")
if event_type == "analyze_ack":
logger.info(
"[EventBus] analyze_ack received on main: event_id=%s lanlan=%s",
event.get("event_id"),
lanlan,
)
notify_analyze_ack(str(event.get("event_id") or ""))
return
# Agent status updates may be broadcast (lanlan_name omitted).
if event_type == "agent_status_update":
payload = {
"type": "agent_status_update",
"snapshot": event.get("snapshot", {}),
}
mgr_for_status = _get_session_manager(lanlan)
if lanlan and mgr_for_status is not None:
mgr = mgr_for_status
ws = getattr(mgr, "websocket", None) if mgr else None
if _is_websocket_connected(ws):
try:
await ws.send_json(payload)
except Exception as e:
logger.debug("[EventBus] agent_status_update send failed: %s", e)
else:
await _broadcast_to_all_connected(payload)
return
# Resolve target session manager; fallback to broadcast if lanlan is unknown
mgr = _get_session_manager(lanlan)
if not mgr and event_type == "task_update":
# Broadcast task_update to all connected sessions when lanlan is unresolvable
task_payload = {"type": "agent_task_update", "task": event.get("task", {})}
delivered = await _broadcast_to_all_connected(task_payload)
if delivered == 0:
logger.warning("[EventBus] task_update broadcast: no connected WebSocket sessions")
return
# --- Music Global Broadcasts (Must come before early 'if not mgr' returns) ---
elif event_type == "music_allowlist_add":
# Music allowlist is a global UI state, broadcast to all active sessions
targets = [mgr] if mgr else [m for _, m in _iter_session_managers()]
payload = {
"type": "music_allowlist_add",
"domains": event.get("domains") or event.get("metadata", {}).get("domains", [])
}
async def _send_allowlist(target_mgr):
if target_mgr and target_mgr.websocket and hasattr(target_mgr.websocket, "send_json"):
try:
await target_mgr.websocket.send_json(payload)
except Exception as e:
logger.debug("[EventBus] music_allowlist_add broadcast failed: %s", e)
await asyncio.gather(*(_send_allowlist(t) for t in targets), return_exceptions=True)
if targets:
logger.info("[EventBus] music_allowlist_add broadcasted to %d sessions", len(targets))
return
elif event_type == "music_play_url":
# Music playback is a global UI action, broadcast to all active sessions
targets = [mgr] if mgr else [m for _, m in _iter_session_managers()]
payload = {
"type": "music_play_url",
"url": event.get("url"),
"name": event.get("name") or "Plugin Music",
"artist": event.get("artist") or "External"
}
async def _send_play(target_mgr):
if target_mgr and target_mgr.websocket and hasattr(target_mgr.websocket, "send_json"):
try:
await target_mgr.websocket.send_json(payload)
except Exception as e:
logger.debug("[EventBus] music_play_url broadcast failed: %s", e)
await asyncio.gather(*(_send_play(t) for t in targets), return_exceptions=True)
if targets:
logger.info("[EventBus] music_play_url broadcasted to %d sessions", len(targets))
return
if not mgr and event_type in ("proactive_message", "task_result"):
fallback_name, fallback_mgr = _select_fallback_session_manager()
if fallback_mgr is not None:
mgr = fallback_mgr
logger.warning(
"[EventBus] %s rerouted: lanlan=%s missing, fallback_session=%s",
event_type,
lanlan,
fallback_name,
)
else:
# No target session found — drop the event entirely.
# Do NOT broadcast text to other sessions to prevent cross-session leaks.
logger.info(
"[EventBus] %s dropped: no target session for lanlan=%s, active_sessions=%s",
event_type,
lanlan,
[name for name, _ in _iter_session_managers()],
)
return
if not mgr:
logger.info("[EventBus] %s dropped: no session_manager for lanlan=%s", event_type, lanlan)
return
if event_type in ("task_result", "proactive_message"):
text = (event.get("text") or "").strip()
if text:
if event.get("direct_reply"):
detail_text = (event.get("detail") or text).strip()
delivered = False
if detail_text and hasattr(mgr, "send_lanlan_response"):
try:
delivered = bool(await mgr.send_lanlan_response(detail_text, True))
except Exception as e:
logger.warning("[EventBus] direct task_result reply failed: %s", e)
if delivered and hasattr(mgr, "handle_proactive_complete"):
try:
await mgr.handle_proactive_complete()
except Exception as e:
logger.warning("[EventBus] direct task_result turn_end failed: %s", e)
if delivered:
logger.info("[EventBus] direct task_result reply delivered: %.60s", detail_text[:60])
return
# Build structured callback and enqueue for LLM injection
cb_status = event.get("status") or ("completed" if event.get("success", True) else "failed")
callback = {
"event": "agent_task_callback",
"task_id": event.get("task_id") or "",
"channel": event.get("channel") or "unknown",
"status": cb_status,
"success": bool(event.get("success", True)),
"summary": event.get("summary") or text,
"detail": event.get("detail") or text,
"error_message": event.get("error_message") or "",
"timestamp": event.get("timestamp") or "",
}
mgr.enqueue_agent_callback(callback)
logger.info("[EventBus] %s enqueued callback, scheduling trigger_agent_callbacks", event_type)
# Create task with exception logging
async def _run_trigger_with_logging():
try:
await mgr.trigger_agent_callbacks()
except Exception as e:
logger.error("[EventBus] trigger_agent_callbacks task failed: %s", e)
mgr._pending_agent_callback_task = asyncio.create_task(_run_trigger_with_logging())
ws = getattr(mgr, "websocket", None)
if _is_websocket_connected(ws):
try:
notif = {
"type": "agent_notification",
"text": text,
"source": "brain",
"status": cb_status,
}
err_msg = event.get("error_message") or ""
if err_msg:
notif["error_message"] = err_msg[:500]
await ws.send_json(notif)
logger.info("[EventBus] agent_notification sent to frontend: %.60s", text[:60])
except Exception as e:
logger.warning("[EventBus] agent_notification WS send failed: %s", e)
else:
logger.warning("[EventBus] agent_notification: WebSocket not connected for lanlan=%s", lanlan)
elif event_type == "agent_notification":
ws = getattr(mgr, "websocket", None)
if _is_websocket_connected(ws):
try:
notif = {
"type": "agent_notification",
"text": event.get("text", ""),
"source": event.get("source", "brain"),
"status": event.get("status", "error"),
}
err_msg = event.get("error_message") or ""
if err_msg:
notif["error_message"] = err_msg[:500]
await ws.send_json(notif)
except Exception as e:
logger.debug("[EventBus] agent_notification send failed: %s", e)
else:
logger.debug("[EventBus] agent_notification: WebSocket not connected for lanlan=%s", lanlan)
elif event_type == "task_update":
task_payload = {"type": "agent_task_update", "task": event.get("task", {})}
ws = getattr(mgr, "websocket", None)
if _is_websocket_connected(ws):
try:
await ws.send_json(task_payload)
except Exception as e:
logger.warning("[EventBus] task_update send failed for lanlan=%s: %s", lanlan, e)
else:
logger.warning("[EventBus] task_update dropped: WebSocket not connected for lanlan=%s", lanlan)
except Exception as e:
logger.debug(f"handle_agent_event error: {e}")
async def _refresh_character_globals():
"""刷新角色相关 module globals(从 config 重新拉一次 aget_character_data)。
所有 fast-path 入口都必须先走这一步,确保 set_current_catgirl / update_catgirl
等操作后,后续读 her_name / lanlan_prompt / lanlan_basic_config 的代码看到最新值。
"""
global master_name, her_name, master_basic_config, lanlan_basic_config
global name_mapping, lanlan_prompt, time_store, setting_store, recent_log
global catgirl_names
master_name, her_name, master_basic_config, lanlan_basic_config, name_mapping, lanlan_prompt, time_store, setting_store, recent_log = await _config_manager.aget_character_data()
catgirl_names = list(lanlan_prompt.keys())
def _ensure_character_slots(k: str) -> bool:
"""为单个 catgirl 预备 per-k 同步资源槽位。返回是否为新建角色(决定后续要不要强制启动线程)。
纯内存的原子操作:要么 role_state[k] 已经存在(什么都不做),要么一次性
把 queue / shutdown_event / websocket_lock 三件全部填好。避免旧代码里
6 张 dict 用两种不同 sentinel(sync_message_queue vs websocket_locks)
各自判断 "角色是否已有槽位" 造成的半初始化风险。
"""
if k not in role_state:
role_state[k] = RoleState(
sync_message_queue=Queue(),
sync_shutdown_event=ThreadEvent(),
websocket_lock=asyncio.Lock(),
)
logger.info(f"为角色 {k} 初始化新资源")
return True
return False
async def _init_character_resources(k: str, is_new_character: bool):
"""为单个 catgirl 完成 session_manager 更新 + 同步连接器线程检查/重启。
依赖 module globals: master_name, lanlan_prompt, lanlan_basic_config(调用方负责先刷新)。
写入 per-k 槽位: role_state[k].session_manager / sync_process —— 不同 k 之间
不共享状态,可安全并行。
"""
rs = role_state[k] # 调用方必须先 _ensure_character_slots,保证这里可直接索引
# 更新或创建session manager(使用最新的prompt)
# 使用锁保护websocket的preserve/restore操作,防止与cleanup()竞争
async with rs.websocket_lock:
# 如果已存在且已有websocket连接,保留websocket引用
old_websocket = None
if rs.session_manager is not None and rs.session_manager.websocket:
old_websocket = rs.session_manager.websocket
logger.info(f"保留 {k} 的现有WebSocket连接")
# 注意:不在这里清理旧session,因为:
# 1. 切换当前角色音色时,已在API层面关闭了session
# 2. 切换其他角色音色时,已跳过重新加载
# 3. 其他场景不应该影响正在使用的session
# 如果旧session_manager有活跃session,保留它,只更新配置相关的字段
# 先检查会话状态(在锁内检查避免竞态条件)
# 同时覆盖 "正在启动" 窗口:_starting_session_count>0 但 is_active=False
# 的期间,start_session 协程仍持有对当前 manager 的引用;如果此时替换
# 实例,旧 manager 会在后台完成启动并挂起 OmniRealtimeClient / TTS 线程 /
# message_handler_task,永远没人调用 end_session — 造成资源泄漏。
mgr = rs.session_manager
has_active_session = mgr is not None and mgr.is_active
has_starting_session = mgr is not None and mgr.is_starting and not mgr.is_active
if has_active_session:
# 有活跃session,不重新创建session_manager,只更新配置
# 这是为了防止重新创建session_manager时破坏正在运行的session
try:
old_mgr = rs.session_manager
# 更新prompt
old_mgr.lanlan_prompt = lanlan_prompt[k].replace('{LANLAN_NAME}', k).replace('{MASTER_NAME}', master_name)
# 直接读 module global lanlan_basic_config,避免重复 load + deepcopy
old_mgr.voice_id = get_reserved(
lanlan_basic_config[k],
'voice_id',
default='',
legacy_keys=('voice_id',),
)
logger.info(f"{k} 有活跃session,只更新配置,不重新创建session_manager")
except Exception as e:
logger.error(f"更新 {k} 的活跃session配置失败: {e}", exc_info=True)
# 配置更新失败,但为了不影响正在运行的session,继续使用旧配置
# 如果确实需要更新配置,可以考虑在下次session重启时再应用
elif has_starting_session:
# start_session 正在执行中:只保留实例避免孤儿泄漏,但绝对不热改
# lanlan_prompt / voice_id — start_session 会在 core.py 内用
# self.lanlan_prompt 拼装首帧 session prompt,并基于当前 self.voice_id
# 计算音色/TTS 分支。本轮写入会让正在进行的启动拿到半旧半新配置
# (用户侧看到启动出来的会话 prompt / 音色与最新配置不一致)。
# 本轮的新 prompt / 音色由下一次 start_session 应用。
logger.info(
f"{k} session 正在启动中(is_starting),保留现有 session_manager,"
"本轮不热更新 prompt/voice_id 以免污染 in-flight 启动"
)
else:
# 没有活跃session,可以安全地重新创建session_manager
new_mgr = core.LLMSessionManager(
rs.sync_message_queue,
k,
lanlan_prompt[k].replace('{LANLAN_NAME}', k).replace('{MASTER_NAME}', master_name)
)
# 将websocket锁存储到session manager中,供cleanup()使用
new_mgr.websocket_lock = rs.websocket_lock
# 恢复websocket引用(如果存在)
if old_websocket:
new_mgr.websocket = old_websocket
logger.info(f"已恢复 {k} 的WebSocket连接")
rs.session_manager = new_mgr
# 检查并启动同步连接器线程
# 如果是新角色,或者线程不存在/已停止,需要启动线程
need_start_thread = False
if is_new_character:
need_start_thread = True
elif rs.sync_process is None:
need_start_thread = True
elif hasattr(rs.sync_process, 'is_alive') and not await asyncio.to_thread(rs.sync_process.is_alive):
need_start_thread = True
try:
await asyncio.to_thread(rs.sync_process.join, timeout=0.1)
except Exception:
# 注意不要写成 bare except:to_thread 是 cancellation point,
# 如果 catch 了 BaseException 会吞掉 asyncio.CancelledError
pass
if need_start_thread:
try:
_char_name = k
def _make_status_cb(char_name):
def _cb(msg):
mgr = _get_session_manager(char_name)
if not mgr:
return
loop = _server_loop
if loop is None or loop.is_closed():
return
ws = mgr.websocket
if ws and hasattr(ws, 'client_state') and ws.client_state == ws.client_state.CONNECTED:
import json as _json
data = _json.dumps({"type": "status", "message": msg})
asyncio.run_coroutine_threadsafe(ws.send_text(data), loop)
return _cb
_status_cb = _make_status_cb(_char_name)
new_thread = Thread(
target=cross_server.sync_connector_process,
args=(rs.sync_message_queue, rs.sync_shutdown_event, k, f"ws://127.0.0.1:{MONITOR_SERVER_PORT}", {'bullet': False, 'monitor': True}, _status_cb),
daemon=True,
name=f"SyncConnector-{k}"
)
rs.sync_process = new_thread
new_thread.start()
logger.info(f"✅ 已为角色 {k} 启动同步连接器线程 ({new_thread.name})")
await asyncio.sleep(0.1) # 线程启动更快,减少等待时间
# 与上面 is_alive 检查保持一致,走 to_thread 避免任何潜在阻塞
if not await asyncio.to_thread(new_thread.is_alive):
logger.error(f"❌ 同步连接器线程 {k} ({new_thread.name}) 启动后立即退出!")
else:
logger.info(f"✅ 同步连接器线程 {k} ({new_thread.name}) 正在运行")
except Exception as e:
logger.error(f"❌ 启动角色 {k} 的同步连接器线程失败: {e}", exc_info=True)
async def _stop_character_thread(k: str):
"""停止单个 catgirl 的同步连接器线程(最多 3s join)。dict 清理留给调用方顺序做。"""
rs = role_state.get(k)
if rs is None or rs.sync_process is None:
return
try:
logger.info(f"正在停止角色 {k} 的同步连接器线程...")
rs.sync_shutdown_event.set()
await asyncio.to_thread(rs.sync_process.join, timeout=3) # 等待线程正常结束
if await asyncio.to_thread(rs.sync_process.is_alive):
logger.warning(f"⚠️ 同步连接器线程 {k} 未能在超时内停止,将作为daemon线程自动清理")
else:
logger.info(f"✅ 已停止角色 {k} 的同步连接器线程")
except Exception as e:
logger.warning(f"停止角色 {k} 的同步连接器线程时出错: {e}")
def _cleanup_character_dicts(k: str):
"""同步清理单个 catgirl 的 per-k 槽位。调用前确保对应线程已停或超时。"""
rs = role_state.get(k)
if rs is None:
return
# 清理队列(queue.Queue 没有 close/join_thread 方法)
try:
while not rs.sync_message_queue.empty():
rs.sync_message_queue.get_nowait()
except QueueEmpty:
# while empty + get_nowait 本身是 racy idiom:另一线程可能先 drain 掉,
# 导致 get_nowait 抛 Empty。这里 role_state[k] 即将被 del 掉,忽略无害。
pass
# 一次 del 原子清掉所有 6 个字段 —— 替代旧代码里 6 张 dict 分别 del 的对称清理
del role_state[k]
async def initialize_character_data():
"""全量刷新:加载 config + 对所有 catgirl 做 per-k init + 清理已删除的。
冷路径(启动 / 主人名编辑 / 大规模批量导入)。per-catgirl 编辑请走
init_one_catgirl / remove_one_catgirl / switch_current_catgirl_fast 这些 fast path。
"""
logger.info("正在加载角色配置...")
# 清理无效的voice_id引用;如果发现旧版 CosyVoice 音色,推入通知缓冲池等前端连接后弹出
# cleanup_invalid_voice_ids 内部涉及同步 IO(load/save characters),offload 以免阻塞事件循环
_cleaned, _legacy_names = await asyncio.to_thread(_config_manager.cleanup_invalid_voice_ids)
if _legacy_names:
core.enqueue_voice_migration_notice(_legacy_names)
# 加载最新的角色数据(offload,避免同步 IO + deepcopy 阻塞事件循环)
await _refresh_character_globals()
# 为所有 catgirl 预备 per-k 同步资源槽位
is_new_map: dict[str, bool] = {k: _ensure_character_slots(k) for k in catgirl_names}
# 每个角色的初始化相互独立(只读共享 prompt / master_name,写各自的 session_manager[k] 等 per-key 槽位)。
# 用 gather 并行,消除 O(N) × (thread roundtrip + 0.1s sleep) 的串行墙钟。
# return_exceptions=True:某个角色初始化失败不应导致其它角色被取消。
_init_results = await asyncio.gather(
*[_init_character_resources(k, is_new_map[k]) for k in catgirl_names],
return_exceptions=True,
)
for k, res in zip(catgirl_names, _init_results):
if isinstance(res, BaseException):
logger.error(f"❌ 初始化角色 {k} 失败: {res}", exc_info=res)
# 清理已删除角色的资源
removed_names = [k for k in role_state.keys() if k not in catgirl_names]
# N 个 join(timeout=3) 串行最坏要 3N 秒;并行化后墙钟 ≈ 3 秒。
if removed_names:
await asyncio.gather(
*[_stop_character_thread(k) for k in removed_names],
return_exceptions=True,
)
# 线程都已停/超时,再在事件循环里顺序清理 dict —— 这些操作都是纯内存,不需要并行。
for k in removed_names:
logger.info(f"清理已删除角色 {k} 的资源")
_cleanup_character_dicts(k)
logger.info(f"角色配置加载完成,当前角色: {catgirl_names},主人: {master_name}")
# ─────────────────────────────────────────────────────────────
# Fast-path helpers — 只处理受影响的单个 catgirl,避免全量遍历
# ─────────────────────────────────────────────────────────────
async def switch_current_catgirl_fast():
"""当前猫娘切换(`当前猫娘` 字段变更)专用 fast path。
关键前提:切换只影响 `her_name` 这一个 global,per-k 的 prompt / voice_id / thread
状态完全不变。所以这里**只刷 globals**,不做任何 per-k 工作。
墙钟:一次 aget_character_data(~数 ms)即全部。
"""
await _refresh_character_globals()
logger.info(f"[fast-switch] 已刷新 globals,当前猫娘: {her_name}")
async def init_one_catgirl(name: str, *, is_new: bool = False):
"""新增 / 编辑单个 catgirl 的 fast path。
- is_new=True:新增,强制启动同步连接器线程
- is_new=False:编辑(prompt / voice_id 等)—— 只刷新 session_manager 的 prompt/voice_id,
不会重启线程
"""
await _refresh_character_globals()
if name not in lanlan_prompt:
logger.warning(f"[init-one] '{name}' 不在 config 中,跳过(可能是并发删除)")
return
slot_new = _ensure_character_slots(name)
await _init_character_resources(name, is_new_character=is_new or slot_new)
async def remove_one_catgirl(name: str):
"""删除单个 catgirl 的 fast path:停该角色的线程 + 清 dict + 刷 globals。"""
await _stop_character_thread(name)
_cleanup_character_dicts(name)
# config 文件已由调用方写入,这里刷新 globals 让 catgirl_names 等反映删除
await _refresh_character_globals()
logger.info(f"[fast-remove] 已移除角色 {name}")
# 注:不再在模块级别执行 initialize_character_data()——cloud_archive 要求先做
# bootstrap_local_cloudsave_environment + import_if_needed,才能在 startup hook 里
# 安全地初始化角色数据。见 on_startup 里的调用顺序。
lock = asyncio.Lock()
# --- FastAPI App Setup ---
app = FastAPI()
@app.exception_handler(MaintenanceModeError)
async def handle_maintenance_mode_error(_request, exc: MaintenanceModeError):
from fastapi.responses import JSONResponse
return JSONResponse(status_code=409, content=maintenance_error_payload(exc))
class CustomStaticFiles(StaticFiles):
async def get_response(self, path, scope):
response = await super().get_response(path, scope)
if path.endswith('.js'):
response.headers['Content-Type'] = 'application/javascript'
return response
# 确定 static 目录位置(使用 _get_app_root)
static_dir = os.path.join(_get_app_root(), 'static')
app.mount("/static", CustomStaticFiles(directory=static_dir), name="static")
# 挂载用户文档下的live2d目录(只在主进程中执行,子进程不提供HTTP服务)
if _IS_MAIN_PROCESS:
_config_manager.ensure_live2d_directory()
_config_manager.ensure_vrm_directory()
_config_manager.ensure_mmd_directory()
_config_manager.ensure_chara_directory()
# CFA (反勒索防护) 感知挂载:
# 优先从原始 Documents 目录(可读)提供模型文件,
# 可写回退目录(AppData)作为辅助挂载供新导入的模型使用
_readable_live2d = _config_manager.readable_live2d_dir
_serve_live2d_path = str(_readable_live2d) if _readable_live2d else str(_config_manager.live2d_dir)
if os.path.exists(_serve_live2d_path):
app.mount("/user_live2d", CustomStaticFiles(directory=_serve_live2d_path), name="user_live2d")
logger.info(f"已挂载用户Live2D目录: {_serve_live2d_path}")
# CFA 场景:可写回退目录额外挂载,供新导入的模型使用
if _readable_live2d and str(_config_manager.live2d_dir) != _serve_live2d_path:
_writable_live2d_path = str(_config_manager.live2d_dir)
if os.path.exists(_writable_live2d_path):
app.mount("/user_live2d_local", CustomStaticFiles(directory=_writable_live2d_path), name="user_live2d_local")
logger.info(f"已挂载本地Live2D目录(CFA回退): {_writable_live2d_path}")
# 挂载VRM动画目录(static/vrm/animation) 必须第一个挂载
vrm_animation_path = str(_config_manager.vrm_animation_dir)
if os.path.exists(vrm_animation_path):
app.mount("/user_vrm/animation", CustomStaticFiles(directory=vrm_animation_path), name="user_vrm_animation")
logger.info(f"已挂载VRM动画目录: {vrm_animation_path}")
# 挂载VRM模型目录(用户文档目录)
user_vrm_path = str(_config_manager.vrm_dir)
if os.path.exists(user_vrm_path):
app.mount("/user_vrm", CustomStaticFiles(directory=user_vrm_path), name="user_vrm")
logger.info(f"已挂载VRM目录: {user_vrm_path}")
# 挂载项目目录下的static/vrm(作为备用,如果文件在项目目录中)
project_vrm_path = os.path.join(static_dir, 'vrm')
if os.path.exists(project_vrm_path) and os.path.isdir(project_vrm_path):
logger.info(f"项目VRM目录存在: {project_vrm_path} (可通过 /static/vrm/ 访问)")
# 挂载MMD动画目录(必须在MMD模型目录之前挂载)
mmd_animation_path = str(_config_manager.mmd_animation_dir)
if os.path.exists(mmd_animation_path):
app.mount("/user_mmd/animation", CustomStaticFiles(directory=mmd_animation_path), name="user_mmd_animation")
logger.info(f"已挂载MMD动画目录: {mmd_animation_path}")
# 挂载MMD模型目录(用户文档目录)
user_mmd_path = str(_config_manager.mmd_dir)
if os.path.exists(user_mmd_path):
app.mount("/user_mmd", CustomStaticFiles(directory=user_mmd_path), name="user_mmd")
logger.info(f"已挂载MMD目录: {user_mmd_path}")
# 挂载项目目录下的static/mmd(作为备用)
project_mmd_path = os.path.join(static_dir, 'mmd')
if os.path.exists(project_mmd_path) and os.path.isdir(project_mmd_path):
logger.info(f"项目MMD目录存在: {project_mmd_path} (可通过 /static/mmd/ 访问)")
# 挂载用户mod路径
user_mod_path = _config_manager.get_workshop_path()
if os.path.exists(user_mod_path) and os.path.isdir(user_mod_path):
app.mount("/user_mods", CustomStaticFiles(directory=user_mod_path), name="user_mods")
logger.info(f"已挂载用户mod路径: {user_mod_path}")
# --- 初始化共享状态并挂载路由 ---
# 显式从各子模块导入 router,避免与包级模块导出产生同名遮蔽。