跳到內容

計劃-執行-驗證:Claude SDK 實作

計劃-執行-驗證(Plan-Execute-Verify, PEV)模式將關注點分離為三個專門的元件,每個元件都針對其角色進行優化。這種架構比簡單的 ReAct 迴圈強健得多,適合生產系統。

最適合:

  • 生產應用程式
  • 複雜的多步驟工作流程
  • 品質關鍵的操作
  • 需要可靠性的任務
  • 需要錯誤恢復的系統

過度設計的場景:

  • 簡單的線性工作流程
  • 快速原型
  • 學習練習
  • 少於 3 個步驟的任務

如果您只是在學習或建構簡單的工作流程,請參閱 react-pattern.md

graph TB
subgraph ControlLoop["控制迴圈"]
Step1[1. 計劃:建立帶有標準的結構化計劃]
Step2[2. 對於每個步驟:]
Step2a[a. 執行:執行步驟]
Step2b[b. 驗證:檢查輸出]
Step2c[c. 如果失敗:帶回饋重試或重新規劃]
Step3[3. 最終驗證:檢查整個計劃]
Step1 --> Step2
Step2 --> Step2a
Step2a --> Step2b
Step2b --> Step2c
Step2c --> Step2
Step2c --> Step3
end
subgraph Components["元件"]
Planner[規劃器<br/>Opus<br/><br/>建立詳細計劃]
Executor[執行器<br/>Sonnet<br/><br/>執行單一步驟]
Verifier[驗證器<br/>Haiku<br/><br/>檢查輸出品質]
end
ControlLoop --> Planner
ControlLoop --> Executor
ControlLoop --> Verifier

**目的:**建立詳細、結構化的計劃並設定驗收標準

**模型:**Claude Opus 4.6(最有能力進行規劃)

職責:

  • 理解高層級目標
  • 分解為具體、可測量的步驟
  • 為每個步驟定義驗收標準
  • 識別步驟之間的依賴關係
  • 規劃錯誤情況

**輸入:**使用者請求 + 上下文

**輸出:**結構化計劃(JSON)

**目的:**一次執行一個步驟,專注於品質

**模型:**Claude Sonnet 4.5(平衡速度/能力)

職責:

  • 僅執行當前步驟的操作
  • 使用先前步驟的上下文
  • 符合驗收標準
  • 報告信心水平
  • 記錄遇到的任何問題

**輸入:**步驟定義 + 先前結果 + 回饋

**輸出:**步驟結果 + 產物

**目的:**根據驗收標準驗證步驟輸出

**模型:**Claude Haiku 4.5(快速/便宜)+ 確定性檢查

職責:

  • 檢查每個驗收標準
  • 提供通過/失敗的證據
  • 決定:通過、重試或重新規劃
  • 提供改進的具體回饋
  • 追蹤品質指標

**輸入:**步驟定義 + 執行結果

**輸出:**驗證結果 + 回饋

讓我們逐步建構計劃-執行-驗證代理,在組裝它們之前先理解每個元件。

首先,我們需要資料模型來表示計劃、步驟和結果。以下是最重要的一個 - Step

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
class StepStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class Step:
"""A single step in the plan"""
step_id: str
name: str
description: str
actions: List[Dict[str, Any]]
acceptance_criteria: List[str] # 如何驗證成功
expected_outputs: List[str] # 要建立的產物
dependencies: List[str] = field(default_factory=list)
estimated_complexity: str = "medium"
status: StepStatus = StepStatus.PENDING
feedback: List[str] = field(default_factory=list)
retry_count: int = 0
def add_feedback(self, feedback: str):
"""從驗證中新增回饋"""
self.feedback.append(feedback)

關鍵設計元素:

  • 驗收標準 - 成功的可測量條件
  • 依賴關係 - 啟用獨立步驟的並行執行
  • 回饋迴圈 - 驗證器可以引導重試
  • 重試追蹤 - 知道何時放棄並重新規劃

我們還需要 PlanStepResultVerificationResult 和其他支援類別(請參閱下面的完整實作)。

規劃器使用 Opus 4.6 建立結構化計劃。系統提示至關重要:

PLANNER_SYSTEM_PROMPT = """You are an expert planning specialist.
Your job: Create detailed, structured plans for complex tasks.
Output format (strict JSON):
{
"goal": "High-level objective",
"context": {...},
"steps": [
{
"step_id": "1",
"name": "Brief step name",
"description": "Detailed description",
"actions": [{"tool": "tool_name", "params": {...}}],
"acceptance_criteria": [
"Specific, measurable criterion 1",
"Specific, measurable criterion 2"
],
"expected_outputs": ["artifact1", "artifact2"],
"dependencies": ["step_id_1"],
"estimated_complexity": "low|medium|high"
}
],
"rollback_strategy": "What to do if plan fails",
"success_criteria": ["Overall success criterion 1"]
}
Guidelines:
- Make acceptance criteria SPECIFIC and MEASURABLE
- Include dependencies between steps
- Think about failure cases
- Break complex steps into smaller ones
"""

為什麼有效:

  • 嚴格的 JSON 格式確保可解析的輸出
  • 具體/可測量的標準啟用自動驗證
  • 依賴關係允許並行執行
  • 複雜度估計有助於成本優化

步驟 3:規劃器 - 建立和重新規劃

Section titled “步驟 3:規劃器 - 建立和重新規劃”

規劃器有兩個主要方法:

import anthropic
import json
class PlannerComponent:
def __init__(self, api_key: str):
self.client = anthropic.Anthropic(api_key=api_key)
def create_plan(self, user_request: str, context: Dict[str, Any]) -> Plan:
"""建立初始計劃"""
prompt = f"""
Create a detailed plan for this request:
REQUEST: {user_request}
CONTEXT:
{json.dumps(context, indent=2)}
Provide a complete plan in JSON format.
"""
response = self.client.messages.create(
model="claude-opus-4-6", # 最有能力進行規劃
max_tokens=8000,
system=PLANNER_SYSTEM_PROMPT,
messages=[{"role": "user", "content": prompt}]
)
plan_json = self._extract_json(response.content[0].text)
return Plan.from_json(plan_json)

重新規劃在步驟嚴重失敗時發生(完整實作請參見下文)。

執行器使用 Sonnet 4.5 以平衡速度/能力:

EXECUTOR_SYSTEM_PROMPT = """You are a step execution specialist.
Your job: Execute ONE step at a time with high quality.
You receive:
1. Current step to execute (with acceptance criteria)
2. Full plan context
3. Results from previous steps
4. Any feedback from previous attempts
Response format (strict JSON):
{
"reasoning": "Why you're taking these specific actions",
"actions_taken": [...],
"step_output": {
"artifacts": ["files created"],
"findings": "What you discovered",
"meets_criteria": {"criterion_1": true}
},
"confidence": 0.95,
"notes": "Any concerns or issues"
}
Rules:
- Focus ONLY on the current step
- Be thorough - meet EVERY acceptance criterion
- Reference previous step outputs when needed
- Report any errors in notes
"""

關鍵特性:

  • 一次一個步驟(防止超前)
  • 透過信心分數進行自我評估
  • 用於驗證的結構化輸出

執行器從先前的步驟和回饋中建構上下文:

class ExecutorComponent:
def __init__(self, api_key: str):
self.client = anthropic.Anthropic(api_key=api_key)
self.results_cache: Dict[str, StepResult] = {}
def execute_step(self, step: Step, plan_context: Dict,
previous_results: Dict[str, StepResult]) -> StepResult:
"""執行單一步驟"""
# 從依賴關係建構上下文
prev_results_context = {}
for dep_id in step.dependencies:
if dep_id in previous_results:
result = previous_results[dep_id]
prev_results_context[dep_id] = {
"outputs": result.step_output,
"artifacts": result.artifacts
}
# 包含先前嘗試的回饋
feedback_context = ""
if step.feedback:
feedback_context = f"""
PREVIOUS ATTEMPTS FAILED VERIFICATION:
{chr(10).join(f"- {fb}" for fb in step.feedback)}
Please address these issues in this attempt.
"""
# 使用工具呼叫執行
tools = self._get_tools()
response = self.client.messages.create(
model="claude-sonnet-4-5",
max_tokens=8000,
system=EXECUTOR_SYSTEM_PROMPT,
tools=tools,
messages=[{"role": "user", "content": prompt}]
)
# 處理回應並回傳結果
# ... (請參閱完整實作)

執行流程:

  1. 從已完成的依賴關係收集上下文
  2. 包含失敗嘗試的回饋
  3. 使用工具呼叫執行
  4. 回傳結構化結果

驗證器使用快速確定性檢查,然後使用 Haiku 4.5 進行語義驗證:

class VerifierComponent:
def __init__(self, api_key: str):
self.client = anthropic.Anthropic(api_key=api_key)
self.deterministic = DeterministicChecker()
def verify(self, step: Step, result: StepResult) -> VerificationResult:
"""
兩階段驗證:
1. 快速確定性檢查(檔案存在、大小 > 0)
2. LLM 語義驗證(符合標準)
"""
# 階段 1:快速檢查
det_result = self.deterministic.verify(step, result)
if not det_result["passed"]:
return VerificationResult(
overall_pass=False,
action=VerificationAction.RETRY,
feedback_for_executor=det_result["feedback"],
# ... (請參閱完整實作)
)
# 階段 2:使用 Haiku 進行語義驗證
return self._llm_verify(step, result)

確定性檢查器(快速,無 LLM 呼叫):

class DeterministicChecker:
"""快速、基於規則的驗證檢查"""
def verify(self, step: Step, result: StepResult) -> Dict:
"""執行確定性檢查"""
# 檢查產物是否存在
for artifact in result.artifacts:
if not os.path.exists(artifact):
return {
"passed": False,
"evidence": f"Artifact {artifact} not found",
"feedback": f"File {artifact} was not created."
}
# 檢查檔案大小
for artifact in result.artifacts:
if artifact.endswith('.md'):
size = os.path.getsize(artifact)
if size < 100:
return {"passed": False, "feedback": f"File too small"}
# 檢查信心水平
if result.confidence < 0.5:
return {"passed": False, "feedback": "Low confidence"}
return {"passed": True}

為什麼兩個階段:

  • 確定性檢查立即捕獲明顯的失敗(無 API 成本)
  • LLM 檢查驗證語義品質和驗收標準

最後,主代理編排一切:

class LegalReviewAgent:
"""編排計劃-執行-驗證迴圈的主代理"""
def __init__(self, api_key: str):
self.planner = PlannerComponent(api_key)
self.executor = ExecutorComponent(api_key)
self.verifier = VerifierComponent(api_key)
def run(self, user_request: str, folder_path: str) -> Optional[Plan]:
"""
主控制迴圈:
1. 計劃:建立結構化計劃
2. 對於每個步驟:執行 → 驗證(帶重試/重新規劃)
3. 最終驗證
"""
# 階段 1:規劃
context = {"folder_path": folder_path}
plan = self.planner.create_plan(user_request, context)
# 階段 2:帶驗證的執行
for step in plan.steps:
success = self._execute_step_with_verification(step, plan)
if not success:
# 重新規劃並重新開始
plan = self.planner.replan(plan, step, verification)
return self.run(user_request, folder_path)
# 階段 3:最終驗證
final_check = self.verifier.verify_entire_plan(plan)
return plan if final_check.overall_pass else None

重試邏輯(每個步驟最多 3 次嘗試):

def _execute_step_with_verification(self, step: Step, plan: Plan) -> bool:
"""使用重試/重新規劃邏輯執行步驟"""
max_retries = 3
for attempt in range(1, max_retries + 1):
step.status = StepStatus.IN_PROGRESS
# 執行
result = self.executor.execute_step(
step, plan.context, self.executor.results_cache
)
# 驗證
verification = self.verifier.verify(step, result)
if verification.overall_pass:
step.status = StepStatus.COMPLETE
return True
elif verification.action == VerificationAction.RETRY:
if attempt < max_retries:
step.add_feedback(verification.feedback_for_executor)
step.retry_count += 1
else:
step.status = StepStatus.FAILED
return False
else: # REPLAN
step.status = StepStatus.FAILED
return False
return False

控制流程:

  1. 計劃 → 執行每個步驟 → 驗證
  2. 如果驗證失敗:帶回饋重試(最多 3 次)
  3. 如果仍然失敗:重新規劃整個工作流程
  4. 最終驗證確保滿足所有標準

以下是結合所有元件的完整可執行程式碼: Here’s the full working code combining all components:

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
import anthropic
import json
import os
# ============================================================================
# DATA MODELS
# ============================================================================
class StepStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETE = "complete"
FAILED = "failed"
class VerificationAction(Enum):
PASS = "pass"
RETRY = "retry"
REPLAN = "replan"
@dataclass
class Step:
"""A single step in the plan"""
step_id: str
name: str
description: str
actions: List[Dict[str, Any]]
acceptance_criteria: List[str]
expected_outputs: List[str]
dependencies: List[str] = field(default_factory=list)
estimated_complexity: str = "medium"
status: StepStatus = StepStatus.PENDING
feedback: List[str] = field(default_factory=list)
retry_count: int = 0
def add_feedback(self, feedback: str):
self.feedback.append(feedback)
def to_dict(self) -> Dict:
return {
"step_id": self.step_id,
"name": self.name,
"description": self.description,
"actions": self.actions,
"acceptance_criteria": self.acceptance_criteria,
"expected_outputs": self.expected_outputs,
"dependencies": self.dependencies,
"estimated_complexity": self.estimated_complexity
}
@dataclass
class Plan:
"""Complete plan with context"""
goal: str
context: Dict[str, Any]
steps: List[Step]
success_criteria: List[str]
rollback_strategy: str = ""
@classmethod
def from_json(cls, data: Dict) -> 'Plan':
steps = [
Step(
step_id=s["step_id"],
name=s["name"],
description=s["description"],
actions=s["actions"],
acceptance_criteria=s["acceptance_criteria"],
expected_outputs=s["expected_outputs"],
dependencies=s.get("dependencies", []),
estimated_complexity=s.get("estimated_complexity", "medium")
)
for s in data["steps"]
]
return cls(
goal=data["goal"],
context=data["context"],
steps=steps,
success_criteria=data["success_criteria"],
rollback_strategy=data.get("rollback_strategy", "")
)
def get_all_artifacts(self) -> List[str]:
"""Get all artifacts produced by completed steps"""
artifacts = []
for step in self.steps:
if step.status == StepStatus.COMPLETE:
artifacts.extend(step.expected_outputs)
return artifacts
@dataclass
class StepResult:
"""Result from executing a step"""
step_id: str
reasoning: str
actions_taken: List[Dict[str, Any]]
step_output: Dict[str, Any]
confidence: float
notes: str
artifacts: List[str] = field(default_factory=list)
def to_dict(self) -> Dict:
return {
"step_id": self.step_id,
"reasoning": self.reasoning,
"actions_taken": self.actions_taken,
"step_output": self.step_output,
"confidence": self.confidence,
"notes": self.notes,
"artifacts": self.artifacts
}
@dataclass
class CriterionResult:
"""Result for a single acceptance criterion"""
criterion: str
passed: bool
evidence: str
confidence: float
feedback: Optional[str] = None
@dataclass
class VerificationResult:
"""Result from verifying a step"""
overall_pass: bool
criteria_results: List[CriterionResult]
action: VerificationAction
feedback_for_executor: Optional[str]
severity: Optional[str]
@classmethod
def from_json(cls, data: Dict) -> 'VerificationResult':
criteria_results = [
CriterionResult(
criterion=c["criterion"],
passed=c["passed"],
evidence=c["evidence"],
confidence=c["confidence"],
feedback=c.get("feedback")
)
for c in data["criteria_results"]
]
return cls(
overall_pass=data["overall_pass"],
criteria_results=criteria_results,
action=VerificationAction(data["action"]),
feedback_for_executor=data.get("feedback_for_executor"),
severity=data.get("severity")
)
# ============================================================================
# PLANNER COMPONENT
# ============================================================================
PLANNER_SYSTEM_PROMPT = """You are an expert planning specialist.
Your job: Create detailed, structured plans for complex tasks.
Output format (strict JSON):
{
"goal": "High-level objective",
"context": {
"key": "value",
"any": "relevant context"
},
"steps": [
{
"step_id": "1",
"name": "Brief step name",
"description": "Detailed description of what to do",
"actions": [
{"tool": "tool_name", "params": {"param": "value"}}
],
"acceptance_criteria": [
"Specific, measurable criterion 1",
"Specific, measurable criterion 2"
],
"expected_outputs": ["artifact1", "artifact2"],
"dependencies": ["step_id_1", "step_id_2"],
"estimated_complexity": "low|medium|high"
}
],
"rollback_strategy": "What to do if plan fails",
"success_criteria": [
"Overall success criterion 1",
"Overall success criterion 2"
]
}
Guidelines:
- Make acceptance criteria SPECIFIC and MEASURABLE
- Include dependencies between steps
- Think about failure cases
- Make criteria verifiable by code or small model
- Break complex steps into smaller ones
- Use "low" complexity for simple file operations, "high" for complex analysis
"""
class PlannerComponent:
def __init__(self, api_key: str):
self.client = anthropic.Anthropic(api_key=api_key)
def create_plan(self, user_request: str, context: Dict[str, Any]) -> Plan:
"""Create initial plan"""
prompt = f"""
Create a detailed plan for this request:
REQUEST: {user_request}
CONTEXT:
{json.dumps(context, indent=2)}
Provide a complete plan in JSON format.
"""
response = self.client.messages.create(
model="claude-opus-4-6",
max_tokens=8000,
system=PLANNER_SYSTEM_PROMPT,
messages=[{"role": "user", "content": prompt}]
)
plan_json = self._extract_json(response.content[0].text)
return Plan.from_json(plan_json)
def replan(self, original_plan: Plan, failed_step: Step,
verification: VerificationResult) -> Plan:
"""Create new plan after critical failure"""
completed_steps = [s for s in original_plan.steps if s.status == StepStatus.COMPLETE]
completed_summary = "\n".join([
f"- {s.step_id}: {s.name} (✓)"
for s in completed_steps
])
prompt = f"""
The original plan failed critically. Create a NEW plan.
ORIGINAL PLAN:
{json.dumps(original_plan.__dict__, default=str, indent=2)}
FAILED STEP:
{json.dumps(failed_step.to_dict(), indent=2)}
FAILURE REASON:
{verification.feedback_for_executor}
COMPLETED STEPS:
{completed_summary}
Requirements for new plan:
1. Address the root cause of failure
2. Reuse completed work where possible
3. Adjust approach based on what we learned
4. Add more robust acceptance criteria
5. Consider alternative strategies
Provide the new plan in JSON format.
"""
response = self.client.messages.create(
model="claude-opus-4-6",
max_tokens=8000,
system=PLANNER_SYSTEM_PROMPT + "\n\nYou are REPLANNING after a failure.",
messages=[{"role": "user", "content": prompt}]
)
plan_json = self._extract_json(response.content[0].text)
return Plan.from_json(plan_json)
def _extract_json(self, text: str) -> Dict:
"""Extract JSON from LLM response"""
if "```json" in text:
start = text.find("```json") + 7
end = text.find("```", start)
json_str = text[start:end].strip()
elif "```" in text:
start = text.find("```") + 3
end = text.find("```", start)
json_str = text[start:end].strip()
else:
json_str = text.strip()
return json.loads(json_str)
# ============================================================================
# EXECUTOR COMPONENT
# ============================================================================
EXECUTOR_SYSTEM_PROMPT = """You are a step execution specialist.
Your job: Execute ONE step at a time with high quality.
You receive:
1. Current step to execute (with acceptance criteria)
2. Full plan context
3. Results from previous steps
4. Any feedback from previous attempts
Response format (strict JSON):
{
"reasoning": "Why you're taking these specific actions",
"actions_taken": [
{"tool": "tool_name", "params": {...}, "result": "what happened"}
],
"step_output": {
"artifacts": ["files created", "data extracted"],
"findings": "What you discovered",
"meets_criteria": {
"criterion_1": true,
"criterion_2": true
}
},
"confidence": 0.95,
"notes": "Any concerns, edge cases, or issues"
}
Rules:
- Focus ONLY on the current step
- Be thorough - meet EVERY acceptance criterion
- If you can't meet a criterion, explain in notes
- Reference previous step outputs when needed
- Report any errors in notes
- Be specific in your outputs
"""
class ExecutorComponent:
def __init__(self, api_key: str):
self.client = anthropic.Anthropic(api_key=api_key)
self.results_cache: Dict[str, StepResult] = {}
def execute_step(self, step: Step, plan_context: Dict,
previous_results: Dict[str, StepResult]) -> StepResult:
"""Execute a single step"""
# Build feedback context if step has been retried
feedback_context = ""
if step.feedback:
feedback_context = f"""
PREVIOUS ATTEMPTS FAILED VERIFICATION:
{chr(10).join(f"- {fb}" for fb in step.feedback)}
Please address these issues in this attempt.
"""
# Build previous results context
prev_results_context = {}
for dep_id in step.dependencies:
if dep_id in previous_results:
result = previous_results[dep_id]
prev_results_context[dep_id] = {
"outputs": result.step_output,
"artifacts": result.artifacts
}
prompt = f"""
Execute this step:
STEP: {step.name}
DESCRIPTION: {step.description}
ACTIONS TO TAKE:
{json.dumps(step.actions, indent=2)}
ACCEPTANCE CRITERIA (you MUST meet these):
{json.dumps(step.acceptance_criteria, indent=2)}
EXPECTED OUTPUTS:
{json.dumps(step.expected_outputs, indent=2)}
PREVIOUS STEP RESULTS:
{json.dumps(prev_results_context, indent=2)}
PLAN CONTEXT:
{json.dumps(plan_context, indent=2)}
{feedback_context}
Execute the step and provide results in JSON format.
"""
# Use tool calling for actual execution
tools = self._get_tools()
response = self.client.messages.create(
model="claude-sonnet-4-5",
max_tokens=8000,
system=EXECUTOR_SYSTEM_PROMPT,
tools=tools,
messages=[{"role": "user", "content": prompt}]
)
# Process response and execute tools
result = self._process_response(response, step)
# Cache result
self.results_cache[step.step_id] = result
return result
def _get_tools(self) -> List[Dict]:
"""Define available tools"""
return [
{
"name": "read_file",
"description": "Read a file from disk",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to read"}
},
"required": ["path"]
}
},
{
"name": "write_file",
"description": "Write content to a file",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to write"},
"content": {"type": "string", "description": "Content to write"}
},
"required": ["path", "content"]
}
},
{
"name": "list_files",
"description": "List files in a directory",
"input_schema": {
"type": "object",
"properties": {
"folder": {"type": "string", "description": "Folder path"}
},
"required": ["folder"]
}
}
]
def _process_response(self, response, step: Step) -> StepResult:
"""Process LLM response and execute tools"""
actions_taken = []
artifacts = []
# Execute any tool calls
for block in response.content:
if block.type == "tool_use":
tool_result = self._execute_tool(block.name, block.input)
actions_taken.append({
"tool": block.name,
"params": block.input,
"result": tool_result
})
# Track artifacts (files created)
if block.name == "write_file":
artifacts.append(block.input["path"])
# Extract structured output from final text block
text_content = next((b.text for b in response.content if hasattr(b, 'text')), "{}")
try:
output_json = self._extract_json(text_content)
except:
output_json = {
"reasoning": "Executed actions",
"step_output": {},
"confidence": 0.8,
"notes": ""
}
return StepResult(
step_id=step.step_id,
reasoning=output_json.get("reasoning", ""),
actions_taken=actions_taken,
step_output=output_json.get("step_output", {}),
confidence=output_json.get("confidence", 0.8),
notes=output_json.get("notes", ""),
artifacts=artifacts
)
def _execute_tool(self, tool_name: str, params: Dict) -> str:
"""Execute a tool and return result"""
try:
if tool_name == "read_file":
with open(params["path"], 'r') as f:
content = f.read()
return f"Success: Read {len(content)} characters"
elif tool_name == "write_file":
os.makedirs(os.path.dirname(params["path"]) or ".", exist_ok=True)
with open(params["path"], 'w') as f:
f.write(params["content"])
return f"Success: Wrote {len(params['content'])} characters"
elif tool_name == "list_files":
files = os.listdir(params["folder"])
return f"Success: Found {len(files)} files"
else:
return f"Error: Unknown tool {tool_name}"
except Exception as e:
return f"Error: {str(e)}"
def _extract_json(self, text: str) -> Dict:
"""Extract JSON from text"""
if "```json" in text:
start = text.find("```json") + 7
end = text.find("```", start)
json_str = text[start:end].strip()
elif "```" in text:
start = text.find("```") + 3
end = text.find("```", start)
json_str = text[start:end].strip()
else:
json_str = text.strip()
return json.loads(json_str)
# ============================================================================
# VERIFIER COMPONENT
# ============================================================================
VERIFIER_SYSTEM_PROMPT = """You are a verification specialist.
Your job: Objectively verify if a step was executed correctly.
You receive:
1. Step definition with acceptance criteria
2. Executor's output and artifacts
3. Access to verify artifacts
Response format (strict JSON):
{
"overall_pass": true/false,
"criteria_results": [
{
"criterion": "All documents identified",
"passed": true,
"evidence": "Found 3 documents in output",
"confidence": 0.99
},
{
"criterion": "Each issue has location reference",
"passed": false,
"evidence": "Issue #2 lacks location",
"confidence": 0.95,
"feedback": "Add section reference to issue about payment terms"
}
],
"action": "pass|retry|replan",
"feedback_for_executor": "Specific actionable feedback if retry",
"severity": "minor|major|critical"
}
Verification rules:
- Be strict but fair
- Provide specific evidence for each criterion
- "retry" = fixable with feedback (max 3 attempts)
- "replan" = fundamental issue, need different approach
- Give actionable feedback, not vague criticism
- Minor = cosmetic issues, major = missing requirements, critical = wrong approach
"""
class VerifierComponent:
def __init__(self, api_key: str):
self.client = anthropic.Anthropic(api_key=api_key)
self.deterministic = DeterministicChecker()
def verify(self, step: Step, result: StepResult) -> VerificationResult:
"""
Two-stage verification:
1. Fast deterministic checks
2. LLM semantic verification
"""
# Stage 1: Deterministic checks
det_result = self.deterministic.verify(step, result)
if not det_result["passed"]:
return VerificationResult(
overall_pass=False,
criteria_results=[
CriterionResult(
criterion="Deterministic checks",
passed=False,
evidence=det_result["evidence"],
confidence=1.0,
feedback=det_result["feedback"]
)
],
action=VerificationAction.RETRY,
feedback_for_executor=det_result["feedback"],
severity="major"
)
# Stage 2: LLM semantic verification
return self._llm_verify(step, result)
def _llm_verify(self, step: Step, result: StepResult) -> VerificationResult:
"""Semantic verification with LLM"""
# Read artifacts if any
artifacts_content = {}
for artifact in result.artifacts:
if os.path.exists(artifact):
with open(artifact, 'r') as f:
artifacts_content[artifact] = f.read()[:5000]
prompt = f"""
Verify this step execution:
STEP DEFINITION:
{json.dumps(step.to_dict(), indent=2)}
EXECUTOR OUTPUT:
{json.dumps(result.to_dict(), indent=2)}
ARTIFACTS CONTENT:
{json.dumps(artifacts_content, indent=2)}
Check each acceptance criterion carefully. Be objective and strict.
Provide verification result in JSON format.
"""
response = self.client.messages.create(
model="claude-haiku-4-5",
max_tokens=4000,
system=VERIFIER_SYSTEM_PROMPT,
messages=[{"role": "user", "content": prompt}]
)
verification_json = self._extract_json(response.content[0].text)
return VerificationResult.from_json(verification_json)
def verify_entire_plan(self, plan: Plan) -> VerificationResult:
"""Final validation of complete plan"""
# Check all steps completed
incomplete = [s for s in plan.steps if s.status != StepStatus.COMPLETE]
if incomplete:
return VerificationResult(
overall_pass=False,
criteria_results=[
CriterionResult(
criterion="All steps complete",
passed=False,
evidence=f"{len(incomplete)} steps incomplete",
confidence=1.0
)
],
action=VerificationAction.REPLAN,
feedback_for_executor="Not all steps completed",
severity="critical"
)
# Check success criteria
artifacts = plan.get_all_artifacts()
criteria_results = []
for criterion in plan.success_criteria:
passed = any(artifact in criterion.lower() or
criterion.lower() in artifact.lower()
for artifact in artifacts)
criteria_results.append(
CriterionResult(
criterion=criterion,
passed=passed,
evidence=f"Artifacts: {artifacts}",
confidence=0.9
)
)
overall_pass = all(cr.passed for cr in criteria_results)
return VerificationResult(
overall_pass=overall_pass,
criteria_results=criteria_results,
action=VerificationAction.PASS if overall_pass else VerificationAction.REPLAN,
feedback_for_executor=None if overall_pass else "Success criteria not met",
severity=None if overall_pass else "major"
)
def _extract_json(self, text: str) -> Dict:
"""Extract JSON from text"""
if "```json" in text:
start = text.find("```json") + 7
end = text.find("```", start)
json_str = text[start:end].strip()
elif "```" in text:
start = text.find("```") + 3
end = text.find("```", start)
json_str = text[start:end].strip()
else:
json_str = text.strip()
return json.loads(json_str)
class DeterministicChecker:
"""Fast, rule-based verification checks"""
def verify(self, step: Step, result: StepResult) -> Dict:
"""Run deterministic checks"""
# Check artifacts exist
for artifact in result.artifacts:
if not os.path.exists(artifact):
return {
"passed": False,
"evidence": f"Artifact {artifact} not found",
"feedback": f"File {artifact} was not created. Verify write_file succeeded."
}
# Check file sizes for written files
for artifact in result.artifacts:
if artifact.endswith('.md'):
size = os.path.getsize(artifact)
if size < 100:
return {
"passed": False,
"evidence": f"File {artifact} is only {size} bytes",
"feedback": f"File {artifact} is too small ({size} bytes). Ensure complete content was written."
}
# Check confidence level
if result.confidence < 0.5:
return {
"passed": False,
"evidence": f"Executor confidence only {result.confidence:.2%}",
"feedback": "Low executor confidence. Review the step execution."
}
return {"passed": True}
# ============================================================================
# MAIN CONTROL LOOP
# ============================================================================
class LegalReviewAgent:
"""Main agent orchestrating Plan-Execute-Verify loop"""
def __init__(self, api_key: str):
self.planner = PlannerComponent(api_key)
self.executor = ExecutorComponent(api_key)
self.verifier = VerifierComponent(api_key)
def run(self, user_request: str, folder_path: str) -> Optional[Plan]:
"""
Main control loop:
1. PLAN: Create structured plan
2. For each step: EXECUTE → VERIFY (with retry/replan)
3. FINAL VALIDATION
"""
print("=" * 60)
print("🏗️ PHASE 1: PLANNING")
print("=" * 60)
# Create initial plan
context = {"folder_path": folder_path}
plan = self.planner.create_plan(user_request, context)
self._display_plan(plan)
print("\n" + "=" * 60)
print("⚙️ PHASE 2: EXECUTION WITH VERIFICATION")
print("=" * 60)
# Execute each step with verification
for step in plan.steps:
success = self._execute_step_with_verification(step, plan)
if not success:
print(f"\n❌ Step {step.step_id} failed critically, replanning...")
verification = VerificationResult(
overall_pass=False,
criteria_results=[],
action=VerificationAction.REPLAN,
feedback_for_executor=f"Step {step.step_id} failed after retries",
severity="critical"
)
plan = self.planner.replan(plan, step, verification)
return self.run(user_request, folder_path)
print("\n" + "=" * 60)
print("🎯 PHASE 3: FINAL VALIDATION")
print("=" * 60)
# Final validation
final_check = self.verifier.verify_entire_plan(plan)
if final_check.overall_pass:
print("✅ Task completed successfully!")
print(f"\n📄 Deliverables:")
for artifact in plan.get_all_artifacts():
print(f" - {artifact}")
return plan
else:
print("❌ Final validation failed:")
print(f" {final_check.feedback_for_executor}")
return None
def _execute_step_with_verification(self, step: Step, plan: Plan) -> bool:
"""Execute a step with retry/replan logic"""
print(f"\n▶️ Step {step.step_id}: {step.name}")
max_retries = 3
for attempt in range(1, max_retries + 1):
print(f" 🔨 Attempt {attempt}/{max_retries}...")
step.status = StepStatus.IN_PROGRESS
# EXECUTE
result = self.executor.execute_step(
step,
plan.context,
self.executor.results_cache
)
print(f" 📊 Confidence: {result.confidence:.2%}")
# VERIFY
print(f" 🔍 Verifying...")
verification = self.verifier.verify(step, result)
if verification.overall_pass:
print(f" ✅ Passed verification")
step.status = StepStatus.COMPLETE
return True
elif verification.action == VerificationAction.RETRY:
print(f" ⚠️ Failed verification ({verification.severity})")
print(f" 💬 {verification.feedback_for_executor}")
if attempt < max_retries:
step.add_feedback(verification.feedback_for_executor)
step.retry_count += 1
else:
print(f" ❌ Max retries reached")
step.status = StepStatus.FAILED
return False
else: # REPLAN
print(f" 🔄 Critical issue, needs replanning")
step.status = StepStatus.FAILED
return False
return False
def _display_plan(self, plan: Plan):
"""Display plan to user"""
print(f"\n📋 Goal: {plan.goal}")
print(f"\nSteps ({len(plan.steps)}):")
for step in plan.steps:
deps = f" (depends on: {', '.join(step.dependencies)})" if step.dependencies else ""
print(f" {step.step_id}. {step.name} [{step.estimated_complexity}]{deps}")
print(f"\nSuccess Criteria:")
for criterion in plan.success_criteria:
print(f" - {criterion}")
# Usage
if __name__ == "__main__":
api_key = os.environ.get("ANTHROPIC_API_KEY")
agent = LegalReviewAgent(api_key)
result = agent.run(
user_request="Review all legal documents and create comprehensive reports",
folder_path="/project/legal_docs"
)

What You Get:

  • ~1300 lines of production-ready code
  • Separation of concerns (Plan/Execute/Verify)
  • Built-in error recovery and quality assurance
  • Observable, testable architecture

完整的法律審查範例請參閱 ai-agent-case-study.md

============================================================
🏗️ PHASE 1: PLANNING
============================================================
🤖 Planner (Opus 4.6): Analyzing request and creating plan...
📋 Goal: Comprehensive legal review with quality assurance
Steps (4):
1. Scan and categorize documents [low]
2. Deep review of each document [high]
3. Generate legal notices document [medium]
4. Generate executive summary [low]
Success Criteria:
- All documents reviewed with findings
- LEGAL_NOTICES.md created with detailed findings
- REVIEW_SUMMARY.md created with status
============================================================
⚙️ PHASE 2: EXECUTION WITH VERIFICATION
============================================================
▶️ Step 1: Scan and categorize documents
🔨 Attempt 1/3...
📊 Confidence: 99.00%
🔍 Verifying...
✅ Passed verification
▶️ Step 2: Deep review of each document
🔨 Attempt 1/3...
📊 Confidence: 92.00%
🔍 Verifying...
⚠️ Failed verification (minor)
💬 Issue 'Vague payment terms' missing recommendation field
🔨 Attempt 2/3...
📊 Confidence: 96.00%
🔍 Verifying...
✅ Passed verification
▶️ Step 3: Generate legal notices document
🔨 Attempt 1/3...
📊 Confidence: 97.00%
🔍 Verifying...
✅ Passed verification
▶️ Step 4: Generate executive summary
🔨 Attempt 1/3...
📊 Confidence: 98.00%
🔍 Verifying...
✅ Passed verification
============================================================
🎯 PHASE 3: FINAL VALIDATION
============================================================
✅ Task completed successfully!
📄 Deliverables:
- LEGAL_NOTICES.md
- REVIEW_SUMMARY.md
功能ReAct 模式計劃-執行-驗證
品質保證多階段驗證
錯誤恢復手動/以相同方式重試帶回饋的智能重試 + 重新規劃
透明度一次一個動作結構化計劃 + 進度追蹤
成本效率可能浪費代幣有針對性的重試、便宜的驗證
並行化僅順序可以並行化獨立步驟
可觀測性基本日誌每個元件的豐富指標
生產就緒❌ 否✅ 是
class ObservabilityMiddleware:
"""Track metrics for monitoring"""
def __init__(self):
self.metrics = []
def log_plan(self, plan: Plan):
self.metrics.append({
"event": "plan_created",
"num_steps": len(plan.steps),
"complexity": sum(
{"low": 1, "medium": 2, "high": 3}[s.estimated_complexity]
for s in plan.steps
)
})
def log_step_execution(self, step: Step, result: StepResult, verification: VerificationResult):
self.metrics.append({
"event": "step_completed",
"step_id": step.step_id,
"retries": step.retry_count,
"confidence": result.confidence,
"passed_verification": verification.overall_pass,
"verification_action": verification.action.value
})
def log_final_result(self, success: bool, plan: Plan):
self.metrics.append({
"event": "task_completed",
"success": success,
"total_steps": len(plan.steps),
"total_retries": sum(s.retry_count for s in plan.steps)
})
import hashlib
class ResultCache:
"""Cache step results for identical executions"""
def __init__(self):
self.cache = {}
def get_cache_key(self, step: Step) -> str:
"""Generate cache key from step definition"""
step_str = json.dumps(step.to_dict(), sort_keys=True)
return hashlib.sha256(step_str.encode()).hexdigest()
def get(self, step: Step) -> Optional[StepResult]:
key = self.get_cache_key(step)
return self.cache.get(key)
def set(self, step: Step, result: StepResult):
key = self.get_cache_key(step)
self.cache[key] = result
class HumanReviewer:
"""Allow human approval at critical points"""
def approve_plan(self, plan: Plan) -> bool:
"""Let human review plan before execution"""
print("\n" + "=" * 60)
print("PLAN APPROVAL REQUIRED")
print("=" * 60)
self._display_plan(plan)
response = input("\nApprove this plan? (y/n): ")
return response.lower() == 'y'
def approve_critical_step(self, step: Step, result: StepResult) -> bool:
"""Review critical steps before proceeding"""
if step.estimated_complexity == "high":
print(f"\n⚠️ Critical step completed: {step.name}")
print(f"Confidence: {result.confidence:.2%}")
print(f"Artifacts: {result.artifacts}")
response = input("\nProceed? (y/n): ")
return response.lower() == 'y'
return True
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ParallelExecutor:
"""Execute independent steps in parallel"""
def __init__(self, executor: ExecutorComponent, max_workers: int = 3):
self.executor = executor
self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
async def execute_steps_parallel(self, steps: List[Step], plan: Plan) -> Dict[str, StepResult]:
"""Execute steps with no dependencies in parallel"""
# Group steps by dependency level
levels = self._topological_sort(steps)
results = {}
for level_steps in levels:
# Execute all steps in this level concurrently
tasks = [
asyncio.get_event_loop().run_in_executor(
self.thread_pool,
self.executor.execute_step,
step,
plan.context,
results
)
for step in level_steps
]
level_results = await asyncio.gather(*tasks)
for step, result in zip(level_steps, level_results):
results[step.step_id] = result
return results
def _topological_sort(self, steps: List[Step]) -> List[List[Step]]:
"""Group steps by dependency level for parallel execution"""
levels = []
remaining = set(s.step_id for s in steps)
step_map = {s.step_id: s for s in steps}
while remaining:
# Find steps with no unsatisfied dependencies
ready = [
step_id for step_id in remaining
if all(dep not in remaining for dep in step_map[step_id].dependencies)
]
if not ready:
raise ValueError("Circular dependency detected")
levels.append([step_map[sid] for sid in ready])
remaining -= set(ready)
return levels
class CostOptimizer:
"""Optimize API costs"""
MODEL_COSTS = {
"claude-opus-4-6": {"input": 15.00, "output": 75.00}, # per 1M tokens
"claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
"claude-haiku-4-5": {"input": 0.80, "output": 4.00}
}
def estimate_plan_cost(self, plan: Plan) -> float:
"""Estimate cost before execution"""
# Planning: 1 Opus call with ~2k input, ~3k output
planning_cost = (
(2000 / 1_000_000) * self.MODEL_COSTS["claude-opus-4-6"]["input"] +
(3000 / 1_000_000) * self.MODEL_COSTS["claude-opus-4-6"]["output"]
)
# Execution: Sonnet for each step
execution_cost = 0
for step in plan.steps:
complexity_tokens = {
"low": (500, 500),
"medium": (1500, 1500),
"high": (3000, 3000)
}
input_tokens, output_tokens = complexity_tokens[step.estimated_complexity]
execution_cost += (
(input_tokens / 1_000_000) * self.MODEL_COSTS["claude-sonnet-4-5"]["input"] +
(output_tokens / 1_000_000) * self.MODEL_COSTS["claude-sonnet-4-5"]["output"]
)
# Verification: Haiku for each step
verification_cost = len(plan.steps) * (
(800 / 1_000_000) * self.MODEL_COSTS["claude-haiku-4-5"]["input"] +
(400 / 1_000_000) * self.MODEL_COSTS["claude-haiku-4-5"]["output"]
)
total = planning_cost + execution_cost + verification_cost
print(f"\n💰 Estimated Cost: ${total:.4f}")
print(f" Planning: ${planning_cost:.4f}")
print(f" Execution: ${execution_cost:.4f}")
print(f" Verification: ${verification_cost:.4f}")
return total
  • 建構生產系統
  • 品質和可靠性至關重要
  • 複雜的多步驟工作流程(5+ 步驟)
  • 昂貴或不可逆的操作
  • 需要錯誤恢復和韌性
  • 需要可觀測性和指標
  • 有時間進行適當的架構設計
  • 簡單的線性工作流程(< 3 步驟)
  • 快速原型或 MVP
  • 學習 AI 代理基礎
  • 上市時間至關重要
  • 開銷無法被複雜性證明合理

ReAct 開始學習,在生產中使用 PEV。

計劃-執行-驗證模式提供:

  • 關注點分離:每個元件都專門化
  • 品質保證:內建驗證
  • 錯誤恢復:智能重試和重新規劃
  • 可觀測性:豐富的指標和日誌
  • 成本效率:為每個任務使用正確的模型
  • 生產就緒:經過實戰考驗的架構

這種架構可以從簡單的文件處理擴展到複雜的多代理系統。