OpenManus
OpenManus安装
克隆仓库:
1 | git clone https://github.com/FoundationAgents/OpenManus.git |
创建并激活虚拟环境:
1 | uv venv --python 3.12 |
安装依赖:
1 | uv pip install -r requirements.txt |
浏览器自动化工具(可选)
1 | playwright install |
配置说明
OpenManus 需要配置使用的 LLM API,请按以下步骤设置:
在 config
目录创建 config.toml
文件(可从示例复制):
1 | cp config/config.example.toml config/config.toml |
编辑 config/config.toml
添加 API 密钥和自定义设置:
使用OpenRouter api
1 | # 全局 LLM 配置 |
快速启动
一行命令运行 OpenManus:
1 | python main.py |
1 | (OpenManus) yang@Yangless:~/OpenManus$ python main.py |
结果:
1 | yang@Yangless:~/OpenManus/workspace$ cat trending_open_source_agents.txt |
如需使用 MCP 工具版本,可运行:
1 | python run_mcp.py |
如需体验不稳定的多智能体版本,可运行:
1 | python run_flow.py |
添加自定义多智能体
目前除了通用的 OpenManus Agent, 我们还内置了DataAnalysis Agent,适用于数据分析和数据可视化任务,你可以在config.toml
中将这个智能体加入到run_flow
中
1 | # run-flow可选配置 |
图表可视化工具
图表可视化工具,通过python生成数据处理代码,最终调用@visactor/vmind得到图表的spec结果,图表渲染使用@visactor/vchart
安装node >= 18
1 | curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash |
安装依赖
1 | cd app/tool/chart_visualization |
简单图表生成任务
给予数据和具体的图表生成需求,测试结果,执行命令:
1 | python -m app.tool.chart_visualization.test.chart_demo |
1 | 2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: To generate the chart showing the popularity of search keywords, we will follow a structured approach. Here's how we can break down the task: |
结果应位于worksapce\visualization
下,涉及到9种不同的图表结果
1 | yang@Yangless:~/OpenManus/app/tool/chart_visualization$ cat /home/yang/OpenManus/workspace/sales_data_export.csv |
简单数据报表任务
给予简单原始数据可分析需求,需要对数据进行简单加工处理,执行命令:
1 | python -m app.tool.chart_visualization.test.report_demo |
代码分析
概述
1. 应用入口
main.py
:应用启动入口,负责初始化并运行Manus智能体
1 | async def main(): |
2. 配置系统
app/config.py
:基于Pydantic模型管理全局配置(LLM/浏览器/沙箱/MCP等),支持TOML/JSON格式加载各种设置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74class LLMSettings(BaseModel):
model: str = Field(..., description="Model name")
base_url: str = Field(..., description="API base URL")
api_key: str = Field(..., description="API key")
max_tokens: int = Field(4096, description="Maximum number of tokens per request")
max_input_tokens: Optional[int] = Field(
None,
description="Maximum input tokens to use across all requests (None for unlimited)",
)
temperature: float = Field(1.0, description="Sampling temperature")
api_type: str = Field(..., description="Azure, Openai, or Ollama")
api_version: str = Field(..., description="Azure Openai version if AzureOpenai")
class ProxySettings(BaseModel):
server: str = Field(None, description="Proxy server address")
username: Optional[str] = Field(None, description="Proxy username")
password: Optional[str] = Field(None, description="Proxy password")
class SearchSettings(BaseModel):
engine: str = Field(default="Google", description="Search engine the llm to use")
fallback_engines: List[str] = Field(
default_factory=lambda: ["DuckDuckGo", "Baidu", "Bing"],
description="Fallback search engines to try if the primary engine fails",
)
retry_delay: int = Field(
default=60,
description="Seconds to wait before retrying all engines again after they all fail",
)
max_retries: int = Field(
default=3,
description="Maximum number of times to retry all engines when all fail",
)
lang: str = Field(
default="en",
description="Language code for search results (e.g., en, zh, fr)",
)
country: str = Field(
default="us",
description="Country code for search results (e.g., us, cn, uk)",
)
class RunflowSettings(BaseModel):
use_data_analysis_agent: bool = Field(
default=False, description="Enable data analysis agent in run flow"
)
class BrowserSettings(BaseModel):
headless: bool = Field(False, description="Whether to run browser in headless mode")
disable_security: bool = Field(
True, description="Disable browser security features"
)
extra_chromium_args: List[str] = Field(
default_factory=list, description="Extra arguments to pass to the browser"
)
chrome_instance_path: Optional[str] = Field(
None, description="Path to a Chrome instance to use"
)
wss_url: Optional[str] = Field(
None, description="Connect to a browser instance via WebSocket"
)
cdp_url: Optional[str] = Field(
None, description="Connect to a browser instance via CDP"
)
proxy: Optional[ProxySettings] = Field(
None, description="Proxy settings for the browser"
)
max_content_length: int = Field(
2000, description="Maximum length for content retrieval operations"
)加载config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127class Config:
_instance = None
_lock = threading.Lock()
_initialized = False
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
with self._lock:
if not self._initialized:
self._config = None
self._load_initial_config()
self._initialized = True
def _get_config_path() -> Path:
root = PROJECT_ROOT
config_path = root / "config" / "config.toml"
if config_path.exists():
return config_path
example_path = root / "config" / "config.example.toml"
if example_path.exists():
return example_path
raise FileNotFoundError("No configuration file found in config directory")
def _load_config(self) -> dict:
config_path = self._get_config_path()
with config_path.open("rb") as f:
return tomllib.load(f)
def _load_initial_config(self):
raw_config = self._load_config()
base_llm = raw_config.get("llm", {})
llm_overrides = {
k: v for k, v in raw_config.get("llm", {}).items() if isinstance(v, dict)
}
default_settings = {
"model": base_llm.get("model"),
"base_url": base_llm.get("base_url"),
"api_key": base_llm.get("api_key"),
"max_tokens": base_llm.get("max_tokens", 4096),
"max_input_tokens": base_llm.get("max_input_tokens"),
"temperature": base_llm.get("temperature", 1.0),
"api_type": base_llm.get("api_type", ""),
"api_version": base_llm.get("api_version", ""),
}
# handle browser config.
browser_config = raw_config.get("browser", {})
browser_settings = None
if browser_config:
# handle proxy settings.
proxy_config = browser_config.get("proxy", {})
proxy_settings = None
if proxy_config and proxy_config.get("server"):
proxy_settings = ProxySettings(
**{
k: v
for k, v in proxy_config.items()
if k in ["server", "username", "password"] and v
}
)
# filter valid browser config parameters.
valid_browser_params = {
k: v
for k, v in browser_config.items()
if k in BrowserSettings.__annotations__ and v is not None
}
# if there is proxy settings, add it to the parameters.
if proxy_settings:
valid_browser_params["proxy"] = proxy_settings
# only create BrowserSettings when there are valid parameters.
if valid_browser_params:
browser_settings = BrowserSettings(**valid_browser_params)
search_config = raw_config.get("search", {})
search_settings = None
if search_config:
search_settings = SearchSettings(**search_config)
sandbox_config = raw_config.get("sandbox", {})
if sandbox_config:
sandbox_settings = SandboxSettings(**sandbox_config)
else:
sandbox_settings = SandboxSettings()
mcp_config = raw_config.get("mcp", {})
mcp_settings = None
if mcp_config:
# Load server configurations from JSON
mcp_config["servers"] = MCPSettings.load_server_config()
mcp_settings = MCPSettings(**mcp_config)
else:
mcp_settings = MCPSettings(servers=MCPSettings.load_server_config())
run_flow_config = raw_config.get("runflow")
if run_flow_config:
run_flow_settings = RunflowSettings(**run_flow_config)
else:
run_flow_settings = RunflowSettings()
config_dict = {
"llm": {
"default": default_settings,
**{
name: {**default_settings, **override_config}
for name, override_config in llm_overrides.items()
},
},
"sandbox": sandbox_settings,
"browser_config": browser_settings,
"search_config": search_settings,
"mcp_config": mcp_settings,
"run_flow_config": run_flow_settings,
}
self._config = AppConfig(**config_dict)
3. 日志系统
app/logger.py
:采用Loguru实现的集中式日志管理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18def define_log_level(print_level="INFO", logfile_level="DEBUG", name: str = None):
"""Adjust the log level to above level"""
global _print_level
_print_level = print_level
current_date = datetime.now()
formatted_date = current_date.strftime("%Y%m%d%H%M%S")
log_name = (
f"{name}_{formatted_date}" if name else formatted_date
) # name a log with prefix name
_logger.remove()
_logger.add(sys.stderr, level=print_level)
_logger.add(PROJECT_ROOT / f"logs/{log_name}.log", level=logfile_level)
return _logger
logger = define_log_level()
4. 智能体层级架构
1 | BaseAgent → ReActAgent → ToolCallAgent → Manus |
app/agent/base.py
:所有智能体的抽象基类(生命周期/状态管理)1
2
3
4
5
6
7
8
9
10
11
12from abc import ABC, abstractmethod
#抽象类接口,代码健壮
from contextlib import asynccontextmanager
#异步上下文管理释放
from typing import List, Optional
#明确了变量、函数参数和返回值的预期类型,使代码更容易理解。
from pydantic import BaseModel, Field, model_validator
#langchain核心组件
from app.llm import LLM
from app.logger import logger
from app.sandbox.client import SANDBOX_CLIENT
from app.schema import ROLE_TYPE, AgentState, Memory, Message1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38#这部分定义了 BaseAgent 类的骨架和所有智能体都应具备的通用属性。
class BaseAgent(BaseModel, ABC):
"""Abstract base class for managing agent state and execution.
Provides foundational functionality for state transitions, memory management,
and a step-based execution loop. Subclasses must implement the `step` method.
"""
# Core attributes
name: str = Field(..., description="Unique name of the agent")
description: Optional[str] = Field(None, description="Optional agent description")
# Prompts
system_prompt: Optional[str] = Field(
None, description="System-level instruction prompt"
)
next_step_prompt: Optional[str] = Field(
None, description="Prompt for determining next action"
)
# Dependencies
llm: LLM = Field(default_factory=LLM, description="Language model instance")
memory: Memory = Field(default_factory=Memory, description="Agent's memory store")
state: AgentState = Field(
default=AgentState.IDLE, description="Current agent state"
)
# Execution control
max_steps: int = Field(default=10, description="Maximum steps before termination")
current_step: int = Field(default=0, description="Current step in execution")
duplicate_threshold: int = 2
class Config:
arbitrary_types_allowed = True
extra = "allow" # Allow extra fields for flexibility in subclasses
#使得模型在接收数据时能够容忍并存储未在模型中明确定义的额外字段。这在设计可扩展的基类(如 BaseAgent)时非常有用,允许子类或外部系统传递更多自定义参数,而不会导致验证错误。
#允许模型字段包含 Pydantic 无法直接验证的自定义对象实例1
2
3
4
5
6
7
8
9#初始化
def initialize_agent(self) -> "BaseAgent":
"""Initialize agent with default settings if not provided."""
if self.llm is None or not isinstance(self.llm, LLM):
self.llm = LLM(config_name=self.name.lower())
if not isinstance(self.memory, Memory):
self.memory = Memory()
return self1
2
3
4
5
6
7
8
9
10
11
12
13
14#安全地管理智能体的状态
async def state_context(self, new_state: AgentState):
"""Context manager for safe agent state transitions."""
# ...
previous_state = self.state
self.state = new_state
try:
yield
except Exception as e:
self.state = AgentState.ERROR
raise e
finally:
self.state = previous_state在 Python 中,
yield
是用来定义**生成器(generator)或异步生成器(async generator)**的关键字。它的本质是:暂停函数执行,把控制权和当前状态交还给调用者,下次可以从这里恢复执行。
举例:
1
2
3
4
5async def something():
async with agent.state_context(AgentState.RUNNING):
# 这就是 async with 块内部的代码
await agent.do_something()
print("在这个状态下完成某些操作")1.Python 运行 async with agent.state_context(…):
进入 state_context 函数;
执行到 yield 前的所有代码(如状态切换);
2.遇到 yield:
暂停 state_context 函数;
跳出函数,把控制权交给 async with 块内部的代码执行;
即此时执行 await agent.do_something(),再执行 print(…);
3.async with 块执行完(无论是否抛异常):
程序回到 state_context 中,从 yield 之后恢复执行;
进入 except 或 finally 来清理现场、还原状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32#更新记忆
def update_memory(
self,
role: ROLE_TYPE, # type: ignore
content: str,
base64_image: Optional[str] = None,
**kwargs,
) -> None:
"""Add a message to the agent's memory.
Args:
role: The role of the message sender (user, system, assistant, tool).
content: The message content.
base64_image: Optional base64 encoded image.
**kwargs: Additional arguments (e.g., tool_call_id for tool messages).
Raises:
ValueError: If the role is unsupported.
"""
message_map = {
"user": Message.user_message,
"system": Message.system_message,
"assistant": Message.assistant_message,
"tool": lambda content, **kw: Message.tool_message(content, **kw),
}
if role not in message_map:
raise ValueError(f"Unsupported message role: {role}")
# Create message with appropriate parameters based on role
kwargs = {"base64_image": base64_image, **(kwargs if role == "tool" else {})}
self.memory.add_message(message_map[role](content, **kwargs))1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#异步执行
async def run(self, request: Optional[str] = None) -> str:
"""Execute the agent's main loop asynchronously.
Args:
request: Optional initial user request to process.
Returns:
A string summarizing the execution results.
Raises:
RuntimeError: If the agent is not in IDLE state at start.
"""
#处于空闲
if self.state != AgentState.IDLE:
raise RuntimeError(f"Cannot run agent from state: {self.state}")
if request:
self.update_memory("user", request)
results: List[str] = []
async with self.state_context(AgentState.RUNNING):
while (
self.current_step < self.max_steps and self.state != AgentState.FINISHED
):
self.current_step += 1
logger.info(f"Executing step {self.current_step}/{self.max_steps}")
step_result = await self.step()
# Check for stuck state
if self.is_stuck():
self.handle_stuck_state()
results.append(f"Step {self.current_step}: {step_result}")
if self.current_step >= self.max_steps:
self.current_step = 0
self.state = AgentState.IDLE
results.append(f"Terminated: Reached max steps ({self.max_steps})")
await SANDBOX_CLIENT.cleanup()
return "\n".join(results) if results else "No steps executed"run 是整个框架的“引擎”。它协调了 state, max_steps, current_step 等控制属性,调用了核心的 step 方法,并利用了 state_context 和 update_memory 等辅助方法,最后还与外部系统 SANDBOX_CLIENT 交互。SANDBOX_CLIENT充当docker在里面执行命令复制粘贴,不破坏宿主机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
async def step(self) -> str:
"""Execute a single step in the agent's workflow.
Must be implemented by subclasses to define specific behavior.
"""
def handle_stuck_state(self):
"""Handle stuck state by adding a prompt to change strategy"""
stuck_prompt = "\
Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
def is_stuck(self) -> bool:
"""Check if the agent is stuck in a loop by detecting duplicate content"""
if len(self.memory.messages) < 2:
return False
last_message = self.memory.messages[-1]
if not last_message.content:
return False
# Count identical content occurrences
duplicate_count = sum(
1
for msg in reversed(self.memory.messages[:-1])
if msg.role == "assistant" and msg.content == last_message.content
)
return duplicate_count >= self.duplicate_threshold
这三个方法共同定义了智能体的单步行为和自我纠错机制,但是存在问题:
1.重复检测只看内容(content),不考虑语义或相似度
2.duplicate_threshold 太低容易误报,太高容易漏报
1
2
3
4
5
6
7
8
9
def messages(self) -> List[Message]:
"""Retrieve a list of messages from the agent's memory."""
return self.memory.messages
def messages(self, value: List[Message]):
"""Set the list of messages in the agent's memory."""
self.memory.messages = value访问和修改智能体的记忆消息列表,简化访问
app/agent/react.py
:实现ReAct(推理-行动)范式1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def think(self) -> bool:
"""Process current state and decide next action"""
#决定是否下一步
async def act(self) -> str:
"""Execute decided actions"""
async def step(self) -> str:
"""Execute a single step: think and act."""
should_act = await self.think()
if not should_act:
return "Thinking complete - no action needed"
return await self.act()app/agent/toolcall.py
:扩展工具调用能力1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23class ToolCallAgent(ReActAgent):
"""Base agent class for handling tool/function calls with enhanced abstraction"""
name: str = "toolcall"
description: str = "an agent that can execute tool calls."
system_prompt: str = SYSTEM_PROMPT
next_step_prompt: str = NEXT_STEP_PROMPT
#工具属性,和大模型交互
available_tools: ToolCollection = ToolCollection(
CreateChatCompletion(), Terminate()
)
#available_tools: 定义了这个 Agent 能使用哪些工具。这里默认有两个:CreateChatCompletion (可能用于生成更长的文本) 和 Terminate (用于结束任务)。ToolCollection 是一个管理工具集合的辅助类。
tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO # type: ignore
#它控制了 LLM 在调用工具时的行为模式,可以是 AUTO (LLM 自行决定是否调用工具), REQUIRED (LLM 必须调用工具), 或 NONE (LLM 禁止调用工具)。
special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])
#状态属性:act时用
tool_calls: List[ToolCall] = Field(default_factory=list)
_current_base64_image: Optional[str] = None
max_steps: int = 30
max_observe: Optional[Union[int, bool]] = None1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91async def think(self) -> bool:
"""Process current state and decide next actions using tools"""
if self.next_step_prompt:
user_msg = Message.user_message(self.next_step_prompt)
self.messages += [user_msg]
try:
# Get response with tool options
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=(
[Message.system_message(self.system_prompt)]
if self.system_prompt
else None
),
tools=self.available_tools.to_params(),
tool_choice=self.tool_choices,
)
except ValueError:
raise
except Exception as e:
# Check if this is a RetryError containing TokenLimitExceeded
if hasattr(e, "__cause__") and isinstance(e.__cause__, TokenLimitExceeded):
token_limit_error = e.__cause__
logger.error(
f"🚨 Token limit error (from RetryError): {token_limit_error}"
)
self.memory.add_message(
Message.assistant_message(
f"Maximum token limit reached, cannot continue execution: {str(token_limit_error)}"
)
)
self.state = AgentState.FINISHED
return False
raise
self.tool_calls = tool_calls = (
response.tool_calls if response and response.tool_calls else []
)
content = response.content if response and response.content else ""
# Log response info
logger.info(f"✨ {self.name}'s thoughts: {content}")
logger.info(
f"🛠️ {self.name} selected {len(tool_calls) if tool_calls else 0} tools to use"
)
if tool_calls:
logger.info(
f"🧰 Tools being prepared: {[call.function.name for call in tool_calls]}"
)
logger.info(f"🔧 Tool arguments: {tool_calls[0].function.arguments}")
try:
if response is None:
raise RuntimeError("No response received from the LLM")
# Handle different tool_choices modes
if self.tool_choices == ToolChoice.NONE:
if tool_calls:
logger.warning(
f"🤔 Hmm, {self.name} tried to use tools when they weren't available!"
)
if content:
self.memory.add_message(Message.assistant_message(content))
return True
return False
# Create and add assistant message
assistant_msg = (
Message.from_tool_calls(content=content, tool_calls=self.tool_calls)
if self.tool_calls
else Message.assistant_message(content)
)
self.memory.add_message(assistant_msg)
if self.tool_choices == ToolChoice.REQUIRED and not self.tool_calls:
return True # Will be handled in act()
# For 'auto' mode, continue with content if no commands but content exists
if self.tool_choices == ToolChoice.AUTO and not self.tool_calls:
return bool(content)
return bool(self.tool_calls)
except Exception as e:
logger.error(f"🚨 Oops! The {self.name}'s thinking process hit a snag: {e}")
self.memory.add_message(
Message.assistant_message(
f"Error encountered while processing: {str(e)}"
)
)
return False这是 Agent 的“思考”阶段,是整个决策过程的核心。
- 准备输入: 如果有 next_step_prompt,将其作为新的用户消息添加到对话历史中,以指导 LLM 的下一步思考。
- 调用LLM: 调用 self.llm.ask_tool 方法。这是一个专门为工具调用设计的 LLM 接口,它会将对话历史、系统提示、可用工具列表 (self.available_tools) 和工具选择模式 (self.tool_choices) 一起发送给 LLM。
- 错误处理: 健壮地处理各种异常,特别是 TokenLimitExceeded,如果 token 超限,会记录错误并结束 Agent。
- 解析响应: 从 LLM 的响应中提取出 tool_calls(计划执行的工具列表)和 content(纯文本思考内容)。
- 记录日志: 打印详细的日志,包括 LLM 的思考、选择了哪些工具、以及工具的参数,这对于调试非常重要。
- 处理不同模式: 根据 self.tool_choices 的值(NONE, REQUIRED, AUTO)进行逻辑判断。例如,在 NONE 模式下,如果 LLM 仍然尝试调用工具,会发出警告。
- 更新记忆: 将 LLM 的响应(包括思考内容和工具调用请求)作为一条“助手”消息添加到记忆中。
- 返回决策: 返回一个布尔值,True 表示有工具需要执行(act 阶段应该被调用),False 则表示没有。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34async def act(self) -> str:
"""Execute tool calls and handle their results"""
if not self.tool_calls:
if self.tool_choices == ToolChoice.REQUIRED:
raise ValueError(TOOL_CALL_REQUIRED)
# Return last message content if no tool calls
return self.messages[-1].content or "No content or commands to execute"
results = []
for command in self.tool_calls:
# Reset base64_image for each tool call
self._current_base64_image = None
result = await self.execute_tool(command)
if self.max_observe:
result = result[: self.max_observe]
logger.info(
f"🎯 Tool '{command.function.name}' completed its mission! Result: {result}"
)
# Add tool response to memory
tool_msg = Message.tool_message(
content=result,
tool_call_id=command.id,
name=command.function.name,
base64_image=self._current_base64_image,
)
self.memory.add_message(tool_msg)
results.append(result)
return "\n\n".join(results)act
方法是代理的“执行器”。它接收 LLM 在think
阶段的决策(工具调用),然后实际地去执行这些工具,收集它们的输出,并将这些输出反馈回代理的记忆中。这样就形成了一个完整的“思考-行动-观察”循环,- 检查前提: 检查 self.tool_calls 列表是否为空。如果为空,根据 tool_choices 模式决定是报错还是直接返回。
- 迭代执行: 遍历 think 阶段生成的所有 tool_calls。
- 执行单个工具: 对每个 command(即一个 ToolCall 对象),调用 self.execute_tool 来实际运行它。
- 处理结果: 获取工具执行结果,并根据 max_observe 对结果进行截断。
- 更新记忆: 将每个工具的执行结果包装成一个 tool_message(包含结果内容、对应的 tool_call_id 等),并添加到记忆中。这对于 LLM 在下一步思考时理解上一步行动的结果至关重要。
- 汇总返回: 将所有工具的执行结果合并成一个字符串返回。这个字符串可以作为当前 step 的最终结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43async def execute_tool(self, command: ToolCall) -> str:
"""Execute a single tool call with robust error handling"""
if not command or not command.function or not command.function.name:
return "Error: Invalid command format"
name = command.function.name
if name not in self.available_tools.tool_map:
return f"Error: Unknown tool '{name}'"
try:
# Parse arguments
args = json.loads(command.function.arguments or "{}")
# Execute the tool
logger.info(f"🔧 Activating tool: '{name}'...")
result = await self.available_tools.execute(name=name, tool_input=args)
# Handle special tools
await self._handle_special_tool(name=name, result=result)
# Check if result is a ToolResult with base64_image
if hasattr(result, "base64_image") and result.base64_image:
# Store the base64_image for later use in tool_message
self._current_base64_image = result.base64_image
# Format result for display (standard case)
observation = (
f"Observed output of cmd `{name}` executed:\n{str(result)}"
if result
else f"Cmd `{name}` completed with no output"
)
return observation
except json.JSONDecodeError:
error_msg = f"Error parsing arguments for {name}: Invalid JSON format"
logger.error(
f"📝 Oops! The arguments for '{name}' don't make sense - invalid JSON, arguments:{command.function.arguments}"
)
return f"Error: {error_msg}"
except Exception as e:
error_msg = f"⚠️ Tool '{name}' encountered a problem: {str(e)}"
logger.exception(error_msg)
return f"Error: {error_msg}"act是确定要运行哪个工具,exe是具体执行。
这是一个更底层的、负责执行单个工具调用的辅助方法。- 验证: 检查工具名称是否存在于 available_tools 中。
- 解析参数: LLM 返回的工具参数通常是 JSON 格式的字符串。此方法使用 json.loads 将其解析为 Python 字典。
- 执行: 调用 self.available_tools.execute() 方法,传入工具名称和解析后的参数来真正运行工具。
- 特殊处理: 调用 _handle_special_tool 来检查当前工具是否需要触发特殊逻辑(如结束 Agent)。
- 结果格式化: 将工具的原始返回结果包装成一个对 LLM 更友好的“观察(Observation)”字符串。
- 错误处理: 捕获所有可能的异常(如 JSON 解析错误、工具执行本身抛出的错误),并将错误信息格式化为字符串返回。这确保了即使某个工具失败,Agent 也能继续运行而不会崩溃。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23async def cleanup(self):
"""Clean up resources used by the agent's tools."""
logger.info(f"🧹 Cleaning up resources for agent '{self.name}'...")
for tool_name, tool_instance in self.available_tools.tool_map.items():
if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction(
tool_instance.cleanup
):
try:
logger.debug(f"🧼 Cleaning up tool: {tool_name}")
await tool_instance.cleanup()
except Exception as e:
logger.error(
f"🚨 Error cleaning up tool '{tool_name}': {e}", exc_info=True
)
logger.info(f"✨ Cleanup complete for agent '{self.name}'.")
async def run(self, request: Optional[str] = None) -> str:
"""Run the agent with cleanup when done."""
try:
return await super().run(request)
finally:
await self.cleanup()
这里我觉得run是多余的,既然
think()
和act()
里已经实现了工具调用逻辑,run应该写到更高抽象层去。但是,仔细考虑:
1.为什么写 run()?
不是为了调用工具,而是为了在当前这一层加入统一生命周期末尾执行清理。可以在更高抽象层继承
2.是否职责冲突?
没有,run 只是封装流程起止点,逻辑控制权还在 think()/act()。
app/agent/manus.py
:主智能体(集成MCP客户端和内置工具)
1 |
|
@model_validator(mode="after")
:
这是一个 Pydantic 库(用于数据验证和设置)提供的装饰器。mode="after"
表示这个验证器(或方法)会在模型的所有字段都被解析和验证之后运行。它接收模型实例作为输入,并期望返回一个修改后的模型实例。这意味着,当一个 Manus
对象被创建时,Pydantic 会首先处理 name
, description
, system_prompt
等字段的赋值。然后,一旦这些基础字段设置完毕,initialize_helper
方法就会被调用。
-> "Manus"
是 Python 的类型提示 (Type Hint)。它表示这个方法 initialize_helper
期望返回一个类型为 Manus
的对象。
为什么是字符串 "Manus"
而不是直接 Manus
?
这被称为前向引用 (Forward Reference)。在 Python 中,当你在一个类的方法内部引用同一个类时,如果这个类还没有完全定义(例如,在类定义的过程中),直接使用类名可能会导致 NameError
。
通过将类型名用引号括起来,Python 会将其视为一个字符串。类型检查器(如 MyPy)会在稍后(当所有类都已定义时)解析这个字符串,从而避免了循环引用或未定义名称的问题。
1 |
|
为什么在create中不初始化initialize_helper()方法,反而要放在之前加入 @model_validator(mode=”after”)初始化,造成分裂
把所有初始化逻辑收敛到 create()
,明确定义构造流程可能更好一点
1 | async def connect_mcp_server( |
使用 mcp_clients 对象与指定的服务器建立连接(通过 SSE 或标准输入/输出)。记录这个连接。从 mcp_clients 获取刚刚从新服务器上发现的工具,并将它们添加到 self.available_tools 中。这使得代理的“工具箱”动态扩展了。
断开与指定服务器的连接。移除连接记录。为了移除已断开服务器的工具,它首先筛选出所有非 MCP 的基础工具,然后用它们重新创建一个 ToolCollection,最后把仍然连接的 MCP 服务器的工具加回来。这确保了代理不会尝试使用一个已经无法访问的工具。
这种写法太麻烦,可以使用工具注册表 + 启用状态表,远比硬编码每次重建 ToolCollection
来得可维护、可扩展。
5. LLM交互层
app/llm.py
:统一接口对接多种大模型(OpenAI/Azure/Bedrock),含:- 令牌计数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41def _calculate_high_detail_tokens(self, width: int, height: int) -> int:
"""Calculate tokens for high detail images based on dimensions"""
# Step 1: Scale to fit in MAX_SIZE x MAX_SIZE square
if width > self.MAX_SIZE or height > self.MAX_SIZE:
scale = self.MAX_SIZE / max(width, height)
width = int(width * scale)
height = int(height * scale)
# Step 2: Scale so shortest side is HIGH_DETAIL_TARGET_SHORT_SIDE
scale = self.HIGH_DETAIL_TARGET_SHORT_SIDE / min(width, height)
scaled_width = int(width * scale)
scaled_height = int(height * scale)
# Step 3: Count number of 512px tiles
tiles_x = math.ceil(scaled_width / self.TILE_SIZE)
tiles_y = math.ceil(scaled_height / self.TILE_SIZE)
total_tiles = tiles_x * tiles_y
# Step 4: Calculate final token count
return (
total_tiles * self.HIGH_DETAIL_TILE_TOKENS
) + self.LOW_DETAIL_IMAGE_TOKENS
def count_content(self, content: Union[str, List[Union[str, dict]]]) -> int:
"""Calculate tokens for message content"""
if not content:
return 0
if isinstance(content, str):
return self.count_text(content)
token_count = 0
for item in content:
if isinstance(item, str):
token_count += self.count_text(item)
elif isinstance(item, dict):
if "text" in item:
token_count += self.count_text(item["text"])
elif "image_url" in item:
token_count += self.count_image(item)
return token_countLLM 处理大图像时,并不是简单地按像素点数计算,而是将其分解为多个“块”(tiles),并对每个块收取一定的 token 费用。同时,还会有一个基础的 token 费用(低分辨率图像的 token 费用)。
- 消息格式化
- 工具调用处理
- 重试机制
1
2
3
4
5
6
7
# Don't retry TokenLimitExceeded ),
)@retry 装饰器作用的对象是一个函数或方法。它的核心作用是增强该函数的容错能力,使其在面对临时的、可恢复的错误(如网络抖动、服务器临时过载)时,能够自动、智能地重试,而不是直接失败。相当于:
1
2original_get_completion = get_completion # 保存原始函数
get_completion = retry(...)(original_get_completion) # 用 retry 返回的新函数替换原始函数
6. 数据模型
app/schema.py
:定义核心数据结构:Message
/ToolCall
/Memory
等
7. 工具框架
app/tool/base.py
:抽象工具基类BaseTool
和标准输出ToolResult
app/tool/tool_collection.py
:工具集合管理器
1 | class ToolCollection: |
- 当
execute
方法被调用时,它接收name
和tool_input
(一个字典,例如{"text": "hello", "count": 2}
).tool = self.tool_map.get(name)
检索到对应的BaseTool
,传入tool_input
,BaseTool
定义了__call__
1 | class BaseTool(ABC, BaseModel): |
app/tool/mcp.py
:客户端MCP协议实现(远程工具代理)
1 | async def connect_stdio( |
使用 stdio 连接”意味着,你的主程序(在这里是 Manus 代理所在的客户端程序)不会通过网络(如 HTTP 或 TCP/IP)去连接一个远程服务器,而是会**在本地启动另一个程序(作为“服务器”),并通过这个新程序的标准输入/输出管道与它进行双向通信。**只要它能读写标准输入/输出并遵循 MCP 协议, Python 客户端就可以与它无缝集成,
app/mcp/server.py
:服务端MCP实现(暴露工具集)- 具体工具实现:
bash.py
/python_execute.py
browser_use_tool.py
/web_search.py
file_operators.py
/ask_human.py
/terminate.py
8. 沙箱环境
app/sandbox/client.py
:基于Docker的安全代码执行环境
9. 异常处理
app/exceptions.py
:定制异常类(如ToolError
/TokenLimitExceeded
)