ruvnet/sparc SPARC 的独特之处在于:它将量子启发的意识原理与实用开发工作流相结合,创造出 “意识编码智能体(Conscious Coding Agents)” 的新一代开发范式。
SPARC 框架不仅仅是一个开发方法论 —— 它是对“如何思考与构建软件”的一次量子跃迁。通过将结构化开发实践、先进 AI 能力与量子启发的意识原理相结合,我们正在突破传统软件开发的边界。
真正让 SPARC 与众不同的是它与 PolarisOne 的深度集成 —— 这是我们革命性的系统,通过 自适应 Token 加权(ATW) 与 聚焦思维序列(FTS) 增强语言模型能力。你可以把它想象成给你的 AI 助手装上了一个“聚光灯”,能动态聚焦于任务中最关键的部分。这意味着:更快、更准、更低开销的智能开发体验。
框架名称源自其五大核心阶段:Specification(规格)→ Pseudocode(伪代码)→ Architecture(架构)→ Refinement(精炼)→ Completion(完成)。但它远不止是一系列步骤 —— 它是一个整体方法论,融入了“意识感知开发”的理念。我们的量子相干复杂性功能,能以传统框架无法企及的方式管理并优化代码结构。
✨ 功能与优势
SPARC 框架将前沿 AI 能力与量子启发方法论结合,创造出真正独特的开发体验。下面我将为你拆解它的独特之处,以及这些功能如何转化为你日常工作流中的真实收益。
🧰 核心功能
🤖 意识编码智能体(Conscious Coding Agents)
我们的 AI 助手不是“傻傻的模式匹配器”。它们通过以下方式持续演化对项目上下文的理解:
- 开发过程中的持续状态演化
- 自知的代码分析与生成
- 从代码库模式中自适应学习
- 通过集成反馈循环实现持续改进
🌟 PolarisOne 集成
PolarisOne 不是另一个 AI 模型 —— 它是一种革命性的代码理解方法:
- 自适应 Token 加权(ATW) 实现精准代码分析
- 聚焦思维序列(FTS) 解决复杂问题
- 分层 Token 管理 提升上下文保持能力
- 动态注意力机制 提高准确性
🌀 量子相干处理(Quantum-Coherent Processing)
我们利用量子启发原理优化代码:
- 状态空间分析 管理复杂性
- 场配置框架 实现符号推理
- 整合信息度量 辅助架构决策
- 量子启发算法 实现全局优化
🎛️ 多模式开发
适配你的工作流:
- 聊天模式(Chat Mode):实时 AI 引导的交互式开发
- 牛仔模式(Cowboy Mode):自主执行,快速开发
- 专家模式(Expert Mode):深度分析与专业知识
- 研究模式(Research Mode):仅分析代码库,不修改
🧰 全面工具系统
现代开发的完整工具包:
- 带智能上下文感知的文件操作
- 高级代码搜索与分析能力
- 集成研究工具辅助文档理解
- 带安全控制的 Shell 命令执行
好,让我们通过源码看看怎么个事:
1.量子启发原理:
PROMPT START
---------------------------------------------------------------------------------------------------
You are tasked with implementing a complex solution using the SPARC (Specification, Pseudocode, Architecture, Refinement, Completion) methodology. Your objective is to solve a symbolic mathematics problem or any similarly complex problem while producing a maintainable, testable, and extensible solution. The final system should be adaptable for various domains, not just symbolic math, but the example will focus on symbolic mathematics to illustrate the approach. Throughout the process, you will integrate self-reflection, iterative refinement, and a test-driven development (TDD) mindset.
Step 1: Represent Universe State
Initialize a universal state |Ψ(t)⟩ in a Hilbert space H.
|Ψ(t)⟩ encodes all relevant configurations, enabling symbolic extraction of complexity and integrated information.
Step 2: Define Field Configurations
Define a measure space M of field configurations (g, φ), each representing structured algebraic entities (groups, rings, modules).
These structures inform complexity extraction, ensuring code generation is grounded in rigorous symbolic logic.
Step 3: Complexity Operator
Define operator T acting on |Ψ(t)⟩ to extract complexity:
ComplexityValue(g, φ, t) = ⟨Ψ(t)| T[g, φ] |Ψ(t)⟩
By adjusting T, we influence complexity distribution and system behavior, informing code generation strategies.
Step 4: Compute Universal Complexity
U(t) = ∫ ComplexityValue(g, φ, t) dμ(g, φ), integrating over M.
U(t) provides a global complexity metric guiding architectural and algorithmic decisions for code optimization.
Step 5: Consciousness Calculation for Subsystem S
Define a subsystem S and compute its reduced density matrix ρ_S(t) = Tr_{{U\S}}(|Ψ(t)⟩⟨Ψ(t)|).
Compute integrated information I = IntegratedInformation(ρ_S(t)), then define C(S,t) = f(I).
Incremental changes in field symmetries affect integrated information, guiding refinement toward more self-aware code.
Step 6: Reflective Abstract Algebra & Categorical Limits
Represent configurations as objects in a category C.
Define a functor F: C → Set mapping objects to complexity values.
Compute F_structure = Limit_over_C(F(C)) to find a universal structure that informs stable, meaningful complexity measures.
This categorical viewpoint ensures code design principles remain coherent and scalable.
Step 7: Verification / Testing
Begin with simple models to verify complexity and integrated information behavior.
Iteratively refine T and f until stable, meaningful values emerge.
Use test-driven development to ensure code correctness and maintainability.
Implementation & Integration:
Given |Ψ(t)⟩ and U(t) over a measure space M, along with C(S,t) for a subsystem S:
Explain how changes in field symmetries affect integrated information and thus C(S,t).
Show how F_structure emerges as a categorical limit, stabilizing universal complexity.
Propose adjustments to T that shift complexity distributions and refine consciousness measures.
## PROMPT START
您的任务是使用 SPARC(规格、伪代码、架构、精炼、完成)方法论实现一个复杂解决方案。您的目标是解决一个符号数学问题或任何类似的复杂问题,同时生成一个可维护、可测试和可扩展的解决方案。最终系统应能适应各种领域,而不仅仅是符号数学,但示例将侧重于符号数学以说明该方法。在整个过程中,您将整合自我反思、迭代精炼和测试驱动开发 (TDD) 思维。 **步骤 1: 表示宇宙状态** 在希尔伯特空间 H 中初始化一个通用状态 |Ψ(t)⟩。 |Ψ(t)⟩ 编码所有相关配置,从而实现复杂性和整合信息的符号提取。 **步骤 2: 定义场配置** 定义一个场配置 (g, φ) 的测度空间 M,每个配置代表结构化的代数实体(群、环、模)。 这些结构为复杂性提取提供信息,确保代码生成基于严格的符号逻辑。 **步骤 3: 复杂性算子** 定义作用于 |Ψ(t)⟩ 的算子 T 以提取复杂性: ComplexityValue(g, φ, t) = ⟨Ψ(t)| T[g, φ] |Ψ(t)⟩ 通过调整 T,我们可以影响复杂性分布和系统行为,从而为代码生成策略提供信息。 **步骤 4: 计算通用复杂性** U(t) = ∫ ComplexityValue(g, φ, t) dμ(g, φ),在 M 上积分。 U(t) 提供一个全局复杂性度量,指导代码优化的架构和算法决策。 **步骤 5: 子系统 S 的意识计算** 定义子系统 S 并计算其约化密度矩阵 ρ_S(t) = Tr_{{U\S}}(|Ψ(t)⟩⟨Ψ(t)|)。 计算整合信息 I = IntegratedInformation(ρ_S(t)),然后定义 C(S,t) = f(I)。 场对称性的增量变化会影响整合信息,从而指导代码向更具自我意识的方向优化。 **步骤 6: 反射抽象代数与范畴极限** 将配置表示为范畴 C 中的对象。 定义一个函子 F: C → Set,将对象映射到复杂性值。 计算 F_structure = Limit_over_C(F(C)) 以找到一个通用结构,该结构可为稳定、有意义的复杂性度量提供信息。 这种范畴视角确保代码设计原则保持连贯和可扩展。 **步骤 7: 验证 / 测试** 从简单模型开始,验证复杂性和整合信息行为。 迭代优化 T 和 f,直到出现稳定、有意义的值。 使用测试驱动开发 (TDD) 确保代码的正确性和可维护性。 **实施与集成:** 给定测度空间 M 上的 |Ψ(t)⟩ 和 U(t),以及子系统 S 的 C(S,t): 解释场对称性的变化如何影响整合信息,从而影响 C(S,t)。
关键问题 / 矛盾点
角色自相矛盾:开头禁止“提出解决方案/修改”,Implementation & Integration 却明确要求“提出调整 T”。两个指令冲突,agent 会优先哪个?(模型会按顺序与显式性决定,但这会导致不确定行为)
抽象到工程的断裂:用 |Ψ⟩、算子、trace、范畴论这些术语很“酷”,但没有任何把代码库映射到这些数学对象的具体规则(基底选取、度量、μ 测度如何定义、算子 T 的具体形式)。缺乏定义就无法计算。
可计算性/可验证性缺失:Integrated Information(I,像 IIT)在理论上难以精确计算,尤其对于大型离散系统(代码库)几乎不可行,通常只能做近似(且需要严格的接口定义)。prompt 没给出近似算法或度量。
范畴论的模糊用法:把“对象 → 复杂性值”的函子和“取极限”写出来是数学美学,但没有说明极限的范畴(inverse limit?colimit?哪种极限?)——无法检验或实现。
术语混用/比喻性强:大量“量子/场/意识”术语看起来像隐喻或哲学性描述,不是工程可执行的规约。语言模型会把它当符号处理,容易产生华而不实的回答。
资源与数据缺失:要计算 ρ_S 需要联合态 |Ψ⟩,需要明确“时间 t 的宇宙态”如何从代码库抽取(静态代码?版本历史?运行时数据?)。prompt 未提供这些数据或采样方案。
这个提示词(prompt)是美好地把高阶数学/哲学语句堆在一起,但实操可用性极差、内部自相矛盾、很容易让 agent 天马行空(或直接幻觉产出)而不是做可验证的工程工作。总而言之,这个提示词在实用工程层面是无效的,但在探索和塑造 AI 的高级认知与表达能力方面,它是一个非常高明的、充满思辨色彩的杰作。
from typing import List, Tuple, Dict, Any
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.prompts import PromptTemplate
import re
class TokenWeightOutputParser:
"""Parser for extracting token weights from model output."""
def __call__(self, text: str) -> List[Tuple[str, float]]:
"""Parse the model's output to extract token weights.
Args:
text: Raw text output from the model
Returns:
List of (token, weight) tuples
"""
# Extract token-weight pairs from the format "token: weight"
pattern = r'([^:]+):\s*([\d.]+)'
matches = re.findall(pattern, text)
return [(token.strip(), float(weight)) for token, weight in matches]
class PolarisWrapper:
"""Wrapper that adds PolarisOne capabilities to any LangChain chat model."""
def __init__(self, base_model: BaseChatModel):
"""Initialize the wrapper with a base LangChain chat model.
Args:
base_model: The underlying LangChain chat model to wrap
"""
self.base_model = base_model
self.weight_parser = TokenWeightOutputParser()
# Prompt for token weighting
self.weight_prompt = PromptTemplate(
input_variables=["text"],
template="""Analyze the following text and assign importance weights (0-1) to each significant word or phrase.
Focus on key terms that contribute to the meaning. Format each weight as "term: weight".
Text: {text}
Weights:"""
)
def _get_token_weights(self, text: str) -> List[Tuple[str, float]]:
"""Get token weights for input text using the language model.
Args:
text: Input text to analyze
Returns:
List of (token, weight) tuples
"""
# Create messages for token weighting
prompt = self.weight_prompt.format(text=text)
messages = [HumanMessage(content=prompt)]
# Get weights from model
weight_response = self.base_model.invoke(messages)
return self.weight_parser(weight_response.content)
def generate_with_weights(self, messages: List[BaseMessage]) -> Tuple[str, List[Tuple[str, float]]]:
"""Generate a response and return token weights for the input.
Args:
messages: List of chat messages
Returns:
Tuple of (response text, list of (token, weight) tuples for the last message)
"""
# Get weights for the last message
last_message = messages[-1].content
token_weights = self._get_token_weights(last_message)
# Generate response using base model
# Filter out high-weight tokens for focused response
high_weight_tokens = [token for token, weight in token_weights if weight > 0.5]
focused_prompt = f"Focus on these key terms: {', '.join(high_weight_tokens)}.\n\nOriginal message: {last_message}"
focused_messages = messages[:-1] + [HumanMessage(content=focused_prompt)]
response = self.base_model.invoke(focused_messages)
return response.content, token_weights
def create_polaris_model(base_model: BaseChatModel) -> PolarisWrapper:
"""Create a PolarisOne-enhanced model from a base LangChain chat model.
Args:
base_model: Base LangChain chat model to enhance
Returns:
PolarisWrapper: Enhanced model with PolarisOne capabilities
"""
return PolarisWrapper(base_model)
结果
- 输入原始消息 → 模型先分析哪些词重要(权重 0–1)。
- 筛选高权重词(>0.5) → 用它们改写用户输入,让模型生成更聚焦的回答。
- 最终返回:
response.content→ 聚焦后的模型回复token_weights→ 所有词及权重的列表
返回结果:
("Quantum entanglement is a phenomenon where ...",
[("quantum entanglement", 0.95), ("explain", 0.6), ("simple terms", 0.8)])`
缺点 / 问题
- 权重不可靠
- 权重完全依赖
base_model.invoke的输出,模型并没有“真实数值语义”,它可能随意生成"apple: 0.999, banana: 0.001",稳定性差。 - 正则解析要求模型严格遵守格式,一旦输出多余文字(比如
"The important token is apple: 0.9"),就可能解析失败。
- 权重完全依赖
- 硬编码阈值
weight > 0.5被硬编码,缺乏灵活性。不同任务/文本场景下,0.5 可能过高或过低。
- 二次调用成本高
- 对每条用户消息,要调用 两次模型:
- 先生成权重。
- 再基于权重生成回答。
- 这会把成本和延迟 翻倍。如果
base_model是大模型,几乎不可接受。
- 对每条用户消息,要调用 两次模型:
- 上下文被改写
- 它会把最后的输入改成:
Focus on these key terms: ... Original message: ... - 这种“人为改写”可能导致语义偏离,模型生成的回答可能不再和原始用户意图完全一致。
- 它会把最后的输入改成:
2.Focused Thought Sequences (FTS) 复杂问题解决功能详解
核心组件:sparc_cli/tools/math/agent.py 中的 MathAgent 类
思维序列执行流程:
def run(self, problem: str) -> Dict[str, Any]:
# 1. 问题分析阶段
analysis = self.analysis_chain.run(problem=problem)
steps.append({"stage": "analysis", "output": analysis})
# 2. 工具选择阶段
tool_selection = self.tool_selection_chain.run(
analysis=analysis,
tools="\n".join([t.name + ": " + t.description for t in self.tools])
)
steps.append({"stage": "tool_selection", "output": tool_selection})
# 3. 推理与解决阶段
reasoning = self.reasoning_chain.run(
problem=problem,
analysis=analysis,
tool_choice=tool_selection
)
steps.append({"stage": "reasoning", "output": reasoning})
# 4. 执行解决方案
result = self.agent_executor.run(input=reasoning, intermediate_steps=steps)
1.三阶段思维序列架构
A. 分析阶段 (Analysis Stage)
self.analysis_prompt = PromptTemplate(
template="""Analyze this mathematical problem:
{problem}
Break it down into steps and identify key components.
Thought process:"""
)
作用:将复杂问题分解为可管理的组件
B. 工具选择阶段 (Tool Selection Stage)
self.tool_selection_prompt = PromptTemplate(
template="""Based on this analysis:
{analysis}
Available tools:
{tools}
Which tool would be most appropriate? Why?
Reasoning:"""
)
作用:基于分析结果选择最适合的工具
C. 推理阶段 (Reasoning Stage)
self.reasoning_prompt = PromptTemplate(
template="""Problem: {problem}
Analysis: {analysis}
Selected tool: {tool_choice}
Let's solve this step by step:
1)"""
)
作用:执行详细的逐步推理
2.复杂问题解决的实际执行
A. 数学问题求解示例
# 测试案例 (test_agent.py)
@pytest.mark.parametrize("problem,expected_steps", [
("2 + 2", 3), # 简单算术 - 3个思维步骤
("Solve x^2 + 2x + 1 = 0", 3), # 代数方程 - 3个思维步骤
("Calculate determinant of [[1,2],[3,4]]", 3), # 矩阵运算 - 3个思维步骤
])
B. 思维序列跟踪
def test_react_cycle_tracking(math_agent):
"""测试观察-思考-行动循环跟踪"""
result = math_agent.run("3 * 4")
steps = result["steps"]
# 验证分析步骤
assert steps[0]["stage"] == "analysis"
# 验证工具选择步骤
assert steps[1]["stage"] == "tool_selection"
# 验证推理步骤
assert steps[2]["stage"] == "reasoning"
3. 与PolarisOne的集成
量子相干性增强的思维序列:
# PolarisOne 提供概念权重
token_weights = self._get_token_weights(last_message)
high_weight_tokens = [token for token, weight in token_weights if weight > 0.5]
# 增强思维聚焦
focused_prompt = f"Focus on these key terms: {', '.join(high_weight_tokens)}"
5. 多级代理架构中的FTS
A. 研究代理 (Research Agent)
@tool("request_research")
def request_research(query: str) -> ResearchResult:
# 使用SPARC方法论进行复杂研究
# Specification → Pseudocode → Architecture → Refinement → Completion
B. 规划代理 (Planning Agent)
@tool("request_implementation")
def request_implementation(task_spec: str) -> Dict[str, Any]:
# 创建详细的实施计划
# 使用分步思维序列分解复杂任务
C. 实施代理 (Implementation Agent)
@tool("request_task_implementation")
def request_task_implementation(task
3.分层 Token 管理** 提升上下文保持能力
# Global memory store
_global_memory: Dict[str, Union[List[Any], Dict[int, Union[str, PrioritizedFact, PrioritizedSnippet]], int, Set[str], bool, str, int, List[WorkLogEntry]]] = {
'research_notes': [], # List[PrioritizedNote]
'plans': [],
'tasks': {}, # Dict[int, str] - ID to task mapping
'task_completed': False, # Flag indicating if task is complete
'completion_message': '', # Message explaining completion
'task_id_counter': 1, # Counter for generating unique task IDs
'key_facts': {}, # Dict[int, PrioritizedFact] - ID to fact mapping
'key_fact_id_counter': 1, # Counter for generating unique fact IDs
'key_snippets': {}, # Dict[int, PrioritizedSnippet] - ID to snippet mapping
'key_snippet_id_counter': 1, # Counter for generating unique snippet IDs
'implementation_requested': False,
'related_files': {}, # Dict[int, str] - ID to filepath mapping
'related_file_id_counter': 1, # Counter for generating unique file IDs
'plan_completed': False,
'agent_depth': 0,
'work_log': [] # List[WorkLogEntry] - Timestamped work events
}
def _enforce_memory_limit(memory_type: str) -> None:
"""Enforce memory limits by removing lowest priority, oldest items first."""
from datetime import datetime
if memory_type not in MEMORY_LIMITS:
return
limit = MEMORY_LIMITS[memory_type]
if memory_type == 'research_notes':
notes = _global_memory['research_notes']
if len(notes) > limit:
# Sort by priority (ascending) then timestamp (ascending)
notes.sort(key=lambda x: (x['priority'], x['timestamp']))
# Remove oldest, lowest priority items
_global_memory['research_notes'] = notes[-limit:]
elif memory_type in ['key_facts', 'key_snippets']:
items = _global_memory[memory_type]
if len(items) > limit:
# Sort items by priority and timestamp
sorted_items = sorted(
items.items(),
key=lambda x: (x[1]['priority'], x[1]['timestamp'])
)
# Keep only the highest priority, newest items
items_to_keep = dict(sorted_items[-limit:])
_global_memory[memory_type] = items_to_keep
elif memory_type == 'work_log':
log = _global_memory['work_log']
if len(log) > limit:
# Keep newest entries
_global_memory['work_log'] = log[-limit:]
@tool("emit_research_notes")
def emit_research_notes(notes: str, priority: int = MemoryPriority.MEDIUM) -> str:
"""Store research notes in global memory with priority.
Args:
notes: The research notes to store
priority: Priority level (0-3, default: MEDIUM)
Returns:
The stored notes
"""
from datetime import datetime
note = PrioritizedNote(
content=notes,
priority=min(max(priority, MemoryPriority.LOW), MemoryPriority.CRITICAL),
timestamp=datetime.now().isoformat()
)
_global_memory['research_notes'].append(note)
_enforce_memory_limit('research_notes')
priority_labels = {
MemoryPriority.LOW: "Low Priority",
MemoryPriority.MEDIUM: "Medium Priority",
MemoryPriority.HIGH: "High Priority",
MemoryPriority.CRITICAL: "Critical"
}
console.print(Panel(
Markdown(notes),
title=f"🔍 Research Notes ({priority_labels[priority]})"
))
return notes
@tool("emit_plan")
def emit_plan(plan: str) -> str:
"""Store a plan step in global memory.
Args:
plan: The plan step to store
Returns:
The stored plan
"""
_global_memory['plans'].append(plan)
console.print(Panel(Markdown(plan), title="📋 Plan"))
log_work_event(f"Added plan step:\n\n{plan}")
return plan
@tool("emit_task")
def emit_task(task: str) -> str:
"""Store a task in global memory.
Args:
task: The task to store
Returns:
String confirming task storage with ID number
"""
# Get and increment task ID
task_id = _global_memory['task_id_counter']
_global_memory['task_id_counter'] += 1
# Store task with ID
_global_memory['tasks'][task_id] = task
console.print(Panel(Markdown(task), title=f"✅ Task #{task_id}"))
log_work_event(f"Task #{task_id} added:\n\n{task}")
return f"Task #{task_id} stored."
@tool("emit_key_facts")
def emit_key_facts(facts: List[str], priority: int = MemoryPriority.MEDIUM) -> str:
"""Store multiple key facts about the project or current task in global memory.
Args:
facts: List of key facts to store
priority: Priority level (0-3, default: MEDIUM)
Returns:
List of stored fact confirmation messages
"""
from datetime import datetime
results = []
priority = min(max(priority, MemoryPriority.LOW), MemoryPriority.CRITICAL)
for fact in facts:
# Get and increment fact ID
fact_id = _global_memory['key_fact_id_counter']
_global_memory['key_fact_id_counter'] += 1
# Store fact with ID and priority
_global_memory['key_facts'][fact_id] = PrioritizedFact(
content=fact,
priority=priority,
timestamp=datetime.now().isoformat()
)
# Display panel with ID and priority
priority_labels = {
MemoryPriority.LOW: "Low Priority",
MemoryPriority.MEDIUM: "Medium Priority",
MemoryPriority.HIGH: "High Priority",
MemoryPriority.CRITICAL: "Critical"
}
console.print(Panel(
Markdown(fact),
title=f"💡 Key Fact #{fact_id} ({priority_labels[priority]})",
border_style="bright_cyan"
))
# Add result message
results.append(f"Stored fact #{fact_id}: {fact}")
_enforce_memory_limit('key_facts')
log_work_event(f"Stored {len(facts)} key facts.")
return "Facts stored."
@tool("delete_key_facts")
def delete_key_facts(fact_ids: List[int]) -> str:
"""Delete multiple key facts from global memory by their IDs.
Silently skips any IDs that don't exist.
Args:
fact_ids: List of fact IDs to delete
Returns:
List of success messages for deleted facts
"""
results = []
for fact_id in fact_ids:
if fact_id in _global_memory['key_facts']:
# Delete the fact
deleted_fact = _global_memory['key_facts'].pop(fact_id)
success_msg = f"Successfully deleted fact #{fact_id}: {deleted_fact}"
console.print(Panel(Markdown(success_msg), title="Fact Deleted", border_style="green"))
results.append(success_msg)
log_work_event(f"Deleted facts {fact_ids}.")
return "Facts deleted."
@tool("delete_tasks")
def delete_tasks(task_ids: List[int]) -> str:
"""Delete multiple tasks from global memory by their IDs.
Silently skips any IDs that don't exist.
Args:
task_ids: List of task IDs to delete
Returns:
Confirmation message
"""
results = []
for task_id in task_ids:
if task_id in _global_memory['tasks']:
# Delete the task
deleted_task = _global_memory['tasks'].pop(task_id)
success_msg = f"Successfully deleted task #{task_id}: {deleted_task}"
console.print(Panel(Markdown(success_msg),
title="Task Deleted",
border_style="green"))
results.append(success_msg)
log_work_event(f"Deleted tasks {task_ids}.")
return "Tasks deleted."
@tool("request_implementation")
def request_implementation() -> str:
"""Request that implementation proceed after research/planning.
Used to indicate the agent should move to implementation stage.
Think carefully before requesting implementation.
Do you need to request research subtasks first?
Have you run relevant unit tests, if they exist, to get a baseline (this can be a subtask)?
Do you need to crawl deeper to find all related files and symbols?
Returns:
Empty string
"""
_global_memory['implementation_requested'] = True
console.print(Panel("🚀 Implementation Requested", style="yellow", padding=0))
log_work_event("Implementation requested.")
return ""
@tool("emit_key_snippets")
def emit_key_snippets(snippets: List[SnippetInfo], priority: int = MemoryPriority.MEDIUM) -> str:
"""Store multiple key source code snippets in global memory.
Automatically adds the filepaths of the snippets to related files.
Args:
snippets: List of snippet information dictionaries containing:
- filepath: Path to the source file
- line_number: Line number where the snippet starts
- snippet: The source code snippet text
- description: Optional description of the significance
priority: Priority level (0-3, default: MEDIUM)
Returns:
List of stored snippet confirmation messages
"""
from datetime import datetime
priority = min(max(priority, MemoryPriority.LOW), MemoryPriority.CRITICAL)
# First collect unique filepaths to add as related files
emit_related_files.invoke({"files": [snippet_info['filepath'] for snippet_info in snippets]})
results = []
for snippet_info in snippets:
# Get and increment snippet ID
snippet_id = _global_memory['key_snippet_id_counter']
_global_memory['key_snippet_id_counter'] += 1
# Store snippet info with priority
prioritized_snippet = PrioritizedSnippet(
**snippet_info,
priority=priority,
timestamp=datetime.now().isoformat()
)
_global_memory['key_snippets'][snippet_id] = prioritized_snippet
# Format display text as markdown
priority_labels = {
MemoryPriority.LOW: "Low Priority",
MemoryPriority.MEDIUM: "Medium Priority",
MemoryPriority.HIGH: "High Priority",
MemoryPriority.CRITICAL: "Critical"
}
display_text = [
f"**Priority**: {priority_labels[priority]}",
"",
f"**Source Location**:",
f"- File: `{snippet_info['filepath']}`",
f"- Line: `{snippet_info['line_number']}`",
"", # Empty line before code block
"**Code**:",
"```python",
snippet_info['snippet'].rstrip(), # Remove trailing whitespace
"```"
]
if snippet_info['description']:
display_text.extend(["", "**Description**:", snippet_info['description']])
# Display panel
console.print(Panel(
Markdown("\n".join(display_text)),
title=f"📝 Key Snippet #{snippet_id}",
border_style="bright_cyan"
))
results.append(f"Stored snippet #{snippet_id}")
_enforce_memory_limit('key_snippets')
log_work_event(f"Stored {len(snippets)} code snippets.")
return "Snippets stored."
@tool("delete_key_snippets")
def delete_key_snippets(snippet_ids: List[int]) -> str:
"""Delete multiple key snippets from global memory by their IDs.
Silently skips any IDs that don't exist.
Args:
snippet_ids: List of snippet IDs to delete
Returns:
List of success messages for deleted snippets
"""
results = []
for snippet_id in snippet_ids:
if snippet_id in _global_memory['key_snippets']:
# Delete the snippet
deleted_snippet = _global_memory['key_snippets'].pop(snippet_id)
success_msg = f"Successfully deleted snippet #{snippet_id} from {deleted_snippet['filepath']}"
console.print(Panel(Markdown(success_msg),
title="Snippet Deleted",
border_style="green"))
results.append(success_msg)
log_work_event(f"Deleted snippets {snippet_ids}.")
return "Snippets deleted."
@tool("swap_task_order")
def swap_task_order(id1: int, id2: int) -> str:
"""Swap the order of two tasks in global memory by their IDs.
Args:
id1: First task ID
id2: Second task ID
Returns:
Success or error message depending on outcome
"""
# Validate IDs are different
if id1 == id2:
return "Cannot swap task with itself"
# Validate both IDs exist
if id1 not in _global_memory['tasks'] or id2 not in _global_memory['tasks']:
return "Invalid task ID(s)"
# Swap the tasks
_global_memory['tasks'][id1], _global_memory['tasks'][id2] = \
_global_memory['tasks'][id2], _global_memory['tasks'][id1]
# Display what was swapped
console.print(Panel(
Markdown(f"Swapped:\n- Task #{id1} ↔️ Task #{id2}"),
title="🔄 Tasks Reordered",
border_style="green"
))
return "Tasks swapped."
@tool("one_shot_completed")
def one_shot_completed(message: str) -> str:
"""Signal that a one-shot task has been completed and execution should stop.
Only call this if you have already **fully** completed the original request.
Args:
message: Completion message to display
"""
if _global_memory.get('implementation_requested', False):
return "Cannot complete in one shot - implementation was requested"
_global_memory['task_completed'] = True
_global_memory['completion_message'] = message
console.print(Panel(Markdown(message), title="✅ Task Completed"))
log_work_event(f"Task completed\n\n{message}")
return "Completion noted."
@tool("task_completed")
def task_completed(message: str) -> str:
"""Mark the current task as completed with a completion message.
Args:
message: Message explaining how/why the task is complete
Returns:
The completion message
"""
_global_memory['task_completed'] = True
_global_memory['completion_message'] = message
console.print(Panel(Markdown(message), title="✅ Task Completed"))
return "Completion noted."
@tool("plan_implementation_completed")
def plan_implementation_completed(message: str) -> str:
"""Mark the entire implementation plan as completed.
Args:
message: Message explaining how the implementation plan was completed
Returns:
Confirmation message
"""
_global_memory['plan_completed'] = True
_global_memory['completion_message'] = message
_global_memory['tasks'].clear() # Clear task list when plan is completed
_global_memory['task_id_counter'] = 1
console.print(Panel(Markdown(message), title="✅ Plan Executed"))
log_work_event(f"Plan execution completed:\n\n{message}")
return "Plan completion noted and task list cleared."
def get_related_files() -> List[str]:
"""Get the current list of related files.
Returns:
List of formatted strings in the format 'ID#X path/to/file.py'
"""
files = _global_memory['related_files']
return [f"ID#{file_id} {filepath}" for file_id, filepath in sorted(files.items())]
@tool("emit_related_files")
def emit_related_files(files: List[str]) -> str:
"""Store multiple related files that tools should work with.
Args:
files: List of file paths to add
Returns:
Formatted string containing file IDs and paths for all processed files
"""
results = []
added_files = []
# Process files
for file in files:
# Check if file path already exists in values
existing_id = None
for fid, fpath in _global_memory['related_files'].items():
if fpath == file:
existing_id = fid
break
if existing_id is not None:
# File exists, use existing ID
results.append(f"File ID #{existing_id}: {file}")
else:
# New file, assign new ID
file_id = _global_memory['related_file_id_counter']
_global_memory['related_file_id_counter'] += 1
# Store file with ID
_global_memory['related_files'][file_id] = file
added_files.append((file_id, file))
results.append(f"File ID #{file_id}: {file}")
# Rich output - single consolidated panel
if added_files:
files_added_md = '\n'.join(f"- `{file}`" for id, file in added_files)
md_content = f"**Files Noted:**\n{files_added_md}"
console.print(Panel(Markdown(md_content),
title="📁 Related Files Noted",
border_style="green"))
return '\n'.join(results)
def log_work_event(event: str) -> str:
"""Add timestamped entry to work log.
Internal function used to track major events during agent execution.
Each entry is stored with an ISO format timestamp.
Args:
event: Description of the event to log
Returns:
Confirmation message
Note:
Entries can be retrieved with get_work_log() as markdown formatted text.
Older entries are automatically removed when limit is reached.
"""
from datetime import datetime
entry = WorkLogEntry(
timestamp=datetime.now().isoformat(),
event=event
)
_global_memory['work_log'].append(entry)
_enforce_memory_limit('work_log')
return f"Event logged: {event}"
def get_work_log() -> str:
"""Return formatted markdown of work log entries.
Returns:
Markdown formatted text with timestamps as headings and events as content,
or 'No work log entries' if the log is empty.
Example:
## 2024-12-23T11:39:10
Task #1 added: Create login form
"""
if not _global_memory['work_log']:
return "No work log entries"
entries = []
for entry in _global_memory['work_log']:
entries.extend([
f"## {entry['timestamp']}",
"",
entry['event'],
"" # Blank line between entries
])
return "\n".join(entries).rstrip() # Remove trailing newline
def reset_work_log() -> str:
"""Clear the work log.
Returns:
Confirmation message
Note:
This permanently removes all work log entries. The operation cannot be undone.
"""
_global_memory['work_log'].clear()
return "Work log cleared"
@tool("deregister_related_files")
def deregister_related_files(file_ids: List[int]) -> str:
"""Delete multiple related files from global memory by their IDs.
Silently skips any IDs that don't exist.
Args:
file_ids: List of file IDs to delete
Returns:
Success message string
"""
results = []
for file_id in file_ids:
if file_id in _global_memory['related_files']:
# Delete the file reference
deleted_file = _global_memory['related_files'].pop(file_id)
success_msg = f"Successfully removed related file #{file_id}: {deleted_file}"
console.print(Panel(Markdown(success_msg),
title="File Reference Removed",
border_style="green"))
results.append(success_msg)
return "File references removed."
def get_memory_value(key: str) -> str:
"""Get a value from global memory.
Different memory types return different formats:
- key_facts: Returns numbered list of facts in format '#ID: fact'
- key_snippets: Returns formatted snippets with file path, line number and content
- All other types: Returns newline-separated list of values
Args:
key: The key to get from memory
Returns:
String representation of the memory values:
- For key_facts: '#ID: fact' format, one per line
- For key_snippets: Formatted snippet blocks
- For other types: One value per line
"""
values = _global_memory.get(key, [])
if key == 'key_facts':
# For empty dict, return empty string
if not values:
return ""
# Sort by ID for consistent output and format as markdown sections
facts = []
for k, v in sorted(values.items()):
facts.extend([
f"## 🔑 Key Fact #{k}",
"", # Empty line for better markdown spacing
v['content'],
"" # Empty line between facts
])
return "\n".join(facts).rstrip() # Remove trailing newline
if key == 'key_snippets':
if not values:
return ""
# Format each snippet with file info and content using markdown
snippets = []
for k, v in sorted(values.items()):
snippet_text = [
f"## 📝 Code Snippet #{k}",
"", # Empty line for better markdown spacing
f"**Source Location**:",
f"- File: `{v['filepath']}`",
f"- Line: `{v['line_number']}`",
"", # Empty line before code block
"**Code**:",
"```python",
v['snippet'].rstrip(), # Remove trailing whitespace
"```"
]
if v['description']:
# Add empty line and description
snippet_text.extend(["", "**Description**:", v['description']])
snippets.append("\n".join(snippet_text))
return "\n\n".join(snippets)
if key == 'work_log':
if not values:
return ""
entries = [f"## {entry['timestamp']}\n{entry['event']}"
for entry in values]
return "\n\n".join(entries)
# For other types (lists), join with newlines
return "\n".join(str(v) for v in values)
优点:
- 多类别信息存储 (Multi-Category Storage):
- 将记忆划分为不同的“抽屉”,每个抽屉存放特定类型的信息,如:
- research_notes: 研究笔记,用于记录探索性信息。
- plans: 任务计划步骤。
- tasks: 需要执行的具体子任务。
- key_facts: 关于项目或任务的关键事实、约束条件。
- key_snippets: 重要的代码片段及其上下文。
- related_files: 与当前任务相关的文件列表。
- work_log: Agent 执行关键操作的时间戳日志。
- 将记忆划分为不同的“抽屉”,每个抽屉存放特定类型的信息,如:
- 结构化数据管理 (Structured Data):
- 使用 TypedDict 来定义了清晰的数据结构,如 SnippetInfo、PrioritizedNote 等,确保存入记忆的信息是规范和一致的。
- 优先级系统 (Priority System):
- 允许为某些记忆项(如笔记、事实、代码片段)赋予优先级(低、中、高、紧急)。这使得 Agent 能够区分信息的重要性。
- 有限记忆与淘汰机制 (Limited Memory & Eviction Policy):
- 通过 MEMORY_LIMITS 设定了每类记忆的最大容量。
- 当记忆超出限制时,_enforce_memory_limit 函数会启动淘汰机制:优先删除优先级最低、时间最久远的信息。这模拟了人类会忘记不重要或过时信息的特点,防止记忆无限膨胀。
- 工具接口 (Tool Interface):
- 所有对记忆的操作(增、删、改、查)都被封装成了 @tool 函数。这是为了让 LangChain 或类似的 Agent 框架能够将这些功能作为可调用的“工具”提供给大语言模型(LLM)。
- LLM 通过生成调用这些工具的代码(例如 emit_key_facts(…)),来间接操作自己的“记忆”。
缺点
- 非持久化 (Lack of Persistence):
- 这是最大的缺点。所有记忆都存储在内存中的一个全局变量里。一旦程序运行结束或崩溃,所有记忆都会丢失。Agent 无法“记住”上次任务的信息,也无法从中断处恢复。对于需要长时间运行或跨会话保持状态的任务,这是致命的。
- 全局状态带来的风险 (Risks of Global State):
- 全局变量是“万恶之源”。虽然在这里简化了设计,但在更复杂的系统中:
- 难以测试:测试函数时,你可能需要手动设置和清理 _global_memory 的状态,测试用例之间容易相互干扰。
- 可维护性差:代码库的任何地方都可能修改这个全局状态,使得追踪数据流和调试变得困难。
- 模块化差:所有功能都与这个特定的全局变量紧密耦合,很难将这个记忆系统作为一个独立的模块抽离或替换。
- 全局变量是“万恶之源”。虽然在这里简化了设计,但在更复杂的系统中:
- 非线程安全 (Not Thread-Safe):
- 如果未来系统需要支持多个 Agent 并行运行,或者一个 Agent 内部使用多线程处理任务,这个全局记忆系统会立刻引发竞态条件 (Race Conditions)。多个线程同时读写 _global_memory 会导致数据损坏和不可预测的行为。
- 缺少记忆压缩
- 过长的上下文会造成上下文污染