The Convergence of Three Paradigms
Imagine orchestrating multiple specialized AI agents working in perfect harmony to solve complex, multi-step problems. Each agent possesses unique expertise — from data analysis to strategic planning to implementation. The challenge? Coordinating their efforts reliably at scale while maintaining enterprise-grade resilience.
As we started automating a business process with AI agents, we hit a critical gap. Standard frameworks could manage agent-to-agent workflows, but real-world operations need to interact with many external systems, humans. This required a more robust solution: a durable workflow engine.
This is where three powerful paradigms converge: Google’s Agent Development Kit (ADK) for building sophisticated AI agents, the Agent-to-Agent (A2A) protocol for seamless inter-agent communication, and Temporal’s durable execution engine for orchestrating complex workflows.
Let me walk you through the lessons learned, the architecture we designed, and the best practices that emerged from this journey.
Part I: The Foundation — Google ADK
Beyond Simple Prompts: The ADK Agent Paradigm
The Google Agent Development Kit isn’t just another LLM wrapper — it’s a comprehensive framework for building production-ready AI agents. At its core, ADK treats agents as first-class software components with well-defined capabilities, state management, and tool integration.
Press enter or click to view image in full size
The ADK agent paradigm treats each agent as a sophisticated software component with specific capabilities:
# Example: Specialized Analysis Agent Pattern
from google.adk.agents import Agent
from google.genai import types as genai_typesspecialist_agent = Agent(
name="domain_specialist",
model="gemini-2.5-pro", # Model selection based on task requirements
instruction="""
You are a domain expert with specialized knowledge. Your capabilities:
1. Analyze complex domain-specific patterns
2. Identify relationships and dependencies
3. Extract key insights and metadata
4. Generate structured recommendations
Provide analysis in structured JSON format with actionable insights.
""",
tools=[analysis_tool, extraction_tool],
generate_content_config=genai_types.GenerateContentConfig(
temperature=0.1, # Low temperature for consistency
max_output_tokens=4096
)
)
The Tool Ecosystem: Extending Agent Capabilities
ADK’s tool system transforms agents from conversational interfaces into action-oriented systems. Each tool provides specific capabilities while maintaining type safety and error handling:
from google.adk.tools import FunctionTool
from google.adk.tools.context import ToolContextdef domain_analysis_tool(
input_content: str,
analysis_type: str,
tool_context: ToolContext
) -> dict:
"""
Performs domain-specific analysis with context preservation.
"""
try:
# Parse input using domain-specific logic
parser = DomainParser()
parsed_structure = parser.parse(input_content)# Extract insights using specialized algorithms
analysis_result = {
"status": "success",
"structure_analysis": extract_structure(parsed_structure),
"pattern_recognition": identify_patterns(parsed_structure),
"dependency_mapping": map_dependencies(parsed_structure),
"complexity_metrics": calculate_metrics(parsed_structure),
"recommendations": generate_recommendations(parsed_structure)
}# Persist to session state for downstream agents
tool_context.state[f"analysis_{analysis_type}"] = analysis_resultreturn analysis_result
except Exception as e:
# Register as ADK tool
return {
"status": "error",
"message": f"Analysis failed: {str(e)}"
}
domain_analyzer_tool = FunctionTool(domain_analysis_tool)
I used mcp tools for GitHub and JIRA to document the whole workflow like we do when implementing current systems or modernizing a system.
Part II: The Communication Layer — A2A Protocol Implementation
From Islands to Networks: The A2A Revolution
The Agent-to-Agent protocol transforms isolated AI agents into collaborative networks. Built on JSON-RPC 2.0 over HTTP(S), A2A provides a standardized communication layer that’s both simple and powerful.
Press enter or click to view image in full size
The implementation leverages ADK’s built-in A2A utilities to expose each agent as an HTTP service (recently launched):
# Pattern: Converting ADK Agent to A2A Service
from google.adk.a2a.utils.agent_to_a2a import to_a2a
from agent_module import specialized_agent# Convert ADK agent to A2A ASGI application
app = to_a2a(specialized_agent, port=8001)# The agent becomes accessible via:
# - HTTP endpoint: /process
# - Agent discovery: /.well-known/agent.json
# - Health checks: /health
The Custom Agent Registry: Orchestrating Specialized Services
Multiple specialized agents operate as independent A2A services, registered in a central registry pattern:
# Pattern: Distributed Agent Registry (Streamlined)
AGENT_REGISTRY = {
"analysis_agent": {
"endpoint": "service:8001",
"description": "Comprehensive data and dependency analysis",
"capabilities": ["pattern_analysis", "dependency_mapping", "complexity_metrics"]
},
"strategy_agent": {
"endpoint": "service:8002",
"description": "Strategic planning and architecture design",
"capabilities": ["strategy_planning", "architecture_design", "risk_assessment"]
},
"transformation_agent": {
"endpoint": "service:8003",
"description": "Code transformation and quality assurance",
"capabilities": ["transformation", "test_generation", "quality_validation"]
},
"deployment_agent": {
"endpoint": "service:8004",
"description": "Deployment orchestration and monitoring",
"capabilities": ["deployment_automation", "monitoring_setup", "rollback_management"]
}
}JSON-RPC Communication: The Protocol in Action
The beauty of A2A lies in its simplicity. Here’s how agents communicate in our system:
# Pattern: A2A Client Implementation
class A2AAgentClient:
"""Unified client for agent-to-agent communication""" async def call_agent(self, agent_name: str, input_data: dict) -> dict:
"""Call an agent via A2A protocol"""
endpoint = self.get_agent_endpoint(agent_name)
# Construct JSON-RPC request following A2A specification
json_rpc_payload = {
"jsonrpc": "2.0",
"method": "process_message",
"params": {
"message": input_data.get("prompt", ""),
"context": {
"task_id": input_data.get("task_id"),
"previous_results": input_data.get("previous_results", {}),
"workflow_context": input_data.get("workflow_context"),
"execution_step": input_data.get("step_number")
}
},
"id": str(uuid.uuid4())
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{endpoint}/process",
json=json_rpc_payload,
timeout=aiohttp.ClientTimeout(total=300)
) as response:
result = await response.json()
# Extract result from JSON-RPC response
if "error" in result:
raise AgentError(f"Agent {agent_name} error: {result['error']}")
return result.get("result", {})
Part III: The Orchestration Engine — Temporal Workflows
Durable Execution: The Foundation of Reliability
Temporal transforms our multi-agent system from a fragile chain of HTTP calls into a resilient, self-healing workflow. Every state transition, every agent call, every piece of data is durably persisted, allowing the system to recover from any failure.
The Human-in-the-Loop Processing Workflow
The Temporal workflow orchestrates a streamlined 4-step process with human approval gates:
# Pattern: HITL Multi-Agent Workflow Orchestration
from temporalio import workflow, activity
from datetime import timedelta@workflow.defn
class HITLProcessingWorkflow:
"""
Orchestrates a 4-step processing pipeline with human approval gates.
Each step includes agent execution followed by human review.
""" @workflow.run
async def run(self, input_data: dict) -> dict:
"""Execute the HITL workflow with durable state and approval gates"""
task_id = input_data["task_id"]
input_sources = input_data["input_sources"]
# Initialize workflow context
workflow_context = {
"task_id": task_id,
"workflow_id": workflow.info().workflow_id,
"start_time": workflow.now(),
"results": {},
"approvals": {}
}
# Step 1: Analysis Agent + Human Review
analysis_result = await workflow.execute_activity(
call_analysis_agent_activity,
args=[input_sources, task_id, workflow_context],
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(minutes=1),
maximum_attempts=3
)
)
# Human approval gate for analysis
analysis_approval = await workflow.execute_activity(
request_human_approval_activity,
args=["analysis", analysis_result, workflow_context],
start_to_close_timeout=timedelta(hours=24) # Allow 24h for human review
)
if not analysis_approval["approved"]:
return {"status": "rejected_at_analysis", "reason": analysis_approval["reason"]}
workflow_context["results"]["analysis"] = analysis_result
workflow_context["approvals"]["analysis"] = analysis_approval
# Step 2: Strategy Agent + Human Review
strategy_result = await workflow.execute_activity(
call_strategy_agent_activity,
args=[analysis_result, task_id, workflow_context],
start_to_close_timeout=timedelta(minutes=15)
)
strategy_approval = await workflow.execute_activity(
request_human_approval_activity,
args=["strategy", strategy_result, workflow_context],
start_to_close_timeout=timedelta(hours=24)
)
if not strategy_approval["approved"]:
return {"status": "rejected_at_strategy", "reason": strategy_approval["reason"]}
# Steps 3-4 follow the same pattern: Agent execution + Human approval
return {
"status": "completed",
"task_id": task_id,
"workflow_id": workflow.info().workflow_id,
"duration": (workflow.now() - workflow_context["start_time"]).total_seconds(),
"results": workflow_context["results"],
"approvals": workflow_context["approvals"]
}
Activities: The Bridge Between Temporal, A2A, and Human Approval
Activities encapsulate agent calls and human approval processes, providing error handling and data transformation:
@activity.defn
async def call_analysis_agent_activity(
input_sources: str,
task_id: str,
workflow_context: dict
) -> dict:
"""Activity that calls the Analysis Agent via A2A""" try:
# Prepare input for the agent
input_data = {
"prompt": f"Analyze data sources: {input_sources}",
"task_id": task_id,
"workflow_id": workflow_context["workflow_id"],
"analysis_type": "comprehensive",
"include_metrics": True
}
# Call agent via A2A protocol
client = A2AAgentClient()
result = await client.call_agent("analysis_agent", input_data)
# Store large results externally to avoid workflow size limits
if len(json.dumps(result)) > 100_000: # 100KB threshold
storage = PayloadStorage()
payload_id = await storage.store_payload(result)
return {"payload_ref": payload_id, "status": "success"}
return result
except Exception as e:
# Activity failures trigger Temporal's retry mechanism
raise ActivityError(f"Analysis failed: {str(e)}")
@activity.defn
async def request_human_approval_activity(
step_name: str,
agent_result: dict,
workflow_context: dict
) -> dict:
"""Activity that requests human approval via web UI""" try:
# Create approval request in database
approval_request = {
"workflow_id": workflow_context["workflow_id"],
"task_id": workflow_context["task_id"],
"step_name": step_name,
"agent_result": agent_result,
"status": "pending_approval",
"created_at": datetime.now().isoformat(),
"timeout_at": (datetime.now() + timedelta(hours=24)).isoformat()
}
# Store approval request for UI to display
approval_storage = ApprovalStorage()
approval_id = await approval_storage.create_approval_request(approval_request)
# Send notification to approvers (email, Slack, etc.)
notification_service = NotificationService()
await notification_service.notify_approvers(
f"Approval needed for {step_name} in workflow {workflow_context['workflow_id']}",
approval_id,
step_name
)
# Wait for human decision (polling or signal-based)
while True:
approval_status = await approval_storage.get_approval_status(approval_id)
if approval_status["status"] == "approved":
return {
"approved": True,
"approval_id": approval_id,
"approver": approval_status["approver"],
"approved_at": approval_status["approved_at"],
"comments": approval_status.get("comments", "")
}
elif approval_status["status"] == "rejected":
return {
"approved": False,
"approval_id": approval_id,
"approver": approval_status["approver"],
"rejected_at": approval_status["rejected_at"],
"reason": approval_status.get("reason", "No reason provided")
}
# Wait 30 seconds before checking again
await asyncio.sleep(30)
except Exception as e:
raise ActivityError(f"Human approval request failed: {str(e)}")
The Approval UI System
The human approval system integrates with a web-based dashboard that presents agent results for review:
# Pattern: Web-Based Approval Dashboard
class ApprovalUIService:
"""Handles the web UI for human approvals""" async def get_pending_approvals(self, user_id: str) -> List[dict]:
"""Get all pending approval requests for a user"""
approval_storage = ApprovalStorage()
pending_requests = await approval_storage.get_pending_for_user(user_id)
# Enrich with agent result summaries
enriched_requests = []
for request in pending_requests:
summary = await self.generate_result_summary(request["agent_result"])
enriched_requests.append({
**request,
"summary": summary,
"time_remaining": self.calculate_time_remaining(request["timeout_at"])
})
return enriched_requests
async def submit_approval_decision(
self,
approval_id: str,
user_id: str,
decision: str,
comments: str = ""
) -> dict:
"""Submit an approval decision (approve/reject)"""
approval_storage = ApprovalStorage()
decision_data = {
"approval_id": approval_id,
"approver": user_id,
"decision": decision, # "approved" or "rejected"
"comments": comments,
"decided_at": datetime.now().isoformat()
}
# Update approval status
await approval_storage.update_approval_decision(decision_data)
# Log the decision for audit trail
audit_logger.info("approval_decision", extra={
"approval_id": approval_id,
"approver": user_id,
"decision": decision,
"workflow_context": await approval_storage.get_workflow_context(approval_id)
})
return {"status": "success", "decision_recorded": True}
Benefits of Human-in-the-Loop Architecture
The HITL approach provides several critical advantages for production AI systems:
1. Quality Assurance
Human experts review AI outputs before proceeding to next stages, catching edge cases and errors that automated testing might miss.
2. Risk Mitigation
Prevents cascading failures from early-stage errors and allows for course correction before significant resources are invested.
3. Continuous Learning
Human feedback can be used to improve agent performance over time, creating valuable training data for future model improvements.
4. Stakeholder Confidence
Maintains human oversight for critical business processes and provides transparency into AI decision-making.
Part IV: Managing Scale — The Payload Storage Pattern
The Challenge: Temporal’s Size Limits
Temporal workflows have payload size limits (typically 2MB). When processing large datasets, agents generate extensive reports that can exceed these limits. Enter the payload storage pattern:
Press enter or click to view image in full size
Implementation: Smart Data Management
# Pattern: External Payload Storage
class PayloadStorage:
"""Manages large data payloads outside of Temporal workflows""" def __init__(self):
self.storage_dir = Path("external_storage/payloads")
self.storage_dir.mkdir(parents=True, exist_ok=True)
async def store_payload(self, data: Any, reference_key: str = None) -> str:
"""Store large payload and return reference ID"""
# Generate unique ID if not provided
if reference_key is None:
reference_key = self._generate_payload_id(data)
file_path = self.storage_dir / f"{reference_key}.json"
# Store with metadata
payload_data = {
"metadata": {
"id": reference_key,
"timestamp": datetime.now().isoformat(),
"size_bytes": len(json.dumps(data).encode()),
"checksum": hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
},
"data": data
}
# Async write for performance
async with aiofiles.open(file_path, 'w') as f:
await f.write(json.dumps(payload_data, indent=2, default=str))
return reference_key
async def resolve_payload(self, reference: Any) -> Any:
"""Resolve data from reference or return as-is"""
# If not a reference, return the data directly
if not isinstance(reference, dict) or "payload_ref" not in reference:
return reference
# Load from storage
file_path = self.storage_dir / f"{reference['payload_ref']}.json"
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
payload_data = json.loads(content)
# Verify integrity
stored_checksum = payload_data["metadata"]["checksum"]
computed_checksum = hashlib.sha256(
json.dumps(payload_data["data"], sort_keys=True).encode()
).hexdigest()
if stored_checksum != computed_checksum:
raise ValueError(f"Payload integrity check failed for {reference['payload_ref']}")
return payload_data["data"]
Part V: Production Considerations
Health Monitoring and Observability
Every production system needs comprehensive monitoring. Our implementation includes multi-layer health checks:
# Pattern: Comprehensive Health Monitoring
class A2AAgentClient: async def check_all_agents_health(self) -> dict:
"""Comprehensive health check for distributed agent fleet"""
health_status = {
"timestamp": datetime.now().isoformat(),
"agents": {},
"overall_status": "healthy"
}
async with aiohttp.ClientSession() as session:
for agent_name, endpoint in self.agent_registry.items():
try:
# Check basic connectivity
async with session.get(
f"{endpoint}/health",
timeout=aiohttp.ClientTimeout(total=5)
) as response:
if response.status == 200:
# Verify agent card is accessible
async with session.get(
f"{endpoint}/.well-known/agent.json",
timeout=aiohttp.ClientTimeout(total=5)
) as card_response:
if card_response.status == 200:
agent_card = await card_response.json()
health_status["agents"][agent_name] = {
"status": "healthy",
"version": agent_card.get("version", "unknown"),
"capabilities": agent_card.get("capabilities", [])
}
else:
health_status["agents"][agent_name] = {
"status": "degraded",
"reason": "Agent discovery unavailable"
}
else:
health_status["agents"][agent_name] = {
"status": "unhealthy",
"http_status": response.status
}
health_status["overall_status"] = "degraded"
except asyncio.TimeoutError:
health_status["agents"][agent_name] = {
"status": "unhealthy",
"reason": "Connection timeout"
}
health_status["overall_status"] = "degraded"
except Exception as e:
health_status["agents"][agent_name] = {
"status": "unhealthy",
"reason": str(e)
}
health_status["overall_status"] = "degraded"
return health_status
Graceful Degradation and Fallbacks
Production systems must handle partial failures gracefully:
# Pattern: Resilient Agent Communication with Fallbacks
@activity.defn
async def call_agent_with_fallback(
agent_name: str,
input_data: dict,
fallback_data: dict = None
) -> dict:
"""Call agent with automatic fallback strategies""" try:
# Attempt real agent call
client = A2AAgentClient()
# Check agent health first
health = await client.check_agent_health(agent_name)
if health["status"] != "healthy":
raise Exception(f"Agent {agent_name} is {health['status']}")
# Call the agent
result = await client.call_agent(agent_name, input_data)
result["execution_mode"] = "live_agent"
return result
except Exception as e:
# Log the failure for monitoring and debugging
logger.error(f"Agent {agent_name} failed: {e}")
# Use fallback data if available
if fallback_data:
return {
"execution_mode": "fallback",
"fallback_reason": str(e),
"data": fallback_data
}
# Generate synthetic response as last resort
return {
"execution_mode": "synthetic",
"status": "completed_with_synthetic_data",
"message": f"Agent {agent_name} unavailable, using synthetic response",
"data": generate_synthetic_response(agent_name, input_data)
}
Security and Authentication
While this example uses local deployment, production systems require robust security:
# Pattern: Production Security for A2A Communication
class SecureA2AClient: def __init__(self):
self.auth_token = os.environ.get("A2A_AUTH_TOKEN")
self.tls_cert = os.environ.get("A2A_TLS_CERT")
async def call_agent_secure(self, agent_name: str, input_data: dict) -> dict:
"""Secure agent communication with authentication and encryption"""
headers = {
"Authorization": f"Bearer {self.auth_token}",
"X-Request-ID": str(uuid.uuid4()),
"X-Workflow-ID": input_data.get("workflow_id", "unknown")
}
# Use HTTPS with certificate verification
ssl_context = ssl.create_default_context()
if self.tls_cert:
ssl_context.load_cert_chain(self.tls_cert)
connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector) as session:
# Call agent with secure connection
async with session.post(
f"https://{agent_name}.agents.internal/process",
json=self._encrypt_payload(input_data),
headers=headers
) as response:
encrypted_result = await response.json()
return self._decrypt_payload(encrypted_result)
Part VI: Lessons from Production
1. Start with Observability
Before deploying a single agent, we instrumented comprehensive logging and metrics:
# Structured logging for every agent interaction
logger.info("agent_call", extra={
"agent": agent_name,
"workflow_id": workflow_id,
"step_number": step_number,
"input_size": len(json.dumps(input_data)),
"duration_ms": duration,
"status": result.get("status"),
"execution_mode": result.get("execution_mode")
})2. Design for Partial Failure
In a distributed agent system, assume at least one service will be degraded at any time. Design workflows to continue even when agents fail, marking sections as “completed_with_fallback” rather than failing entirely.
3. Optimize for Cost and Performance
Different agents require different model configurations based on their tasks:
- Data Analysis: Uses fast models (Gemini 2.5 Flash) for speed
- Strategic Planning: Uses advanced models (Gemini 2.5 Pro) for complex reasoning
- Pattern Extraction: Uses low temperature (0.1) for consistency
- Creative Generation: Uses higher temperature (0.7) for variety
4. Cache
Agent responses are often deterministic for the same input. Implement caching at multiple levels:
# Pattern: Multi-level Caching Strategy
context_cache_key = hashlib.sha256(
f"{task_id}:{input_hash}:{analysis_type}".encode()
).hexdigest()if context_cache_key in cache:
return cache[context_cache_key]5. Version Everything
Agents evolve. We version agent definitions, prompts, and even workflow definitions:
# Pattern: Comprehensive Agent Versioning
{
"name": "domain-analysis-agent",
"version": "2.3.1",
"api_version": "1.0",
"model_version": "gemini-2.5-flash-002",
"prompt_version": "2024-11-15",
"capabilities": [...],
"changelog": "Enhanced pattern recognition algorithms"
}Conclusion: The Power of Integration
The integration of Google ADK, A2A protocol, and Temporal isn’t just a technical achievement — it’s a paradigm shift in how we approach complex AI systems. By combining:
- ADK’s agent intelligence — Sophisticated AI capabilities with tool integration. I will talk more about the “Moat” agents need in the coming post.
- A2A’s communication standards — Seamless inter-agent collaboration, Security
- Temporal’s durable execution — Enterprise-grade reliability and fault tolerance
We’ve created architectural patterns that can tackle complex, multi-step problems that require coordinated intelligence across specialized domains.
The patterns we’ve explored — from payload storage to health monitoring, from graceful degradation to version management — aren’t domain-specific. They’re blueprints for any complex multi-agent system that needs to operate reliably at scale.
As we move into an era where AI agents become the primary interface for complex system interactions, the foundations we lay today with frameworks like ADK, protocols like A2A, and orchestrators like Temporal will determine whether these systems are fragile experiments or robust production solutions.
The concepts are proven, the patterns are battle-tested, and the future is collaborative. Welcome to the age of orchestrated intelligence. I will publish a saample repository later when I am done scrubbing the business code out :)
Technical Resources
Architecture Diagrams
Complete System Architecture
Data Flow Through the System
Drawing on our experience with multi-agent systems, this implementation provides a foundational philosophy and repeatable patterns for developing robust and distributed agentic systems.