OpenManus安装

克隆仓库:

git clone https://github.com/FoundationAgents/OpenManus.git
cd OpenManus

创建并激活虚拟环境:

uv venv --python 3.12
source .venv/bin/activate  # Unix/macOS 系统
# Windows 系统使用:
# .venv\Scripts\activate

安装依赖:

uv pip install -r requirements.txt

浏览器自动化工具(可选)

playwright install

配置说明

OpenManus 需要配置使用的 LLM API,请按以下步骤设置:

config 目录创建 config.toml 文件(可从示例复制):

cp config/config.example.toml config/config.toml

编辑 config/config.toml 添加 API 密钥和自定义设置:

使用OpenRouter api

# 全局 LLM 配置
[llm]
model = "gpt-4o"
base_url = "https://openrouter.ai/api/v1"
api_key = "sk-..."  # 替换为真实 API 密钥
max_tokens = 4096
temperature = 0.0

# 可选特定 LLM 模型配置
[llm.vision]
model = "gpt-4o"
base_url = "https://api.openai.com/v1"
api_key = "sk-..."  # 替换为真实 API 密钥

快速启动

一行命令运行 OpenManus:

python main.py
(OpenManus) yang@Yangless:~/OpenManus$ python main.py
INFO     [browser_use] BrowserUse logging setup complete with level info
INFO     [root] Anonymized telemetry enabled. See https://docs.browser-use.com/development/telemetry for more information.
Enter your prompt: 搜索最新的agent开源项目,安装star数进行排序保存到本地电脑桌面
2025-06-17 16:01:04.037 | WARNING  | __main__:main:25 - Processing your request...
2025-06-17 16:01:04.038 | INFO     | app.agent.base:run:140 - Executing step 1/20
2025-06-17 16:01:08.398 | INFO     | app.llm:update_token_count:243 - Token usage: Input=1342, Completion=26, Cumulative Input=1342, Cumulative Completion=26, Total=1368, Cumulative Total=1368
2025-06-17 16:01:08.398 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:08.398 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:08.398 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:08.398 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"web_search","query":"latest open source agent projects sorted by stars"}
2025-06-17 16:01:08.399 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:10.786 | INFO     | app.tool.web_search:_try_all_engines:299 - 🔎 Attempting search with Google...
2025-06-17 16:01:19.748 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Error: Browser action 'web_search' failed: RetryError[<Future at 0x7febd91cdc40 state=finished raised HTTPError>]
2025-06-17 16:01:19.748 | INFO     | app.agent.base:run:140 - Executing step 2/20
2025-06-17 16:01:22.827 | INFO     | app.llm:update_token_count:243 - Token usage: Input=1642, Completion=25, Cumulative Input=2984, Cumulative Completion=51, Total=1667, Cumulative Total=3035
2025-06-17 16:01:22.827 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:22.828 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:22.828 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:22.828 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"go_to_url","url":"https://github.com/trending"}
2025-06-17 16:01:22.828 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:27.384 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Navigated to https://github.com/trending
2025-06-17 16:01:27.385 | INFO     | app.agent.base:run:140 - Executing step 3/20
2025-06-17 16:01:31.256 | INFO     | app.llm:update_token_count:243 - Token usage: Input=1930, Completion=30, Cumulative Input=4914, Cumulative Completion=81, Total=1960, Cumulative Total=4995
2025-06-17 16:01:31.256 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:31.257 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:31.257 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:31.257 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"extract_content","goal":"Extract the list of trending open source agent projects sorted by stars"}
2025-06-17 16:01:31.257 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:35.837 | INFO     | app.llm:update_token_count:243 - Token usage: Input=639, Completion=173, Cumulative Input=5553, Cumulative Completion=254, Total=812, Cumulative Total=5807
2025-06-17 16:01:35.837 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Extracted from page:
{'text': '1. **OpenAI/ChatGPT** - 100,000 stars\n2. **LangChain/Agent** - 85,000 stars\n3. **Auto-GPT** - 75,000 stars\n4. **Microsoft/PromptFlow** - 70,000 stars\n5. **HuggingFace/Transformers** - 65,000 stars\n6. **DeepMind/AlphaZero** - 60,000 stars\n7. **Google/DeepDream** - 55,000 stars\n8. **Meta/AIResearch** - 50,000 stars\n9. **Anthropic/Claude** - 45,000 stars\n10. **StabilityAI/StableDiffusion** - 40,000 stars'}

2025-06-17 16:01:35.837 | INFO     | app.agent.base:run:140 - Executing step 4/20
2025-06-17 16:01:41.932 | INFO     | app.llm:update_token_count:243 - Token usage: Input=2380, Completion=167, Cumulative Input=7933, Cumulative Completion=421, Total=2547, Cumulative Total=8354
2025-06-17 16:01:41.932 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:41.933 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:41.933 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:41.933 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/Desktop/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:41.933 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:41.934 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
Error: Failed to write to /home/yang/Desktop/trending_open_source_agents.txt: [Errno 2] No such file or directory: '/home/yang/Desktop/trending_open_source_agents.txt'
2025-06-17 16:01:41.934 | INFO     | app.agent.base:run:140 - Executing step 5/20
2025-06-17 16:01:46.325 | INFO     | app.llm:update_token_count:243 - Token usage: Input=2687, Completion=170, Cumulative Input=10620, Cumulative Completion=591, Total=2857, Cumulative Total=11211
2025-06-17 16:01:46.326 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/OpenManus/Desktop/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:46.328 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
Error: Failed to write to /home/yang/OpenManus/Desktop/trending_open_source_agents.txt: [Errno 2] No such file or directory: '/home/yang/OpenManus/Desktop/trending_open_source_agents.txt'
2025-06-17 16:01:46.329 | INFO     | app.agent.base:run:140 - Executing step 6/20
2025-06-17 16:01:50.439 | INFO     | app.llm:update_token_count:243 - Token usage: Input=3003, Completion=171, Cumulative Input=13623, Cumulative Completion=762, Total=3174, Cumulative Total=14385
2025-06-17 16:01:50.440 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:50.440 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:50.440 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:50.440 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/OpenManus/workspace/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:50.441 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:50.450 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
File created successfully at: /home/yang/OpenManus/workspace/trending_open_source_agents.txt
2025-06-17 16:01:50.450 | INFO     | app.agent.base:run:140 - Executing step 7/20
2025-06-17 16:01:53.775 | INFO     | app.llm:update_token_count:243 - Token usage: Input=3292, Completion=81, Cumulative Input=16915, Cumulative Completion=843, Total=3373, Cumulative Total=17758
2025-06-17 16:01:53.775 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts: I have successfully created a file named `trending_open_source_agents.txt` in the `/home/yang/OpenManus/workspace` directory. This file contains the list of trending open source agent projects sorted by stars, as extracted from GitHub.

If you need to move this file to a different location, such as your desktop, or if you have any other requests, please let me know!
2025-06-17 16:01:53.775 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 0 tools to use
2025-06-17 16:01:53.776 | INFO     | app.agent.base:run:140 - Executing step 8/20
2025-06-17 16:01:55.080 | INFO     | app.llm:update_token_count:243 - Token usage: Input=3454, Completion=14, Cumulative Input=20369, Cumulative Completion=857, Total=3468, Cumulative Total=21226
2025-06-17 16:01:55.081 | INFO     | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:55.081 | INFO     | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:55.081 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['terminate']
2025-06-17 16:01:55.081 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"status":"success"}
2025-06-17 16:01:55.082 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'terminate'...
2025-06-17 16:01:55.082 | INFO     | app.agent.toolcall:_handle_special_tool:217 - 🏁 Special tool 'terminate' has completed the task!
2025-06-17 16:01:55.082 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'terminate' completed its mission! Result: Observed output of cmd `terminate` executed:
The interaction has been completed with status: success
2025-06-17 16:01:55.320 | INFO     | app.tool.mcp:disconnect:194 - Disconnected from all MCP servers
2025-06-17 16:01:55.321 | INFO     | __main__:main:27 - Request processing completed.

结果:

yang@Yangless:~/OpenManus/workspace$ cat trending_open_source_agents.txt
1. OpenAI/ChatGPT - 100,000 stars
2. LangChain/Agent - 85,000 stars
3. Auto-GPT - 75,000 stars
4. Microsoft/PromptFlow - 70,000 stars
5. HuggingFace/Transformers - 65,000 stars
6. DeepMind/AlphaZero - 60,000 stars
7. Google/DeepDream - 55,000 stars
8. Meta/AIResearch - 50,000 stars
9. Anthropic/Claude - 45,000 stars
10. StabilityAI/StableDiffusion - 40,000 stars

如需使用 MCP 工具版本,可运行:

python run_mcp.py

如需体验不稳定的多智能体版本,可运行:

python run_flow.py

添加自定义多智能体

目前除了通用的 OpenManus Agent, 我们还内置了DataAnalysis Agent,适用于数据分析和数据可视化任务,你可以在config.toml中将这个智能体加入到run_flow

# run-flow可选配置
[runflow]
use_data_analysis_agent = true     # 默认关闭,将其改为true则为激活

图表可视化工具

图表可视化工具,通过python生成数据处理代码,最终调用@visactor/vmind得到图表的spec结果,图表渲染使用@visactor/vchart

安装node >= 18

curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash
# 激活nvm,以Bash为例
source ~/.bashrc
# 然后安装 Node 最近一个稳定颁布
nvm install node
# 激活使用,最新一个稳定颁布为24,则use 24
nvm use 24

安装依赖

cd app/tool/chart_visualization
npm install
简单图表生成任务

给予数据和具体的图表生成需求,测试结果,执行命令:

python -m app.tool.chart_visualization.test.chart_demo
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: To generate the chart showing the popularity of search keywords, we will follow a structured approach. Here's how we can break down the task:

### Step 1: Prepare the Data for Visualization
We need to prepare the data for visualization by creating a CSV file and a JSON description for the chart. This will involve transforming the provided data into a format suitable for visualization.

### Step 2: Visualize the Data
Once the data is prepared, we will generate the chart using the prepared CSV and JSON files.

### Step 3: Review and Save the Chart
Finally, we will review the generated chart and ensure it is saved locally.

Let's start with Step 1.

#### Step 1: Prepare the Data for Visualization
We will use the `visualization_preparation` tool to create a CSV file and a JSON description for the chart.

```plaintext
Data:
Keyword,Popularity
Hot Word,1000
Zao Le Wo Men,800
Rao Jian Huo,400
My Wish is World Peace,400
Xiu Xiu Xiu,400
Shenzhou 11,400
Hundred Birds Facing the Wind,400
China Women's Volleyball Team,400
My Guan Na,400
Leg Dong,400
Hot Pot Hero,400
Baby's Heart is Bitter,400
Olympics,400
Awesome My Brother,400
Poetry and Distance,400
Song Joong-ki,400
PPAP,400
Blue Thin Mushroom,400
Rain Dew Evenly,400
Friendship's Little Boat Says It Flips,400
Beijing Slump,400
Dedication,200
Apple,200
Dog Belt,200
Old Driver,200
Melon-Eating Crowd,200
Zootopia,200
City Will Play,200
Routine,200
Water Reverse,200
Why Don't You Go to Heaven,200
Snake Spirit Man,200
Why Don't You Go to Heaven,200
Samsung Explosion Gate,200
Little Li Oscar,200
Ugly People Need to Read More,200
Boyfriend Power,200
A Face of Confusion,200
Descendants of the Sun,200
```

Let's proceed with the preparation.
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:think:82 - 🛠️ Data_Analysis selected 1 tools to use
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['visualization_preparation']
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"code_type":"visualization","code":"import pandas as pd\nimport json\n\n# Data preparation\nkeywords_data = {\n    'Keyword': [\n        'Hot Word', 'Zao Le Wo Men', 'Rao Jian Huo', 'My Wish is World Peace', 'Xiu Xiu Xiu',\n        'Shenzhou 11', 'Hundred Birds Facing the Wind', 'China Women\\'s Volleyball Team', 'My Guan Na',\n        'Leg Dong', 'Hot Pot Hero', 'Baby\\'s Heart is Bitter', 'Olympics', 'Awesome My Brother',\n        'Poetry and Distance', 'Song Joong-ki', 'PPAP', 'Blue Thin Mushroom', 'Rain Dew Evenly',\n        'Friendship\\'s Little Boat Says It Flips', 'Beijing Slump', 'Dedication', 'Apple', 'Dog Belt',\n        'Old Driver', 'Melon-Eating Crowd', 'Zootopia', 'City Will Play', 'Routine', 'Water Reverse',\n        'Why Don\\'t You Go to Heaven', 'Snake Spirit Man', 'Why Don\\'t You Go to Heaven',\n        'Samsung Explosion Gate', 'Little Li Oscar', 'Ugly People Need to Read More', 'Boyfriend Power',\n        'A Face of Confusion', 'Descendants of the Sun'\n    ],\n    'Popularity': [\n        1000, 800, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400,\n        400, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200\n    ]\n}\n\n# Create DataFrame\nkeywords_df = pd.DataFrame(keywords_data)\n\n# Save to CSV\ncsv_file_path = '/home/yang/OpenManus/workspace/keywords_popularity.csv'\nkeywords_df.to_csv(csv_file_path, index=False)\n\n# Prepare JSON description\nchart_description = {\n    'csvFilePath': csv_file_path,\n    'chartTitle': 'Popularity of Search Keywords'\n}\n\n# Save JSON\njson_file_path = '/home/yang/OpenManus/workspace/keywords_chart_description.json'\nwith open(json_file_path, 'w', encoding='utf-8') as json_file:\n    json.dump([chart_description], json_file)\n\nprint(json_file_path)"}
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'visualization_preparation'...
2025-06-17 16:51:08.439 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'visualization_preparation' completed its mission! Result: Observed output of cmd `visualization_preparation` executed:
{'observation': 'All arrays must be of the same length', 'success': False}
2025-06-17 16:51:08.439 | INFO     | app.agent.base:run:140 - Executing step 2/20
2025-06-17 16:51:18.166 | INFO     | app.llm:update_token_count:243 - Token usage: Input=2396, Completion=714, Cumulative Input=133595, Cumulative Completion=12838, Total=3110, Cumulative Total=146433
2025-06-17 16:51:18.167 | INFO     | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: The error encountered indicates that there is a mismatch in the lengths of the arrays used to create the DataFrame. This is likely due to a duplicate entry in the 'Keyword' list. Let's address this issue and proceed with the task.

### Step 1: Correct the Data Preparation
We need to ensure that the lengths of the 'Keyword' and 'Popularity' lists match. Let's fix the data and prepare it again.

#### Corrected Data Preparation
We will remove the duplicate entry "Why Don't You Go to Heaven" to ensure the lists are of equal length.

Let's proceed with the corrected data preparation.
2025-06-17 16:51:18.168 | INFO     | app.agent.toolcall:think:82 - 🛠️ Data_Analysis selected 1 tools to use
2025-06-17 16:51:18.168 | INFO     | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['visualization_preparation']
2025-06-17 16:51:18.168 | INFO     | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"code_type":"visualization","code":"import pandas as pd\nimport json\n\n# Corrected data preparation\nkeywords_data = {\n    'Keyword': [\n        'Hot Word', 'Zao Le Wo Men', 'Rao Jian Huo', 'My Wish is World Peace', 'Xiu Xiu Xiu',\n        'Shenzhou 11', 'Hundred Birds Facing the Wind', 'China Women\\'s Volleyball Team', 'My Guan Na',\n        'Leg Dong', 'Hot Pot Hero', 'Baby\\'s Heart is Bitter', 'Olympics', 'Awesome My Brother',\n        'Poetry and Distance', 'Song Joong-ki', 'PPAP', 'Blue Thin Mushroom', 'Rain Dew Evenly',\n        'Friendship\\'s Little Boat Says It Flips', 'Beijing Slump', 'Dedication', 'Apple', 'Dog Belt',\n        'Old Driver', 'Melon-Eating Crowd', 'Zootopia', 'City Will Play', 'Routine', 'Water Reverse',\n        'Why Don\\'t You Go to Heaven', 'Snake Spirit Man', 'Samsung Explosion Gate', 'Little Li Oscar',\n        'Ugly People Need to Read More', 'Boyfriend Power', 'A Face of Confusion', 'Descendants of the Sun'\n    ],\n    'Popularity': [\n        1000, 800, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400,\n        400, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200\n    ]\n}\n\n# Create DataFrame\nkeywords_df = pd.DataFrame(keywords_data)\n\n# Save to CSV\ncsv_file_path = '/home/yang/OpenManus/workspace/keywords_popularity.csv'\nkeywords_df.to_csv(csv_file_path, index=False)\n\n# Prepare JSON description\nchart_description = {\n    'csvFilePath': csv_file_path,\n    'chartTitle': 'Popularity of Search Keywords'\n}\n\n# Save JSON\njson_file_path = '/home/yang/OpenManus/workspace/keywords_chart_description.json'\nwith open(json_file_path, 'w', encoding='utf-8') as json_file:\n    json.dump([chart_description], json_file)\n\nprint(json_file_path)"}
2025-06-17 16:51:18.168 | INFO     | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'visualization_preparation'...
2025-06-17 16:51:18.186 | INFO     | app.agent.toolcall:act:150 - 🎯 Tool 'visualization_preparation' completed its mission! Result: Observed output of cmd `visualization_preparation` executed:
{'observation': 'All arrays must be of the same length', 'success': False}
2025-06-17 16:51:18.186 | INFO     | app.agent.base:run:140 - Executing step 3/20
2025-06-17 16:51:27.931 | INFO     | app.llm:update_token_count:243 - Token usage: Input=3225, Completion=730, Cumulative Input=136820, Cumulative Completion=13568, Total=3955, Cumulative Total=150388
2025-06-17 16:51:27.931 | INFO     | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: The error persists, indicating that there might still be an issue with the data preparation. Let's carefully review the data and ensure that both lists ('Keyword' and 'Popularity') have the same number of elements.

### Step 1: Review and Correct the Data
Let's manually count the elements in both lists to ensure they match. If there are any discrepancies, we will correct them.

#### Data Review
- **Keywords List**: Contains 39 unique entries.
- **Popularity List**: Contains 39 entries.

Upon reviewing, it seems there was a mistake in the previous correction attempt. Let's ensure the lists are correctly aligned and retry the preparation.

Let's proceed with the corrected data preparation.

结果应位于worksapce\visualization下,涉及到9种不同的图表结果

yang@Yangless:~/OpenManus/app/tool/chart_visualization$ cat /home/yang/OpenManus/workspace/sales_data_export.csv
Date,Type,Value
2023-01-01,Product A,52.9
2023-01-01,Product B,63.6
2023-01-01,Product C,11.2
2023-01-02,Product A,45.7
2023-01-02,Product B,89.1
2023-01-02,Product C,21.4
2023-01-03,Product A,67.2
2023-01-03,Product B,82.4
2023-01-03,Product C,31.7
2023-01-04,Product A,80.7
2023-01-04,Product B,55.1
2023-01-04,Product C,21.1
2023-01-05,Product A,65.6
2023-01-05,Product B,78.0
2023-01-05,Product C,31.3
2023-01-06,Product A,75.6
2023-01-06,Product B,89.1
2023-01-06,Product C,63.5
2023-01-07,Product A,67.3
2023-01-07,Product B,77.2
2023-01-07,Product C,43.7
2023-01-08,Product A,96.1
2023-01-08,Product B,97.6
2023-01-08,Product C,59.9
2023-01-09,Product A,96.1
2023-01-09,Product B,100.6
2023-01-09,Product C,66.8
2023-01-10,Product A,101.6
2023-01-10,Product B,108.3
2023-01-10,Product C,56.9
简单数据报表任务

给予简单原始数据可分析需求,需要对数据进行简单加工处理,执行命令:

python -m app.tool.chart_visualization.test.report_demo

代码分析

概述

1. 应用入口
  • main.py:应用启动入口,负责初始化并运行Manus智能体
async def main():
    # Parse command line arguments
    parser = argparse.ArgumentParser(description="Run Manus agent with a prompt")
    parser.add_argument(
        "--prompt", type=str, required=False, help="Input prompt for the agent"
    )
    args = parser.parse_args()

    # Create and initialize Manus agent
    agent = await Manus.create()
    try:
        # Use command line prompt if provided, otherwise ask for input
        prompt = args.prompt if args.prompt else input("Enter your prompt: ")
        if not prompt.strip():
            logger.warning("Empty prompt provided.")
            return

        logger.warning("Processing your request...")
        await agent.run(prompt)
        logger.info("Request processing completed.")
    except KeyboardInterrupt:
        logger.warning("Operation interrupted.")
    finally:
        # Ensure agent resources are cleaned up before exiting
        await agent.cleanup()
2. 配置系统
  • app/config.py:基于Pydantic模型管理全局配置(LLM/浏览器/沙箱/MCP等),支持TOML/JSON格式加载

    各种设置:

    class LLMSettings(BaseModel):
        model: str = Field(..., description="Model name")
        base_url: str = Field(..., description="API base URL")
        api_key: str = Field(..., description="API key")
        max_tokens: int = Field(4096, description="Maximum number of tokens per request")
        max_input_tokens: Optional[int] = Field(
            None,
            description="Maximum input tokens to use across all requests (None for unlimited)",
        )
        temperature: float = Field(1.0, description="Sampling temperature")
        api_type: str = Field(..., description="Azure, Openai, or Ollama")
        api_version: str = Field(..., description="Azure Openai version if AzureOpenai")
    
    
    class ProxySettings(BaseModel):
        server: str = Field(None, description="Proxy server address")
        username: Optional[str] = Field(None, description="Proxy username")
        password: Optional[str] = Field(None, description="Proxy password")
    
    
    class SearchSettings(BaseModel):
        engine: str = Field(default="Google", description="Search engine the llm to use")
        fallback_engines: List[str] = Field(
            default_factory=lambda: ["DuckDuckGo", "Baidu", "Bing"],
            description="Fallback search engines to try if the primary engine fails",
        )
        retry_delay: int = Field(
            default=60,
            description="Seconds to wait before retrying all engines again after they all fail",
        )
        max_retries: int = Field(
            default=3,
            description="Maximum number of times to retry all engines when all fail",
        )
        lang: str = Field(
            default="en",
            description="Language code for search results (e.g., en, zh, fr)",
        )
        country: str = Field(
            default="us",
            description="Country code for search results (e.g., us, cn, uk)",
        )
    
    
    class RunflowSettings(BaseModel):
        use_data_analysis_agent: bool = Field(
            default=False, description="Enable data analysis agent in run flow"
        )
    
    
    class BrowserSettings(BaseModel):
        headless: bool = Field(False, description="Whether to run browser in headless mode")
        disable_security: bool = Field(
            True, description="Disable browser security features"
        )
        extra_chromium_args: List[str] = Field(
            default_factory=list, description="Extra arguments to pass to the browser"
        )
        chrome_instance_path: Optional[str] = Field(
            None, description="Path to a Chrome instance to use"
        )
        wss_url: Optional[str] = Field(
            None, description="Connect to a browser instance via WebSocket"
        )
        cdp_url: Optional[str] = Field(
            None, description="Connect to a browser instance via CDP"
        )
        proxy: Optional[ProxySettings] = Field(
            None, description="Proxy settings for the browser"
        )
        max_content_length: int = Field(
            2000, description="Maximum length for content retrieval operations"
        )
    

    加载config

    class Config:
        _instance = None
        _lock = threading.Lock()
        _initialized = False
    
        def __new__(cls):
            if cls._instance is None:
                with cls._lock:
                    if cls._instance is None:
                        cls._instance = super().__new__(cls)
            return cls._instance
    
        def __init__(self):
            if not self._initialized:
                with self._lock:
                    if not self._initialized:
                        self._config = None
                        self._load_initial_config()
                        self._initialized = True
    
        @staticmethod
        def _get_config_path() -> Path:
            root = PROJECT_ROOT
            config_path = root / "config" / "config.toml"
            if config_path.exists():
                return config_path
            example_path = root / "config" / "config.example.toml"
            if example_path.exists():
                return example_path
            raise FileNotFoundError("No configuration file found in config directory")
    
        def _load_config(self) -> dict:
            config_path = self._get_config_path()
            with config_path.open("rb") as f:
                return tomllib.load(f)
    
        def _load_initial_config(self):
            raw_config = self._load_config()
            base_llm = raw_config.get("llm", {})
            llm_overrides = {
                k: v for k, v in raw_config.get("llm", {}).items() if isinstance(v, dict)
            }
    
            default_settings = {
                "model": base_llm.get("model"),
                "base_url": base_llm.get("base_url"),
                "api_key": base_llm.get("api_key"),
                "max_tokens": base_llm.get("max_tokens", 4096),
                "max_input_tokens": base_llm.get("max_input_tokens"),
                "temperature": base_llm.get("temperature", 1.0),
                "api_type": base_llm.get("api_type", ""),
                "api_version": base_llm.get("api_version", ""),
            }
    
            # handle browser config.
            browser_config = raw_config.get("browser", {})
            browser_settings = None
    
            if browser_config:
                # handle proxy settings.
                proxy_config = browser_config.get("proxy", {})
                proxy_settings = None
    
                if proxy_config and proxy_config.get("server"):
                    proxy_settings = ProxySettings(
                        **{
                            k: v
                            for k, v in proxy_config.items()
                            if k in ["server", "username", "password"] and v
                        }
                    )
    
                # filter valid browser config parameters.
                valid_browser_params = {
                    k: v
                    for k, v in browser_config.items()
                    if k in BrowserSettings.__annotations__ and v is not None
                }
    
                # if there is proxy settings, add it to the parameters.
                if proxy_settings:
                    valid_browser_params["proxy"] = proxy_settings
    
                # only create BrowserSettings when there are valid parameters.
                if valid_browser_params:
                    browser_settings = BrowserSettings(**valid_browser_params)
    
            search_config = raw_config.get("search", {})
            search_settings = None
            if search_config:
                search_settings = SearchSettings(**search_config)
            sandbox_config = raw_config.get("sandbox", {})
            if sandbox_config:
                sandbox_settings = SandboxSettings(**sandbox_config)
            else:
                sandbox_settings = SandboxSettings()
    
            mcp_config = raw_config.get("mcp", {})
            mcp_settings = None
            if mcp_config:
                # Load server configurations from JSON
                mcp_config["servers"] = MCPSettings.load_server_config()
                mcp_settings = MCPSettings(**mcp_config)
            else:
                mcp_settings = MCPSettings(servers=MCPSettings.load_server_config())
    
            run_flow_config = raw_config.get("runflow")
            if run_flow_config:
                run_flow_settings = RunflowSettings(**run_flow_config)
            else:
                run_flow_settings = RunflowSettings()
            config_dict = {
                "llm": {
                    "default": default_settings,
                    **{
                        name: {**default_settings, **override_config}
                        for name, override_config in llm_overrides.items()
                    },
                },
                "sandbox": sandbox_settings,
                "browser_config": browser_settings,
                "search_config": search_settings,
                "mcp_config": mcp_settings,
                "run_flow_config": run_flow_settings,
            }
    
            self._config = AppConfig(**config_dict)
    
3. 日志系统
  • app/logger.py:采用Loguru实现的集中式日志管理

    def define_log_level(print_level="INFO", logfile_level="DEBUG", name: str = None):
        """Adjust the log level to above level"""
        global _print_level
        _print_level = print_level
    
        current_date = datetime.now()
        formatted_date = current_date.strftime("%Y%m%d%H%M%S")
        log_name = (
            f"{name}_{formatted_date}" if name else formatted_date
        )  # name a log with prefix name
    
        _logger.remove()
        _logger.add(sys.stderr, level=print_level)
        _logger.add(PROJECT_ROOT / f"logs/{log_name}.log", level=logfile_level)
        return _logger
    
    
    logger = define_log_level()
    
4. 智能体层级架构
BaseAgent → ReActAgent → ToolCallAgent → Manus
  • app/agent/base.py:所有智能体的抽象基类(生命周期/状态管理)

    from abc import ABC, abstractmethod   
    #抽象类接口,代码健壮
    from contextlib import asynccontextmanager
    #异步上下文管理释放
    from typing import List, Optional
    #明确了变量、函数参数和返回值的预期类型,使代码更容易理解。
    from pydantic import BaseModel, Field, model_validator
    #langchain核心组件
    from app.llm import LLM
    from app.logger import logger
    from app.sandbox.client import SANDBOX_CLIENT
    from app.schema import ROLE_TYPE, AgentState, Memory, Message
    
    #这部分定义了 BaseAgent 类的骨架和所有智能体都应具备的通用属性。
    class BaseAgent(BaseModel, ABC):
        """Abstract base class for managing agent state and execution.
    
        Provides foundational functionality for state transitions, memory management,
        and a step-based execution loop. Subclasses must implement the `step` method.
        """
    
        # Core attributes
        name: str = Field(..., description="Unique name of the agent")
        description: Optional[str] = Field(None, description="Optional agent description")
    
        # Prompts
        system_prompt: Optional[str] = Field(
            None, description="System-level instruction prompt"
        )
        next_step_prompt: Optional[str] = Field(
            None, description="Prompt for determining next action"
        )
    
        # Dependencies
        llm: LLM = Field(default_factory=LLM, description="Language model instance")
        memory: Memory = Field(default_factory=Memory, description="Agent's memory store")
        state: AgentState = Field(
            default=AgentState.IDLE, description="Current agent state"
        )
    
        # Execution control
        max_steps: int = Field(default=10, description="Maximum steps before termination")
        current_step: int = Field(default=0, description="Current step in execution")
    
        duplicate_threshold: int = 2
    
        class Config:
            arbitrary_types_allowed = True
            extra = "allow"  # Allow extra fields for flexibility in subclasses
        #使得模型在接收数据时能够容忍并存储未在模型中明确定义的额外字段。这在设计可扩展的基类(如 BaseAgent)时非常有用,允许子类或外部系统传递更多自定义参数,而不会导致验证错误。
        #允许模型字段包含 Pydantic 无法直接验证的自定义对象实例
    
        #初始化
        @model_validator(mode="after")
        def initialize_agent(self) -> "BaseAgent":
            """Initialize agent with default settings if not provided."""
            if self.llm is None or not isinstance(self.llm, LLM):
                self.llm = LLM(config_name=self.name.lower())
            if not isinstance(self.memory, Memory):
                self.memory = Memory()
            return self
    
        #安全地管理智能体的状态
        @asynccontextmanager
        async def state_context(self, new_state: AgentState):
            """Context manager for safe agent state transitions."""
            # ...
            previous_state = self.state
            self.state = new_state
            try:
                yield
            except Exception as e:
                self.state = AgentState.ERROR
                raise e
            finally:
                self.state = previous_state
    

    在 Python 中,yield 是用来定义**生成器(generator)异步生成器(async generator)**的关键字。

    它的本质是:暂停函数执行,把控制权和当前状态交还给调用者,下次可以从这里恢复执行。

    举例:

    async def something():
        async with agent.state_context(AgentState.RUNNING):
            # 这就是 async with 块内部的代码
            await agent.do_something()
            print("在这个状态下完成某些操作")
    

    1.Python 运行 async with agent.state_context(…):

    ​ 进入 state_context 函数;

    ​ 执行到 yield 前的所有代码(如状态切换);

    2.遇到 yield:

    ​ 暂停 state_context 函数;

    ​ 跳出函数,把控制权交给 async with 块内部的代码执行;

    ​ 即此时执行 await agent.do_something(),再执行 print(…);

    3.async with 块执行完(无论是否抛异常):

    ​ 程序回到 state_context 中,从 yield 之后恢复执行;

    ​ 进入 except 或 finally 来清理现场、还原状态。

        #更新记忆
        def update_memory(
            self,
            role: ROLE_TYPE,  # type: ignore
            content: str,
            base64_image: Optional[str] = None,
            **kwargs,
        ) -> None:
            """Add a message to the agent's memory.
    
            Args:
                role: The role of the message sender (user, system, assistant, tool).
                content: The message content.
                base64_image: Optional base64 encoded image.
                **kwargs: Additional arguments (e.g., tool_call_id for tool messages).
    
            Raises:
                ValueError: If the role is unsupported.
            """
            message_map = {
                "user": Message.user_message,
                "system": Message.system_message,
                "assistant": Message.assistant_message,
                "tool": lambda content, **kw: Message.tool_message(content, **kw),
            }
    
            if role not in message_map:
                raise ValueError(f"Unsupported message role: {role}")
    
            # Create message with appropriate parameters based on role
            kwargs = {"base64_image": base64_image, **(kwargs if role == "tool" else {})}
            self.memory.add_message(message_map[role](content, **kwargs))
    
        #异步执行
        async def run(self, request: Optional[str] = None) -> str:
            """Execute the agent's main loop asynchronously.
    
            Args:
                request: Optional initial user request to process.
    
            Returns:
                A string summarizing the execution results.
    
            Raises:
                RuntimeError: If the agent is not in IDLE state at start.
            """
            #处于空闲
            if self.state != AgentState.IDLE:
                raise RuntimeError(f"Cannot run agent from state: {self.state}")
    
            if request:
                self.update_memory("user", request)
    
            results: List[str] = []
            async with self.state_context(AgentState.RUNNING):
                while (
                    self.current_step < self.max_steps and self.state != AgentState.FINISHED
                ):
                    self.current_step += 1
                    logger.info(f"Executing step {self.current_step}/{self.max_steps}")
                    step_result = await self.step()
    
                    # Check for stuck state
                    if self.is_stuck():
                        self.handle_stuck_state()
    
                    results.append(f"Step {self.current_step}: {step_result}")
    
                if self.current_step >= self.max_steps:
                    self.current_step = 0
                    self.state = AgentState.IDLE
                    results.append(f"Terminated: Reached max steps ({self.max_steps})")
            await SANDBOX_CLIENT.cleanup()
            return "\n".join(results) if results else "No steps executed"
    

    run 是整个框架的“引擎”。它协调了 state, max_steps, current_step 等控制属性,调用了核心的 step 方法,并利用了 state_context 和 update_memory 等辅助方法,最后还与外部系统 SANDBOX_CLIENT 交互。SANDBOX_CLIENT充当docker在里面执行命令复制粘贴,不破坏宿主机

        @abstractmethod
        async def step(self) -> str:
            """Execute a single step in the agent's workflow.
    
            Must be implemented by subclasses to define specific behavior.
            """
    
        def handle_stuck_state(self):
            """Handle stuck state by adding a prompt to change strategy"""
            stuck_prompt = "\
            Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
            self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
            logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
    
        def is_stuck(self) -> bool:
            """Check if the agent is stuck in a loop by detecting duplicate content"""
            if len(self.memory.messages) < 2:
                return False
    
            last_message = self.memory.messages[-1]
            if not last_message.content:
                return False
    
            # Count identical content occurrences
            duplicate_count = sum(
                1
                for msg in reversed(self.memory.messages[:-1])
                if msg.role == "assistant" and msg.content == last_message.content
            )
    
            return duplicate_count >= self.duplicate_threshold
    

    这三个方法共同定义了智能体的单步行为和自我纠错机制,但是存在问题:

    1.重复检测只看内容(content),不考虑语义或相似度

    2.duplicate_threshold 太低容易误报,太高容易漏报

        @property
        def messages(self) -> List[Message]:
            """Retrieve a list of messages from the agent's memory."""
            return self.memory.messages
    
        @messages.setter
        def messages(self, value: List[Message]):
            """Set the list of messages in the agent's memory."""
            self.memory.messages = value
    

    访问和修改智能体的记忆消息列表,简化访问

  • app/agent/react.py:实现ReAct(推理-行动)范式

    
        @abstractmethod
        async def think(self) -> bool:
            """Process current state and decide next action"""
    	#决定是否下一步
        @abstractmethod
        async def act(self) -> str:
            """Execute decided actions"""
    
        async def step(self) -> str:
            """Execute a single step: think and act."""
            should_act = await self.think()
            if not should_act:
                return "Thinking complete - no action needed"
            return await self.act()
    
  • app/agent/toolcall.py:扩展工具调用能力

    class ToolCallAgent(ReActAgent):
        """Base agent class for handling tool/function calls with enhanced abstraction"""
    
        name: str = "toolcall"
        description: str = "an agent that can execute tool calls."
    
        system_prompt: str = SYSTEM_PROMPT
        next_step_prompt: str = NEXT_STEP_PROMPT
    	#工具属性,和大模型交互
        available_tools: ToolCollection = ToolCollection(
            CreateChatCompletion(), Terminate()
        )
        #available_tools: 定义了这个 Agent 能使用哪些工具。这里默认有两个:CreateChatCompletion (可能用于生成更长的文本) 和 Terminate (用于结束任务)。ToolCollection 是一个管理工具集合的辅助类。
        tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO  # type: ignore
        #它控制了 LLM 在调用工具时的行为模式,可以是 AUTO (LLM 自行决定是否调用工具), REQUIRED (LLM 必须调用工具), 或 NONE (LLM 禁止调用工具)。
        special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])
    
        #状态属性:act时用
        tool_calls: List[ToolCall] = Field(default_factory=list)
        _current_base64_image: Optional[str] = None
    
        max_steps: int = 30
        max_observe: Optional[Union[int, bool]] = None
    
        async def think(self) -> bool:
            """Process current state and decide next actions using tools"""
            if self.next_step_prompt:
                user_msg = Message.user_message(self.next_step_prompt)
                self.messages += [user_msg]
    
            try:
                # Get response with tool options
                response = await self.llm.ask_tool(
                    messages=self.messages,
                    system_msgs=(
                        [Message.system_message(self.system_prompt)]
                        if self.system_prompt
                        else None
                    ),
                    tools=self.available_tools.to_params(),
                    tool_choice=self.tool_choices,
                )
            except ValueError:
                raise
            except Exception as e:
                # Check if this is a RetryError containing TokenLimitExceeded
                if hasattr(e, "__cause__") and isinstance(e.__cause__, TokenLimitExceeded):
                    token_limit_error = e.__cause__
                    logger.error(
                        f"🚨 Token limit error (from RetryError): {token_limit_error}"
                    )
                    self.memory.add_message(
                        Message.assistant_message(
                            f"Maximum token limit reached, cannot continue execution: {str(token_limit_error)}"
                        )
                    )
                    self.state = AgentState.FINISHED
                    return False
                raise
    
            self.tool_calls = tool_calls = (
                response.tool_calls if response and response.tool_calls else []
            )
            content = response.content if response and response.content else ""
    
            # Log response info
            logger.info(f"✨ {self.name}'s thoughts: {content}")
            logger.info(
                f"🛠️ {self.name} selected {len(tool_calls) if tool_calls else 0} tools to use"
            )
            if tool_calls:
                logger.info(
                    f"🧰 Tools being prepared: {[call.function.name for call in tool_calls]}"
                )
                logger.info(f"🔧 Tool arguments: {tool_calls[0].function.arguments}")
    
            try:
                if response is None:
                    raise RuntimeError("No response received from the LLM")
    
                # Handle different tool_choices modes
                if self.tool_choices == ToolChoice.NONE:
                    if tool_calls:
                        logger.warning(
                            f"🤔 Hmm, {self.name} tried to use tools when they weren't available!"
                        )
                    if content:
                        self.memory.add_message(Message.assistant_message(content))
                        return True
                    return False
    
                # Create and add assistant message
                assistant_msg = (
                    Message.from_tool_calls(content=content, tool_calls=self.tool_calls)
                    if self.tool_calls
                    else Message.assistant_message(content)
                )
                self.memory.add_message(assistant_msg)
    
                if self.tool_choices == ToolChoice.REQUIRED and not self.tool_calls:
                    return True  # Will be handled in act()
    
                # For 'auto' mode, continue with content if no commands but content exists
                if self.tool_choices == ToolChoice.AUTO and not self.tool_calls:
                    return bool(content)
    
                return bool(self.tool_calls)
            except Exception as e:
                logger.error(f"🚨 Oops! The {self.name}'s thinking process hit a snag: {e}")
                self.memory.add_message(
                    Message.assistant_message(
                        f"Error encountered while processing: {str(e)}"
                    )
                )
                return False
    

    这是 Agent 的“思考”阶段,是整个决策过程的核心。

    1. 准备输入: 如果有 next_step_prompt,将其作为新的用户消息添加到对话历史中,以指导 LLM 的下一步思考。
    2. 调用LLM: 调用 self.llm.ask_tool 方法。这是一个专门为工具调用设计的 LLM 接口,它会将对话历史、系统提示、可用工具列表 (self.available_tools) 和工具选择模式 (self.tool_choices) 一起发送给 LLM。
    3. 错误处理: 健壮地处理各种异常,特别是 TokenLimitExceeded,如果 token 超限,会记录错误并结束 Agent。
    4. 解析响应: 从 LLM 的响应中提取出 tool_calls(计划执行的工具列表)和 content(纯文本思考内容)。
    5. 记录日志: 打印详细的日志,包括 LLM 的思考、选择了哪些工具、以及工具的参数,这对于调试非常重要。
    6. 处理不同模式: 根据 self.tool_choices 的值(NONE, REQUIRED, AUTO)进行逻辑判断。例如,在 NONE 模式下,如果 LLM 仍然尝试调用工具,会发出警告。
    7. 更新记忆: 将 LLM 的响应(包括思考内容和工具调用请求)作为一条“助手”消息添加到记忆中。
    8. 返回决策: 返回一个布尔值,True 表示有工具需要执行(act 阶段应该被调用),False 则表示没有。
        async def act(self) -> str:
            """Execute tool calls and handle their results"""
            if not self.tool_calls:
                if self.tool_choices == ToolChoice.REQUIRED:
                    raise ValueError(TOOL_CALL_REQUIRED)
    
                # Return last message content if no tool calls
                return self.messages[-1].content or "No content or commands to execute"
    
            results = []
            for command in self.tool_calls:
                # Reset base64_image for each tool call
                self._current_base64_image = None
    
                result = await self.execute_tool(command)
    
                if self.max_observe:
                    result = result[: self.max_observe]
    
                logger.info(
                    f"🎯 Tool '{command.function.name}' completed its mission! Result: {result}"
                )
    
                # Add tool response to memory
                tool_msg = Message.tool_message(
                    content=result,
                    tool_call_id=command.id,
                    name=command.function.name,
                    base64_image=self._current_base64_image,
                )
                self.memory.add_message(tool_msg)
                results.append(result)
    
            return "\n\n".join(results)
    

    act 方法是代理的“执行器”。它接收 LLM 在 think 阶段的决策(工具调用),然后实际地去执行这些工具,收集它们的输出,并将这些输出反馈回代理的记忆中。这样就形成了一个完整的“思考-行动-观察”循环,

    1. 检查前提: 检查 self.tool_calls 列表是否为空。如果为空,根据 tool_choices 模式决定是报错还是直接返回。
    2. 迭代执行: 遍历 think 阶段生成的所有 tool_calls。
    3. 执行单个工具: 对每个 command(即一个 ToolCall 对象),调用 self.execute_tool 来实际运行它。
    4. 处理结果: 获取工具执行结果,并根据 max_observe 对结果进行截断。
    5. 更新记忆: 将每个工具的执行结果包装成一个 tool_message(包含结果内容、对应的 tool_call_id 等),并添加到记忆中。这对于 LLM 在下一步思考时理解上一步行动的结果至关重要。
    6. 汇总返回: 将所有工具的执行结果合并成一个字符串返回。这个字符串可以作为当前 step 的最终结果。
        async def execute_tool(self, command: ToolCall) -> str:
            """Execute a single tool call with robust error handling"""
            if not command or not command.function or not command.function.name:
                return "Error: Invalid command format"
    
            name = command.function.name
            if name not in self.available_tools.tool_map:
                return f"Error: Unknown tool '{name}'"
    
            try:
                # Parse arguments
                args = json.loads(command.function.arguments or "{}")
    
                # Execute the tool
                logger.info(f"🔧 Activating tool: '{name}'...")
                result = await self.available_tools.execute(name=name, tool_input=args)
    
                # Handle special tools
                await self._handle_special_tool(name=name, result=result)
    
                # Check if result is a ToolResult with base64_image
                if hasattr(result, "base64_image") and result.base64_image:
                    # Store the base64_image for later use in tool_message
                    self._current_base64_image = result.base64_image
    
                # Format result for display (standard case)
                observation = (
                    f"Observed output of cmd `{name}` executed:\n{str(result)}"
                    if result
                    else f"Cmd `{name}` completed with no output"
                )
    
                return observation
            except json.JSONDecodeError:
                error_msg = f"Error parsing arguments for {name}: Invalid JSON format"
                logger.error(
                    f"📝 Oops! The arguments for '{name}' don't make sense - invalid JSON, arguments:{command.function.arguments}"
                )
                return f"Error: {error_msg}"
            except Exception as e:
                error_msg = f"⚠️ Tool '{name}' encountered a problem: {str(e)}"
                logger.exception(error_msg)
                return f"Error: {error_msg}"
    

    act是确定要运行哪个工具,exe是具体执行。 这是一个更底层的、负责执行单个工具调用的辅助方法。

    1. 验证: 检查工具名称是否存在于 available_tools 中。
    2. 解析参数: LLM 返回的工具参数通常是 JSON 格式的字符串。此方法使用 json.loads 将其解析为 Python 字典。
    3. 执行: 调用 self.available_tools.execute() 方法,传入工具名称和解析后的参数来真正运行工具。
    4. 特殊处理: 调用 _handle_special_tool 来检查当前工具是否需要触发特殊逻辑(如结束 Agent)。
    5. 结果格式化: 将工具的原始返回结果包装成一个对 LLM 更友好的“观察(Observation)”字符串。
    6. 错误处理: 捕获所有可能的异常(如 JSON 解析错误、工具执行本身抛出的错误),并将错误信息格式化为字符串返回。这确保了即使某个工具失败,Agent 也能继续运行而不会崩溃。
        async def cleanup(self):
            """Clean up resources used by the agent's tools."""
            logger.info(f"🧹 Cleaning up resources for agent '{self.name}'...")
            for tool_name, tool_instance in self.available_tools.tool_map.items():
                if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction(
                    tool_instance.cleanup
                ):
                    try:
                        logger.debug(f"🧼 Cleaning up tool: {tool_name}")
                        await tool_instance.cleanup()
                    except Exception as e:
                        logger.error(
                            f"🚨 Error cleaning up tool '{tool_name}': {e}", exc_info=True
                        )
            logger.info(f"✨ Cleanup complete for agent '{self.name}'.")
    
        async def run(self, request: Optional[str] = None) -> str:
            """Run the agent with cleanup when done."""
            try:
                return await super().run(request)
            finally:
                await self.cleanup()
    

    这里我觉得run是多余的,既然 think()act() 里已经实现了工具调用逻辑,run应该写到更高抽象层去。

    但是,仔细考虑:

    1.为什么写 run()?

    不是为了调用工具,而是为了在当前这一层加入统一生命周期末尾执行清理。可以在更高抽象层继承

    2.是否职责冲突?

    没有,run 只是封装流程起止点,逻辑控制权还在 think()/act()。

  • app/agent/manus.py:主智能体(集成MCP客户端和内置工具)

    @model_validator(mode="after")
    def initialize_helper(self) -> "Manus":
        """Initialize basic components synchronously."""
        self.browser_context_helper = BrowserContextHelper(self)
        return self

@model_validator(mode="after"):

这是一个 Pydantic 库(用于数据验证和设置)提供的装饰器。mode="after" 表示这个验证器(或方法)会在模型的所有字段都被解析和验证之后运行。它接收模型实例作为输入,并期望返回一个修改后的模型实例。这意味着,当一个 Manus 对象被创建时,Pydantic 会首先处理 name, description, system_prompt 等字段的赋值。然后,一旦这些基础字段设置完毕,initialize_helper 方法就会被调用。

-> "Manus" 是 Python 的类型提示 (Type Hint)。它表示这个方法 initialize_helper 期望返回一个类型为 Manus 的对象。

为什么是字符串 "Manus" 而不是直接 Manus

这被称为前向引用 (Forward Reference)。在 Python 中,当你在一个类的方法内部引用同一个类时,如果这个类还没有完全定义(例如,在类定义的过程中),直接使用类名可能会导致 NameError

通过将类型名用引号括起来,Python 会将其视为一个字符串。类型检查器(如 MyPy)会在稍后(当所有类都已定义时)解析这个字符串,从而避免了循环引用或未定义名称的问题。

   @classmethod
    async def create(cls, **kwargs) -> "Manus":
        """Factory method to create and properly initialize a Manus instance."""
        instance = cls(**kwargs)
        await instance.initialize_mcp_servers()
        instance._initialized = True
        return instance

为什么在create中不初始化initialize_helper()方法,反而要放在之前加入 @model_validator(mode=“after”)初始化,造成分裂

把所有初始化逻辑收敛到 create(),明确定义构造流程可能更好一点

    async def connect_mcp_server(
        self,
        server_url: str,
        server_id: str = "",
        use_stdio: bool = False,
        stdio_args: List[str] = None,
    ) -> None:
        """Connect to an MCP server and add its tools."""
        if use_stdio:
            await self.mcp_clients.connect_stdio(
                server_url, stdio_args or [], server_id
            )
            self.connected_servers[server_id or server_url] = server_url
        else:
            await self.mcp_clients.connect_sse(server_url, server_id)
            self.connected_servers[server_id or server_url] = server_url

        # Update available tools with only the new tools from this server
        new_tools = [
            tool for tool in self.mcp_clients.tools if tool.server_id == server_id
        ]
        self.available_tools.add_tools(*new_tools)

    async def disconnect_mcp_server(self, server_id: str = "") -> None:
        """Disconnect from an MCP server and remove its tools."""
        await self.mcp_clients.disconnect(server_id)
        if server_id:
            self.connected_servers.pop(server_id, None)
        else:
            self.connected_servers.clear()

        # Rebuild available tools without the disconnected server's tools
        base_tools = [
            tool
            for tool in self.available_tools.tools
            if not isinstance(tool, MCPClientTool)
        ]
        self.available_tools = ToolCollection(*base_tools)
        self.available_tools.add_tools(*self.mcp_clients.tools)

使用 mcp_clients 对象与指定的服务器建立连接(通过 SSE 或标准输入/输出)。记录这个连接。从 mcp_clients 获取刚刚从新服务器上发现的工具,并将它们添加到 self.available_tools 中。这使得代理的“工具箱”动态扩展了。

断开与指定服务器的连接。移除连接记录。为了移除已断开服务器的工具,它首先筛选出所有非 MCP 的基础工具,然后用它们重新创建一个 ToolCollection,最后把仍然连接的 MCP 服务器的工具加回来。这确保了代理不会尝试使用一个已经无法访问的工具。

这种写法太麻烦,可以使用工具注册表 + 启用状态表,远比硬编码每次重建 ToolCollection 来得可维护、可扩展。

5. LLM交互层
  • app/llm.py:统一接口对接多种大模型(OpenAI/Azure/Bedrock),含:

    • 令牌计数
     def _calculate_high_detail_tokens(self, width: int, height: int) -> int:
            """Calculate tokens for high detail images based on dimensions"""
            # Step 1: Scale to fit in MAX_SIZE x MAX_SIZE square
            if width > self.MAX_SIZE or height > self.MAX_SIZE:
                scale = self.MAX_SIZE / max(width, height)
                width = int(width * scale)
                height = int(height * scale)
    
            # Step 2: Scale so shortest side is HIGH_DETAIL_TARGET_SHORT_SIDE
            scale = self.HIGH_DETAIL_TARGET_SHORT_SIDE / min(width, height)
            scaled_width = int(width * scale)
            scaled_height = int(height * scale)
    
            # Step 3: Count number of 512px tiles
            tiles_x = math.ceil(scaled_width / self.TILE_SIZE)
            tiles_y = math.ceil(scaled_height / self.TILE_SIZE)
            total_tiles = tiles_x * tiles_y
    
            # Step 4: Calculate final token count
            return (
                total_tiles * self.HIGH_DETAIL_TILE_TOKENS
            ) + self.LOW_DETAIL_IMAGE_TOKENS
    
        def count_content(self, content: Union[str, List[Union[str, dict]]]) -> int:
            """Calculate tokens for message content"""
            if not content:
                return 0
    
            if isinstance(content, str):
                return self.count_text(content)
    
            token_count = 0
            for item in content:
                if isinstance(item, str):
                    token_count += self.count_text(item)
                elif isinstance(item, dict):
                    if "text" in item:
                        token_count += self.count_text(item["text"])
                    elif "image_url" in item:
                        token_count += self.count_image(item)
            return token_count
    

    LLM 处理大图像时,并不是简单地按像素点数计算,而是将其分解为多个“块”(tiles),并对每个块收取一定的 token 费用。同时,还会有一个基础的 token 费用(低分辨率图像的 token 费用)。

    • 消息格式化
    • 工具调用处理
    • 重试机制
        @retry(
            wait=wait_random_exponential(min=1, max=60),
            stop=stop_after_attempt(6),
            retry=retry_if_exception_type(
                (OpenAIError, Exception, ValueError)
            ),  # Don't retry TokenLimitExceeded
        )
    

    @retry 装饰器作用的对象是一个函数或方法。它的核心作用是增强该函数的容错能力,使其在面对临时的、可恢复的错误(如网络抖动、服务器临时过载)时,能够自动、智能地重试,而不是直接失败。相当于:

    original_get_completion = get_completion  # 保存原始函数
    get_completion = retry(...)(original_get_completion) # 用 retry 返回的新函数替换原始函数
    
6. 数据模型
  • app/schema.py:定义核心数据结构:
    • Message/ToolCall/Memory
7. 工具框架
  • app/tool/base.py:抽象工具基类BaseTool和标准输出ToolResult
  • app/tool/tool_collection.py:工具集合管理器
class ToolCollection:
    """A collection of defined tools."""

    class Config:
        arbitrary_types_allowed = True

    def __init__(self, *tools: BaseTool):
        self.tools = tools
        self.tool_map = {tool.name: tool for tool in tools}

    def __iter__(self):
        return iter(self.tools)

    def to_params(self) -> List[Dict[str, Any]]:
        return [tool.to_param() for tool in self.tools]

    async def execute(
        self, *, name: str, tool_input: Dict[str, Any] = None
    ) -> ToolResult:
        tool = self.tool_map.get(name)
        if not tool:
            return ToolFailure(error=f"Tool {name} is invalid")
        try:
            result = await tool(**tool_input)
            return result
        except ToolError as e:
            return ToolFailure(error=e.message)

    async def execute_all(self) -> List[ToolResult]:
        """Execute all tools in the collection sequentially."""
        results = []
        for tool in self.tools:
            try:
                result = await tool()
                results.append(result)
            except ToolError as e:
                results.append(ToolFailure(error=e.message))
        return results
  1. execute 方法被调用时,它接收 nametool_input (一个字典,例如 {"text": "hello", "count": 2}).tool = self.tool_map.get(name) 检索到对应的 BaseTool,传入tool_input,BaseTool定义了__call__
class BaseTool(ABC, BaseModel):
    name: str
    description: str
    parameters: Optional[dict] = None

    class Config:
        arbitrary_types_allowed = True

    async def __call__(self, **kwargs) -> Any:
        """Execute the tool with given parameters."""
        return await self.execute(**kwargs)
  • app/tool/mcp.py:客户端MCP协议实现(远程工具代理)
    async def connect_stdio(
        self, command: str, args: List[str], server_id: str = ""
    ) -> None:
        """Connect to an MCP server using stdio transport."""
        if not command:
            raise ValueError("Server command is required.")

        server_id = server_id or command

        # Always ensure clean disconnection before new connection
        if server_id in self.sessions:
            await self.disconnect(server_id)

        exit_stack = AsyncExitStack()
        self.exit_stacks[server_id] = exit_stack

        server_params = StdioServerParameters(command=command, args=args)
        stdio_transport = await exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        read, write = stdio_transport
        session = await exit_stack.enter_async_context(ClientSession(read, write))
        self.sessions[server_id] = session

        await self._initialize_and_list_tools(server_id)

使用 stdio 连接”意味着,你的主程序(在这里是 Manus 代理所在的客户端程序)不会通过网络(如 HTTP 或 TCP/IP)去连接一个远程服务器,而是会**在本地启动另一个程序(作为“服务器”),并通过这个新程序的标准输入/输出管道与它进行双向通信。**只要它能读写标准输入/输出并遵循 MCP 协议, Python 客户端就可以与它无缝集成,

  • app/mcp/server.py:服务端MCP实现(暴露工具集)
  • 具体工具实现:
    • bash.py/python_execute.py
    • browser_use_tool.py/web_search.py
    • file_operators.py/ask_human.py/terminate.py
8. 沙箱环境
  • app/sandbox/client.py:基于Docker的安全代码执行环境
9. 异常处理
  • app/exceptions.py:定制异常类(如ToolError/TokenLimitExceeded