OpenManus安装

克隆仓库:

git clone https://github.com/FoundationAgents/OpenManus.git
cd OpenManus

创建并激活虚拟环境:

uv venv --python 3.12
source .venv/bin/activate  # Unix/macOS 系统
# Windows 系统使用:
# .venv\Scripts\activate

安装依赖:

uv pip install -r requirements.txt

浏览器自动化工具(可选)

playwright install

配置说明

OpenManus 需要配置使用的 LLM API,请按以下步骤设置:

config 目录创建 config.toml 文件(可从示例复制):

cp config/config.example.toml config/config.toml

编辑 config/config.toml 添加 API 密钥和自定义设置:

使用 OpenRouter API:

# 全局 LLM 配置
[llm]
model = "gpt-4o"
base_url = "https://openrouter.ai/api/v1"
api_key = "sk-..."  # 替换为真实 API 密钥
max_tokens = 4096
temperature = 0.0

# 可选特定 LLM 模型配置
[llm.vision]
model = "gpt-4o"
base_url = "https://api.openai.com/v1"
api_key = "sk-..."  # 替换为真实 API 密钥

快速启动

一行命令运行 OpenManus:

python main.py
(OpenManus) yang@Yangless:~/OpenManus$ python main.py
INFO     [browser_use] BrowserUse logging setup complete with level info
INFO     [root] Anonymized telemetry enabled. See https://docs.browser-use.com/development/telemetry for more information.
Enter your prompt: 搜索最新的 agent 开源项目,按 star 数进行排序保存到本地电脑桌面
2025-06-17 16:01:04.037 | WARNING  | __main__:main:25 - Processing your request...
2025-06-17 16:01:04.038 | INFO     | app.agent.base:run:140 - Executing step 1/20
2025-06-17 16:01:08.398 | INFO     | app.llm:update_token_count:243 - Token usage: Input=1342, Completion=26, Cumulative Input=1342, Cumulative Completion=26, Total=1368, Cumulative Total=1368
2025-06-17 16:01:08.398 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:08.398 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:08.398 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:08.398 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"web_search","query":"latest open source agent projects sorted by stars"}
2025-06-17 16:01:08.399 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:10.786 | INFO     | app.tool.web_search:_try_all_engines:299 - 🔎 Attempting search with Google...
2025-06-17 16:01:19.748 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Error: Browser action 'web_search' failed: RetryError[<Future at 0x7febd91cdc40 state=finished raised HTTPError>]
2025-06-17 16:01:19.748 | INFO     | app.agent.base:run:140 - Executing step 2/20
2025-06-17 16:01:22.827 | INFO     | app.llm:update_token_count:243 - Token usage: Input=1642, Completion=25, Cumulative Input=2984, Cumulative Completion=51, Total=1667, Cumulative Total=3035
2025-06-17 16:01:22.827 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:22.828 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:22.828 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:22.828 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"go_to_url","url":"https://github.com/trending"}
2025-06-17 16:01:22.828 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:27.384 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Navigated to https://github.com/trending
2025-06-17 16:01:27.385 | INFO     | app.agent.base:run:140 - Executing step 3/20
2025-06-17 16:01:31.256 | INFO     | app.llm:update_token_count:243 - Token usage: Input=1930, Completion=30, Cumulative Input=4914, Cumulative Completion=81, Total=1960, Cumulative Total=4995
2025-06-17 16:01:31.256 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:31.257 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:31.257 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:31.257 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"extract_content","goal":"Extract the list of trending open source agent projects sorted by stars"}
2025-06-17 16:01:31.257 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:35.837 | INFO     | app.llm:update_token_count:243 - Token usage: Input=639, Completion=173, Cumulative Input=5553, Cumulative Completion=254, Total=812, Cumulative Total=5807
2025-06-17 16:01:35.837 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Extracted from page:
{'text': '1. **OpenAI/ChatGPT** - 100,000 stars\n2. **LangChain/Agent** - 85,000 stars\n3. **Auto-GPT** - 75,000 stars\n4. **Microsoft/PromptFlow** - 70,000 stars\n5. **HuggingFace/Transformers** - 65,000 stars\n6. **DeepMind/AlphaZero** - 60,000 stars\n7. **Google/DeepDream** - 55,000 stars\n8. **Meta/AIResearch** - 50,000 stars\n9. **Anthropic/Claude** - 45,000 stars\n10. **StabilityAI/StableDiffusion** - 40,000 stars'}

2025-06-17 16:01:35.837 | INFO     | app.agent.base:run:140 - Executing step 4/20
2025-06-17 16:01:41.932 | INFO     | app.llm:update_token_count:243 - Token usage: Input=2380, Completion=167, Cumulative Input=7933, Cumulative Completion=421, Total=2547, Cumulative Total=8354
2025-06-17 16:01:41.932 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:41.933 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:41.933 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:41.933 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/Desktop/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:41.933 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:41.934 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
Error: Failed to write to /home/yang/Desktop/trending_open_source_agents.txt: [Errno 2] No such file or directory: '/home/yang/Desktop/trending_open_source_agents.txt'
2025-06-17 16:01:41.934 | INFO     | app.agent.base:run:140 - Executing step 5/20
2025-06-17 16:01:46.325 | INFO     | app.llm:update_token_count:243 - Token usage: Input=2687, Completion=170, Cumulative Input=10620, Cumulative Completion=591, Total=2857, Cumulative Total=11211
2025-06-17 16:01:46.326 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/OpenManus/Desktop/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
Error: Failed to write to /home/yang/OpenManus/Desktop/trending_open_source_agents.txt: [Errno 2] No such file or directory: '/home/yang/OpenManus/Desktop/trending_open_source_agents.txt'
2025-06-17 16:01:46.329 | INFO     | app.agent.base:run:140 - Executing step 6/20
2025-06-17 16:01:50.439 | INFO     | app.llm:update_token_count:243 - Token usage: Input=3003, Completion=171, Cumulative Input=13623, Cumulative Completion=762, Total=3174, Cumulative Total=14385
2025-06-17 16:01:50.440 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:50.440 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:50.440 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:50.440 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/OpenManus/workspace/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:50.441 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:50.450 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
File created successfully at: /home/yang/OpenManus/workspace/trending_open_source_agents.txt
2025-06-17 16:01:50.450 | INFO     | app.agent.base:run:140 - Executing step 7/20
2025-06-17 16:01:53.775 | INFO     | app.llm:update_token_count:243 - Token usage: Input=3292, Completion=81, Cumulative Input=16915, Cumulative Completion=843, Total=3373, Cumulative Total=17758
2025-06-17 16:01:53.775 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts: I have successfully created a file named `trending_open_source_agents.txt` in the `/home/yang/OpenManus/workspace` directory. This file contains the list of trending open source agent projects sorted by stars, as extracted from GitHub.

If you need to move this file to a different location, such as your desktop, or if you have any other requests, please let me know!
2025-06-17 16:01:53.775 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 0 tools to use
2025-06-17 16:01:53.776 | INFO     | app.agent.base:run:140 - Executing step 8/20
2025-06-17 16:01:55.080 | INFO     | app.llm:update_token_count:243 - Token usage: Input=3454, Completion=14, Cumulative Input=20369, Cumulative Completion=857, Total=3468, Cumulative Total=21226
2025-06-17 16:01:55.081 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:55.081 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:55.081 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['terminate']
2025-06-17 16:01:55.081 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"status":"success"}
2025-06-17 16:01:55.082 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'terminate'...
2025-06-17 16:01:55.082 | INFO     | app.agent.toolcall:_handle_special_tool:217 - 🏁 Special tool 'terminate' has completed the task!
2025-06-17 16:01:55.082 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'terminate' completed its mission! Result: Observed output of cmd `terminate` executed:
The interaction has been completed with status: success
2025-06-17 16:01:55.320 | INFO     | app.tool.mcp:disconnect:194 - Disconnected from all MCP servers
2025-06-17 16:01:55.321 | INFO     | __main__:main:27 - Request processing completed.

结果:

yang@Yangless:~/OpenManus/workspace$ cat trending_open_source_agents.txt
1. OpenAI/ChatGPT - 100,000 stars
2. LangChain/Agent - 85,000 stars
3. Auto-GPT - 75,000 stars
4. Microsoft/PromptFlow - 70,000 stars
5. HuggingFace/Transformers - 65,000 stars
6. DeepMind/AlphaZero - 60,000 stars
7. Google/DeepDream - 55,000 stars
8. Meta/AIResearch - 50,000 stars
9. Anthropic/Claude - 45,000 stars
10. StabilityAI/StableDiffusion - 40,000 stars

如需使用 MCP 工具版本,可运行:

python run_mcp.py

如需体验不稳定的多智能体版本,可运行:

python run_flow.py

添加自定义多智能体

除了通用的 OpenManus Agent,项目还内置了 DataAnalysis Agent,用来处理数据分析和数据可视化任务。可以在 config.toml 里把这个智能体加入 run_flow

# run-flow可选配置
[runflow]
use_data_analysis_agent = true     # 默认关闭,改为 true 后启用

图表可视化工具

图表可视化工具会通过 Python 生成数据处理代码,再调用 @visactor/vmind 得到图表 spec,渲染则交给 @visactor/vchart

安装 Node >= 18:

curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash
# 激活 nvm,以 Bash 为例
source ~/.bashrc
# 然后安装 Node 最新稳定版
nvm install node
# 激活使用,假设最新稳定版为 24,则 use 24
nvm use 24

安装依赖

cd app/tool/chart_visualization
npm install
简单图表生成任务

给出数据和图表生成需求后,执行命令:

python -m app.tool.chart_visualization.test.chart_demo
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: To generate the chart showing the popularity of search keywords, we will follow a structured approach. Here's how we can break down the task:

### Step 1: Prepare the Data for Visualization
We need to prepare the data for visualization by creating a CSV file and a JSON description for the chart. This will involve transforming the provided data into a format suitable for visualization.

### Step 2: Visualize the Data
Once the data is prepared, we will generate the chart using the prepared CSV and JSON files.

### Step 3: Review and Save the Chart
Finally, we will review the generated chart and ensure it is saved locally.

Let's start with Step 1.

#### Step 1: Prepare the Data for Visualization
We will use the `visualization_preparation` tool to create a CSV file and a JSON description for the chart.

```plaintext
Data:
Keyword,Popularity
Hot Word,1000
Zao Le Wo Men,800
Rao Jian Huo,400
My Wish is World Peace,400
Xiu Xiu Xiu,400
Shenzhou 11,400
Hundred Birds Facing the Wind,400
China Women's Volleyball Team,400
My Guan Na,400
Leg Dong,400
Hot Pot Hero,400
Baby's Heart is Bitter,400
Olympics,400
Awesome My Brother,400
Poetry and Distance,400
Song Joong-ki,400
PPAP,400
Blue Thin Mushroom,400
Rain Dew Evenly,400
Friendship's Little Boat Says It Flips,400
Beijing Slump,400
Dedication,200
Apple,200
Dog Belt,200
Old Driver,200
Melon-Eating Crowd,200
Zootopia,200
City Will Play,200
Routine,200
Water Reverse,200
Why Don't You Go to Heaven,200
Snake Spirit Man,200
Why Don't You Go to Heaven,200
Samsung Explosion Gate,200
Little Li Oscar,200
Ugly People Need to Read More,200
Boyfriend Power,200
A Face of Confusion,200
Descendants of the Sun,200
```

Let's proceed with the preparation.
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:think:82 - 🛠️ Data_Analysis selected 1 tools to use
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['visualization_preparation']
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"code_type":"visualization","code":"import pandas as pd\nimport json\n\n# Data preparation\nkeywords_data = {\n    'Keyword': [\n        'Hot Word', 'Zao Le Wo Men', 'Rao Jian Huo', 'My Wish is World Peace', 'Xiu Xiu Xiu',\n        'Shenzhou 11', 'Hundred Birds Facing the Wind', 'China Women\\'s Volleyball Team', 'My Guan Na',\n        'Leg Dong', 'Hot Pot Hero', 'Baby\\'s Heart is Bitter', 'Olympics', 'Awesome My Brother',\n        'Poetry and Distance', 'Song Joong-ki', 'PPAP', 'Blue Thin Mushroom', 'Rain Dew Evenly',\n        'Friendship\\'s Little Boat Says It Flips', 'Beijing Slump', 'Dedication', 'Apple', 'Dog Belt',\n        'Old Driver', 'Melon-Eating Crowd', 'Zootopia', 'City Will Play', 'Routine', 'Water Reverse',\n        'Why Don\\'t You Go to Heaven', 'Snake Spirit Man', 'Why Don\\'t You Go to Heaven',\n        'Samsung Explosion Gate', 'Little Li Oscar', 'Ugly People Need to Read More', 'Boyfriend Power',\n        'A Face of Confusion', 'Descendants of the Sun'\n    ],\n    'Popularity': [\n        1000, 800, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400,\n        400, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200\n    ]\n}\n\n# Create DataFrame\nkeywords_df = pd.DataFrame(keywords_data)\n\n# Save to CSV\ncsv_file_path = '/home/yang/OpenManus/workspace/keywords_popularity.csv'\nkeywords_df.to_csv(csv_file_path, index=False)\n\n# Prepare JSON description\nchart_description = {\n    'csvFilePath': csv_file_path,\n    'chartTitle': 'Popularity of Search Keywords'\n}\n\n# Save JSON\njson_file_path = '/home/yang/OpenManus/workspace/keywords_chart_description.json'\nwith open(json_file_path, 'w', encoding='utf-8') as json_file:\n    json.dump([chart_description], json_file)\n\nprint(json_file_path)"}
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'visualization_preparation'...
2025-06-17 16:51:08.439 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'visualization_preparation' completed its mission! Result: Observed output of cmd `visualization_preparation` executed:
{'observation': 'All arrays must be of the same length', 'success': False}
2025-06-17 16:51:08.439 | INFO     | app.agent.base:run:140 - Executing step 2/20
2025-06-17 16:51:18.166 | INFO     | app.llm:update_token_count:243 - Token usage: Input=2396, Completion=714, Cumulative Input=133595, Cumulative Completion=12838, Total=3110, Cumulative Total=146433
2025-06-17 16:51:18.167 | INFO     | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: The error encountered indicates that there is a mismatch in the lengths of the arrays used to create the DataFrame. This is likely due to a duplicate entry in the 'Keyword' list. Let's address this issue and proceed with the task.

### Step 1: Correct the Data Preparation
We need to ensure that the lengths of the 'Keyword' and 'Popularity' lists match. Let's fix the data and prepare it again.

#### Corrected Data Preparation
We will remove the duplicate entry "Why Don't You Go to Heaven" to ensure the lists are of equal length.

Let's proceed with the corrected data preparation.
2025-06-17 16:51:18.168 | INFO     | app.agent.toolcall:think:82 - 🛠️ Data_Analysis selected 1 tools to use
2025-06-17 16:51:18.168 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['visualization_preparation']
2025-06-17 16:51:18.168 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"code_type":"visualization","code":"import pandas as pd\nimport json\n\n# Corrected data preparation\nkeywords_data = {\n    'Keyword': [\n        'Hot Word', 'Zao Le Wo Men', 'Rao Jian Huo', 'My Wish is World Peace', 'Xiu Xiu Xiu',\n        'Shenzhou 11', 'Hundred Birds Facing the Wind', 'China Women\\'s Volleyball Team', 'My Guan Na',\n        'Leg Dong', 'Hot Pot Hero', 'Baby\\'s Heart is Bitter', 'Olympics', 'Awesome My Brother',\n        'Poetry and Distance', 'Song Joong-ki', 'PPAP', 'Blue Thin Mushroom', 'Rain Dew Evenly',\n        'Friendship\\'s Little Boat Says It Flips', 'Beijing Slump', 'Dedication', 'Apple', 'Dog Belt',\n        'Old Driver', 'Melon-Eating Crowd', 'Zootopia', 'City Will Play', 'Routine', 'Water Reverse',\n        'Why Don\\'t You Go to Heaven', 'Snake Spirit Man', 'Samsung Explosion Gate', 'Little Li Oscar',\n        'Ugly People Need to Read More', 'Boyfriend Power', 'A Face of Confusion', 'Descendants of the Sun'\n    ],\n    'Popularity': [\n        1000, 800, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400,\n        400, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200\n    ]\n}\n\n# Create DataFrame\nkeywords_df = pd.DataFrame(keywords_data)\n\n# Save to CSV\ncsv_file_path = '/home/yang/OpenManus/workspace/keywords_popularity.csv'\nkeywords_df.to_csv(csv_file_path, index=False)\n\n# Prepare JSON description\nchart_description = {\n    'csvFilePath': csv_file_path,\n    'chartTitle': 'Popularity of Search Keywords'\n}\n\n# Save JSON\njson_file_path = '/home/yang/OpenManus/workspace/keywords_chart_description.json'\nwith open(json_file_path, 'w', encoding='utf-8') as json_file:\n    json.dump([chart_description], json_file)\n\nprint(json_file_path)"}
2025-06-17 16:51:18.168 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'visualization_preparation'...
2025-06-17 16:51:18.186 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'visualization_preparation' completed its mission! Result: Observed output of cmd `visualization_preparation` executed:
{'observation': 'All arrays must be of the same length', 'success': False}
2025-06-17 16:51:18.186 | INFO     | app.agent.base:run:140 - Executing step 3/20
2025-06-17 16:51:27.931 | INFO     | app.llm:update_token_count:243 - Token usage: Input=3225, Completion=730, Cumulative Input=136820, Cumulative Completion=13568, Total=3955, Cumulative Total=150388
2025-06-17 16:51:27.931 | INFO     | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: The error persists, indicating that there might still be an issue with the data preparation. Let's carefully review the data and ensure that both lists ('Keyword' and 'Popularity') have the same number of elements.

### Step 1: Review and Correct the Data
Let's manually count the elements in both lists to ensure they match. If there are any discrepancies, we will correct them.

#### Data Review
- **Keywords List**: Contains 39 unique entries.
- **Popularity List**: Contains 39 entries.

Upon reviewing, it seems there was a mistake in the previous correction attempt. Let's ensure the lists are correctly aligned and retry the preparation.

Let's proceed with the corrected data preparation.

结果应位于worksapce\visualization下,涉及到9种不同的图表结果

yang@Yangless:~/OpenManus/app/tool/chart_visualization$ cat /home/yang/OpenManus/workspace/sales_data_export.csv
Date,Type,Value
2023-01-01,Product A,52.9
2023-01-01,Product B,63.6
2023-01-01,Product C,11.2
2023-01-02,Product A,45.7
2023-01-02,Product B,89.1
2023-01-02,Product C,21.4
2023-01-03,Product A,67.2
2023-01-03,Product B,82.4
2023-01-03,Product C,31.7
2023-01-04,Product A,80.7
2023-01-04,Product B,55.1
2023-01-04,Product C,21.1
2023-01-05,Product A,65.6
2023-01-05,Product B,78.0
2023-01-05,Product C,31.3
2023-01-06,Product A,75.6
2023-01-06,Product B,89.1
2023-01-06,Product C,63.5
2023-01-07,Product A,67.3
2023-01-07,Product B,77.2
2023-01-07,Product C,43.7
2023-01-08,Product A,96.1
2023-01-08,Product B,97.6
2023-01-08,Product C,59.9
2023-01-09,Product A,96.1
2023-01-09,Product B,100.6
2023-01-09,Product C,66.8
2023-01-10,Product A,101.6
2023-01-10,Product B,108.3
2023-01-10,Product C,56.9
简单数据报表任务

给出简单原始数据和分析需求后,执行命令:

python -m app.tool.chart_visualization.test.report_demo

代码分析

概述

1. 应用入口
  • main.py:应用启动入口,负责初始化并运行Manus智能体
async def main():
    # Parse command line arguments
    parser = argparse.ArgumentParser(description="Run Manus agent with a prompt")
    parser.add_argument(
        "--prompt", type=str, required=False, help="Input prompt for the agent"
    )
    args = parser.parse_args()

    # Create and initialize Manus agent
    agent = await Manus.create()
    try:
        # Use command line prompt if provided, otherwise ask for input
        prompt = args.prompt if args.prompt else input("Enter your prompt: ")
        if not prompt.strip():
            logger.warning("Empty prompt provided.")
            return

        logger.warning("Processing your request...")
        await agent.run(prompt)
        logger.info("Request processing completed.")
    except KeyboardInterrupt:
        logger.warning("Operation interrupted.")
    finally:
        # Ensure agent resources are cleaned up before exiting
        await agent.cleanup()
2. 配置系统
  • app/config.py:基于Pydantic模型管理全局配置(LLM/浏览器/沙箱/MCP等),支持TOML/JSON格式加载

    各种设置:

    class LLMSettings(BaseModel):
        model: str = Field(..., description="Model name")
        base_url: str = Field(..., description="API base URL")
        api_key: str = Field(..., description="API key")
        max_tokens: int = Field(4096, description="Maximum number of tokens per request")
        max_input_tokens: Optional[int] = Field(
            None,
            description="Maximum input tokens to use across all requests (None for unlimited)",
        )
        temperature: float = Field(1.0, description="Sampling temperature")
        api_type: str = Field(..., description="Azure, Openai, or Ollama")
        api_version: str = Field(..., description="Azure Openai version if AzureOpenai")
    
    
    class ProxySettings(BaseModel):
        server: str = Field(None, description="Proxy server address")
        username: Optional[str] = Field(None, description="Proxy username")
        password: Optional[str] = Field(None, description="Proxy password")
    
    
    class SearchSettings(BaseModel):
        engine: str = Field(default="Google", description="Search engine the llm to use")
        fallback_engines: List[str] = Field(
            default_factory=lambda: ["DuckDuckGo", "Baidu", "Bing"],
            description="Fallback search engines to try if the primary engine fails",
        )
        retry_delay: int = Field(
            default=60,
            description="Seconds to wait before retrying all engines again after they all fail",
        )
        max_retries: int = Field(
            default=3,
            description="Maximum number of times to retry all engines when all fail",
        )
        lang: str = Field(
            default="en",
            description="Language code for search results (e.g., en, zh, fr)",
        )
        country: str = Field(
            default="us",
            description="Country code for search results (e.g., us, cn, uk)",
        )
    
    
    class RunflowSettings(BaseModel):
        use_data_analysis_agent: bool = Field(
            default=False, description="Enable data analysis agent in run flow"
        )
    
    
    class BrowserSettings(BaseModel):
        headless: bool = Field(False, description="Whether to run browser in headless mode")
        disable_security: bool = Field(
            True, description="Disable browser security features"
        )
        extra_chromium_args: List[str] = Field(
            default_factory=list, description="Extra arguments to pass to the browser"
        )
        chrome_instance_path: Optional[str] = Field(
            None, description="Path to a Chrome instance to use"
        )
        wss_url: Optional[str] = Field(
            None, description="Connect to a browser instance via WebSocket"
        )
        cdp_url: Optional[str] = Field(
            None, description="Connect to a browser instance via CDP"
        )
        proxy: Optional[ProxySettings] = Field(
            None, description="Proxy settings for the browser"
        )
        max_content_length: int = Field(
            2000, description="Maximum length for content retrieval operations"
        )
    

    加载config

    class Config:
        _instance = None
        _lock = threading.Lock()
        _initialized = False
    
        def __new__(cls):
            if cls._instance is None:
                with cls._lock:
                    if cls._instance is None:
                        cls._instance = super().__new__(cls)
            return cls._instance
    
        def __init__(self):
            if not self._initialized:
                with self._lock:
                    if not self._initialized:
                        self._config = None
                        self._load_initial_config()
                        self._initialized = True
    
        @staticmethod
        def _get_config_path() -> Path:
            root = PROJECT_ROOT
            config_path = root / "config" / "config.toml"
            if config_path.exists():
                return config_path
            example_path = root / "config" / "config.example.toml"
            if example_path.exists():
                return example_path
            raise FileNotFoundError("No configuration file found in config directory")
    
        def _load_config(self) -> dict:
            config_path = self._get_config_path()
            with config_path.open("rb") as f:
                return tomllib.load(f)
    
        def _load_initial_config(self):
            raw_config = self._load_config()
            base_llm = raw_config.get("llm", {})
            llm_overrides = {
                k: v for k, v in raw_config.get("llm", {}).items() if isinstance(v, dict)
            }
    
            default_settings = {
                "model": base_llm.get("model"),
                "base_url": base_llm.get("base_url"),
                "api_key": base_llm.get("api_key"),
                "max_tokens": base_llm.get("max_tokens", 4096),
                "max_input_tokens": base_llm.get("max_input_tokens"),
                "temperature": base_llm.get("temperature", 1.0),
                "api_type": base_llm.get("api_type", ""),
                "api_version": base_llm.get("api_version", ""),
            }
    
            # handle browser config.
            browser_config = raw_config.get("browser", {})
            browser_settings = None
    
            if browser_config:
                # handle proxy settings.
                proxy_config = browser_config.get("proxy", {})
                proxy_settings = None
    
                if proxy_config and proxy_config.get("server"):
                    proxy_settings = ProxySettings(
                        **{
                            k: v
                            for k, v in proxy_config.items()
                            if k in ["server", "username", "password"] and v
                        }
                    )
    
                # filter valid browser config parameters.
                valid_browser_params = {
                    k: v
                    for k, v in browser_config.items()
                    if k in BrowserSettings.__annotations__ and v is not None
                }
    
                # if there is proxy settings, add it to the parameters.
                if proxy_settings:
                    valid_browser_params["proxy"] = proxy_settings
    
                # only create BrowserSettings when there are valid parameters.
                if valid_browser_params:
                    browser_settings = BrowserSettings(**valid_browser_params)
    
            search_config = raw_config.get("search", {})
            search_settings = None
            if search_config:
                search_settings = SearchSettings(**search_config)
            sandbox_config = raw_config.get("sandbox", {})
            if sandbox_config:
                sandbox_settings = SandboxSettings(**sandbox_config)
            else:
                sandbox_settings = SandboxSettings()
    
            mcp_config = raw_config.get("mcp", {})
            mcp_settings = None
            if mcp_config:
                # Load server configurations from JSON
                mcp_config["servers"] = MCPSettings.load_server_config()
                mcp_settings = MCPSettings(**mcp_config)
            else:
                mcp_settings = MCPSettings(servers=MCPSettings.load_server_config())
    
            run_flow_config = raw_config.get("runflow")
            if run_flow_config:
                run_flow_settings = RunflowSettings(**run_flow_config)
            else:
                run_flow_settings = RunflowSettings()
            config_dict = {
                "llm": {
                    "default": default_settings,
                    **{
                        name: {**default_settings, **override_config}
                        for name, override_config in llm_overrides.items()
                    },
                },
                "sandbox": sandbox_settings,
                "browser_config": browser_settings,
                "search_config": search_settings,
                "mcp_config": mcp_settings,
                "run_flow_config": run_flow_settings,
            }
    
            self._config = AppConfig(**config_dict)
    
3. 日志系统
  • app/logger.py:采用Loguru实现的集中式日志管理

    def define_log_level(print_level="INFO", logfile_level="DEBUG", name: str = None):
        """Adjust the log level to above level"""
        global _print_level
        _print_level = print_level
    
        current_date = datetime.now()
        formatted_date = current_date.strftime("%Y%m%d%H%M%S")
        log_name = (
            f"{name}_{formatted_date}" if name else formatted_date
        )  # name a log with prefix name
    
        _logger.remove()
        _logger.add(sys.stderr, level=print_level)
        _logger.add(PROJECT_ROOT / f"logs/{log_name}.log", level=logfile_level)
        return _logger
    
    
    logger = define_log_level()
    
4. 智能体层级架构
BaseAgent → ReActAgent → ToolCallAgent → Manus
  • app/agent/base.py:所有智能体的抽象基类(生命周期/状态管理)

    from abc import ABC, abstractmethod   
    #抽象类接口,代码健壮
    from contextlib import asynccontextmanager
    #异步上下文管理释放
    from typing import List, Optional
    #明确了变量、函数参数和返回值的预期类型,使代码更容易理解。
    from pydantic import BaseModel, Field, model_validator
    #langchain核心组件
    from app.llm import LLM
    from app.logger import logger
    from app.sandbox.client import SANDBOX_CLIENT
    from app.schema import ROLE_TYPE, AgentState, Memory, Message
    
    #这部分定义了 BaseAgent 类的骨架和所有智能体都应具备的通用属性。
    class BaseAgent(BaseModel, ABC):
        """Abstract base class for managing agent state and execution.
    
        Provides foundational functionality for state transitions, memory management,
        and a step-based execution loop. Subclasses must implement the `step` method.
        """
    
        # Core attributes
        name: str = Field(..., description="Unique name of the agent")
        description: Optional[str] = Field(None, description="Optional agent description")
    
        # Prompts
        system_prompt: Optional[str] = Field(
            None, description="System-level instruction prompt"
        )
        next_step_prompt: Optional[str] = Field(
            None, description="Prompt for determining next action"
        )
    
        # Dependencies
        llm: LLM = Field(default_factory=LLM, description="Language model instance")
        memory: Memory = Field(default_factory=Memory, description="Agent's memory store")
        state: AgentState = Field(
            default=AgentState.IDLE, description="Current agent state"
        )
    
        # Execution control
        max_steps: int = Field(default=10, description="Maximum steps before termination")
        current_step: int = Field(default=0, description="Current step in execution")
    
        duplicate_threshold: int = 2
    
        class Config:
            arbitrary_types_allowed = True
            extra = "allow"  # Allow extra fields for flexibility in subclasses
        #使得模型在接收数据时能够容忍并存储未在模型中明确定义的额外字段。这在设计可扩展的基类(如 BaseAgent)时非常有用,允许子类或外部系统传递更多自定义参数,而不会导致验证错误。
        #允许模型字段包含 Pydantic 无法直接验证的自定义对象实例
    
        #初始化
        @model_validator(mode="after")
        def initialize_agent(self) -> "BaseAgent":
            """Initialize agent with default settings if not provided."""
            if self.llm is None or not isinstance(self.llm, LLM):
                self.llm = LLM(config_name=self.name.lower())
            if not isinstance(self.memory, Memory):
                self.memory = Memory()
            return self
    
        #安全地管理智能体的状态
        @asynccontextmanager
        async def state_context(self, new_state: AgentState):
            """Context manager for safe agent state transitions."""
            # ...
            previous_state = self.state
            self.state = new_state
            try:
                yield
            except Exception as e:
                self.state = AgentState.ERROR
                raise e
            finally:
                self.state = previous_state
    

    在 Python 中,yield 是用来定义**生成器(generator)异步生成器(async generator)**的关键字。

    它的本质是:暂停函数执行,把控制权和当前状态交还给调用者,下次可以从这里恢复执行。

    举例:

    async def something():
        async with agent.state_context(AgentState.RUNNING):
            # 这就是 async with 块内部的代码
            await agent.do_something()
            print("在这个状态下完成某些操作")
    

    1.Python 运行 async with agent.state_context(…):

    ​ 进入 state_context 函数;

    ​ 执行到 yield 前的所有代码(如状态切换);

    2.遇到 yield:

    ​ 暂停 state_context 函数;

    ​ 跳出函数,把控制权交给 async with 块内部的代码执行;

    ​ 即此时执行 await agent.do_something(),再执行 print(…);

    3.async with 块执行完(无论是否抛异常):

    ​ 程序回到 state_context 中,从 yield 之后恢复执行;

    ​ 进入 except 或 finally 来清理现场、还原状态。

        #更新记忆
        def update_memory(
            self,
            role: ROLE_TYPE,  # type: ignore
            content: str,
            base64_image: Optional[str] = None,
            **kwargs,
        ) -> None:
            """Add a message to the agent's memory.
    
            Args:
                role: The role of the message sender (user, system, assistant, tool).
                content: The message content.
                base64_image: Optional base64 encoded image.
                **kwargs: Additional arguments (e.g., tool_call_id for tool messages).
    
            Raises:
                ValueError: If the role is unsupported.
            """
            message_map = {
                "user": Message.user_message,
                "system": Message.system_message,
                "assistant": Message.assistant_message,
                "tool": lambda content, **kw: Message.tool_message(content, **kw),
            }
    
            if role not in message_map:
                raise ValueError(f"Unsupported message role: {role}")
    
            # Create message with appropriate parameters based on role
            kwargs = {"base64_image": base64_image, **(kwargs if role == "tool" else {})}
            self.memory.add_message(message_map[role](content, **kwargs))
    
        #异步执行
        async def run(self, request: Optional[str] = None) -> str:
            """Execute the agent's main loop asynchronously.
    
            Args:
                request: Optional initial user request to process.
    
            Returns:
                A string summarizing the execution results.
    
            Raises:
                RuntimeError: If the agent is not in IDLE state at start.
            """
            #处于空闲
            if self.state != AgentState.IDLE:
                raise RuntimeError(f"Cannot run agent from state: {self.state}")
    
            if request:
                self.update_memory("user", request)
    
            results: List[str] = []
            async with self.state_context(AgentState.RUNNING):
                while (
                    self.current_step < self.max_steps and self.state != AgentState.FINISHED
                ):
                    self.current_step += 1
                    logger.info(f"Executing step {self.current_step}/{self.max_steps}")
                    step_result = await self.step()
    
                    # Check for stuck state
                    if self.is_stuck():
                        self.handle_stuck_state()
    
                    results.append(f"Step {self.current_step}: {step_result}")
    
                if self.current_step >= self.max_steps:
                    self.current_step = 0
                    self.state = AgentState.IDLE
                    results.append(f"Terminated: Reached max steps ({self.max_steps})")
            await SANDBOX_CLIENT.cleanup()
            return "\n".join(results) if results else "No steps executed"
    

    run 是整个框架的调度入口:它协调 statemax_stepscurrent_step 等控制属性,调用核心的 step 方法,同时用 state_contextupdate_memory 处理状态和记忆。最后还会和外部的 SANDBOX_CLIENT 交互。SANDBOX_CLIENT 相当于把命令放进 Docker 里执行,避免直接污染宿主机。

        @abstractmethod
        async def step(self) -> str:
            """Execute a single step in the agent's workflow.
    
            Must be implemented by subclasses to define specific behavior.
            """
    
        def handle_stuck_state(self):
            """Handle stuck state by adding a prompt to change strategy"""
            stuck_prompt = "\
            Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
            self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
            logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
    
        def is_stuck(self) -> bool:
            """Check if the agent is stuck in a loop by detecting duplicate content"""
            if len(self.memory.messages) < 2:
                return False
    
            last_message = self.memory.messages[-1]
            if not last_message.content:
                return False
    
            # Count identical content occurrences
            duplicate_count = sum(
                1
                for msg in reversed(self.memory.messages[:-1])
                if msg.role == "assistant" and msg.content == last_message.content
            )
    
            return duplicate_count >= self.duplicate_threshold
    

    这三个方法定义了智能体的单步行为和自我纠错机制,但这里有两个明显隐患:

    1.重复检测只看内容(content),不考虑语义或相似度

    2.duplicate_threshold 太低容易误报,太高容易漏报

        @property
        def messages(self) -> List[Message]:
            """Retrieve a list of messages from the agent's memory."""
            return self.memory.messages
    
        @messages.setter
        def messages(self, value: List[Message]):
            """Set the list of messages in the agent's memory."""
            self.memory.messages = value
    

    这里给记忆消息列表包了一层属性访问,调用处会短一点。

  • app/agent/react.py:实现ReAct(推理-行动)范式

    
        @abstractmethod
        async def think(self) -> bool:
            """Process current state and decide next action"""
    	#决定是否下一步
        @abstractmethod
        async def act(self) -> str:
            """Execute decided actions"""
    
        async def step(self) -> str:
            """Execute a single step: think and act."""
            should_act = await self.think()
            if not should_act:
                return "Thinking complete - no action needed"
            return await self.act()
    
  • app/agent/toolcall.py:扩展工具调用能力

    class ToolCallAgent(ReActAgent):
        """Base agent class for handling tool/function calls with enhanced abstraction"""
    
        name: str = "toolcall"
        description: str = "an agent that can execute tool calls."
    
        system_prompt: str = SYSTEM_PROMPT
        next_step_prompt: str = NEXT_STEP_PROMPT
    	#工具属性,和大模型交互
        available_tools: ToolCollection = ToolCollection(
            CreateChatCompletion(), Terminate()
        )
        #available_tools: 定义了这个 Agent 能使用哪些工具。这里默认有两个:CreateChatCompletion (可能用于生成更长的文本) 和 Terminate (用于结束任务)。ToolCollection 是一个管理工具集合的辅助类。
        tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO  # type: ignore
        #它控制了 LLM 在调用工具时的行为模式,可以是 AUTO (LLM 自行决定是否调用工具), REQUIRED (LLM 必须调用工具), 或 NONE (LLM 禁止调用工具)。
        special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])
    
        #状态属性:act时用
        tool_calls: List[ToolCall] = Field(default_factory=list)
        _current_base64_image: Optional[str] = None
    
        max_steps: int = 30
        max_observe: Optional[Union[int, bool]] = None
    
        async def think(self) -> bool:
            """Process current state and decide next actions using tools"""
            if self.next_step_prompt:
                user_msg = Message.user_message(self.next_step_prompt)
                self.messages += [user_msg]
    
            try:
                # Get response with tool options
                response = await self.llm.ask_tool(
                    messages=self.messages,
                    system_msgs=(
                        [Message.system_message(self.system_prompt)]
                        if self.system_prompt
                        else None
                    ),
                    tools=self.available_tools.to_params(),
                    tool_choice=self.tool_choices,
                )
            except ValueError:
                raise
            except Exception as e:
                # Check if this is a RetryError containing TokenLimitExceeded
                if hasattr(e, "__cause__") and isinstance(e.__cause__, TokenLimitExceeded):
                    token_limit_error = e.__cause__
                    logger.error(
                        f"🚨 Token limit error (from RetryError): {token_limit_error}"
                    )
                    self.memory.add_message(
                        Message.assistant_message(
                            f"Maximum token limit reached, cannot continue execution: {str(token_limit_error)}"
                        )
                    )
                    self.state = AgentState.FINISHED
                    return False
                raise
    
            self.tool_calls = tool_calls = (
                response.tool_calls if response and response.tool_calls else []
            )
            content = response.content if response and response.content else ""
    
            # Log response info
            logger.info(f"✨ {self.name}'s thoughts: {content}")
            logger.info(
                f"🛠️ {self.name} selected {len(tool_calls) if tool_calls else 0} tools to use"
            )
            if tool_calls:
                logger.info(
                    f"🧰 Tools being prepared: {[call.function.name for call in tool_calls]}"
                )
                logger.info(f"🔧 Tool arguments: {tool_calls[0].function.arguments}")
    
            try:
                if response is None:
                    raise RuntimeError("No response received from the LLM")
    
                # Handle different tool_choices modes
                if self.tool_choices == ToolChoice.NONE:
                    if tool_calls:
                        logger.warning(
                            f"🤔 Hmm, {self.name} tried to use tools when they weren't available!"
                        )
                    if content:
                        self.memory.add_message(Message.assistant_message(content))
                        return True
                    return False
    
                # Create and add assistant message
                assistant_msg = (
                    Message.from_tool_calls(content=content, tool_calls=self.tool_calls)
                    if self.tool_calls
                    else Message.assistant_message(content)
                )
                self.memory.add_message(assistant_msg)
    
                if self.tool_choices == ToolChoice.REQUIRED and not self.tool_calls:
                    return True  # Will be handled in act()
    
                # For 'auto' mode, continue with content if no commands but content exists
                if self.tool_choices == ToolChoice.AUTO and not self.tool_calls:
                    return bool(content)
    
                return bool(self.tool_calls)
            except Exception as e:
                logger.error(f"🚨 Oops! The {self.name}'s thinking process hit a snag: {e}")
                self.memory.add_message(
                    Message.assistant_message(
                        f"Error encountered while processing: {str(e)}"
                    )
                )
                return False
    

    这是 Agent 的“思考”阶段,负责把当前上下文变成下一步决策。

    1. 准备输入: 如果有 next_step_prompt,就把它作为新的用户消息加入对话历史,用来引导 LLM 的下一步。
    2. 调用 LLM: 调用 self.llm.ask_tool 方法。这个接口面向工具调用,会把对话历史、系统提示、可用工具列表 (self.available_tools) 和工具选择模式 (self.tool_choices) 一起发送给 LLM。
    3. 错误处理: 健壮地处理各种异常,特别是 TokenLimitExceeded,如果 token 超限,会记录错误并结束 Agent。
    4. 解析响应: 从 LLM 的响应中提取出 tool_calls(计划执行的工具列表)和 content(纯文本思考内容)。
    5. 记录日志: 打印 LLM 的思考、工具选择和工具参数,方便排查问题。
    6. 处理不同模式: 根据 self.tool_choices 的值(NONE, REQUIRED, AUTO)进行逻辑判断。例如,在 NONE 模式下,如果 LLM 仍然尝试调用工具,会发出警告。
    7. 更新记忆: 将 LLM 的响应(包括思考内容和工具调用请求)作为一条“助手”消息添加到记忆中。
    8. 返回决策: 返回一个布尔值,True 表示有工具需要执行(act 阶段应该被调用),False 则表示没有。
        async def act(self) -> str:
            """Execute tool calls and handle their results"""
            if not self.tool_calls:
                if self.tool_choices == ToolChoice.REQUIRED:
                    raise ValueError(TOOL_CALL_REQUIRED)
    
                # Return last message content if no tool calls
                return self.messages[-1].content or "No content or commands to execute"
    
            results = []
            for command in self.tool_calls:
                # Reset base64_image for each tool call
                self._current_base64_image = None
    
                result = await self.execute_tool(command)
    
                if self.max_observe:
                    result = result[: self.max_observe]
    
                logger.info(
                    f"🎯 Tool '{command.function.name}' completed its mission! Result: {result}"
                )
    
                # Add tool response to memory
                tool_msg = Message.tool_message(
                    content=result,
                    tool_call_id=command.id,
                    name=command.function.name,
                    base64_image=self._current_base64_image,
                )
                self.memory.add_message(tool_msg)
                results.append(result)
    
            return "\n\n".join(results)
    

    act 方法是代理的“执行器”。它接收 LLM 在 think 阶段给出的工具调用,真正执行这些工具,收集输出,再把结果写回记忆里。这样就串起了“思考-行动-观察”循环。

    1. 检查前提: 检查 self.tool_calls 列表是否为空。如果为空,根据 tool_choices 模式决定是报错还是直接返回。
    2. 迭代执行: 遍历 think 阶段生成的所有 tool_calls。
    3. 执行单个工具: 对每个 command(即一个 ToolCall 对象),调用 self.execute_tool 来实际运行它。
    4. 处理结果: 获取工具执行结果,并根据 max_observe 对结果进行截断。
    5. 更新记忆: 将每个工具的执行结果包装成一个 tool_message(包含结果内容、对应的 tool_call_id 等),并添加到记忆中,供 LLM 下一步继续读。
    6. 汇总返回: 将所有工具的执行结果合并成一个字符串,作为当前 step 的结果。
        async def execute_tool(self, command: ToolCall) -> str:
            """Execute a single tool call with robust error handling"""
            if not command or not command.function or not command.function.name:
                return "Error: Invalid command format"
    
            name = command.function.name
            if name not in self.available_tools.tool_map:
                return f"Error: Unknown tool '{name}'"
    
            try:
                # Parse arguments
                args = json.loads(command.function.arguments or "{}")
    
                # Execute the tool
                logger.info(f"🔧 Activating tool: '{name}'...")
                result = await self.available_tools.execute(name=name, tool_input=args)
    
                # Handle special tools
                await self._handle_special_tool(name=name, result=result)
    
                # Check if result is a ToolResult with base64_image
                if hasattr(result, "base64_image") and result.base64_image:
                    # Store the base64_image for later use in tool_message
                    self._current_base64_image = result.base64_image
    
                # Format result for display (standard case)
                observation = (
                    f"Observed output of cmd `{name}` executed:\n{str(result)}"
                    if result
                    else f"Cmd `{name}` completed with no output"
                )
    
                return observation
            except json.JSONDecodeError:
                error_msg = f"Error parsing arguments for {name}: Invalid JSON format"
                logger.error(
                    f"📝 Oops! The arguments for '{name}' don't make sense - invalid JSON, arguments:{command.function.arguments}"
                )
                return f"Error: {error_msg}"
            except Exception as e:
                error_msg = f"⚠️ Tool '{name}' encountered a problem: {str(e)}"
                logger.exception(error_msg)
                return f"Error: {error_msg}"
    

    act 负责调度工具,execute_tool 负责执行单个工具调用。

    1. 验证: 检查工具名称是否存在于 available_tools 中。
    2. 解析参数: LLM 返回的工具参数通常是 JSON 格式的字符串。此方法使用 json.loads 将其解析为 Python 字典。
    3. 执行: 调用 self.available_tools.execute() 方法,传入工具名称和解析后的参数来运行工具。
    4. 特殊处理: 调用 _handle_special_tool 来检查当前工具是否需要触发特殊逻辑(如结束 Agent)。
    5. 结果格式化: 将工具的原始返回结果包装成“观察(Observation)”字符串,方便 LLM 继续读。
    6. 错误处理: 捕获 JSON 解析错误、工具执行异常等问题,并将错误信息格式化为字符串返回。这样单个工具失败时,Agent 还有机会继续往下走。
        async def cleanup(self):
            """Clean up resources used by the agent's tools."""
            logger.info(f"🧹 Cleaning up resources for agent '{self.name}'...")
            for tool_name, tool_instance in self.available_tools.tool_map.items():
                if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction(
                    tool_instance.cleanup
                ):
                    try:
                        logger.debug(f"🧼 Cleaning up tool: {tool_name}")
                        await tool_instance.cleanup()
                    except Exception as e:
                        logger.error(
                            f"🚨 Error cleaning up tool '{tool_name}': {e}", exc_info=True
                        )
            logger.info(f"✨ Cleanup complete for agent '{self.name}'.")
    
        async def run(self, request: Optional[str] = None) -> str:
            """Run the agent with cleanup when done."""
            try:
                return await super().run(request)
            finally:
                await self.cleanup()
    

    这里我第一眼觉得 run 有点多余:既然 think()act() 里已经有工具调用逻辑,run 似乎应该放到更高抽象层。

    但是,仔细考虑:

    1. 为什么写 run()

    它不是为了调用工具,而是为了在当前这一层补上生命周期末尾的统一清理。当然,这部分也可以放到更高抽象层继承。

    1. 是否职责冲突?

    不冲突,run 只是封装流程起止点,真正的逻辑控制权还在 think() / act()

  • app/agent/manus.py:主智能体(集成MCP客户端和内置工具)

    @model_validator(mode="after")
    def initialize_helper(self) -> "Manus":
        """Initialize basic components synchronously."""
        self.browser_context_helper = BrowserContextHelper(self)
        return self

@model_validator(mode="after"):

这是 Pydantic 提供的装饰器,用来做模型校验和初始化。mode="after" 表示这个验证器会在所有字段解析、验证之后运行。创建 Manus 对象时,Pydantic 会先处理 namedescriptionsystem_prompt 等字段,基础字段就位后再调用 initialize_helper

-> "Manus" 是 Python 的类型提示 (Type Hint),表示 initialize_helper 预期返回一个 Manus 对象。

为什么是字符串 "Manus" 而不是直接 Manus

这叫前向引用 (Forward Reference)。在类定义过程中引用这个类本身时,类还没完全定义,直接写类名可能触发 NameError

把类型名放进引号里后,Python 会先把它当字符串处理。类型检查器(如 MyPy)会在后面再解析这个字符串,避开循环引用或名称未定义的问题。

   @classmethod
    async def create(cls, **kwargs) -> "Manus":
        """Factory method to create and properly initialize a Manus instance."""
        instance = cls(**kwargs)
        await instance.initialize_mcp_servers()
        instance._initialized = True
        return instance

为什么不在 create() 里初始化 initialize_helper(),反而放到前面的 @model_validator(mode="after") 里?这里读起来有点分裂。

把所有初始化逻辑收敛到 create(),明确定义构造流程可能更好一点

    async def connect_mcp_server(
        self,
        server_url: str,
        server_id: str = "",
        use_stdio: bool = False,
        stdio_args: List[str] = None,
    ) -> None:
        """Connect to an MCP server and add its tools."""
        if use_stdio:
            await self.mcp_clients.connect_stdio(
                server_url, stdio_args or [], server_id
            )
            self.connected_servers[server_id or server_url] = server_url
        else:
            await self.mcp_clients.connect_sse(server_url, server_id)
            self.connected_servers[server_id or server_url] = server_url

        # Update available tools with only the new tools from this server
        new_tools = [
            tool for tool in self.mcp_clients.tools if tool.server_id == server_id
        ]
        self.available_tools.add_tools(*new_tools)

    async def disconnect_mcp_server(self, server_id: str = "") -> None:
        """Disconnect from an MCP server and remove its tools."""
        await self.mcp_clients.disconnect(server_id)
        if server_id:
            self.connected_servers.pop(server_id, None)
        else:
            self.connected_servers.clear()

        # Rebuild available tools without the disconnected server's tools
        base_tools = [
            tool
            for tool in self.available_tools.tools
            if not isinstance(tool, MCPClientTool)
        ]
        self.available_tools = ToolCollection(*base_tools)
        self.available_tools.add_tools(*self.mcp_clients.tools)

这里通过 mcp_clients 和指定服务器建立连接(SSE 或标准输入/输出),记录连接后,再从 mcp_clients 取出新服务器暴露的工具,追加到 self.available_tools。也就是说,代理的工具箱是在运行时动态扩展的。

断开连接时,会先移除连接记录。为了去掉已断开服务器的工具,它会筛出所有非 MCP 的基础工具,重建一个 ToolCollection,再把仍然连接的 MCP 工具加回来。这样可以避免代理继续调用已经不可用的工具。

这种写法太麻烦,可以使用工具注册表 + 启用状态表,远比硬编码每次重建 ToolCollection 来得可维护、可扩展。

5. LLM交互层
  • app/llm.py:统一接口对接多种大模型(OpenAI/Azure/Bedrock),含:

    • 令牌计数
     def _calculate_high_detail_tokens(self, width: int, height: int) -> int:
            """Calculate tokens for high detail images based on dimensions"""
            # Step 1: Scale to fit in MAX_SIZE x MAX_SIZE square
            if width > self.MAX_SIZE or height > self.MAX_SIZE:
                scale = self.MAX_SIZE / max(width, height)
                width = int(width * scale)
                height = int(height * scale)
    
            # Step 2: Scale so shortest side is HIGH_DETAIL_TARGET_SHORT_SIDE
            scale = self.HIGH_DETAIL_TARGET_SHORT_SIDE / min(width, height)
            scaled_width = int(width * scale)
            scaled_height = int(height * scale)
    
            # Step 3: Count number of 512px tiles
            tiles_x = math.ceil(scaled_width / self.TILE_SIZE)
            tiles_y = math.ceil(scaled_height / self.TILE_SIZE)
            total_tiles = tiles_x * tiles_y
    
            # Step 4: Calculate final token count
            return (
                total_tiles * self.HIGH_DETAIL_TILE_TOKENS
            ) + self.LOW_DETAIL_IMAGE_TOKENS
    
        def count_content(self, content: Union[str, List[Union[str, dict]]]) -> int:
            """Calculate tokens for message content"""
            if not content:
                return 0
    
            if isinstance(content, str):
                return self.count_text(content)
    
            token_count = 0
            for item in content:
                if isinstance(item, str):
                    token_count += self.count_text(item)
                elif isinstance(item, dict):
                    if "text" in item:
                        token_count += self.count_text(item["text"])
                    elif "image_url" in item:
                        token_count += self.count_image(item)
            return token_count
    

    LLM 处理大图像时,并不是简单地按像素点数计算,而是将其分解为多个“块”(tiles),并对每个块收取一定的 token 费用。同时,还会有一个基础的 token 费用(低分辨率图像的 token 费用)。

    • 消息格式化
    • 工具调用处理
    • 重试机制
        @retry(
            wait=wait_random_exponential(min=1, max=60),
            stop=stop_after_attempt(6),
            retry=retry_if_exception_type(
                (OpenAIError, Exception, ValueError)
            ),  # Don't retry TokenLimitExceeded
        )
    

    @retry 装饰器作用的对象是一个函数或方法。它用来增强该函数的容错能力:面对临时、可恢复的错误(如网络抖动、服务器临时过载)时自动重试,而不是直接失败。相当于:

    original_get_completion = get_completion  # 保存原始函数
    get_completion = retry(...)(original_get_completion) # 用 retry 返回的新函数替换原始函数
    
6. 数据模型
  • app/schema.py:定义核心数据结构:
    • Message/ToolCall/Memory
7. 工具框架
  • app/tool/base.py:抽象工具基类BaseTool和标准输出ToolResult
  • app/tool/tool_collection.py:工具集合管理器
class ToolCollection:
    """A collection of defined tools."""

    class Config:
        arbitrary_types_allowed = True

    def __init__(self, *tools: BaseTool):
        self.tools = tools
        self.tool_map = {tool.name: tool for tool in tools}

    def __iter__(self):
        return iter(self.tools)

    def to_params(self) -> List[Dict[str, Any]]:
        return [tool.to_param() for tool in self.tools]

    async def execute(
        self, *, name: str, tool_input: Dict[str, Any] = None
    ) -> ToolResult:
        tool = self.tool_map.get(name)
        if not tool:
            return ToolFailure(error=f"Tool {name} is invalid")
        try:
            result = await tool(**tool_input)
            return result
        except ToolError as e:
            return ToolFailure(error=e.message)

    async def execute_all(self) -> List[ToolResult]:
        """Execute all tools in the collection sequentially."""
        results = []
        for tool in self.tools:
            try:
                result = await tool()
                results.append(result)
            except ToolError as e:
                results.append(ToolFailure(error=e.message))
        return results
  1. execute 方法被调用时,它接收 nametool_input (一个字典,例如 {"text": "hello", "count": 2}).tool = self.tool_map.get(name) 检索到对应的 BaseTool,传入tool_input,BaseTool定义了__call__
class BaseTool(ABC, BaseModel):
    name: str
    description: str
    parameters: Optional[dict] = None

    class Config:
        arbitrary_types_allowed = True

    async def __call__(self, **kwargs) -> Any:
        """Execute the tool with given parameters."""
        return await self.execute(**kwargs)
  • app/tool/mcp.py:客户端MCP协议实现(远程工具代理)
    async def connect_stdio(
        self, command: str, args: List[str], server_id: str = ""
    ) -> None:
        """Connect to an MCP server using stdio transport."""
        if not command:
            raise ValueError("Server command is required.")

        server_id = server_id or command

        # Always ensure clean disconnection before new connection
        if server_id in self.sessions:
            await self.disconnect(server_id)

        exit_stack = AsyncExitStack()
        self.exit_stacks[server_id] = exit_stack

        server_params = StdioServerParameters(command=command, args=args)
        stdio_transport = await exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        read, write = stdio_transport
        session = await exit_stack.enter_async_context(ClientSession(read, write))
        self.sessions[server_id] = session

        await self._initialize_and_list_tools(server_id)

使用 stdio 连接意味着,主程序(这里是 Manus 代理所在的客户端程序)不会通过网络(如 HTTP 或 TCP/IP)去连接远程服务器,而是**在本地启动另一个程序作为“服务器”,再通过这个新程序的标准输入/输出管道与它双向通信。**只要它能读写标准输入/输出并遵循 MCP 协议,Python 客户端就可以与它集成。

  • app/mcp/server.py:服务端MCP实现(暴露工具集)
  • 具体工具实现:
    • bash.py/python_execute.py
    • browser_use_tool.py/web_search.py
    • file_operators.py/ask_human.py/terminate.py
8. 沙箱环境
  • app/sandbox/client.py:基于 Docker 的安全代码执行环境
9. 异常处理
  • app/exceptions.py:定制异常类(如ToolError/TokenLimitExceeded