OpenManus安装
克隆仓库:
git clone https://github.com/FoundationAgents/OpenManus.git
cd OpenManus
创建并激活虚拟环境:
uv venv --python 3.12
source .venv/bin/activate # Unix/macOS 系统
# Windows 系统使用:
# .venv\Scripts\activate
安装依赖:
uv pip install -r requirements.txt
浏览器自动化工具(可选)
playwright install
配置说明
OpenManus 需要配置使用的 LLM API,请按以下步骤设置:
在 config 目录创建 config.toml 文件(可从示例复制):
cp config/config.example.toml config/config.toml
编辑 config/config.toml 添加 API 密钥和自定义设置:
使用 OpenRouter API:
# 全局 LLM 配置
[llm]
model = "gpt-4o"
base_url = "https://openrouter.ai/api/v1"
api_key = "sk-..." # 替换为真实 API 密钥
max_tokens = 4096
temperature = 0.0
# 可选特定 LLM 模型配置
[llm.vision]
model = "gpt-4o"
base_url = "https://api.openai.com/v1"
api_key = "sk-..." # 替换为真实 API 密钥
快速启动
一行命令运行 OpenManus:
python main.py
(OpenManus) yang@Yangless:~/OpenManus$ python main.py
INFO [browser_use] BrowserUse logging setup complete with level info
INFO [root] Anonymized telemetry enabled. See https://docs.browser-use.com/development/telemetry for more information.
Enter your prompt: 搜索最新的 agent 开源项目,按 star 数进行排序保存到本地电脑桌面
2025-06-17 16:01:04.037 | WARNING | __main__:main:25 - Processing your request...
2025-06-17 16:01:04.038 | INFO | app.agent.base:run:140 - Executing step 1/20
2025-06-17 16:01:08.398 | INFO | app.llm:update_token_count:243 - Token usage: Input=1342, Completion=26, Cumulative Input=1342, Cumulative Completion=26, Total=1368, Cumulative Total=1368
2025-06-17 16:01:08.398 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:08.398 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:08.398 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:08.398 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"web_search","query":"latest open source agent projects sorted by stars"}
2025-06-17 16:01:08.399 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:10.786 | INFO | app.tool.web_search:_try_all_engines:299 - 🔎 Attempting search with Google...
2025-06-17 16:01:19.748 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Error: Browser action 'web_search' failed: RetryError[<Future at 0x7febd91cdc40 state=finished raised HTTPError>]
2025-06-17 16:01:19.748 | INFO | app.agent.base:run:140 - Executing step 2/20
2025-06-17 16:01:22.827 | INFO | app.llm:update_token_count:243 - Token usage: Input=1642, Completion=25, Cumulative Input=2984, Cumulative Completion=51, Total=1667, Cumulative Total=3035
2025-06-17 16:01:22.827 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:22.828 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:22.828 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:22.828 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"go_to_url","url":"https://github.com/trending"}
2025-06-17 16:01:22.828 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:27.384 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Navigated to https://github.com/trending
2025-06-17 16:01:27.385 | INFO | app.agent.base:run:140 - Executing step 3/20
2025-06-17 16:01:31.256 | INFO | app.llm:update_token_count:243 - Token usage: Input=1930, Completion=30, Cumulative Input=4914, Cumulative Completion=81, Total=1960, Cumulative Total=4995
2025-06-17 16:01:31.256 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:31.257 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:31.257 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:31.257 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"extract_content","goal":"Extract the list of trending open source agent projects sorted by stars"}
2025-06-17 16:01:31.257 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:35.837 | INFO | app.llm:update_token_count:243 - Token usage: Input=639, Completion=173, Cumulative Input=5553, Cumulative Completion=254, Total=812, Cumulative Total=5807
2025-06-17 16:01:35.837 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Extracted from page:
{'text': '1. **OpenAI/ChatGPT** - 100,000 stars\n2. **LangChain/Agent** - 85,000 stars\n3. **Auto-GPT** - 75,000 stars\n4. **Microsoft/PromptFlow** - 70,000 stars\n5. **HuggingFace/Transformers** - 65,000 stars\n6. **DeepMind/AlphaZero** - 60,000 stars\n7. **Google/DeepDream** - 55,000 stars\n8. **Meta/AIResearch** - 50,000 stars\n9. **Anthropic/Claude** - 45,000 stars\n10. **StabilityAI/StableDiffusion** - 40,000 stars'}
2025-06-17 16:01:35.837 | INFO | app.agent.base:run:140 - Executing step 4/20
2025-06-17 16:01:41.932 | INFO | app.llm:update_token_count:243 - Token usage: Input=2380, Completion=167, Cumulative Input=7933, Cumulative Completion=421, Total=2547, Cumulative Total=8354
2025-06-17 16:01:41.932 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:41.933 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:41.933 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:41.933 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/Desktop/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:41.933 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:41.934 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
Error: Failed to write to /home/yang/Desktop/trending_open_source_agents.txt: [Errno 2] No such file or directory: '/home/yang/Desktop/trending_open_source_agents.txt'
2025-06-17 16:01:41.934 | INFO | app.agent.base:run:140 - Executing step 5/20
2025-06-17 16:01:46.325 | INFO | app.llm:update_token_count:243 - Token usage: Input=2687, Completion=170, Cumulative Input=10620, Cumulative Completion=591, Total=2857, Cumulative Total=11211
2025-06-17 16:01:46.326 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/OpenManus/Desktop/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
Error: Failed to write to /home/yang/OpenManus/Desktop/trending_open_source_agents.txt: [Errno 2] No such file or directory: '/home/yang/OpenManus/Desktop/trending_open_source_agents.txt'
2025-06-17 16:01:46.329 | INFO | app.agent.base:run:140 - Executing step 6/20
2025-06-17 16:01:50.439 | INFO | app.llm:update_token_count:243 - Token usage: Input=3003, Completion=171, Cumulative Input=13623, Cumulative Completion=762, Total=3174, Cumulative Total=14385
2025-06-17 16:01:50.440 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:50.440 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:50.440 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:50.440 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/OpenManus/workspace/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:50.441 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:50.450 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
File created successfully at: /home/yang/OpenManus/workspace/trending_open_source_agents.txt
2025-06-17 16:01:50.450 | INFO | app.agent.base:run:140 - Executing step 7/20
2025-06-17 16:01:53.775 | INFO | app.llm:update_token_count:243 - Token usage: Input=3292, Completion=81, Cumulative Input=16915, Cumulative Completion=843, Total=3373, Cumulative Total=17758
2025-06-17 16:01:53.775 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts: I have successfully created a file named `trending_open_source_agents.txt` in the `/home/yang/OpenManus/workspace` directory. This file contains the list of trending open source agent projects sorted by stars, as extracted from GitHub.
If you need to move this file to a different location, such as your desktop, or if you have any other requests, please let me know!
2025-06-17 16:01:53.775 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 0 tools to use
2025-06-17 16:01:53.776 | INFO | app.agent.base:run:140 - Executing step 8/20
2025-06-17 16:01:55.080 | INFO | app.llm:update_token_count:243 - Token usage: Input=3454, Completion=14, Cumulative Input=20369, Cumulative Completion=857, Total=3468, Cumulative Total=21226
2025-06-17 16:01:55.081 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:55.081 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:55.081 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['terminate']
2025-06-17 16:01:55.081 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"status":"success"}
2025-06-17 16:01:55.082 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'terminate'...
2025-06-17 16:01:55.082 | INFO | app.agent.toolcall:_handle_special_tool:217 - 🏁 Special tool 'terminate' has completed the task!
2025-06-17 16:01:55.082 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'terminate' completed its mission! Result: Observed output of cmd `terminate` executed:
The interaction has been completed with status: success
2025-06-17 16:01:55.320 | INFO | app.tool.mcp:disconnect:194 - Disconnected from all MCP servers
2025-06-17 16:01:55.321 | INFO | __main__:main:27 - Request processing completed.
结果:
yang@Yangless:~/OpenManus/workspace$ cat trending_open_source_agents.txt
1. OpenAI/ChatGPT - 100,000 stars
2. LangChain/Agent - 85,000 stars
3. Auto-GPT - 75,000 stars
4. Microsoft/PromptFlow - 70,000 stars
5. HuggingFace/Transformers - 65,000 stars
6. DeepMind/AlphaZero - 60,000 stars
7. Google/DeepDream - 55,000 stars
8. Meta/AIResearch - 50,000 stars
9. Anthropic/Claude - 45,000 stars
10. StabilityAI/StableDiffusion - 40,000 stars
如需使用 MCP 工具版本,可运行:
python run_mcp.py
如需体验不稳定的多智能体版本,可运行:
python run_flow.py
添加自定义多智能体
除了通用的 OpenManus Agent,项目还内置了 DataAnalysis Agent,用来处理数据分析和数据可视化任务。可以在 config.toml 里把这个智能体加入 run_flow:
# run-flow可选配置
[runflow]
use_data_analysis_agent = true # 默认关闭,改为 true 后启用
图表可视化工具
图表可视化工具会通过 Python 生成数据处理代码,再调用 @visactor/vmind 得到图表 spec,渲染则交给 @visactor/vchart。
安装 Node >= 18:
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash
# 激活 nvm,以 Bash 为例
source ~/.bashrc
# 然后安装 Node 最新稳定版
nvm install node
# 激活使用,假设最新稳定版为 24,则 use 24
nvm use 24
安装依赖
cd app/tool/chart_visualization
npm install
简单图表生成任务
给出数据和图表生成需求后,执行命令:
python -m app.tool.chart_visualization.test.chart_demo
2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: To generate the chart showing the popularity of search keywords, we will follow a structured approach. Here's how we can break down the task:
### Step 1: Prepare the Data for Visualization
We need to prepare the data for visualization by creating a CSV file and a JSON description for the chart. This will involve transforming the provided data into a format suitable for visualization.
### Step 2: Visualize the Data
Once the data is prepared, we will generate the chart using the prepared CSV and JSON files.
### Step 3: Review and Save the Chart
Finally, we will review the generated chart and ensure it is saved locally.
Let's start with Step 1.
#### Step 1: Prepare the Data for Visualization
We will use the `visualization_preparation` tool to create a CSV file and a JSON description for the chart.
```plaintext
Data:
Keyword,Popularity
Hot Word,1000
Zao Le Wo Men,800
Rao Jian Huo,400
My Wish is World Peace,400
Xiu Xiu Xiu,400
Shenzhou 11,400
Hundred Birds Facing the Wind,400
China Women's Volleyball Team,400
My Guan Na,400
Leg Dong,400
Hot Pot Hero,400
Baby's Heart is Bitter,400
Olympics,400
Awesome My Brother,400
Poetry and Distance,400
Song Joong-ki,400
PPAP,400
Blue Thin Mushroom,400
Rain Dew Evenly,400
Friendship's Little Boat Says It Flips,400
Beijing Slump,400
Dedication,200
Apple,200
Dog Belt,200
Old Driver,200
Melon-Eating Crowd,200
Zootopia,200
City Will Play,200
Routine,200
Water Reverse,200
Why Don't You Go to Heaven,200
Snake Spirit Man,200
Why Don't You Go to Heaven,200
Samsung Explosion Gate,200
Little Li Oscar,200
Ugly People Need to Read More,200
Boyfriend Power,200
A Face of Confusion,200
Descendants of the Sun,200
```
Let's proceed with the preparation.
2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:think:82 - 🛠️ Data_Analysis selected 1 tools to use
2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['visualization_preparation']
2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"code_type":"visualization","code":"import pandas as pd\nimport json\n\n# Data preparation\nkeywords_data = {\n 'Keyword': [\n 'Hot Word', 'Zao Le Wo Men', 'Rao Jian Huo', 'My Wish is World Peace', 'Xiu Xiu Xiu',\n 'Shenzhou 11', 'Hundred Birds Facing the Wind', 'China Women\\'s Volleyball Team', 'My Guan Na',\n 'Leg Dong', 'Hot Pot Hero', 'Baby\\'s Heart is Bitter', 'Olympics', 'Awesome My Brother',\n 'Poetry and Distance', 'Song Joong-ki', 'PPAP', 'Blue Thin Mushroom', 'Rain Dew Evenly',\n 'Friendship\\'s Little Boat Says It Flips', 'Beijing Slump', 'Dedication', 'Apple', 'Dog Belt',\n 'Old Driver', 'Melon-Eating Crowd', 'Zootopia', 'City Will Play', 'Routine', 'Water Reverse',\n 'Why Don\\'t You Go to Heaven', 'Snake Spirit Man', 'Why Don\\'t You Go to Heaven',\n 'Samsung Explosion Gate', 'Little Li Oscar', 'Ugly People Need to Read More', 'Boyfriend Power',\n 'A Face of Confusion', 'Descendants of the Sun'\n ],\n 'Popularity': [\n 1000, 800, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400,\n 400, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200\n ]\n}\n\n# Create DataFrame\nkeywords_df = pd.DataFrame(keywords_data)\n\n# Save to CSV\ncsv_file_path = '/home/yang/OpenManus/workspace/keywords_popularity.csv'\nkeywords_df.to_csv(csv_file_path, index=False)\n\n# Prepare JSON description\nchart_description = {\n 'csvFilePath': csv_file_path,\n 'chartTitle': 'Popularity of Search Keywords'\n}\n\n# Save JSON\njson_file_path = '/home/yang/OpenManus/workspace/keywords_chart_description.json'\nwith open(json_file_path, 'w', encoding='utf-8') as json_file:\n json.dump([chart_description], json_file)\n\nprint(json_file_path)"}
2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'visualization_preparation'...
2025-06-17 16:51:08.439 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'visualization_preparation' completed its mission! Result: Observed output of cmd `visualization_preparation` executed:
{'observation': 'All arrays must be of the same length', 'success': False}
2025-06-17 16:51:08.439 | INFO | app.agent.base:run:140 - Executing step 2/20
2025-06-17 16:51:18.166 | INFO | app.llm:update_token_count:243 - Token usage: Input=2396, Completion=714, Cumulative Input=133595, Cumulative Completion=12838, Total=3110, Cumulative Total=146433
2025-06-17 16:51:18.167 | INFO | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: The error encountered indicates that there is a mismatch in the lengths of the arrays used to create the DataFrame. This is likely due to a duplicate entry in the 'Keyword' list. Let's address this issue and proceed with the task.
### Step 1: Correct the Data Preparation
We need to ensure that the lengths of the 'Keyword' and 'Popularity' lists match. Let's fix the data and prepare it again.
#### Corrected Data Preparation
We will remove the duplicate entry "Why Don't You Go to Heaven" to ensure the lists are of equal length.
Let's proceed with the corrected data preparation.
2025-06-17 16:51:18.168 | INFO | app.agent.toolcall:think:82 - 🛠️ Data_Analysis selected 1 tools to use
2025-06-17 16:51:18.168 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['visualization_preparation']
2025-06-17 16:51:18.168 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"code_type":"visualization","code":"import pandas as pd\nimport json\n\n# Corrected data preparation\nkeywords_data = {\n 'Keyword': [\n 'Hot Word', 'Zao Le Wo Men', 'Rao Jian Huo', 'My Wish is World Peace', 'Xiu Xiu Xiu',\n 'Shenzhou 11', 'Hundred Birds Facing the Wind', 'China Women\\'s Volleyball Team', 'My Guan Na',\n 'Leg Dong', 'Hot Pot Hero', 'Baby\\'s Heart is Bitter', 'Olympics', 'Awesome My Brother',\n 'Poetry and Distance', 'Song Joong-ki', 'PPAP', 'Blue Thin Mushroom', 'Rain Dew Evenly',\n 'Friendship\\'s Little Boat Says It Flips', 'Beijing Slump', 'Dedication', 'Apple', 'Dog Belt',\n 'Old Driver', 'Melon-Eating Crowd', 'Zootopia', 'City Will Play', 'Routine', 'Water Reverse',\n 'Why Don\\'t You Go to Heaven', 'Snake Spirit Man', 'Samsung Explosion Gate', 'Little Li Oscar',\n 'Ugly People Need to Read More', 'Boyfriend Power', 'A Face of Confusion', 'Descendants of the Sun'\n ],\n 'Popularity': [\n 1000, 800, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400,\n 400, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200\n ]\n}\n\n# Create DataFrame\nkeywords_df = pd.DataFrame(keywords_data)\n\n# Save to CSV\ncsv_file_path = '/home/yang/OpenManus/workspace/keywords_popularity.csv'\nkeywords_df.to_csv(csv_file_path, index=False)\n\n# Prepare JSON description\nchart_description = {\n 'csvFilePath': csv_file_path,\n 'chartTitle': 'Popularity of Search Keywords'\n}\n\n# Save JSON\njson_file_path = '/home/yang/OpenManus/workspace/keywords_chart_description.json'\nwith open(json_file_path, 'w', encoding='utf-8') as json_file:\n json.dump([chart_description], json_file)\n\nprint(json_file_path)"}
2025-06-17 16:51:18.168 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'visualization_preparation'...
2025-06-17 16:51:18.186 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'visualization_preparation' completed its mission! Result: Observed output of cmd `visualization_preparation` executed:
{'observation': 'All arrays must be of the same length', 'success': False}
2025-06-17 16:51:18.186 | INFO | app.agent.base:run:140 - Executing step 3/20
2025-06-17 16:51:27.931 | INFO | app.llm:update_token_count:243 - Token usage: Input=3225, Completion=730, Cumulative Input=136820, Cumulative Completion=13568, Total=3955, Cumulative Total=150388
2025-06-17 16:51:27.931 | INFO | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: The error persists, indicating that there might still be an issue with the data preparation. Let's carefully review the data and ensure that both lists ('Keyword' and 'Popularity') have the same number of elements.
### Step 1: Review and Correct the Data
Let's manually count the elements in both lists to ensure they match. If there are any discrepancies, we will correct them.
#### Data Review
- **Keywords List**: Contains 39 unique entries.
- **Popularity List**: Contains 39 entries.
Upon reviewing, it seems there was a mistake in the previous correction attempt. Let's ensure the lists are correctly aligned and retry the preparation.
Let's proceed with the corrected data preparation.
结果应位于worksapce\visualization下,涉及到9种不同的图表结果
yang@Yangless:~/OpenManus/app/tool/chart_visualization$ cat /home/yang/OpenManus/workspace/sales_data_export.csv
Date,Type,Value
2023-01-01,Product A,52.9
2023-01-01,Product B,63.6
2023-01-01,Product C,11.2
2023-01-02,Product A,45.7
2023-01-02,Product B,89.1
2023-01-02,Product C,21.4
2023-01-03,Product A,67.2
2023-01-03,Product B,82.4
2023-01-03,Product C,31.7
2023-01-04,Product A,80.7
2023-01-04,Product B,55.1
2023-01-04,Product C,21.1
2023-01-05,Product A,65.6
2023-01-05,Product B,78.0
2023-01-05,Product C,31.3
2023-01-06,Product A,75.6
2023-01-06,Product B,89.1
2023-01-06,Product C,63.5
2023-01-07,Product A,67.3
2023-01-07,Product B,77.2
2023-01-07,Product C,43.7
2023-01-08,Product A,96.1
2023-01-08,Product B,97.6
2023-01-08,Product C,59.9
2023-01-09,Product A,96.1
2023-01-09,Product B,100.6
2023-01-09,Product C,66.8
2023-01-10,Product A,101.6
2023-01-10,Product B,108.3
2023-01-10,Product C,56.9
简单数据报表任务
给出简单原始数据和分析需求后,执行命令:
python -m app.tool.chart_visualization.test.report_demo
代码分析
概述
1. 应用入口
main.py:应用启动入口,负责初始化并运行Manus智能体
async def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Run Manus agent with a prompt")
parser.add_argument(
"--prompt", type=str, required=False, help="Input prompt for the agent"
)
args = parser.parse_args()
# Create and initialize Manus agent
agent = await Manus.create()
try:
# Use command line prompt if provided, otherwise ask for input
prompt = args.prompt if args.prompt else input("Enter your prompt: ")
if not prompt.strip():
logger.warning("Empty prompt provided.")
return
logger.warning("Processing your request...")
await agent.run(prompt)
logger.info("Request processing completed.")
except KeyboardInterrupt:
logger.warning("Operation interrupted.")
finally:
# Ensure agent resources are cleaned up before exiting
await agent.cleanup()
2. 配置系统
app/config.py:基于Pydantic模型管理全局配置(LLM/浏览器/沙箱/MCP等),支持TOML/JSON格式加载各种设置:
class LLMSettings(BaseModel): model: str = Field(..., description="Model name") base_url: str = Field(..., description="API base URL") api_key: str = Field(..., description="API key") max_tokens: int = Field(4096, description="Maximum number of tokens per request") max_input_tokens: Optional[int] = Field( None, description="Maximum input tokens to use across all requests (None for unlimited)", ) temperature: float = Field(1.0, description="Sampling temperature") api_type: str = Field(..., description="Azure, Openai, or Ollama") api_version: str = Field(..., description="Azure Openai version if AzureOpenai") class ProxySettings(BaseModel): server: str = Field(None, description="Proxy server address") username: Optional[str] = Field(None, description="Proxy username") password: Optional[str] = Field(None, description="Proxy password") class SearchSettings(BaseModel): engine: str = Field(default="Google", description="Search engine the llm to use") fallback_engines: List[str] = Field( default_factory=lambda: ["DuckDuckGo", "Baidu", "Bing"], description="Fallback search engines to try if the primary engine fails", ) retry_delay: int = Field( default=60, description="Seconds to wait before retrying all engines again after they all fail", ) max_retries: int = Field( default=3, description="Maximum number of times to retry all engines when all fail", ) lang: str = Field( default="en", description="Language code for search results (e.g., en, zh, fr)", ) country: str = Field( default="us", description="Country code for search results (e.g., us, cn, uk)", ) class RunflowSettings(BaseModel): use_data_analysis_agent: bool = Field( default=False, description="Enable data analysis agent in run flow" ) class BrowserSettings(BaseModel): headless: bool = Field(False, description="Whether to run browser in headless mode") disable_security: bool = Field( True, description="Disable browser security features" ) extra_chromium_args: List[str] = Field( default_factory=list, description="Extra arguments to pass to the browser" ) chrome_instance_path: Optional[str] = Field( None, description="Path to a Chrome instance to use" ) wss_url: Optional[str] = Field( None, description="Connect to a browser instance via WebSocket" ) cdp_url: Optional[str] = Field( None, description="Connect to a browser instance via CDP" ) proxy: Optional[ProxySettings] = Field( None, description="Proxy settings for the browser" ) max_content_length: int = Field( 2000, description="Maximum length for content retrieval operations" )加载config
class Config: _instance = None _lock = threading.Lock() _initialized = False def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not self._initialized: with self._lock: if not self._initialized: self._config = None self._load_initial_config() self._initialized = True @staticmethod def _get_config_path() -> Path: root = PROJECT_ROOT config_path = root / "config" / "config.toml" if config_path.exists(): return config_path example_path = root / "config" / "config.example.toml" if example_path.exists(): return example_path raise FileNotFoundError("No configuration file found in config directory") def _load_config(self) -> dict: config_path = self._get_config_path() with config_path.open("rb") as f: return tomllib.load(f) def _load_initial_config(self): raw_config = self._load_config() base_llm = raw_config.get("llm", {}) llm_overrides = { k: v for k, v in raw_config.get("llm", {}).items() if isinstance(v, dict) } default_settings = { "model": base_llm.get("model"), "base_url": base_llm.get("base_url"), "api_key": base_llm.get("api_key"), "max_tokens": base_llm.get("max_tokens", 4096), "max_input_tokens": base_llm.get("max_input_tokens"), "temperature": base_llm.get("temperature", 1.0), "api_type": base_llm.get("api_type", ""), "api_version": base_llm.get("api_version", ""), } # handle browser config. browser_config = raw_config.get("browser", {}) browser_settings = None if browser_config: # handle proxy settings. proxy_config = browser_config.get("proxy", {}) proxy_settings = None if proxy_config and proxy_config.get("server"): proxy_settings = ProxySettings( **{ k: v for k, v in proxy_config.items() if k in ["server", "username", "password"] and v } ) # filter valid browser config parameters. valid_browser_params = { k: v for k, v in browser_config.items() if k in BrowserSettings.__annotations__ and v is not None } # if there is proxy settings, add it to the parameters. if proxy_settings: valid_browser_params["proxy"] = proxy_settings # only create BrowserSettings when there are valid parameters. if valid_browser_params: browser_settings = BrowserSettings(**valid_browser_params) search_config = raw_config.get("search", {}) search_settings = None if search_config: search_settings = SearchSettings(**search_config) sandbox_config = raw_config.get("sandbox", {}) if sandbox_config: sandbox_settings = SandboxSettings(**sandbox_config) else: sandbox_settings = SandboxSettings() mcp_config = raw_config.get("mcp", {}) mcp_settings = None if mcp_config: # Load server configurations from JSON mcp_config["servers"] = MCPSettings.load_server_config() mcp_settings = MCPSettings(**mcp_config) else: mcp_settings = MCPSettings(servers=MCPSettings.load_server_config()) run_flow_config = raw_config.get("runflow") if run_flow_config: run_flow_settings = RunflowSettings(**run_flow_config) else: run_flow_settings = RunflowSettings() config_dict = { "llm": { "default": default_settings, **{ name: {**default_settings, **override_config} for name, override_config in llm_overrides.items() }, }, "sandbox": sandbox_settings, "browser_config": browser_settings, "search_config": search_settings, "mcp_config": mcp_settings, "run_flow_config": run_flow_settings, } self._config = AppConfig(**config_dict)
3. 日志系统
app/logger.py:采用Loguru实现的集中式日志管理def define_log_level(print_level="INFO", logfile_level="DEBUG", name: str = None): """Adjust the log level to above level""" global _print_level _print_level = print_level current_date = datetime.now() formatted_date = current_date.strftime("%Y%m%d%H%M%S") log_name = ( f"{name}_{formatted_date}" if name else formatted_date ) # name a log with prefix name _logger.remove() _logger.add(sys.stderr, level=print_level) _logger.add(PROJECT_ROOT / f"logs/{log_name}.log", level=logfile_level) return _logger logger = define_log_level()
4. 智能体层级架构
BaseAgent → ReActAgent → ToolCallAgent → Manus
app/agent/base.py:所有智能体的抽象基类(生命周期/状态管理)from abc import ABC, abstractmethod #抽象类接口,代码健壮 from contextlib import asynccontextmanager #异步上下文管理释放 from typing import List, Optional #明确了变量、函数参数和返回值的预期类型,使代码更容易理解。 from pydantic import BaseModel, Field, model_validator #langchain核心组件 from app.llm import LLM from app.logger import logger from app.sandbox.client import SANDBOX_CLIENT from app.schema import ROLE_TYPE, AgentState, Memory, Message#这部分定义了 BaseAgent 类的骨架和所有智能体都应具备的通用属性。 class BaseAgent(BaseModel, ABC): """Abstract base class for managing agent state and execution. Provides foundational functionality for state transitions, memory management, and a step-based execution loop. Subclasses must implement the `step` method. """ # Core attributes name: str = Field(..., description="Unique name of the agent") description: Optional[str] = Field(None, description="Optional agent description") # Prompts system_prompt: Optional[str] = Field( None, description="System-level instruction prompt" ) next_step_prompt: Optional[str] = Field( None, description="Prompt for determining next action" ) # Dependencies llm: LLM = Field(default_factory=LLM, description="Language model instance") memory: Memory = Field(default_factory=Memory, description="Agent's memory store") state: AgentState = Field( default=AgentState.IDLE, description="Current agent state" ) # Execution control max_steps: int = Field(default=10, description="Maximum steps before termination") current_step: int = Field(default=0, description="Current step in execution") duplicate_threshold: int = 2 class Config: arbitrary_types_allowed = True extra = "allow" # Allow extra fields for flexibility in subclasses #使得模型在接收数据时能够容忍并存储未在模型中明确定义的额外字段。这在设计可扩展的基类(如 BaseAgent)时非常有用,允许子类或外部系统传递更多自定义参数,而不会导致验证错误。 #允许模型字段包含 Pydantic 无法直接验证的自定义对象实例#初始化 @model_validator(mode="after") def initialize_agent(self) -> "BaseAgent": """Initialize agent with default settings if not provided.""" if self.llm is None or not isinstance(self.llm, LLM): self.llm = LLM(config_name=self.name.lower()) if not isinstance(self.memory, Memory): self.memory = Memory() return self#安全地管理智能体的状态 @asynccontextmanager async def state_context(self, new_state: AgentState): """Context manager for safe agent state transitions.""" # ... previous_state = self.state self.state = new_state try: yield except Exception as e: self.state = AgentState.ERROR raise e finally: self.state = previous_state在 Python 中,
yield是用来定义**生成器(generator)或异步生成器(async generator)**的关键字。它的本质是:暂停函数执行,把控制权和当前状态交还给调用者,下次可以从这里恢复执行。
举例:
async def something(): async with agent.state_context(AgentState.RUNNING): # 这就是 async with 块内部的代码 await agent.do_something() print("在这个状态下完成某些操作")1.Python 运行 async with agent.state_context(…):
进入 state_context 函数;
执行到 yield 前的所有代码(如状态切换);
2.遇到 yield:
暂停 state_context 函数;
跳出函数,把控制权交给 async with 块内部的代码执行;
即此时执行 await agent.do_something(),再执行 print(…);
3.async with 块执行完(无论是否抛异常):
程序回到 state_context 中,从 yield 之后恢复执行;
进入 except 或 finally 来清理现场、还原状态。
#更新记忆 def update_memory( self, role: ROLE_TYPE, # type: ignore content: str, base64_image: Optional[str] = None, **kwargs, ) -> None: """Add a message to the agent's memory. Args: role: The role of the message sender (user, system, assistant, tool). content: The message content. base64_image: Optional base64 encoded image. **kwargs: Additional arguments (e.g., tool_call_id for tool messages). Raises: ValueError: If the role is unsupported. """ message_map = { "user": Message.user_message, "system": Message.system_message, "assistant": Message.assistant_message, "tool": lambda content, **kw: Message.tool_message(content, **kw), } if role not in message_map: raise ValueError(f"Unsupported message role: {role}") # Create message with appropriate parameters based on role kwargs = {"base64_image": base64_image, **(kwargs if role == "tool" else {})} self.memory.add_message(message_map[role](content, **kwargs))#异步执行 async def run(self, request: Optional[str] = None) -> str: """Execute the agent's main loop asynchronously. Args: request: Optional initial user request to process. Returns: A string summarizing the execution results. Raises: RuntimeError: If the agent is not in IDLE state at start. """ #处于空闲 if self.state != AgentState.IDLE: raise RuntimeError(f"Cannot run agent from state: {self.state}") if request: self.update_memory("user", request) results: List[str] = [] async with self.state_context(AgentState.RUNNING): while ( self.current_step < self.max_steps and self.state != AgentState.FINISHED ): self.current_step += 1 logger.info(f"Executing step {self.current_step}/{self.max_steps}") step_result = await self.step() # Check for stuck state if self.is_stuck(): self.handle_stuck_state() results.append(f"Step {self.current_step}: {step_result}") if self.current_step >= self.max_steps: self.current_step = 0 self.state = AgentState.IDLE results.append(f"Terminated: Reached max steps ({self.max_steps})") await SANDBOX_CLIENT.cleanup() return "\n".join(results) if results else "No steps executed"run是整个框架的调度入口:它协调state、max_steps、current_step等控制属性,调用核心的step方法,同时用state_context和update_memory处理状态和记忆。最后还会和外部的SANDBOX_CLIENT交互。SANDBOX_CLIENT相当于把命令放进 Docker 里执行,避免直接污染宿主机。@abstractmethod async def step(self) -> str: """Execute a single step in the agent's workflow. Must be implemented by subclasses to define specific behavior. """ def handle_stuck_state(self): """Handle stuck state by adding a prompt to change strategy""" stuck_prompt = "\ Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted." self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}" logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}") def is_stuck(self) -> bool: """Check if the agent is stuck in a loop by detecting duplicate content""" if len(self.memory.messages) < 2: return False last_message = self.memory.messages[-1] if not last_message.content: return False # Count identical content occurrences duplicate_count = sum( 1 for msg in reversed(self.memory.messages[:-1]) if msg.role == "assistant" and msg.content == last_message.content ) return duplicate_count >= self.duplicate_threshold这三个方法定义了智能体的单步行为和自我纠错机制,但这里有两个明显隐患:
1.重复检测只看内容(content),不考虑语义或相似度
2.duplicate_threshold 太低容易误报,太高容易漏报
@property def messages(self) -> List[Message]: """Retrieve a list of messages from the agent's memory.""" return self.memory.messages @messages.setter def messages(self, value: List[Message]): """Set the list of messages in the agent's memory.""" self.memory.messages = value这里给记忆消息列表包了一层属性访问,调用处会短一点。
app/agent/react.py:实现ReAct(推理-行动)范式@abstractmethod async def think(self) -> bool: """Process current state and decide next action""" #决定是否下一步 @abstractmethod async def act(self) -> str: """Execute decided actions""" async def step(self) -> str: """Execute a single step: think and act.""" should_act = await self.think() if not should_act: return "Thinking complete - no action needed" return await self.act()app/agent/toolcall.py:扩展工具调用能力class ToolCallAgent(ReActAgent): """Base agent class for handling tool/function calls with enhanced abstraction""" name: str = "toolcall" description: str = "an agent that can execute tool calls." system_prompt: str = SYSTEM_PROMPT next_step_prompt: str = NEXT_STEP_PROMPT #工具属性,和大模型交互 available_tools: ToolCollection = ToolCollection( CreateChatCompletion(), Terminate() ) #available_tools: 定义了这个 Agent 能使用哪些工具。这里默认有两个:CreateChatCompletion (可能用于生成更长的文本) 和 Terminate (用于结束任务)。ToolCollection 是一个管理工具集合的辅助类。 tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO # type: ignore #它控制了 LLM 在调用工具时的行为模式,可以是 AUTO (LLM 自行决定是否调用工具), REQUIRED (LLM 必须调用工具), 或 NONE (LLM 禁止调用工具)。 special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name]) #状态属性:act时用 tool_calls: List[ToolCall] = Field(default_factory=list) _current_base64_image: Optional[str] = None max_steps: int = 30 max_observe: Optional[Union[int, bool]] = Noneasync def think(self) -> bool: """Process current state and decide next actions using tools""" if self.next_step_prompt: user_msg = Message.user_message(self.next_step_prompt) self.messages += [user_msg] try: # Get response with tool options response = await self.llm.ask_tool( messages=self.messages, system_msgs=( [Message.system_message(self.system_prompt)] if self.system_prompt else None ), tools=self.available_tools.to_params(), tool_choice=self.tool_choices, ) except ValueError: raise except Exception as e: # Check if this is a RetryError containing TokenLimitExceeded if hasattr(e, "__cause__") and isinstance(e.__cause__, TokenLimitExceeded): token_limit_error = e.__cause__ logger.error( f"🚨 Token limit error (from RetryError): {token_limit_error}" ) self.memory.add_message( Message.assistant_message( f"Maximum token limit reached, cannot continue execution: {str(token_limit_error)}" ) ) self.state = AgentState.FINISHED return False raise self.tool_calls = tool_calls = ( response.tool_calls if response and response.tool_calls else [] ) content = response.content if response and response.content else "" # Log response info logger.info(f"✨ {self.name}'s thoughts: {content}") logger.info( f"🛠️ {self.name} selected {len(tool_calls) if tool_calls else 0} tools to use" ) if tool_calls: logger.info( f"🧰 Tools being prepared: {[call.function.name for call in tool_calls]}" ) logger.info(f"🔧 Tool arguments: {tool_calls[0].function.arguments}") try: if response is None: raise RuntimeError("No response received from the LLM") # Handle different tool_choices modes if self.tool_choices == ToolChoice.NONE: if tool_calls: logger.warning( f"🤔 Hmm, {self.name} tried to use tools when they weren't available!" ) if content: self.memory.add_message(Message.assistant_message(content)) return True return False # Create and add assistant message assistant_msg = ( Message.from_tool_calls(content=content, tool_calls=self.tool_calls) if self.tool_calls else Message.assistant_message(content) ) self.memory.add_message(assistant_msg) if self.tool_choices == ToolChoice.REQUIRED and not self.tool_calls: return True # Will be handled in act() # For 'auto' mode, continue with content if no commands but content exists if self.tool_choices == ToolChoice.AUTO and not self.tool_calls: return bool(content) return bool(self.tool_calls) except Exception as e: logger.error(f"🚨 Oops! The {self.name}'s thinking process hit a snag: {e}") self.memory.add_message( Message.assistant_message( f"Error encountered while processing: {str(e)}" ) ) return False这是 Agent 的“思考”阶段,负责把当前上下文变成下一步决策。
- 准备输入: 如果有 next_step_prompt,就把它作为新的用户消息加入对话历史,用来引导 LLM 的下一步。
- 调用 LLM: 调用 self.llm.ask_tool 方法。这个接口面向工具调用,会把对话历史、系统提示、可用工具列表 (self.available_tools) 和工具选择模式 (self.tool_choices) 一起发送给 LLM。
- 错误处理: 健壮地处理各种异常,特别是 TokenLimitExceeded,如果 token 超限,会记录错误并结束 Agent。
- 解析响应: 从 LLM 的响应中提取出 tool_calls(计划执行的工具列表)和 content(纯文本思考内容)。
- 记录日志: 打印 LLM 的思考、工具选择和工具参数,方便排查问题。
- 处理不同模式: 根据 self.tool_choices 的值(NONE, REQUIRED, AUTO)进行逻辑判断。例如,在 NONE 模式下,如果 LLM 仍然尝试调用工具,会发出警告。
- 更新记忆: 将 LLM 的响应(包括思考内容和工具调用请求)作为一条“助手”消息添加到记忆中。
- 返回决策: 返回一个布尔值,True 表示有工具需要执行(act 阶段应该被调用),False 则表示没有。
async def act(self) -> str: """Execute tool calls and handle their results""" if not self.tool_calls: if self.tool_choices == ToolChoice.REQUIRED: raise ValueError(TOOL_CALL_REQUIRED) # Return last message content if no tool calls return self.messages[-1].content or "No content or commands to execute" results = [] for command in self.tool_calls: # Reset base64_image for each tool call self._current_base64_image = None result = await self.execute_tool(command) if self.max_observe: result = result[: self.max_observe] logger.info( f"🎯 Tool '{command.function.name}' completed its mission! Result: {result}" ) # Add tool response to memory tool_msg = Message.tool_message( content=result, tool_call_id=command.id, name=command.function.name, base64_image=self._current_base64_image, ) self.memory.add_message(tool_msg) results.append(result) return "\n\n".join(results)act方法是代理的“执行器”。它接收 LLM 在think阶段给出的工具调用,真正执行这些工具,收集输出,再把结果写回记忆里。这样就串起了“思考-行动-观察”循环。- 检查前提: 检查 self.tool_calls 列表是否为空。如果为空,根据 tool_choices 模式决定是报错还是直接返回。
- 迭代执行: 遍历 think 阶段生成的所有 tool_calls。
- 执行单个工具: 对每个 command(即一个 ToolCall 对象),调用 self.execute_tool 来实际运行它。
- 处理结果: 获取工具执行结果,并根据 max_observe 对结果进行截断。
- 更新记忆: 将每个工具的执行结果包装成一个 tool_message(包含结果内容、对应的 tool_call_id 等),并添加到记忆中,供 LLM 下一步继续读。
- 汇总返回: 将所有工具的执行结果合并成一个字符串,作为当前 step 的结果。
async def execute_tool(self, command: ToolCall) -> str: """Execute a single tool call with robust error handling""" if not command or not command.function or not command.function.name: return "Error: Invalid command format" name = command.function.name if name not in self.available_tools.tool_map: return f"Error: Unknown tool '{name}'" try: # Parse arguments args = json.loads(command.function.arguments or "{}") # Execute the tool logger.info(f"🔧 Activating tool: '{name}'...") result = await self.available_tools.execute(name=name, tool_input=args) # Handle special tools await self._handle_special_tool(name=name, result=result) # Check if result is a ToolResult with base64_image if hasattr(result, "base64_image") and result.base64_image: # Store the base64_image for later use in tool_message self._current_base64_image = result.base64_image # Format result for display (standard case) observation = ( f"Observed output of cmd `{name}` executed:\n{str(result)}" if result else f"Cmd `{name}` completed with no output" ) return observation except json.JSONDecodeError: error_msg = f"Error parsing arguments for {name}: Invalid JSON format" logger.error( f"📝 Oops! The arguments for '{name}' don't make sense - invalid JSON, arguments:{command.function.arguments}" ) return f"Error: {error_msg}" except Exception as e: error_msg = f"⚠️ Tool '{name}' encountered a problem: {str(e)}" logger.exception(error_msg) return f"Error: {error_msg}"act负责调度工具,execute_tool负责执行单个工具调用。- 验证: 检查工具名称是否存在于 available_tools 中。
- 解析参数: LLM 返回的工具参数通常是 JSON 格式的字符串。此方法使用 json.loads 将其解析为 Python 字典。
- 执行: 调用 self.available_tools.execute() 方法,传入工具名称和解析后的参数来运行工具。
- 特殊处理: 调用 _handle_special_tool 来检查当前工具是否需要触发特殊逻辑(如结束 Agent)。
- 结果格式化: 将工具的原始返回结果包装成“观察(Observation)”字符串,方便 LLM 继续读。
- 错误处理: 捕获 JSON 解析错误、工具执行异常等问题,并将错误信息格式化为字符串返回。这样单个工具失败时,Agent 还有机会继续往下走。
async def cleanup(self): """Clean up resources used by the agent's tools.""" logger.info(f"🧹 Cleaning up resources for agent '{self.name}'...") for tool_name, tool_instance in self.available_tools.tool_map.items(): if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction( tool_instance.cleanup ): try: logger.debug(f"🧼 Cleaning up tool: {tool_name}") await tool_instance.cleanup() except Exception as e: logger.error( f"🚨 Error cleaning up tool '{tool_name}': {e}", exc_info=True ) logger.info(f"✨ Cleanup complete for agent '{self.name}'.") async def run(self, request: Optional[str] = None) -> str: """Run the agent with cleanup when done.""" try: return await super().run(request) finally: await self.cleanup()这里我第一眼觉得
run有点多余:既然think()和act()里已经有工具调用逻辑,run似乎应该放到更高抽象层。但是,仔细考虑:
- 为什么写
run()?
它不是为了调用工具,而是为了在当前这一层补上生命周期末尾的统一清理。当然,这部分也可以放到更高抽象层继承。
- 是否职责冲突?
不冲突,
run只是封装流程起止点,真正的逻辑控制权还在think()/act()。app/agent/manus.py:主智能体(集成MCP客户端和内置工具)
@model_validator(mode="after")
def initialize_helper(self) -> "Manus":
"""Initialize basic components synchronously."""
self.browser_context_helper = BrowserContextHelper(self)
return self
@model_validator(mode="after"):
这是 Pydantic 提供的装饰器,用来做模型校验和初始化。mode="after" 表示这个验证器会在所有字段解析、验证之后运行。创建 Manus 对象时,Pydantic 会先处理 name、description、system_prompt 等字段,基础字段就位后再调用 initialize_helper。
-> "Manus" 是 Python 的类型提示 (Type Hint),表示 initialize_helper 预期返回一个 Manus 对象。
为什么是字符串 "Manus" 而不是直接 Manus?
这叫前向引用 (Forward Reference)。在类定义过程中引用这个类本身时,类还没完全定义,直接写类名可能触发 NameError。
把类型名放进引号里后,Python 会先把它当字符串处理。类型检查器(如 MyPy)会在后面再解析这个字符串,避开循环引用或名称未定义的问题。
@classmethod
async def create(cls, **kwargs) -> "Manus":
"""Factory method to create and properly initialize a Manus instance."""
instance = cls(**kwargs)
await instance.initialize_mcp_servers()
instance._initialized = True
return instance
为什么不在 create() 里初始化 initialize_helper(),反而放到前面的 @model_validator(mode="after") 里?这里读起来有点分裂。
把所有初始化逻辑收敛到 create(),明确定义构造流程可能更好一点
async def connect_mcp_server(
self,
server_url: str,
server_id: str = "",
use_stdio: bool = False,
stdio_args: List[str] = None,
) -> None:
"""Connect to an MCP server and add its tools."""
if use_stdio:
await self.mcp_clients.connect_stdio(
server_url, stdio_args or [], server_id
)
self.connected_servers[server_id or server_url] = server_url
else:
await self.mcp_clients.connect_sse(server_url, server_id)
self.connected_servers[server_id or server_url] = server_url
# Update available tools with only the new tools from this server
new_tools = [
tool for tool in self.mcp_clients.tools if tool.server_id == server_id
]
self.available_tools.add_tools(*new_tools)
async def disconnect_mcp_server(self, server_id: str = "") -> None:
"""Disconnect from an MCP server and remove its tools."""
await self.mcp_clients.disconnect(server_id)
if server_id:
self.connected_servers.pop(server_id, None)
else:
self.connected_servers.clear()
# Rebuild available tools without the disconnected server's tools
base_tools = [
tool
for tool in self.available_tools.tools
if not isinstance(tool, MCPClientTool)
]
self.available_tools = ToolCollection(*base_tools)
self.available_tools.add_tools(*self.mcp_clients.tools)
这里通过 mcp_clients 和指定服务器建立连接(SSE 或标准输入/输出),记录连接后,再从 mcp_clients 取出新服务器暴露的工具,追加到 self.available_tools。也就是说,代理的工具箱是在运行时动态扩展的。
断开连接时,会先移除连接记录。为了去掉已断开服务器的工具,它会筛出所有非 MCP 的基础工具,重建一个 ToolCollection,再把仍然连接的 MCP 工具加回来。这样可以避免代理继续调用已经不可用的工具。
这种写法太麻烦,可以使用工具注册表 + 启用状态表,远比硬编码每次重建 ToolCollection 来得可维护、可扩展。
5. LLM交互层
app/llm.py:统一接口对接多种大模型(OpenAI/Azure/Bedrock),含:- 令牌计数
def _calculate_high_detail_tokens(self, width: int, height: int) -> int: """Calculate tokens for high detail images based on dimensions""" # Step 1: Scale to fit in MAX_SIZE x MAX_SIZE square if width > self.MAX_SIZE or height > self.MAX_SIZE: scale = self.MAX_SIZE / max(width, height) width = int(width * scale) height = int(height * scale) # Step 2: Scale so shortest side is HIGH_DETAIL_TARGET_SHORT_SIDE scale = self.HIGH_DETAIL_TARGET_SHORT_SIDE / min(width, height) scaled_width = int(width * scale) scaled_height = int(height * scale) # Step 3: Count number of 512px tiles tiles_x = math.ceil(scaled_width / self.TILE_SIZE) tiles_y = math.ceil(scaled_height / self.TILE_SIZE) total_tiles = tiles_x * tiles_y # Step 4: Calculate final token count return ( total_tiles * self.HIGH_DETAIL_TILE_TOKENS ) + self.LOW_DETAIL_IMAGE_TOKENS def count_content(self, content: Union[str, List[Union[str, dict]]]) -> int: """Calculate tokens for message content""" if not content: return 0 if isinstance(content, str): return self.count_text(content) token_count = 0 for item in content: if isinstance(item, str): token_count += self.count_text(item) elif isinstance(item, dict): if "text" in item: token_count += self.count_text(item["text"]) elif "image_url" in item: token_count += self.count_image(item) return token_countLLM 处理大图像时,并不是简单地按像素点数计算,而是将其分解为多个“块”(tiles),并对每个块收取一定的 token 费用。同时,还会有一个基础的 token 费用(低分辨率图像的 token 费用)。
- 消息格式化
- 工具调用处理
- 重试机制
@retry( wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6), retry=retry_if_exception_type( (OpenAIError, Exception, ValueError) ), # Don't retry TokenLimitExceeded )@retry 装饰器作用的对象是一个函数或方法。它用来增强该函数的容错能力:面对临时、可恢复的错误(如网络抖动、服务器临时过载)时自动重试,而不是直接失败。相当于:
original_get_completion = get_completion # 保存原始函数 get_completion = retry(...)(original_get_completion) # 用 retry 返回的新函数替换原始函数
6. 数据模型
app/schema.py:定义核心数据结构:Message/ToolCall/Memory等
7. 工具框架
app/tool/base.py:抽象工具基类BaseTool和标准输出ToolResultapp/tool/tool_collection.py:工具集合管理器
class ToolCollection:
"""A collection of defined tools."""
class Config:
arbitrary_types_allowed = True
def __init__(self, *tools: BaseTool):
self.tools = tools
self.tool_map = {tool.name: tool for tool in tools}
def __iter__(self):
return iter(self.tools)
def to_params(self) -> List[Dict[str, Any]]:
return [tool.to_param() for tool in self.tools]
async def execute(
self, *, name: str, tool_input: Dict[str, Any] = None
) -> ToolResult:
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
try:
result = await tool(**tool_input)
return result
except ToolError as e:
return ToolFailure(error=e.message)
async def execute_all(self) -> List[ToolResult]:
"""Execute all tools in the collection sequentially."""
results = []
for tool in self.tools:
try:
result = await tool()
results.append(result)
except ToolError as e:
results.append(ToolFailure(error=e.message))
return results
- 当
execute方法被调用时,它接收name和tool_input(一个字典,例如{"text": "hello", "count": 2}).tool = self.tool_map.get(name)检索到对应的BaseTool,传入tool_input,BaseTool定义了__call__
class BaseTool(ABC, BaseModel):
name: str
description: str
parameters: Optional[dict] = None
class Config:
arbitrary_types_allowed = True
async def __call__(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""
return await self.execute(**kwargs)
app/tool/mcp.py:客户端MCP协议实现(远程工具代理)
async def connect_stdio(
self, command: str, args: List[str], server_id: str = ""
) -> None:
"""Connect to an MCP server using stdio transport."""
if not command:
raise ValueError("Server command is required.")
server_id = server_id or command
# Always ensure clean disconnection before new connection
if server_id in self.sessions:
await self.disconnect(server_id)
exit_stack = AsyncExitStack()
self.exit_stacks[server_id] = exit_stack
server_params = StdioServerParameters(command=command, args=args)
stdio_transport = await exit_stack.enter_async_context(
stdio_client(server_params)
)
read, write = stdio_transport
session = await exit_stack.enter_async_context(ClientSession(read, write))
self.sessions[server_id] = session
await self._initialize_and_list_tools(server_id)
使用 stdio 连接意味着,主程序(这里是 Manus 代理所在的客户端程序)不会通过网络(如 HTTP 或 TCP/IP)去连接远程服务器,而是**在本地启动另一个程序作为“服务器”,再通过这个新程序的标准输入/输出管道与它双向通信。**只要它能读写标准输入/输出并遵循 MCP 协议,Python 客户端就可以与它集成。
app/mcp/server.py:服务端MCP实现(暴露工具集)- 具体工具实现:
bash.py/python_execute.pybrowser_use_tool.py/web_search.pyfile_operators.py/ask_human.py/terminate.py
8. 沙箱环境
app/sandbox/client.py:基于 Docker 的安全代码执行环境
9. 异常处理
app/exceptions.py:定制异常类(如ToolError/TokenLimitExceeded)