核心机制
CoPaw 最值得逆向研究的5个核心设计模式,每一个都对灵工 SaaS 场景有直接的工程价值。
身份记忆 PROFILE
优先级:★★★
源码位置:src/copaw/agents/prompt.py、src/copaw/agents/memory/agent_md_manager.py、src/copaw/agents/hooks/bootstrap.py
关键机制:PromptBuilder 每次对话前重建 system prompt,修改文件立即生效,无缓存
设计哲学:「越用越懂你」
不同于传统 AI 每次对话都重置,CoPaw 通过三层 Markdown 文件构建持久化的 Agent 身份记忆,且全部在对话开始前动态读取,无任何缓存。
| 文件 | 职责 | 维护方式 | 对 Prompt 的影响 |
AGENTS.md | 工作流规则、业务指引、能力边界定义 含 <!-- heartbeat:start/end --> 段 | 人工配置,热更新 | 注入规则约束,决定 AI 如何行动 |
SOUL.md | 核心身份、行为原则、价值观、语气风格 | 人工配置,通常固定 | 注入「人格」,决定 AI 怎么说话 |
PROFILE.md | 用户画像、偏好、历史交互学习到的信息 | AI 自动维护,每轮对话后判断是否更新 | 注入用户上下文,决定 AI 怎么理解用户 |
PromptBuilder 完整实现(prompt.py 源码精读)
class PromptBuilder:
HEARTBEAT_PATTERN = re.compile(
r"<!-- heartbeat:start -->.*?<!-- heartbeat:end -->",
re.DOTALL,
)
def _load_file(self, filename: str) -> None:
file_path = self.working_dir / filename
if not file_path.exists():
return # ★ 所有文件都是可选的,不存在就跳过,不报错
content = file_path.read_text(encoding="utf-8").strip()
# ① 自动剥离 YAML frontmatter(--- 开头的元数据块)
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
content = parts[2].strip()
# ② AGENTS.md 特殊处理:heartbeat 段根据配置开关
if filename == "AGENTS.md":
if not self.heartbeat_enabled:
content = self.HEARTBEAT_PATTERN.sub("", content) # 移除整段
else:
content = content.replace("<!-- heartbeat:start -->", "")
content = content.replace("<!-- heartbeat:end -->", "")
if content:
self.prompt_parts.append(f"# {filename}") # 添加节标题
self.prompt_parts.append(content)
def build(self) -> str:
files_to_load = self.enabled_files or PromptConfig.DEFAULT_FILES
# DEFAULT_FILES = ["AGENTS.md", "SOUL.md", "PROFILE.md"]
for filename in files_to_load:
self._load_file(filename) # 按序加载,全部可选
if not self.prompt_parts:
return DEFAULT_SYS_PROMPT # 降级兜底
return "
".join(self.prompt_parts)
def build_system_prompt_from_working_dir(
working_dir, enabled_files=None,
agent_id=None, heartbeat_enabled=False
) -> str:
# 按 agent_id 加载独立配置(v0.1.0 多 Agent 支持)
if agent_id:
agent_config = load_agent_config(agent_id)
enabled_files = agent_config.system_prompt_files
builder = PromptBuilder(working_dir, enabled_files, heartbeat_enabled)
prompt = builder.build()
# ③ v0.1.0 新增:Agent Identity 前置注入
if agent_id:
identity = (
f"# Agent Identity
"
f"Your agent id is `{agent_id}`. "
f"This is your unique identifier in the multi-agent system.
"
)
prompt = identity + prompt
return prompt
关键设计洞察:_load_file() 每次调用都从磁盘重新读取(无缓存)。这意味着修改 AGENTS.md 后,下一条对话消息就立即生效,不需要重启任何服务——这是整个系统中对业务人员最友好的设计。
系统提示词构建顺序(完整时序)
rebuild_sys_prompt() 执行时序:
① Agent Identity(agent_id 唯一标识,v0.1.0 新增)
"Your agent id is `enterprise_A`. This is your unique identifier..."
② # AGENTS.md
[文件内容,去除 frontmatter]
[heartbeat 段根据 config.heartbeat.enabled 保留或删除]
③ # SOUL.md
[文件内容,去除 frontmatter]
④ # PROFILE.md
[文件内容,去除 frontmatter]
(不存在时跳过,不影响其他文件)
⑤ env_context(可选,通过 CoPawAgent(env_context=...) 传入)
↑ 这里注入租户权限上下文!
"你只能查询 tenant_id=enterprise_A 的数据。"
最终合并:"
".join([① ② ③ ④ ⑤])
灵工场景映射:在 env_context 中注入 "你只能查询 tenant_id={enterprise_id} 的数据,不得跨租户访问。" 即可实现 Prompt 层的软约束——这是最低侵入、最快落地的隔离手段,与 MCP 层的硬隔离互为补充。
BootstrapHook — 首次初始化引导
当工作区存在 BOOTSTRAP.md 时,BootstrapHook 在首次用户交互时触发,引导用户完成 Agent 初始设置:
class BootstrapHook:
async def __call__(self, agent, kwargs):
bootstrap_path = self.working_dir / "BOOTSTRAP.md"
completed_flag = self.working_dir / ".bootstrap_completed"
if completed_flag.exists(): # 防止重复触发
return None
if not bootstrap_path.exists():
return None
messages = await agent.memory.get_memory()
if not is_first_user_interaction(messages): # 只对第一条消息触发
return None
# 在第一条 user 消息前注入引导文本(中/英/俄 三语)
guidance = build_bootstrap_guidance(self.language)
for msg in messages[system_count:]:
if msg.role == "user":
prepend_to_message_content(msg, guidance)
break
# 创建完成标记文件,防止下次再触发
completed_flag.touch()
return None
引导文本(中文版)包含:1. 阅读 BOOTSTRAP.md → 2. 按指示帮用户定义 AI 身份 → 3. 创建/更新 PROFILE.md/AGENTS.md 等文件 → 4. 完成后自动删除 BOOTSTRAP.md。
灵工场景映射:企业首次接入时,BOOTSTRAP.md 可引导管理员完成 AGENTS.md(业务权限声明)、SOUL.md(服务风格)配置,零开发介入,运营人员自助完成初始化。
PROFILE.md 自动更新机制
# post_reasoning hook — 对话结束后判断是否更新 PROFILE.md
async def post_reasoning_hook(agent, response):
# 1. 追加写入 Session History(每轮对话必做)
await memory_manager.append_session(response)
# 2. Agent 主动写入 PROFILE.md(通过 write_file 工具)
# 当 AI 在对话中发现用户新偏好时,可使用 write_file 工具更新 PROFILE.md
# 下次对话 rebuild_sys_prompt() 时自动读取最新内容
PROFILE.md 是「越用越懂你」的实现载体。AI 可以通过 write_file 工具主动更新用户偏好,形成学习闭环。对于多用户场景,需要按 user_id 隔离:profiles/{user_id}/PROFILE.md。
灵工场景改造建议
| 改造点 | 现状 | 建议做法 |
| 租户权限注入 | 无租户概念 | 通过 env_context 参数在每次对话时动态注入:"你只能查询 tenant_id={id} 的数据" |
| PROFILE.md 多用户隔离 | 单文件,全局共享 | 按 user_id 独立路径存储:profiles/{user_id}_PROFILE.md |
| system_prompt_files 角色化 | 固定三文件 | 按角色配置不同文件列表(HR vs 财务 vs 管理员 加载不同 AGENTS.md) |
| BOOTSTRAP.md 企业定制 | 通用模板 | 为每个企业定制 BOOTSTRAP.md,包含其业务系统的权限说明和操作规范 |
Skill 触发机制
优先级:★★★
源码位置:src/copaw/agents/skills_manager.py、src/copaw/agents/skills_hub.py、src/copaw/security/skill_scanner/
关键机制:三层目录 + SKILL.md frontmatter description 字段是 LLM 触发决策的唯一依据
| 对比维度 | Skill(软能力) | MCP Tool(硬工具) |
| 定义方式 | SKILL.md Markdown 文件 | MCP Server 函数(Python/Node/任意语言) |
| 注入方式 | 注入 system prompt(作为 <skill> 标签) | 注册为 LLM 的 tools 参数(显式工具列表) |
| 触发方式 | LLM 读到 description 后自主决策调用 | LLM 返回 tool_use block 显式调用 |
| 扩展成本 | 写 Markdown 文件,零代码 | 需要实现 MCP Server 接口 |
| 适合场景 | 复杂工作流、操作规范、知识参考 | 数据查询、API 调用、确定性执行 |
| 执行主体 | LLM 根据描述自主编排 | 框架调用后返回结果给 LLM |
| 灵工场景 | 数据查询规范、诊断流程、报表格式 | 查订单 API、查结算 API、文件生成 |
三层目录架构(v0.1.0 重大升级)
# 目录优先级:customized > builtin(同名时 customized 覆盖 builtin)
# Agent 运行时只读取:active_skills/(builtin + customized 的合并结果)
builtin_skills/ ← 代码包内,只读,12种内置技能
├── cron/
│ └── SKILL.md ← metadata.builtin_skill_version = "1.0"
├── docx/
│ ├── SKILL.md
│ └── scripts/ ← Python/JS 执行脚本
└── ... (共 12 种)
workspace_dir/
├── customized_skills/ ← 用户自定义(永不被 builtin 覆盖)
│ └── linggong_query/
│ └── SKILL.md
└── active_skills/ ← 运行时激活(Agent 实际加载此目录)
├── cron/SKILL.md ← 从 builtin 同步(enable_skill())
└── linggong_query/SKILL.md ← 从 customized 同步
# 生命周期(SkillService 接口):
create_skill() → 写入 customized_skills/
enable_skill() → 同步到 active_skills/(含 SkillScanner 安全扫描)
disable_skill() → 从 active_skills/ 移除
list_available_skills() → 读取 active_skills/ 目录列表
SKILL.md 完整格式(触发机制核心)
---
name: linggong_data_query # ★ 技能标识(唯一,同目录名)
description: | # ★★★ 关键字段:LLM 根据此决定何时调用
当用户询问灵工订单、结算数据、用工记录时使用本技能。
触发词:查订单、查结算、查工人信息、数据分析、统计报表
metadata:
builtin_skill_version: "1.0" # 版本号(内置技能升级比对用)
copaw:
emoji: "📊"
requires:
bins: [] # 依赖的外部二进制(无则为空)
install: [] # 安装方式声明(可选)
---
# 灵工数据查询规范
当用户发起数据查询时,按以下步骤执行:
## 第一步:确认查询范围
- 确认时间范围(如:本周/本月/自定义日期)
- 确认查询类型(订单/结算/用工记录/统计报表)
## 第二步:调用数据查询工具
使用 `query_worker_orders` MCP 工具,参数格式:
- date_from: "YYYY-MM-DD"
- date_to: "YYYY-MM-DD"
- status: "all" | "pending" | "completed"
## 第三步:结果展示
- 查询结果用 Markdown 表格展示
- 金额字段保留两位小数,加"元"单位
- 超过 20 条时自动分页,告知用户总数
Skill 注册与触发完整流程
# ── 1. 初始化时注册(CoPawAgent 构建时)────────────────────────────
for skill_name in list_available_skills(workspace_dir):
skill_dir = active_skills / skill_name
toolkit.register_agent_skill(str(skill_dir))
# → 读取 SKILL.md,提取 description
# → 将 Skill 内容注入 system prompt 的 <skill> 标签
# ── 2. system prompt 中的 Skill 格式 ──────────────────────────────
# 注入后的 system prompt 含有如下段落:
<skill name="linggong_data_query">
当用户询问灵工订单、结算数据... (SKILL.md 正文内容)
</skill>
# ── 3. LLM 推理时(_reasoning 阶段)─────────────────────────────
# LLM 读取 system prompt 中所有 <skill> 标签
# 根据用户意图匹配 description 最合适的 Skill
# 按 Skill 中定义的步骤自主编排工具调用序列
# ── 4. namesake_strategy(同名 Skill 处理策略)────────────────────
toolkit.register_agent_skill(
str(skill_dir),
namesake_strategy="skip" # skip/override/raise/rename
)
SkillScanner 安全扫描(激活前自动触发)
# enable_skill() 内部流程:
def enable_skill(self, name: str, force: bool = False) -> bool:
skill_dir = customized_skills / name
# ① SkillScanner 扫描(mode="block" 时扫描失败则拒绝激活)
scanner = SkillScanner()
result = scanner.scan(skill_dir)
if not result.is_safe and not force:
raise SkillScannerError(f"Skill scan failed: {result.max_severity}")
# ② 安全通过 → 同步到 active_skills/
shutil.copytree(skill_dir, active_skills / name)
return True
# SkillScanner 检测 17 种威胁(PatternAnalyzer):
# PROMPT_INJECTION / COMMAND_INJECTION / HARDCODED_SECRETS
# DATA_EXFILTRATION / SUPPLY_CHAIN_ATTACK / UNICODE_STEGANOGRAPHY
# ...(见数据结构章节完整列表)
Skills Hub 在线安装(v0.1.0 新增)
# install_skill_from_hub(url) 完整流程:
URL
→ _resolve_bundle_from_url() # 解析来源(clawhub/github/lobehub/modelscope...)
→ bundle(SKILL.md + files)
→ _normalize_bundle() # 校验结构、symlink 防护、路径遍历防护
→ SkillService.create_skill() # 写入 customized_skills/
→ SkillScanner.scan() # PatternAnalyzer 17 种威胁检测
→ enable_skill() # 同步到 active_skills/
→ HubInstallResult(name, enabled, source_url)
# 支持 7 种来源:
clawhub.ai / github.com / skills.sh / lobehub.com
modelscope.cn / skillsmp.com / 本地 ZIP(最大 200MB)
灵工场景映射:将灵工业务查询规范、诊断流程、报表规范封装为私有 Skill,通过 ZIP 上传方式部署到各企业工作区;按企业差异化配置 active_skills/,实现能力定制化。
优先级:★★★
源码位置:src/copaw/agents/tool_guard_mixin.py、src/copaw/security/tool_guard/engine.py、src/copaw/agents/react_agent.py
关键机制:ToolGuardMixin 通过 MRO 透明注入,5步决策链(denied → guarded → always_run → approval → execute)
工具注册全流程(Agent 初始化时)
# CoPawAgent._create_toolkit() — 按序注册三类工具
# ① 内置工具(13种,按 Agent config 独立开关)
for tool_name, tool_func in BUILTIN_TOOL_FUNCTIONS.items():
if agent_config.tools.builtin_tools[tool_name].enabled:
toolkit.register_tool_function(
tool_func,
namesake_strategy="skip"
)
# ② Skill 工具(从 active_skills/ 动态加载)
for skill_name in list_available_skills(workspace_dir):
toolkit.register_agent_skill(str(active_skills / skill_name))
# ③ MCP 工具(异步注册,对话开始前)
await agent.register_mcp_clients(namesake_strategy="skip")
for client in mcp_clients:
await toolkit.register_mcp_client(client)
# ④ memory_search 工具(可选,仅启用 MemoryManager 时)
if memory_manager:
toolkit.register_tool_function(
create_memory_search_tool(memory_manager)
)
MRO 链:CoPawAgent → ToolGuardMixin → ReActAgent
_acting(tool_call) 覆写拦截点,每次工具调用都经过以下决策:
1
denied_tools 检查(硬拒绝)
engine.is_denied(tool_name) → True
→ 立即返回 ⛔ 拒绝消息,不进入任何审批流
→ 附加 tool_guard_denied mark 到 memory,用于后续 cleanup 精确回滚
2
guarded_tools 范围检查
engine.is_guarded(tool_name) → True(guarded_tools=null 时所有工具都在范围)
→ 检查 one-shot pre-approval(已审批则跳过 Guardian 直接执行)
→ 否则运行所有 Guardian(FilePathGuardian + RuleBasedGuardian)
3
非守卫工具:always_run Guardian
工具不在 guarded 范围时,仍运行标记了 always_run=True 的 Guardian
→ FilePathGuardian 始终运行(路径遍历防护,防止 AI 访问其他企业目录)
4
Findings 处理 → 进入审批流
Guardian 返回 severity=CRITICAL/HIGH 的 GuardFinding → _GuardAction("approval_needed")
→ 有 session_id:创建 pending approval,Agent 挂起等待用户 /approve
→ 无 session_id(CLI/Cron):直接 DENY
→ 超时(默认 600s)自动拒绝
5
APPROVE → 执行工具
无 findings 或 pre-approved → 调用 super()._acting(tool_call)
→ 执行实际工具函数(Built-in / Skill / MCP)
→ 结果追加到 memory,LLM 继续推理
# engine.py — guard() 核心方法
def guard(self, tool_name, params, *, only_always_run=False):
if not self._enabled:
return None
result = ToolGuardResult(tool_name=tool_name, params=params)
# 按 only_always_run 过滤 Guardian 列表
guardians = (
[g for g in self._guardians if g.always_run]
if only_always_run else self._guardians
)
for guardian in guardians:
try:
findings = guardian.guard(tool_name, params)
result.findings.extend(findings)
result.guardians_used.append(guardian.name)
except Exception as exc:
result.guardians_failed.append({"name": guardian.name, "error": str(exc)})
result.guard_duration_seconds = time.monotonic() - t0
return result
# tool_guard_mixin.py — _acting() 决策(简化)
async def _acting(self, tool_call):
self._ensure_tool_guard()
async with self._tool_guard_lock: # ← 串行化,防并发竞态
action = await self._decide_guard_action(tool_call)
if action:
return await self._execute_guard_action(action, tool_call)
return await super()._acting(tool_call) # ← 无拦截,正常执行
斜杠命令系统(/compact /new /clear 等)
# CommandHandler.SYSTEM_COMMANDS(v0.1.0 完整命令集)
{
"compact", # 手动触发记忆压缩(同等于 MemoryCompactionHook 自动触发)
"new", # 新建会话(保留记忆摘要,清空对话历史)
"clear", # 清空当前会话(含压缩摘要一起清除)
"history", # 查看对话历史摘要(ReMeLight 生成)
"compact_str", # 显示当前压缩摘要内容(调试用)
"await_summary", # 等待异步摘要任务完成
"message", # 内部发送消息
"dump_history", # 导出历史到 debug_history.jsonl(调试用)
"load_history", # 从文件加载历史(调试/还原用)
}
# 命令路由(CoPawAgent.reply() 短路处理)
if self.command_handler.is_command(query):
msg = await self.command_handler.handle_command(query)
await self.print(msg)
return msg # ← 短路!不进入 ReAct 循环,零 Token 消耗
设计精髓:系统命令通过短路机制绕过 ReAct 循环,不消耗任何 LLM Token,响应速度极快。灵工平台可以利用这个机制实现「零成本快速指令」,如 /report 直接触发 Cron 任务生成报告,完全不走 LLM 推理。
灵工场景改造建议
| 场景 | 工具类型 | ToolGuard 配置 |
| 数据查询工具(只读) | MCP Tool | 加入 guarded_tools,配置只读规则,自动 APPROVE |
| 写操作工具(修改合同/发起付款) | MCP Tool | 加入 guarded_tools,配置 TIMEOUT 审批,需用户 /approve |
| 危险工具(shell/browser) | Built-in | 加入 denied_tools,永远拒绝,或 enabled: false |
| 跨企业目录访问 | any | FilePathGuardian always_run=True 自动拦截 |
MCP 协议规范
优先级:★★★
源码位置:src/copaw/app/mcp/manager.py、src/copaw/agents/react_agent.py、src/copaw/config/config.py
关键机制:四种 transport + 热重载 + v0.1.0 自动重连 + rebuild_info 重建 + 异步锁原子替换
MCP 在 CoPaw 中的定位
MCP(Model Context Protocol)是 CoPaw 与外部系统交互的标准化接口协议。所有业务逻辑(查订单、查结算、调用业务 API)都应通过 MCP Server 暴露,Agent 通过 JSON-RPC 调用。
# 核心理念:LLM 不直接访问数据库
用户问:「查一下本月结算总额」
↓
LLM 决策:调用 tool "get_settlement_summary"
↓
MCP Client → JSON-RPC → MCP Server(灵工业务侧)
↓
MCP Server 从 Header 获取 tenant_id,强制注入查询条件
↓
SELECT SUM(amount) FROM settlements WHERE tenant_id=? AND month=?
↓
返回结果 → tool_result → LLM 生成自然语言回复
四种 Transport 对比
| Transport | 适用场景 | 连接方式 | 优缺点 |
stdio | 本地进程 MCP Server | 子进程 stdin/stdout JSON-RPC | ✅ 低延迟,无网络开销 ❌ 只能本地,不适合云部署 |
sse | 远程 HTTP 长连接 | Server-Sent Events 流式 | ✅ 支持远程,可流式推送 ✅ 推荐用于灵工 SaaS 云服务 |
streamable_http | 云端无状态 MCP 服务 | HTTP 请求 + 流式响应 | ✅ 无状态,易 K8s 扩展 ✅ 最适合高并发场景 |
(MCPConfigWatcher) | 配置热重载 | 监听 mcp.json 文件变更 | ✅ 改配置无需重启,即时生效 |
MCP 热重载(MCPClientManager)—— 原子替换
# MCPClientManager.replace_client() — 零停机替换(connect new (outside lock) → swap inside lock)
async def replace_client(self, key, client_config, timeout=60.0):
# ① 锁外创建并连接新 client(可能耗时,不阻塞其他请求)
new_client = self._build_client(client_config)
await asyncio.wait_for(new_client.connect(), timeout=timeout)
# ② 锁内原子替换(最小化锁持有时间)
async with self._lock:
old_client = self._clients.get(key)
self._clients[key] = new_client
if old_client:
await old_client.close() # 优雅关闭旧连接
# get_clients() 每次调用都加锁刷新(保证拿到最新 client 列表)
async def get_clients(self) -> list:
async with self._lock:
return [c for c in self._clients.values() if c is not None]
v0.1.0 MCP 自动重连机制
# react_agent.py — _recover_mcp_client() 两阶段恢复
async def _recover_mcp_client(self, client):
# 阶段1:尝试直接重连
if await self._reconnect_mcp_client(client):
return client
# 阶段2:重连失败 → 从 _copaw_rebuild_info 重建 client
rebuilt = self._rebuild_mcp_client(client)
if rebuilt and await self._reconnect_mcp_client(rebuilt):
# 复用原有 client 引用(保持 manager 内部状态稳定)
return self._reuse_shared_client_reference(client, rebuilt)
return None # 恢复失败,跳过此 client(不影响其他 client)
# _rebuild_mcp_client() — 从 rebuild_info 完整重建
def _rebuild_mcp_client(self, client):
info = getattr(client, "_copaw_rebuild_info", None)
if not info: return None
if info["transport"] == "stdio":
return StdIOStatefulClient(
name=info["name"], command=info["command"],
args=info["args"], env=info["env"],
)
else: # sse / streamable_http
# ★ 环境变量展开(${ENTERPRISE_ID} 在重建时重新展开)
headers = {k: os.path.expandvars(v) for k, v in info["headers"].items()}
return HttpStatefulClient(
name=info["name"], transport=info["transport"],
url=info["url"], headers=headers,
)
区分 MCP 内部 Cancel 和 Task 真实 Cancel
@staticmethod
def _should_propagate_cancelled_error(error) -> bool:
"""精确判断 CancelledError 是否需要传播
MCP 内部超时产生的 CancelledError → 应吞掉(不传播)
用户主动取消任务产生的 CancelledError → 必须传播
"""
if not isinstance(error, asyncio.CancelledError):
return False
task = asyncio.current_task()
if task is None: return False
# Python 3.11+:cancelling() > 0 表示任务被外部真正取消
cancelling = getattr(task, "cancelling", None)
if callable(cancelling):
return cancelling() > 0
# Python < 3.11:保守策略,总是传播
return True
设计精髓:MCP 内部的 CancelledError(如工具调用超时)应被吞掉继续运行,但用户主动取消任务的 CancelledError 必须传播,否则会造成任务假死。这个细节在生产环境中非常重要。
灵工 MCP Server 设计规范
# 推荐架构:灵工业务 MCP Server(Python FastMCP)
from mcp.server.fastmcp import FastMCP
app = FastMCP("linggong-business")
# ★ 安全原则:tenant_id 从 Header 获取,不允许 LLM 传参
def get_tenant_id() -> str:
return current_request_context.headers["X-Enterprise-Id"]
@app.tool()
async def query_worker_orders(
date_from: str, # ✅ LLM 传参:查询条件
date_to: str,
status: str = "all",
# ❌ 绝不允许:tenant_id: str ← 不能作为参数!
) -> str:
"""查询本企业用工订单列表"""
tenant_id = get_tenant_id() # ← 服务端强制获取,不可被 LLM 覆盖
rows = await db.execute(
"SELECT * FROM orders WHERE tenant_id=? AND date BETWEEN ? AND ? AND status=?",
[tenant_id, date_from, date_to, status]
)
return format_markdown_table(rows)
# mcp.json 配置:
# headers: {"X-Enterprise-Id": "${ENTERPRISE_ID}"} ← 环境变量展开
⚠️ 安全红线:MCP Tool 的 tenant_id(或 enterprise_id)参数必须从服务端 Header/Context 获取,绝不能作为 LLM 可传入的函数参数暴露。否则 LLM 可能被 Prompt Injection 攻击诱导查询其他企业数据。
会话上下文管理
优先级:★★★
源码位置:src/copaw/agents/react_agent.py、src/copaw/agents/memory/memory_manager.py、src/copaw/agents/hooks/memory_compaction.py
关键数字:memory_compact_ratio=0.75(默认,可配置 0.30–0.90)、max_input_length=131072(128K,按模型设置)
完整 15 步处理时序
四阶段:① 消息路由 → ② 记忆准备 → ③ LLM推理+工具调用 → ④ 写回+推送
① 消息路由
1外部渠道(钉钉/飞书/API)→ ChannelMessageConverter.build_agent_request_from_native() → AgentRequest
2AgentRunner.query_handler() — approval 检查:有 pending approval → 进审批恢复流,其余消息继续
3命令检测:/compact /new /clear /history … → 短路返回(不消耗 LLM Token)
② 记忆准备
4SafeJSONSession.load_session_state() — 从 sessions/{uid}_{sid}.json 恢复 InMemoryMemory
5CoPawAgent.rebuild_sys_prompt() — 读 AGENTS.md / SOUL.md / PROFILE.md + env_context,重建 system prompt
6BootstrapHook(pre_reasoning)— 首次对话时注入引导文本,后续自动跳过
7MemoryCompactionHook(pre_reasoning)— Token 计数,超阈值触发记忆压缩(见下方详解)
③ LLM推理 + 工具调用(ReAct Loop,最多 max_iters=50 轮)
8CoPawAgent._reasoning() — 调用 LLM,返回 text + tool_use blocks(_in_summarizing 期间过滤幽灵 tool_use)
9ToolGuardMixin._acting() — 安全拦截(denied → 拒绝 / guarded → Guardian → 审批 / 放行 → 执行)
10工具结果追加 memory(tool_result block);媒体块降级:_strip_media_blocks_from_memory() 自动重试
11LLM 返回纯文本(任务完成)→ 退出循环;或达到 max_iters=50 → 强制退出
④ 写回 + 推送
12post Hook:判断是否更新 PROFILE.md(AI 从对话中学习用户偏好,异步写入)
13SafeJSONSession.save_session_state()(finally 块,任何情况下都落盘)
14流式推送(SSE)→ Channel.send_response() → 用户端收到消息
15approval 被拒绝时:_cleanup_tool_guard_denied_messages() 回滚 memory,重新 save
Memory 内容结构与生命周期
# InMemoryMemory.content 内部结构
# 每条记录是 (Msg, marks[]) 二元组
memory.content = [
# 步骤5 写入,每次对话重建
(Msg(role="system", content="# AGENTS.md
..."), marks=[]),
# 步骤4 load,或新一轮对话追加
(Msg(role="user", content="查一下本月结算总额"), marks=[]),
# 步骤8 _reasoning 写入(LLM 决定调用工具)
(Msg(role="assistant", content=[{"type":"tool_use","name":"query_settlement",...}]), marks=[]),
# 步骤10 工具结果追加
(Msg(role="user", content=[{"type":"tool_result","output":"合计 ¥ 128,000"}]), marks=[]),
# 步骤8 最终回复
(Msg(role="assistant", content="本月结算总额为 128,000 元..."), marks=[]),
# 步骤9 ToolGuard 拦截(如果发生)
(Msg(role="system", content="⛔ 工具 execute_shell_command 已被拦截"),
marks=["tool_guard_denied"]),
]
# marks 当前值:
# [] = 正常消息
# ["tool_guard_denied"] = 被 ToolGuard 拦截的消息(cleanup 时会移除)
# 压缩后的消息:被 mark_messages_compressed() 标记,不再传给 LLM
# 持久化生命周期:
# LOAD: query 开始时从 sessions/{uid}_{sid}.json 完整恢复
# SAVE: query 结束时 finally 块完整落盘(覆盖写入)
# ROLLBACK: approval 拒绝时 _cleanup_denied → 重新 save
# compressed_summary 独立存储(叠加在 memory 之上)
compressed_summary = memory.get_compressed_summary() # → str 或 ""
# 完整上下文 = system_prompt + compressed_summary + memory.content(未被压缩的部分)
# LLM 实际接收:
messages_to_llm = [
{"role": "system", "content": sys_prompt + "
" + compressed_summary},
# ... 未压缩的历史消息 ...
{"role": "user", "content": "当前用户输入"},
]
Token 计数与压缩参数(AgentsRunningConfig)
| 参数 | 默认值 | 约束 | 说明 |
max_input_length | 131072(128K) | ≥1000 | 模型上下文窗口上限(按实际模型调整) |
memory_compact_ratio | 0.75 | 0.30–0.90 | Token 用量达到此比例时触发压缩(0.75 = 75%) |
memory_compact_reserve | 0.10 | 0.05–0.30 | 压缩后保留的上下文比例(留给新对话的空间) |
tool_result_compact_recent_n | 1 | 1–10 | 最近 N 条 tool_result 用 recent_threshold 判断 |
tool_result_compact_old_threshold | 1000 chars | ≥100 | 旧 tool_result 超过此长度时截断 |
tool_result_compact_recent_threshold | 30000 chars | ≥1000 | 近期 tool_result 超过此长度时截断 |
tool_result_compact_retention_days | 3天 | 1–10 | 超过保留天数的 tool_result 强制截断 |
token_count_estimate_divisor | 3.75 | >1 | 字符估算 Token 的除数:token ≈ len(text) / 3.75 |
compact_with_thinking_block | False | bool | 压缩摘要时是否启用 LLM thinking 块(Claude 3.7+) |
max_iters | 50 | ≥1 | 单次对话 ReAct 最大迭代轮次(防死循环) |
# 压缩触发计算(MemoryCompactionHook)
memory_compact_threshold = max_input_length * memory_compact_ratio
# 例:128K × 0.75 = 98,304 tokens → 超过此值触发压缩
# 实际触发逻辑:
str_tokens = count(sys_prompt + compressed_summary) # 固定占用
left_threshold = memory_compact_threshold - str_tokens
# left_threshold = 剩余给历史消息的 token 预算
# 若历史消息超过 left_threshold → 触发 compact_memory()
# 压缩后保留:
# - 最新 MEMORY_COMPACT_KEEP_RECENT=3 条消息(不压缩)
# - 更早的消息 → 用 LLM 生成摘要,存入 compressed_summary
# - 摘要前置于下一次对话的 system_prompt 之后
v0.1.0 关键变化对比(v0.0.6 → v0.1.0)
| 参数/机制 | v0.0.6 | v0.1.0 | 影响 |
| 压缩触发阈值 | hardcoded 80% | memory_compact_ratio=0.75(可配置,环境变量 COPAW_MEMORY_COMPACT_RATIO) | 可调,触发更早 |
| 保留最近条数 | keep_recent=5 | MEMORY_COMPACT_KEEP_RECENT=3(环境变量 COPAW_MEMORY_COMPACT_KEEP_RECENT) | 压缩更激进 |
| tool_result 压缩 | 简单保留尾部 | 新增三维控制(recent_n / old_threshold / retention_days) | 精细化 Token 控制 |
| 压缩状态提示 | 无 | 流式输出 "🔄 Context compaction started..." / "✅ completed" | 用户可感知 |
| 异步压缩 | 无 | add_async_summary_task(),压缩失败时异步重试 | 可靠性提升 |
| 媒体块降级 | 报错 | _strip_media_blocks_from_memory(),模型不支持时自动剥离重试 | 稳定性提升 |
| 幽灵 tool_use | 无处理 | _in_summarizing 状态过滤,压缩期间不触发假工具调用 | 消除误操作 |
| 记忆语义搜索 | 无 | memory_search 工具(Chroma 向量 + FTS 全文,min_score 过滤) | 跨会话检索 |
MemoryCompactionHook 源码关键逻辑
class MemoryCompactionHook:
"""pre_reasoning hook:每轮对话调用,检查并在需要时触发压缩"""
async def __call__(self, agent, kwargs):
# 1. 热读 Agent Config(无缓存,支持配置热更新)
agent_config = load_agent_config(self.memory_manager.agent_id)
running_config = agent_config.running
token_counter = get_copaw_token_counter(agent_config)
# 2. 计算固定占用(system_prompt + compressed_summary)
str_token_count = await token_counter.count(
text=(agent.sys_prompt or "") + (memory.get_compressed_summary() or "")
)
left_threshold = running_config.memory_compact_threshold - str_token_count
# 3. tool_result 三维压缩(先做,减少主压缩触发频率)
await self.memory_manager.compact_tool_result(
messages=messages,
recent_n=running_config.tool_result_compact_recent_n, # 默认 1
old_threshold=running_config.tool_result_compact_old_threshold, # 默认 1000 chars
recent_threshold=running_config.tool_result_compact_recent_threshold, # 默认 30000
retention_days=running_config.tool_result_compact_retention_days, # 默认 3天
)
# 4. 检查是否触发主压缩
(messages_to_compact, _, is_valid) = await self.memory_manager.check_context(
messages=messages,
memory_compact_threshold=left_threshold,
memory_compact_reserve=running_config.memory_compact_reserve, # 默认 0.10
as_token_counter=token_counter,
)
if not messages_to_compact:
return None # 未触发
# 5. 触发主压缩
self.memory_manager.add_async_summary_task(messages=messages_to_compact)
await self._print_status_message(agent, "🔄 Context compaction started...")
compact_content = await self.memory_manager.compact_memory(
messages=messages_to_compact,
previous_summary=memory.get_compressed_summary(), # 累积摘要传入
)
if compact_content:
await memory.mark_messages_compressed(messages_to_compact) # 标记已压缩
await memory.update_compressed_summary(compact_content) # 更新摘要
await self._print_status_message(agent, "✅ Context compaction completed")
RoutingChatModel — LLM 智能路由(v0.1.0 新增)
# src/copaw/agents/routing_chat_model.py
class RoutingChatModel(ChatModelBase):
"""在 local / cloud 两个 endpoint 之间按策略路由的 ChatModel 代理"""
def __init__(self, *, local_endpoint, cloud_endpoint, routing_cfg):
self.local_endpoint = local_endpoint # Ollama / llama.cpp
self.cloud_endpoint = cloud_endpoint # OpenAI / Anthropic / etc.
self.policy = RoutingPolicy(routing_cfg)
async def __call__(self, messages, tools=None, tool_choice=None, **kwargs):
# 提取 user 文本用于决策
text = " ".join(m["content"] for m in messages
if m["role"] == "user" and isinstance(m.get("content"), str))
decision = self.policy.decide(text=text, tools_available=tools is not None)
endpoint = (self.local_endpoint if decision.route == "local"
else self.cloud_endpoint)
logger.debug("LLM routing: route=%s provider=%s model=%s reasons=%s",
decision.route, endpoint.provider_id, endpoint.model_name,
",".join(decision.reasons))
return await endpoint.model(messages, tools, tool_choice, **kwargs)
# RoutingPolicy.decide()(v0.1.0 当前实现)
class RoutingPolicy:
def decide(self, *, text="", channel="", tools_available=True):
if getattr(self.cfg, "mode", "local_first") == "cloud_first":
return RoutingDecision(route="cloud", reasons=["mode:cloud_first"])
return RoutingDecision(route="local", reasons=["mode:local_first"])
# ↑ 当前按 mode 静态决策;未来可扩展为按文本长度/工具数量动态路由
灵工场景映射:日常查询("查上周订单")走 local_first → 本地 Ollama 小模型(成本近零);月度分析报告走 cloud_first → claude-sonnet(高质量)。仅需改 config.json 的 llm_routing.mode,无需重启(ConfigWatcher 热更新)。
Zero-downtime 热重载(MultiAgentManager.reload_agent)
# src/copaw/app/multi_agent_manager.py
async def reload_agent(self, agent_id: str) -> bool:
"""零停机热重载:新实例完全启动后再原子替换,正在进行的 SSE 流不中断"""
# Step 1:检查是否在运行(加锁,快速)
async with self._lock:
old_instance = self.agents[agent_id]
# Step 2:读取最新配置(锁外,慢操作)
config = load_config()
agent_ref = config.agents.profiles[agent_id]
# Step 3:创建并启动新 Workspace 实例(锁外,慢操作,不阻塞其他 Agent)
new_instance = Workspace(agent_id, workspace_dir=agent_ref.workspace_dir)
# Step 3.5:复用旧实例的可重用组件(关键设计!)
async with self._lock:
old_instance = self.agents.get(agent_id)
if old_instance:
reusable = old_instance._service_manager.get_reusable_services()
# reusable 包含:memory_manager(记忆不丢失)、chat_manager(索引不重建)
# 以及已建立的 MCP client 连接(不重连)
if reusable:
await new_instance.set_reusable_components(reusable)
await new_instance.start() # 使用复用组件启动,比全新启动快很多
# Step 4:原子替换(加锁,极短,最小化阻塞时间)
async with self._lock:
old_instance = self.agents[agent_id]
self.agents[agent_id] = new_instance # 新请求立即路由到新实例
# Step 5:优雅停止旧实例(锁外,不影响其他操作)
await self._graceful_stop_old_instance(old_instance, agent_id)
# 若有活跃 SSE 流 → 延迟到流结束后再 stop(用户无感知)
# 若无活跃任务 → 立即 stop
return True
核心设计:锁只在「原子替换」瞬间持有。新 Workspace 的创建(慢)和旧 Workspace 的关闭(慢)都在锁外完成。可重用组件复用让重载更快——记忆、索引、MCP 连接全部继承,不丢失。
⚠️ 单用户设计限制 → 多租户改造建议
| 改造点 | CoPaw 现状 | 多租户改造建议 | 难度 |
| session_id 隔离 |
channel:user_id(无企业维度) |
扩展为 tenant_id:channel:user_id,防止 A 企业用户加载到 B 企业记忆 |
低 |
| Memory 存储后端 |
单机 JSON 文件(sessions/*.json) |
迁移至 PostgreSQL,按 tenant_id 分区;小规模保留文件作回退 |
高 |
| PROFILE.md 多用户 |
单文件,全工作区共享 |
按 user_id 独立存储:profiles/{user_id}_PROFILE.md,由 AgentMdManager 路由 |
中 |
| 记忆语义搜索隔离 |
单一 Chroma collection |
每 tenant_id 独立 Chroma collection,防向量检索跨企业召回 |
中 |
| 压缩 LLM 费用 |
用配置的主模型压缩 |
compact_memory 单独配置小模型(如 gpt-4o-mini),降低压缩成本 |
低 |
| 热重载触发方式 |
手动 API 触发 |
配置中心 Webhook 自动触发 reload_agent(enterprise_id),实现 GitOps 式管理 |
中 |
⚠️ 安全红线:session_id 中的 tenant_id 必须从服务端可信来源(JWT Token / 请求 Header)提取,绝不能从用户输入中读取。否则攻击者可构造 session_id 绕过隔离,读取其他企业记忆。