OpenManus

OpenManus安装

克隆仓库:

1
2
git clone https://github.com/FoundationAgents/OpenManus.git
cd OpenManus

创建并激活虚拟环境:

1
2
3
4
uv venv --python 3.12
source .venv/bin/activate # Unix/macOS 系统
# Windows 系统使用:
# .venv\Scripts\activate

安装依赖:

1
uv pip install -r requirements.txt

浏览器自动化工具(可选)

1
playwright install

配置说明

OpenManus 需要配置使用的 LLM API,请按以下步骤设置:

config 目录创建 config.toml 文件(可从示例复制):

1
cp config/config.example.toml config/config.toml

编辑 config/config.toml 添加 API 密钥和自定义设置:

使用OpenRouter api

1
2
3
4
5
6
7
8
9
10
11
12
13
# 全局 LLM 配置
[llm]
model = "gpt-4o"
base_url = "https://openrouter.ai/api/v1"
api_key = "sk-..." # 替换为真实 API 密钥
max_tokens = 4096
temperature = 0.0

# 可选特定 LLM 模型配置
[llm.vision]
model = "gpt-4o"
base_url = "https://api.openai.com/v1"
api_key = "sk-..." # 替换为真实 API 密钥

快速启动

一行命令运行 OpenManus:

1
python main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
(OpenManus) yang@Yangless:~/OpenManus$ python main.py
INFO [browser_use] BrowserUse logging setup complete with level info
INFO [root] Anonymized telemetry enabled. See https://docs.browser-use.com/development/telemetry for more information.
Enter your prompt: 搜索最新的agent开源项目,安装star数进行排序保存到本地电脑桌面
2025-06-17 16:01:04.037 | WARNING | __main__:main:25 - Processing your request...
2025-06-17 16:01:04.038 | INFO | app.agent.base:run:140 - Executing step 1/20
2025-06-17 16:01:08.398 | INFO | app.llm:update_token_count:243 - Token usage: Input=1342, Completion=26, Cumulative Input=1342, Cumulative Completion=26, Total=1368, Cumulative Total=1368
2025-06-17 16:01:08.398 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:08.398 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:08.398 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:08.398 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"web_search","query":"latest open source agent projects sorted by stars"}
2025-06-17 16:01:08.399 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:10.786 | INFO | app.tool.web_search:_try_all_engines:299 - 🔎 Attempting search with Google...
2025-06-17 16:01:19.748 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Error: Browser action 'web_search' failed: RetryError[<Future at 0x7febd91cdc40 state=finished raised HTTPError>]
2025-06-17 16:01:19.748 | INFO | app.agent.base:run:140 - Executing step 2/20
2025-06-17 16:01:22.827 | INFO | app.llm:update_token_count:243 - Token usage: Input=1642, Completion=25, Cumulative Input=2984, Cumulative Completion=51, Total=1667, Cumulative Total=3035
2025-06-17 16:01:22.827 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:22.828 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:22.828 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:22.828 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"go_to_url","url":"https://github.com/trending"}
2025-06-17 16:01:22.828 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:27.384 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Navigated to https://github.com/trending
2025-06-17 16:01:27.385 | INFO | app.agent.base:run:140 - Executing step 3/20
2025-06-17 16:01:31.256 | INFO | app.llm:update_token_count:243 - Token usage: Input=1930, Completion=30, Cumulative Input=4914, Cumulative Completion=81, Total=1960, Cumulative Total=4995
2025-06-17 16:01:31.256 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:31.257 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:31.257 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['browser_use']
2025-06-17 16:01:31.257 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"action":"extract_content","goal":"Extract the list of trending open source agent projects sorted by stars"}
2025-06-17 16:01:31.257 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'browser_use'...
2025-06-17 16:01:35.837 | INFO | app.llm:update_token_count:243 - Token usage: Input=639, Completion=173, Cumulative Input=5553, Cumulative Completion=254, Total=812, Cumulative Total=5807
2025-06-17 16:01:35.837 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'browser_use' completed its mission! Result: Observed output of cmd `browser_use` executed:
Extracted from page:
{'text': '1. **OpenAI/ChatGPT** - 100,000 stars\n2. **LangChain/Agent** - 85,000 stars\n3. **Auto-GPT** - 75,000 stars\n4. **Microsoft/PromptFlow** - 70,000 stars\n5. **HuggingFace/Transformers** - 65,000 stars\n6. **DeepMind/AlphaZero** - 60,000 stars\n7. **Google/DeepDream** - 55,000 stars\n8. **Meta/AIResearch** - 50,000 stars\n9. **Anthropic/Claude** - 45,000 stars\n10. **StabilityAI/StableDiffusion** - 40,000 stars'}

2025-06-17 16:01:35.837 | INFO | app.agent.base:run:140 - Executing step 4/20
2025-06-17 16:01:41.932 | INFO | app.llm:update_token_count:243 - Token usage: Input=2380, Completion=167, Cumulative Input=7933, Cumulative Completion=421, Total=2547, Cumulative Total=8354
2025-06-17 16:01:41.932 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:41.933 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:41.933 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:41.933 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/Desktop/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:41.933 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:41.934 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
Error: Failed to write to /home/yang/Desktop/trending_open_source_agents.txt: [Errno 2] No such file or directory: '/home/yang/Desktop/trending_open_source_agents.txt'
2025-06-17 16:01:41.934 | INFO | app.agent.base:run:140 - Executing step 5/20
2025-06-17 16:01:46.325 | INFO | app.llm:update_token_count:243 - Token usage: Input=2687, Completion=170, Cumulative Input=10620, Cumulative Completion=591, Total=2857, Cumulative Total=11211
2025-06-17 16:01:46.326 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/OpenManus/Desktop/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:46.328 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
Error: Failed to write to /home/yang/OpenManus/Desktop/trending_open_source_agents.txt: [Errno 2] No such file or directory: '/home/yang/OpenManus/Desktop/trending_open_source_agents.txt'
2025-06-17 16:01:46.329 | INFO | app.agent.base:run:140 - Executing step 6/20
2025-06-17 16:01:50.439 | INFO | app.llm:update_token_count:243 - Token usage: Input=3003, Completion=171, Cumulative Input=13623, Cumulative Completion=762, Total=3174, Cumulative Total=14385
2025-06-17 16:01:50.440 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:50.440 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:50.440 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['str_replace_editor']
2025-06-17 16:01:50.440 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"command":"create","path":"/home/yang/OpenManus/workspace/trending_open_source_agents.txt","file_text":"1. OpenAI/ChatGPT - 100,000 stars\n2. LangChain/Agent - 85,000 stars\n3. Auto-GPT - 75,000 stars\n4. Microsoft/PromptFlow - 70,000 stars\n5. HuggingFace/Transformers - 65,000 stars\n6. DeepMind/AlphaZero - 60,000 stars\n7. Google/DeepDream - 55,000 stars\n8. Meta/AIResearch - 50,000 stars\n9. Anthropic/Claude - 45,000 stars\n10. StabilityAI/StableDiffusion - 40,000 stars"}
2025-06-17 16:01:50.441 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'str_replace_editor'...
2025-06-17 16:01:50.450 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'str_replace_editor' completed its mission! Result: Observed output of cmd `str_replace_editor` executed:
File created successfully at: /home/yang/OpenManus/workspace/trending_open_source_agents.txt
2025-06-17 16:01:50.450 | INFO | app.agent.base:run:140 - Executing step 7/20
2025-06-17 16:01:53.775 | INFO | app.llm:update_token_count:243 - Token usage: Input=3292, Completion=81, Cumulative Input=16915, Cumulative Completion=843, Total=3373, Cumulative Total=17758
2025-06-17 16:01:53.775 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts: I have successfully created a file named `trending_open_source_agents.txt` in the `/home/yang/OpenManus/workspace` directory. This file contains the list of trending open source agent projects sorted by stars, as extracted from GitHub.

If you need to move this file to a different location, such as your desktop, or if you have any other requests, please let me know!
2025-06-17 16:01:53.775 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 0 tools to use
2025-06-17 16:01:53.776 | INFO | app.agent.base:run:140 - Executing step 8/20
2025-06-17 16:01:55.080 | INFO | app.llm:update_token_count:243 - Token usage: Input=3454, Completion=14, Cumulative Input=20369, Cumulative Completion=857, Total=3468, Cumulative Total=21226
2025-06-17 16:01:55.081 | INFO | app.agent.toolcall:think:81 - ✨ Manus's thoughts:
2025-06-17 16:01:55.081 | INFO | app.agent.toolcall:think:82 - 🛠️ Manus selected 1 tools to use
2025-06-17 16:01:55.081 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['terminate']
2025-06-17 16:01:55.081 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"status":"success"}
2025-06-17 16:01:55.082 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'terminate'...
2025-06-17 16:01:55.082 | INFO | app.agent.toolcall:_handle_special_tool:217 - 🏁 Special tool 'terminate' has completed the task!
2025-06-17 16:01:55.082 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'terminate' completed its mission! Result: Observed output of cmd `terminate` executed:
The interaction has been completed with status: success
2025-06-17 16:01:55.320 | INFO | app.tool.mcp:disconnect:194 - Disconnected from all MCP servers
2025-06-17 16:01:55.321 | INFO | __main__:main:27 - Request processing completed.

结果:

1
2
3
4
5
6
7
8
9
10
11
yang@Yangless:~/OpenManus/workspace$ cat trending_open_source_agents.txt
1. OpenAI/ChatGPT - 100,000 stars
2. LangChain/Agent - 85,000 stars
3. Auto-GPT - 75,000 stars
4. Microsoft/PromptFlow - 70,000 stars
5. HuggingFace/Transformers - 65,000 stars
6. DeepMind/AlphaZero - 60,000 stars
7. Google/DeepDream - 55,000 stars
8. Meta/AIResearch - 50,000 stars
9. Anthropic/Claude - 45,000 stars
10. StabilityAI/StableDiffusion - 40,000 stars

如需使用 MCP 工具版本,可运行:

1
python run_mcp.py

如需体验不稳定的多智能体版本,可运行:

1
python run_flow.py

添加自定义多智能体

目前除了通用的 OpenManus Agent, 我们还内置了DataAnalysis Agent,适用于数据分析和数据可视化任务,你可以在config.toml中将这个智能体加入到run_flow

1
2
3
# run-flow可选配置
[runflow]
use_data_analysis_agent = true # 默认关闭,将其改为true则为激活

图表可视化工具

图表可视化工具,通过python生成数据处理代码,最终调用@visactor/vmind得到图表的spec结果,图表渲染使用@visactor/vchart

安装node >= 18

1
2
3
4
5
6
7
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash
# 激活nvm,以Bash为例
source ~/.bashrc
# 然后安装 Node 最近一个稳定颁布
nvm install node
# 激活使用,最新一个稳定颁布为24,则use 24
nvm use 24

安装依赖

1
2
cd app/tool/chart_visualization
npm install
简单图表生成任务

给予数据和具体的图表生成需求,测试结果,执行命令:

1
python -m app.tool.chart_visualization.test.chart_demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
2025-06-17 16:51:08.423 | INFO     | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: To generate the chart showing the popularity of search keywords, we will follow a structured approach. Here's how we can break down the task:

### Step 1: Prepare the Data for Visualization
We need to prepare the data for visualization by creating a CSV file and a JSON description for the chart. This will involve transforming the provided data into a format suitable for visualization.

### Step 2: Visualize the Data
Once the data is prepared, we will generate the chart using the prepared CSV and JSON files.

### Step 3: Review and Save the Chart
Finally, we will review the generated chart and ensure it is saved locally.

Let's start with Step 1.

#### Step 1: Prepare the Data for Visualization
We will use the `visualization_preparation` tool to create a CSV file and a JSON description for the chart.

```plaintext
Data:
Keyword,Popularity
Hot Word,1000
Zao Le Wo Men,800
Rao Jian Huo,400
My Wish is World Peace,400
Xiu Xiu Xiu,400
Shenzhou 11,400
Hundred Birds Facing the Wind,400
China Women's Volleyball Team,400
My Guan Na,400
Leg Dong,400
Hot Pot Hero,400
Baby's Heart is Bitter,400
Olympics,400
Awesome My Brother,400
Poetry and Distance,400
Song Joong-ki,400
PPAP,400
Blue Thin Mushroom,400
Rain Dew Evenly,400
Friendship's Little Boat Says It Flips,400
Beijing Slump,400
Dedication,200
Apple,200
Dog Belt,200
Old Driver,200
Melon-Eating Crowd,200
Zootopia,200
City Will Play,200
Routine,200
Water Reverse,200
Why Don't You Go to Heaven,200
Snake Spirit Man,200
Why Don't You Go to Heaven,200
Samsung Explosion Gate,200
Little Li Oscar,200
Ugly People Need to Read More,200
Boyfriend Power,200
A Face of Confusion,200
Descendants of the Sun,200
```

Let's proceed with the preparation.
2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:think:82 - 🛠️ Data_Analysis selected 1 tools to use
2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['visualization_preparation']
2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"code_type":"visualization","code":"import pandas as pd\nimport json\n\n# Data preparation\nkeywords_data = {\n 'Keyword': [\n 'Hot Word', 'Zao Le Wo Men', 'Rao Jian Huo', 'My Wish is World Peace', 'Xiu Xiu Xiu',\n 'Shenzhou 11', 'Hundred Birds Facing the Wind', 'China Women\\'s Volleyball Team', 'My Guan Na',\n 'Leg Dong', 'Hot Pot Hero', 'Baby\\'s Heart is Bitter', 'Olympics', 'Awesome My Brother',\n 'Poetry and Distance', 'Song Joong-ki', 'PPAP', 'Blue Thin Mushroom', 'Rain Dew Evenly',\n 'Friendship\\'s Little Boat Says It Flips', 'Beijing Slump', 'Dedication', 'Apple', 'Dog Belt',\n 'Old Driver', 'Melon-Eating Crowd', 'Zootopia', 'City Will Play', 'Routine', 'Water Reverse',\n 'Why Don\\'t You Go to Heaven', 'Snake Spirit Man', 'Why Don\\'t You Go to Heaven',\n 'Samsung Explosion Gate', 'Little Li Oscar', 'Ugly People Need to Read More', 'Boyfriend Power',\n 'A Face of Confusion', 'Descendants of the Sun'\n ],\n 'Popularity': [\n 1000, 800, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400,\n 400, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200\n ]\n}\n\n# Create DataFrame\nkeywords_df = pd.DataFrame(keywords_data)\n\n# Save to CSV\ncsv_file_path = '/home/yang/OpenManus/workspace/keywords_popularity.csv'\nkeywords_df.to_csv(csv_file_path, index=False)\n\n# Prepare JSON description\nchart_description = {\n 'csvFilePath': csv_file_path,\n 'chartTitle': 'Popularity of Search Keywords'\n}\n\n# Save JSON\njson_file_path = '/home/yang/OpenManus/workspace/keywords_chart_description.json'\nwith open(json_file_path, 'w', encoding='utf-8') as json_file:\n json.dump([chart_description], json_file)\n\nprint(json_file_path)"}
2025-06-17 16:51:08.423 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'visualization_preparation'...
2025-06-17 16:51:08.439 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'visualization_preparation' completed its mission! Result: Observed output of cmd `visualization_preparation` executed:
{'observation': 'All arrays must be of the same length', 'success': False}
2025-06-17 16:51:08.439 | INFO | app.agent.base:run:140 - Executing step 2/20
2025-06-17 16:51:18.166 | INFO | app.llm:update_token_count:243 - Token usage: Input=2396, Completion=714, Cumulative Input=133595, Cumulative Completion=12838, Total=3110, Cumulative Total=146433
2025-06-17 16:51:18.167 | INFO | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: The error encountered indicates that there is a mismatch in the lengths of the arrays used to create the DataFrame. This is likely due to a duplicate entry in the 'Keyword' list. Let's address this issue and proceed with the task.

### Step 1: Correct the Data Preparation
We need to ensure that the lengths of the 'Keyword' and 'Popularity' lists match. Let's fix the data and prepare it again.

#### Corrected Data Preparation
We will remove the duplicate entry "Why Don't You Go to Heaven" to ensure the lists are of equal length.

Let's proceed with the corrected data preparation.
2025-06-17 16:51:18.168 | INFO | app.agent.toolcall:think:82 - 🛠️ Data_Analysis selected 1 tools to use
2025-06-17 16:51:18.168 | INFO | app.agent.toolcall:think:86 - 🧰 Tools being prepared: ['visualization_preparation']
2025-06-17 16:51:18.168 | INFO | app.agent.toolcall:think:89 - 🔧 Tool arguments: {"code_type":"visualization","code":"import pandas as pd\nimport json\n\n# Corrected data preparation\nkeywords_data = {\n 'Keyword': [\n 'Hot Word', 'Zao Le Wo Men', 'Rao Jian Huo', 'My Wish is World Peace', 'Xiu Xiu Xiu',\n 'Shenzhou 11', 'Hundred Birds Facing the Wind', 'China Women\\'s Volleyball Team', 'My Guan Na',\n 'Leg Dong', 'Hot Pot Hero', 'Baby\\'s Heart is Bitter', 'Olympics', 'Awesome My Brother',\n 'Poetry and Distance', 'Song Joong-ki', 'PPAP', 'Blue Thin Mushroom', 'Rain Dew Evenly',\n 'Friendship\\'s Little Boat Says It Flips', 'Beijing Slump', 'Dedication', 'Apple', 'Dog Belt',\n 'Old Driver', 'Melon-Eating Crowd', 'Zootopia', 'City Will Play', 'Routine', 'Water Reverse',\n 'Why Don\\'t You Go to Heaven', 'Snake Spirit Man', 'Samsung Explosion Gate', 'Little Li Oscar',\n 'Ugly People Need to Read More', 'Boyfriend Power', 'A Face of Confusion', 'Descendants of the Sun'\n ],\n 'Popularity': [\n 1000, 800, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400,\n 400, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200\n ]\n}\n\n# Create DataFrame\nkeywords_df = pd.DataFrame(keywords_data)\n\n# Save to CSV\ncsv_file_path = '/home/yang/OpenManus/workspace/keywords_popularity.csv'\nkeywords_df.to_csv(csv_file_path, index=False)\n\n# Prepare JSON description\nchart_description = {\n 'csvFilePath': csv_file_path,\n 'chartTitle': 'Popularity of Search Keywords'\n}\n\n# Save JSON\njson_file_path = '/home/yang/OpenManus/workspace/keywords_chart_description.json'\nwith open(json_file_path, 'w', encoding='utf-8') as json_file:\n json.dump([chart_description], json_file)\n\nprint(json_file_path)"}
2025-06-17 16:51:18.168 | INFO | app.agent.toolcall:execute_tool:180 - 🔧 Activating tool: 'visualization_preparation'...
2025-06-17 16:51:18.186 | INFO | app.agent.toolcall:act:150 - 🎯 Tool 'visualization_preparation' completed its mission! Result: Observed output of cmd `visualization_preparation` executed:
{'observation': 'All arrays must be of the same length', 'success': False}
2025-06-17 16:51:18.186 | INFO | app.agent.base:run:140 - Executing step 3/20
2025-06-17 16:51:27.931 | INFO | app.llm:update_token_count:243 - Token usage: Input=3225, Completion=730, Cumulative Input=136820, Cumulative Completion=13568, Total=3955, Cumulative Total=150388
2025-06-17 16:51:27.931 | INFO | app.agent.toolcall:think:81 - ✨ Data_Analysis's thoughts: The error persists, indicating that there might still be an issue with the data preparation. Let's carefully review the data and ensure that both lists ('Keyword' and 'Popularity') have the same number of elements.

### Step 1: Review and Correct the Data
Let's manually count the elements in both lists to ensure they match. If there are any discrepancies, we will correct them.

#### Data Review
- **Keywords List**: Contains 39 unique entries.
- **Popularity List**: Contains 39 entries.

Upon reviewing, it seems there was a mistake in the previous correction attempt. Let's ensure the lists are correctly aligned and retry the preparation.

Let's proceed with the corrected data preparation.

结果应位于worksapce\visualization下,涉及到9种不同的图表结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
yang@Yangless:~/OpenManus/app/tool/chart_visualization$ cat /home/yang/OpenManus/workspace/sales_data_export.csv
Date,Type,Value
2023-01-01,Product A,52.9
2023-01-01,Product B,63.6
2023-01-01,Product C,11.2
2023-01-02,Product A,45.7
2023-01-02,Product B,89.1
2023-01-02,Product C,21.4
2023-01-03,Product A,67.2
2023-01-03,Product B,82.4
2023-01-03,Product C,31.7
2023-01-04,Product A,80.7
2023-01-04,Product B,55.1
2023-01-04,Product C,21.1
2023-01-05,Product A,65.6
2023-01-05,Product B,78.0
2023-01-05,Product C,31.3
2023-01-06,Product A,75.6
2023-01-06,Product B,89.1
2023-01-06,Product C,63.5
2023-01-07,Product A,67.3
2023-01-07,Product B,77.2
2023-01-07,Product C,43.7
2023-01-08,Product A,96.1
2023-01-08,Product B,97.6
2023-01-08,Product C,59.9
2023-01-09,Product A,96.1
2023-01-09,Product B,100.6
2023-01-09,Product C,66.8
2023-01-10,Product A,101.6
2023-01-10,Product B,108.3
2023-01-10,Product C,56.9
简单数据报表任务

给予简单原始数据可分析需求,需要对数据进行简单加工处理,执行命令:

1
python -m app.tool.chart_visualization.test.report_demo

代码分析

概述

1. 应用入口
  • main.py:应用启动入口,负责初始化并运行Manus智能体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
async def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Run Manus agent with a prompt")
parser.add_argument(
"--prompt", type=str, required=False, help="Input prompt for the agent"
)
args = parser.parse_args()

# Create and initialize Manus agent
agent = await Manus.create()
try:
# Use command line prompt if provided, otherwise ask for input
prompt = args.prompt if args.prompt else input("Enter your prompt: ")
if not prompt.strip():
logger.warning("Empty prompt provided.")
return

logger.warning("Processing your request...")
await agent.run(prompt)
logger.info("Request processing completed.")
except KeyboardInterrupt:
logger.warning("Operation interrupted.")
finally:
# Ensure agent resources are cleaned up before exiting
await agent.cleanup()
2. 配置系统
  • app/config.py:基于Pydantic模型管理全局配置(LLM/浏览器/沙箱/MCP等),支持TOML/JSON格式加载

    各种设置:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    class LLMSettings(BaseModel):
    model: str = Field(..., description="Model name")
    base_url: str = Field(..., description="API base URL")
    api_key: str = Field(..., description="API key")
    max_tokens: int = Field(4096, description="Maximum number of tokens per request")
    max_input_tokens: Optional[int] = Field(
    None,
    description="Maximum input tokens to use across all requests (None for unlimited)",
    )
    temperature: float = Field(1.0, description="Sampling temperature")
    api_type: str = Field(..., description="Azure, Openai, or Ollama")
    api_version: str = Field(..., description="Azure Openai version if AzureOpenai")


    class ProxySettings(BaseModel):
    server: str = Field(None, description="Proxy server address")
    username: Optional[str] = Field(None, description="Proxy username")
    password: Optional[str] = Field(None, description="Proxy password")


    class SearchSettings(BaseModel):
    engine: str = Field(default="Google", description="Search engine the llm to use")
    fallback_engines: List[str] = Field(
    default_factory=lambda: ["DuckDuckGo", "Baidu", "Bing"],
    description="Fallback search engines to try if the primary engine fails",
    )
    retry_delay: int = Field(
    default=60,
    description="Seconds to wait before retrying all engines again after they all fail",
    )
    max_retries: int = Field(
    default=3,
    description="Maximum number of times to retry all engines when all fail",
    )
    lang: str = Field(
    default="en",
    description="Language code for search results (e.g., en, zh, fr)",
    )
    country: str = Field(
    default="us",
    description="Country code for search results (e.g., us, cn, uk)",
    )


    class RunflowSettings(BaseModel):
    use_data_analysis_agent: bool = Field(
    default=False, description="Enable data analysis agent in run flow"
    )


    class BrowserSettings(BaseModel):
    headless: bool = Field(False, description="Whether to run browser in headless mode")
    disable_security: bool = Field(
    True, description="Disable browser security features"
    )
    extra_chromium_args: List[str] = Field(
    default_factory=list, description="Extra arguments to pass to the browser"
    )
    chrome_instance_path: Optional[str] = Field(
    None, description="Path to a Chrome instance to use"
    )
    wss_url: Optional[str] = Field(
    None, description="Connect to a browser instance via WebSocket"
    )
    cdp_url: Optional[str] = Field(
    None, description="Connect to a browser instance via CDP"
    )
    proxy: Optional[ProxySettings] = Field(
    None, description="Proxy settings for the browser"
    )
    max_content_length: int = Field(
    2000, description="Maximum length for content retrieval operations"
    )

    加载config

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    class Config:
    _instance = None
    _lock = threading.Lock()
    _initialized = False

    def __new__(cls):
    if cls._instance is None:
    with cls._lock:
    if cls._instance is None:
    cls._instance = super().__new__(cls)
    return cls._instance

    def __init__(self):
    if not self._initialized:
    with self._lock:
    if not self._initialized:
    self._config = None
    self._load_initial_config()
    self._initialized = True

    @staticmethod
    def _get_config_path() -> Path:
    root = PROJECT_ROOT
    config_path = root / "config" / "config.toml"
    if config_path.exists():
    return config_path
    example_path = root / "config" / "config.example.toml"
    if example_path.exists():
    return example_path
    raise FileNotFoundError("No configuration file found in config directory")

    def _load_config(self) -> dict:
    config_path = self._get_config_path()
    with config_path.open("rb") as f:
    return tomllib.load(f)

    def _load_initial_config(self):
    raw_config = self._load_config()
    base_llm = raw_config.get("llm", {})
    llm_overrides = {
    k: v for k, v in raw_config.get("llm", {}).items() if isinstance(v, dict)
    }

    default_settings = {
    "model": base_llm.get("model"),
    "base_url": base_llm.get("base_url"),
    "api_key": base_llm.get("api_key"),
    "max_tokens": base_llm.get("max_tokens", 4096),
    "max_input_tokens": base_llm.get("max_input_tokens"),
    "temperature": base_llm.get("temperature", 1.0),
    "api_type": base_llm.get("api_type", ""),
    "api_version": base_llm.get("api_version", ""),
    }

    # handle browser config.
    browser_config = raw_config.get("browser", {})
    browser_settings = None

    if browser_config:
    # handle proxy settings.
    proxy_config = browser_config.get("proxy", {})
    proxy_settings = None

    if proxy_config and proxy_config.get("server"):
    proxy_settings = ProxySettings(
    **{
    k: v
    for k, v in proxy_config.items()
    if k in ["server", "username", "password"] and v
    }
    )

    # filter valid browser config parameters.
    valid_browser_params = {
    k: v
    for k, v in browser_config.items()
    if k in BrowserSettings.__annotations__ and v is not None
    }

    # if there is proxy settings, add it to the parameters.
    if proxy_settings:
    valid_browser_params["proxy"] = proxy_settings

    # only create BrowserSettings when there are valid parameters.
    if valid_browser_params:
    browser_settings = BrowserSettings(**valid_browser_params)

    search_config = raw_config.get("search", {})
    search_settings = None
    if search_config:
    search_settings = SearchSettings(**search_config)
    sandbox_config = raw_config.get("sandbox", {})
    if sandbox_config:
    sandbox_settings = SandboxSettings(**sandbox_config)
    else:
    sandbox_settings = SandboxSettings()

    mcp_config = raw_config.get("mcp", {})
    mcp_settings = None
    if mcp_config:
    # Load server configurations from JSON
    mcp_config["servers"] = MCPSettings.load_server_config()
    mcp_settings = MCPSettings(**mcp_config)
    else:
    mcp_settings = MCPSettings(servers=MCPSettings.load_server_config())

    run_flow_config = raw_config.get("runflow")
    if run_flow_config:
    run_flow_settings = RunflowSettings(**run_flow_config)
    else:
    run_flow_settings = RunflowSettings()
    config_dict = {
    "llm": {
    "default": default_settings,
    **{
    name: {**default_settings, **override_config}
    for name, override_config in llm_overrides.items()
    },
    },
    "sandbox": sandbox_settings,
    "browser_config": browser_settings,
    "search_config": search_settings,
    "mcp_config": mcp_settings,
    "run_flow_config": run_flow_settings,
    }

    self._config = AppConfig(**config_dict)
3. 日志系统
  • app/logger.py:采用Loguru实现的集中式日志管理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    def define_log_level(print_level="INFO", logfile_level="DEBUG", name: str = None):
    """Adjust the log level to above level"""
    global _print_level
    _print_level = print_level

    current_date = datetime.now()
    formatted_date = current_date.strftime("%Y%m%d%H%M%S")
    log_name = (
    f"{name}_{formatted_date}" if name else formatted_date
    ) # name a log with prefix name

    _logger.remove()
    _logger.add(sys.stderr, level=print_level)
    _logger.add(PROJECT_ROOT / f"logs/{log_name}.log", level=logfile_level)
    return _logger


    logger = define_log_level()
4. 智能体层级架构
1
BaseAgent → ReActAgent → ToolCallAgent → Manus
  • app/agent/base.py:所有智能体的抽象基类(生命周期/状态管理)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from abc import ABC, abstractmethod   
    #抽象类接口,代码健壮
    from contextlib import asynccontextmanager
    #异步上下文管理释放
    from typing import List, Optional
    #明确了变量、函数参数和返回值的预期类型,使代码更容易理解。
    from pydantic import BaseModel, Field, model_validator
    #langchain核心组件
    from app.llm import LLM
    from app.logger import logger
    from app.sandbox.client import SANDBOX_CLIENT
    from app.schema import ROLE_TYPE, AgentState, Memory, Message
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    #这部分定义了 BaseAgent 类的骨架和所有智能体都应具备的通用属性。
    class BaseAgent(BaseModel, ABC):
    """Abstract base class for managing agent state and execution.

    Provides foundational functionality for state transitions, memory management,
    and a step-based execution loop. Subclasses must implement the `step` method.
    """

    # Core attributes
    name: str = Field(..., description="Unique name of the agent")
    description: Optional[str] = Field(None, description="Optional agent description")

    # Prompts
    system_prompt: Optional[str] = Field(
    None, description="System-level instruction prompt"
    )
    next_step_prompt: Optional[str] = Field(
    None, description="Prompt for determining next action"
    )

    # Dependencies
    llm: LLM = Field(default_factory=LLM, description="Language model instance")
    memory: Memory = Field(default_factory=Memory, description="Agent's memory store")
    state: AgentState = Field(
    default=AgentState.IDLE, description="Current agent state"
    )

    # Execution control
    max_steps: int = Field(default=10, description="Maximum steps before termination")
    current_step: int = Field(default=0, description="Current step in execution")

    duplicate_threshold: int = 2

    class Config:
    arbitrary_types_allowed = True
    extra = "allow" # Allow extra fields for flexibility in subclasses
    #使得模型在接收数据时能够容忍并存储未在模型中明确定义的额外字段。这在设计可扩展的基类(如 BaseAgent)时非常有用,允许子类或外部系统传递更多自定义参数,而不会导致验证错误。
    #允许模型字段包含 Pydantic 无法直接验证的自定义对象实例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    #初始化
    @model_validator(mode="after")
    def initialize_agent(self) -> "BaseAgent":
    """Initialize agent with default settings if not provided."""
    if self.llm is None or not isinstance(self.llm, LLM):
    self.llm = LLM(config_name=self.name.lower())
    if not isinstance(self.memory, Memory):
    self.memory = Memory()
    return self
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    #安全地管理智能体的状态
    @asynccontextmanager
    async def state_context(self, new_state: AgentState):
    """Context manager for safe agent state transitions."""
    # ...
    previous_state = self.state
    self.state = new_state
    try:
    yield
    except Exception as e:
    self.state = AgentState.ERROR
    raise e
    finally:
    self.state = previous_state

    在 Python 中,yield 是用来定义**生成器(generator)异步生成器(async generator)**的关键字。

    它的本质是:暂停函数执行,把控制权和当前状态交还给调用者,下次可以从这里恢复执行。

    举例:

    1
    2
    3
    4
    5
    async def something():
    async with agent.state_context(AgentState.RUNNING):
    # 这就是 async with 块内部的代码
    await agent.do_something()
    print("在这个状态下完成某些操作")

    1.Python 运行 async with agent.state_context(…):

    ​ 进入 state_context 函数;

    ​ 执行到 yield 前的所有代码(如状态切换);

    2.遇到 yield:

    ​ 暂停 state_context 函数;

    ​ 跳出函数,把控制权交给 async with 块内部的代码执行;

    ​ 即此时执行 await agent.do_something(),再执行 print(…);

    3.async with 块执行完(无论是否抛异常):

    ​ 程序回到 state_context 中,从 yield 之后恢复执行;

    ​ 进入 except 或 finally 来清理现场、还原状态。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    #更新记忆
    def update_memory(
    self,
    role: ROLE_TYPE, # type: ignore
    content: str,
    base64_image: Optional[str] = None,
    **kwargs,
    ) -> None:
    """Add a message to the agent's memory.

    Args:
    role: The role of the message sender (user, system, assistant, tool).
    content: The message content.
    base64_image: Optional base64 encoded image.
    **kwargs: Additional arguments (e.g., tool_call_id for tool messages).

    Raises:
    ValueError: If the role is unsupported.
    """
    message_map = {
    "user": Message.user_message,
    "system": Message.system_message,
    "assistant": Message.assistant_message,
    "tool": lambda content, **kw: Message.tool_message(content, **kw),
    }

    if role not in message_map:
    raise ValueError(f"Unsupported message role: {role}")

    # Create message with appropriate parameters based on role
    kwargs = {"base64_image": base64_image, **(kwargs if role == "tool" else {})}
    self.memory.add_message(message_map[role](content, **kwargs))
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    #异步执行
    async def run(self, request: Optional[str] = None) -> str:
    """Execute the agent's main loop asynchronously.

    Args:
    request: Optional initial user request to process.

    Returns:
    A string summarizing the execution results.

    Raises:
    RuntimeError: If the agent is not in IDLE state at start.
    """
    #处于空闲
    if self.state != AgentState.IDLE:
    raise RuntimeError(f"Cannot run agent from state: {self.state}")

    if request:
    self.update_memory("user", request)

    results: List[str] = []
    async with self.state_context(AgentState.RUNNING):
    while (
    self.current_step < self.max_steps and self.state != AgentState.FINISHED
    ):
    self.current_step += 1
    logger.info(f"Executing step {self.current_step}/{self.max_steps}")
    step_result = await self.step()

    # Check for stuck state
    if self.is_stuck():
    self.handle_stuck_state()

    results.append(f"Step {self.current_step}: {step_result}")

    if self.current_step >= self.max_steps:
    self.current_step = 0
    self.state = AgentState.IDLE
    results.append(f"Terminated: Reached max steps ({self.max_steps})")
    await SANDBOX_CLIENT.cleanup()
    return "\n".join(results) if results else "No steps executed"

    run 是整个框架的“引擎”。它协调了 state, max_steps, current_step 等控制属性,调用了核心的 step 方法,并利用了 state_context 和 update_memory 等辅助方法,最后还与外部系统 SANDBOX_CLIENT 交互。SANDBOX_CLIENT充当docker在里面执行命令复制粘贴,不破坏宿主机

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    @abstractmethod
    async def step(self) -> str:
    """Execute a single step in the agent's workflow.

    Must be implemented by subclasses to define specific behavior.
    """

    def handle_stuck_state(self):
    """Handle stuck state by adding a prompt to change strategy"""
    stuck_prompt = "\
    Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
    self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
    logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")

    def is_stuck(self) -> bool:
    """Check if the agent is stuck in a loop by detecting duplicate content"""
    if len(self.memory.messages) < 2:
    return False

    last_message = self.memory.messages[-1]
    if not last_message.content:
    return False

    # Count identical content occurrences
    duplicate_count = sum(
    1
    for msg in reversed(self.memory.messages[:-1])
    if msg.role == "assistant" and msg.content == last_message.content
    )

    return duplicate_count >= self.duplicate_threshold

    这三个方法共同定义了智能体的单步行为和自我纠错机制,但是存在问题:

    1.重复检测只看内容(content),不考虑语义或相似度

    2.duplicate_threshold 太低容易误报,太高容易漏报

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @property
    def messages(self) -> List[Message]:
    """Retrieve a list of messages from the agent's memory."""
    return self.memory.messages

    @messages.setter
    def messages(self, value: List[Message]):
    """Set the list of messages in the agent's memory."""
    self.memory.messages = value

    访问和修改智能体的记忆消息列表,简化访问

  • app/agent/react.py:实现ReAct(推理-行动)范式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
       
    @abstractmethod
    async def think(self) -> bool:
    """Process current state and decide next action"""
    #决定是否下一步
    @abstractmethod
    async def act(self) -> str:
    """Execute decided actions"""

    async def step(self) -> str:
    """Execute a single step: think and act."""
    should_act = await self.think()
    if not should_act:
    return "Thinking complete - no action needed"
    return await self.act()
  • app/agent/toolcall.py:扩展工具调用能力

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    class ToolCallAgent(ReActAgent):
    """Base agent class for handling tool/function calls with enhanced abstraction"""

    name: str = "toolcall"
    description: str = "an agent that can execute tool calls."

    system_prompt: str = SYSTEM_PROMPT
    next_step_prompt: str = NEXT_STEP_PROMPT
    #工具属性,和大模型交互
    available_tools: ToolCollection = ToolCollection(
    CreateChatCompletion(), Terminate()
    )
    #available_tools: 定义了这个 Agent 能使用哪些工具。这里默认有两个:CreateChatCompletion (可能用于生成更长的文本) 和 Terminate (用于结束任务)。ToolCollection 是一个管理工具集合的辅助类。
    tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO # type: ignore
    #它控制了 LLM 在调用工具时的行为模式,可以是 AUTO (LLM 自行决定是否调用工具), REQUIRED (LLM 必须调用工具), 或 NONE (LLM 禁止调用工具)。
    special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])

    #状态属性:act时用
    tool_calls: List[ToolCall] = Field(default_factory=list)
    _current_base64_image: Optional[str] = None

    max_steps: int = 30
    max_observe: Optional[Union[int, bool]] = None
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    async def think(self) -> bool:
    """Process current state and decide next actions using tools"""
    if self.next_step_prompt:
    user_msg = Message.user_message(self.next_step_prompt)
    self.messages += [user_msg]

    try:
    # Get response with tool options
    response = await self.llm.ask_tool(
    messages=self.messages,
    system_msgs=(
    [Message.system_message(self.system_prompt)]
    if self.system_prompt
    else None
    ),
    tools=self.available_tools.to_params(),
    tool_choice=self.tool_choices,
    )
    except ValueError:
    raise
    except Exception as e:
    # Check if this is a RetryError containing TokenLimitExceeded
    if hasattr(e, "__cause__") and isinstance(e.__cause__, TokenLimitExceeded):
    token_limit_error = e.__cause__
    logger.error(
    f"🚨 Token limit error (from RetryError): {token_limit_error}"
    )
    self.memory.add_message(
    Message.assistant_message(
    f"Maximum token limit reached, cannot continue execution: {str(token_limit_error)}"
    )
    )
    self.state = AgentState.FINISHED
    return False
    raise

    self.tool_calls = tool_calls = (
    response.tool_calls if response and response.tool_calls else []
    )
    content = response.content if response and response.content else ""

    # Log response info
    logger.info(f"✨ {self.name}'s thoughts: {content}")
    logger.info(
    f"🛠️ {self.name} selected {len(tool_calls) if tool_calls else 0} tools to use"
    )
    if tool_calls:
    logger.info(
    f"🧰 Tools being prepared: {[call.function.name for call in tool_calls]}"
    )
    logger.info(f"🔧 Tool arguments: {tool_calls[0].function.arguments}")

    try:
    if response is None:
    raise RuntimeError("No response received from the LLM")

    # Handle different tool_choices modes
    if self.tool_choices == ToolChoice.NONE:
    if tool_calls:
    logger.warning(
    f"🤔 Hmm, {self.name} tried to use tools when they weren't available!"
    )
    if content:
    self.memory.add_message(Message.assistant_message(content))
    return True
    return False

    # Create and add assistant message
    assistant_msg = (
    Message.from_tool_calls(content=content, tool_calls=self.tool_calls)
    if self.tool_calls
    else Message.assistant_message(content)
    )
    self.memory.add_message(assistant_msg)

    if self.tool_choices == ToolChoice.REQUIRED and not self.tool_calls:
    return True # Will be handled in act()

    # For 'auto' mode, continue with content if no commands but content exists
    if self.tool_choices == ToolChoice.AUTO and not self.tool_calls:
    return bool(content)

    return bool(self.tool_calls)
    except Exception as e:
    logger.error(f"🚨 Oops! The {self.name}'s thinking process hit a snag: {e}")
    self.memory.add_message(
    Message.assistant_message(
    f"Error encountered while processing: {str(e)}"
    )
    )
    return False

    这是 Agent 的“思考”阶段,是整个决策过程的核心。

    1. 准备输入: 如果有 next_step_prompt,将其作为新的用户消息添加到对话历史中,以指导 LLM 的下一步思考。
    2. 调用LLM: 调用 self.llm.ask_tool 方法。这是一个专门为工具调用设计的 LLM 接口,它会将对话历史、系统提示、可用工具列表 (self.available_tools) 和工具选择模式 (self.tool_choices) 一起发送给 LLM。
    3. 错误处理: 健壮地处理各种异常,特别是 TokenLimitExceeded,如果 token 超限,会记录错误并结束 Agent。
    4. 解析响应: 从 LLM 的响应中提取出 tool_calls(计划执行的工具列表)和 content(纯文本思考内容)。
    5. 记录日志: 打印详细的日志,包括 LLM 的思考、选择了哪些工具、以及工具的参数,这对于调试非常重要。
    6. 处理不同模式: 根据 self.tool_choices 的值(NONE, REQUIRED, AUTO)进行逻辑判断。例如,在 NONE 模式下,如果 LLM 仍然尝试调用工具,会发出警告。
    7. 更新记忆: 将 LLM 的响应(包括思考内容和工具调用请求)作为一条“助手”消息添加到记忆中。
    8. 返回决策: 返回一个布尔值,True 表示有工具需要执行(act 阶段应该被调用),False 则表示没有。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    async def act(self) -> str:
    """Execute tool calls and handle their results"""
    if not self.tool_calls:
    if self.tool_choices == ToolChoice.REQUIRED:
    raise ValueError(TOOL_CALL_REQUIRED)

    # Return last message content if no tool calls
    return self.messages[-1].content or "No content or commands to execute"

    results = []
    for command in self.tool_calls:
    # Reset base64_image for each tool call
    self._current_base64_image = None

    result = await self.execute_tool(command)

    if self.max_observe:
    result = result[: self.max_observe]

    logger.info(
    f"🎯 Tool '{command.function.name}' completed its mission! Result: {result}"
    )

    # Add tool response to memory
    tool_msg = Message.tool_message(
    content=result,
    tool_call_id=command.id,
    name=command.function.name,
    base64_image=self._current_base64_image,
    )
    self.memory.add_message(tool_msg)
    results.append(result)

    return "\n\n".join(results)

    act 方法是代理的“执行器”。它接收 LLM 在 think 阶段的决策(工具调用),然后实际地去执行这些工具,收集它们的输出,并将这些输出反馈回代理的记忆中。这样就形成了一个完整的“思考-行动-观察”循环,

    1. 检查前提: 检查 self.tool_calls 列表是否为空。如果为空,根据 tool_choices 模式决定是报错还是直接返回。
    2. 迭代执行: 遍历 think 阶段生成的所有 tool_calls。
    3. 执行单个工具: 对每个 command(即一个 ToolCall 对象),调用 self.execute_tool 来实际运行它。
    4. 处理结果: 获取工具执行结果,并根据 max_observe 对结果进行截断。
    5. 更新记忆: 将每个工具的执行结果包装成一个 tool_message(包含结果内容、对应的 tool_call_id 等),并添加到记忆中。这对于 LLM 在下一步思考时理解上一步行动的结果至关重要。
    6. 汇总返回: 将所有工具的执行结果合并成一个字符串返回。这个字符串可以作为当前 step 的最终结果。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    async def execute_tool(self, command: ToolCall) -> str:
    """Execute a single tool call with robust error handling"""
    if not command or not command.function or not command.function.name:
    return "Error: Invalid command format"

    name = command.function.name
    if name not in self.available_tools.tool_map:
    return f"Error: Unknown tool '{name}'"

    try:
    # Parse arguments
    args = json.loads(command.function.arguments or "{}")

    # Execute the tool
    logger.info(f"🔧 Activating tool: '{name}'...")
    result = await self.available_tools.execute(name=name, tool_input=args)

    # Handle special tools
    await self._handle_special_tool(name=name, result=result)

    # Check if result is a ToolResult with base64_image
    if hasattr(result, "base64_image") and result.base64_image:
    # Store the base64_image for later use in tool_message
    self._current_base64_image = result.base64_image

    # Format result for display (standard case)
    observation = (
    f"Observed output of cmd `{name}` executed:\n{str(result)}"
    if result
    else f"Cmd `{name}` completed with no output"
    )

    return observation
    except json.JSONDecodeError:
    error_msg = f"Error parsing arguments for {name}: Invalid JSON format"
    logger.error(
    f"📝 Oops! The arguments for '{name}' don't make sense - invalid JSON, arguments:{command.function.arguments}"
    )
    return f"Error: {error_msg}"
    except Exception as e:
    error_msg = f"⚠️ Tool '{name}' encountered a problem: {str(e)}"
    logger.exception(error_msg)
    return f"Error: {error_msg}"

    act是确定要运行哪个工具,exe是具体执行。
    这是一个更底层的、负责执行单个工具调用的辅助方法。

    1. 验证: 检查工具名称是否存在于 available_tools 中。
    2. 解析参数: LLM 返回的工具参数通常是 JSON 格式的字符串。此方法使用 json.loads 将其解析为 Python 字典。
    3. 执行: 调用 self.available_tools.execute() 方法,传入工具名称和解析后的参数来真正运行工具。
    4. 特殊处理: 调用 _handle_special_tool 来检查当前工具是否需要触发特殊逻辑(如结束 Agent)。
    5. 结果格式化: 将工具的原始返回结果包装成一个对 LLM 更友好的“观察(Observation)”字符串。
    6. 错误处理: 捕获所有可能的异常(如 JSON 解析错误、工具执行本身抛出的错误),并将错误信息格式化为字符串返回。这确保了即使某个工具失败,Agent 也能继续运行而不会崩溃。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    async def cleanup(self):
    """Clean up resources used by the agent's tools."""
    logger.info(f"🧹 Cleaning up resources for agent '{self.name}'...")
    for tool_name, tool_instance in self.available_tools.tool_map.items():
    if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction(
    tool_instance.cleanup
    ):
    try:
    logger.debug(f"🧼 Cleaning up tool: {tool_name}")
    await tool_instance.cleanup()
    except Exception as e:
    logger.error(
    f"🚨 Error cleaning up tool '{tool_name}': {e}", exc_info=True
    )
    logger.info(f"✨ Cleanup complete for agent '{self.name}'.")

    async def run(self, request: Optional[str] = None) -> str:
    """Run the agent with cleanup when done."""
    try:
    return await super().run(request)
    finally:
    await self.cleanup()

    这里我觉得run是多余的,既然 think()act() 里已经实现了工具调用逻辑,run应该写到更高抽象层去。

    但是,仔细考虑:

    1.为什么写 run()?

    不是为了调用工具,而是为了在当前这一层加入统一生命周期末尾执行清理。可以在更高抽象层继承

    2.是否职责冲突?

    没有,run 只是封装流程起止点,逻辑控制权还在 think()/act()。

  • app/agent/manus.py:主智能体(集成MCP客户端和内置工具)

1
2
3
4
5
@model_validator(mode="after")
def initialize_helper(self) -> "Manus":
"""Initialize basic components synchronously."""
self.browser_context_helper = BrowserContextHelper(self)
return self

@model_validator(mode="after"):

这是一个 Pydantic 库(用于数据验证和设置)提供的装饰器。mode="after" 表示这个验证器(或方法)会在模型的所有字段都被解析和验证之后运行。它接收模型实例作为输入,并期望返回一个修改后的模型实例。这意味着,当一个 Manus 对象被创建时,Pydantic 会首先处理 name, description, system_prompt 等字段的赋值。然后,一旦这些基础字段设置完毕,initialize_helper 方法就会被调用。

-> "Manus" 是 Python 的类型提示 (Type Hint)。它表示这个方法 initialize_helper 期望返回一个类型为 Manus 的对象。

为什么是字符串 "Manus" 而不是直接 Manus

这被称为前向引用 (Forward Reference)。在 Python 中,当你在一个类的方法内部引用同一个类时,如果这个类还没有完全定义(例如,在类定义的过程中),直接使用类名可能会导致 NameError

通过将类型名用引号括起来,Python 会将其视为一个字符串。类型检查器(如 MyPy)会在稍后(当所有类都已定义时)解析这个字符串,从而避免了循环引用或未定义名称的问题。

1
2
3
4
5
6
7
@classmethod
async def create(cls, **kwargs) -> "Manus":
"""Factory method to create and properly initialize a Manus instance."""
instance = cls(**kwargs)
await instance.initialize_mcp_servers()
instance._initialized = True
return instance

为什么在create中不初始化initialize_helper()方法,反而要放在之前加入 @model_validator(mode=”after”)初始化,造成分裂

把所有初始化逻辑收敛到 create(),明确定义构造流程可能更好一点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def connect_mcp_server(
self,
server_url: str,
server_id: str = "",
use_stdio: bool = False,
stdio_args: List[str] = None,
) -> None:
"""Connect to an MCP server and add its tools."""
if use_stdio:
await self.mcp_clients.connect_stdio(
server_url, stdio_args or [], server_id
)
self.connected_servers[server_id or server_url] = server_url
else:
await self.mcp_clients.connect_sse(server_url, server_id)
self.connected_servers[server_id or server_url] = server_url

# Update available tools with only the new tools from this server
new_tools = [
tool for tool in self.mcp_clients.tools if tool.server_id == server_id
]
self.available_tools.add_tools(*new_tools)

async def disconnect_mcp_server(self, server_id: str = "") -> None:
"""Disconnect from an MCP server and remove its tools."""
await self.mcp_clients.disconnect(server_id)
if server_id:
self.connected_servers.pop(server_id, None)
else:
self.connected_servers.clear()

# Rebuild available tools without the disconnected server's tools
base_tools = [
tool
for tool in self.available_tools.tools
if not isinstance(tool, MCPClientTool)
]
self.available_tools = ToolCollection(*base_tools)
self.available_tools.add_tools(*self.mcp_clients.tools)

使用 mcp_clients 对象与指定的服务器建立连接(通过 SSE 或标准输入/输出)。记录这个连接。从 mcp_clients 获取刚刚从新服务器上发现的工具,并将它们添加到 self.available_tools 中。这使得代理的“工具箱”动态扩展了。

断开与指定服务器的连接。移除连接记录。为了移除已断开服务器的工具,它首先筛选出所有非 MCP 的基础工具,然后用它们重新创建一个 ToolCollection,最后把仍然连接的 MCP 服务器的工具加回来。这确保了代理不会尝试使用一个已经无法访问的工具。

这种写法太麻烦,可以使用工具注册表 + 启用状态表,远比硬编码每次重建 ToolCollection 来得可维护、可扩展。

5. LLM交互层
  • app/llm.py:统一接口对接多种大模型(OpenAI/Azure/Bedrock),含:

    • 令牌计数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    def _calculate_high_detail_tokens(self, width: int, height: int) -> int:
    """Calculate tokens for high detail images based on dimensions"""
    # Step 1: Scale to fit in MAX_SIZE x MAX_SIZE square
    if width > self.MAX_SIZE or height > self.MAX_SIZE:
    scale = self.MAX_SIZE / max(width, height)
    width = int(width * scale)
    height = int(height * scale)

    # Step 2: Scale so shortest side is HIGH_DETAIL_TARGET_SHORT_SIDE
    scale = self.HIGH_DETAIL_TARGET_SHORT_SIDE / min(width, height)
    scaled_width = int(width * scale)
    scaled_height = int(height * scale)

    # Step 3: Count number of 512px tiles
    tiles_x = math.ceil(scaled_width / self.TILE_SIZE)
    tiles_y = math.ceil(scaled_height / self.TILE_SIZE)
    total_tiles = tiles_x * tiles_y

    # Step 4: Calculate final token count
    return (
    total_tiles * self.HIGH_DETAIL_TILE_TOKENS
    ) + self.LOW_DETAIL_IMAGE_TOKENS

    def count_content(self, content: Union[str, List[Union[str, dict]]]) -> int:
    """Calculate tokens for message content"""
    if not content:
    return 0

    if isinstance(content, str):
    return self.count_text(content)

    token_count = 0
    for item in content:
    if isinstance(item, str):
    token_count += self.count_text(item)
    elif isinstance(item, dict):
    if "text" in item:
    token_count += self.count_text(item["text"])
    elif "image_url" in item:
    token_count += self.count_image(item)
    return token_count

    LLM 处理大图像时,并不是简单地按像素点数计算,而是将其分解为多个“块”(tiles),并对每个块收取一定的 token 费用。同时,还会有一个基础的 token 费用(低分辨率图像的 token 费用)。

    • 消息格式化
    • 工具调用处理
    • 重试机制
    1
    2
    3
    4
    5
    6
    7
    @retry(
    wait=wait_random_exponential(min=1, max=60),
    stop=stop_after_attempt(6),
    retry=retry_if_exception_type(
    (OpenAIError, Exception, ValueError)
    ), # Don't retry TokenLimitExceeded
    )

    @retry 装饰器作用的对象是一个函数或方法。它的核心作用是增强该函数的容错能力,使其在面对临时的、可恢复的错误(如网络抖动、服务器临时过载)时,能够自动、智能地重试,而不是直接失败。相当于:

    1
    2
    original_get_completion = get_completion  # 保存原始函数
    get_completion = retry(...)(original_get_completion) # 用 retry 返回的新函数替换原始函数
6. 数据模型
  • app/schema.py:定义核心数据结构:
    • Message/ToolCall/Memory
7. 工具框架
  • app/tool/base.py:抽象工具基类BaseTool和标准输出ToolResult
  • app/tool/tool_collection.py:工具集合管理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ToolCollection:
"""A collection of defined tools."""

class Config:
arbitrary_types_allowed = True

def __init__(self, *tools: BaseTool):
self.tools = tools
self.tool_map = {tool.name: tool for tool in tools}

def __iter__(self):
return iter(self.tools)

def to_params(self) -> List[Dict[str, Any]]:
return [tool.to_param() for tool in self.tools]

async def execute(
self, *, name: str, tool_input: Dict[str, Any] = None
) -> ToolResult:
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
try:
result = await tool(**tool_input)
return result
except ToolError as e:
return ToolFailure(error=e.message)

async def execute_all(self) -> List[ToolResult]:
"""Execute all tools in the collection sequentially."""
results = []
for tool in self.tools:
try:
result = await tool()
results.append(result)
except ToolError as e:
results.append(ToolFailure(error=e.message))
return results

  1. execute 方法被调用时,它接收 nametool_input (一个字典,例如 {"text": "hello", "count": 2}).tool = self.tool_map.get(name) 检索到对应的 BaseTool,传入tool_input,BaseTool定义了__call__
1
2
3
4
5
6
7
8
9
10
11
class BaseTool(ABC, BaseModel):
name: str
description: str
parameters: Optional[dict] = None

class Config:
arbitrary_types_allowed = True

async def __call__(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""
return await self.execute(**kwargs)
  • app/tool/mcp.py:客户端MCP协议实现(远程工具代理)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
async def connect_stdio(
self, command: str, args: List[str], server_id: str = ""
) -> None:
"""Connect to an MCP server using stdio transport."""
if not command:
raise ValueError("Server command is required.")

server_id = server_id or command

# Always ensure clean disconnection before new connection
if server_id in self.sessions:
await self.disconnect(server_id)

exit_stack = AsyncExitStack()
self.exit_stacks[server_id] = exit_stack

server_params = StdioServerParameters(command=command, args=args)
stdio_transport = await exit_stack.enter_async_context(
stdio_client(server_params)
)
read, write = stdio_transport
session = await exit_stack.enter_async_context(ClientSession(read, write))
self.sessions[server_id] = session

await self._initialize_and_list_tools(server_id)

使用 stdio 连接”意味着,你的主程序(在这里是 Manus 代理所在的客户端程序)不会通过网络(如 HTTP 或 TCP/IP)去连接一个远程服务器,而是会**在本地启动另一个程序(作为“服务器”),并通过这个新程序的标准输入/输出管道与它进行双向通信。**只要它能读写标准输入/输出并遵循 MCP 协议, Python 客户端就可以与它无缝集成,

  • app/mcp/server.py:服务端MCP实现(暴露工具集)
  • 具体工具实现:
    • bash.py/python_execute.py
    • browser_use_tool.py/web_search.py
    • file_operators.py/ask_human.py/terminate.py
8. 沙箱环境
  • app/sandbox/client.py:基于Docker的安全代码执行环境
9. 异常处理
  • app/exceptions.py:定制异常类(如ToolError/TokenLimitExceeded