This blog is part of the ADK Masterclass - Hands-On Series. Events are the fundamental building blocks of ADK's execution model. Understanding events is key to building responsive, streaming agents and debugging complex agent behaviors.
View Code on GitHubTable of Contents
1. What are Events?
Events represent discrete actions or outputs that occur during agent execution. When we call runner.run_async(), it returns an async generator that yields events as they happen:
- Real-time feedback: See what the agent is doing as it happens
- Streaming responses: Display text as it's generated
- Debugging: Trace the exact sequence of operations
- Session history: Events are stored in the session for context
2. Why Use Events?
Without events, we'd have to wait for the entire agent execution to complete before seeing any output. For complex agents that call multiple tools or generate long responses, this could mean waiting several seconds with no feedback. Events solve this by:
- Enabling streaming UX: Show text as it's generated, just like ChatGPT
- Providing visibility: See when tools are called and what results they return
- Supporting debugging: Trace exactly what happened during execution
- Enabling cancellation: Stop execution early if needed
3. Event Types
| Event Type | Description | Key Properties |
|---|---|---|
ContentEvent |
Text content from LLM | content, partial |
ToolCallEvent |
Agent requests tool execution | tool_name, args |
ToolResultEvent |
Result from tool execution | tool_name, result |
ErrorEvent |
Error during execution | error, message |
EndEvent |
Agent turn completed | final_response |
4. Tutorial
Prerequisites
- Google AI Studio API Key
- Python 3.9+ installed
Setup Environment
# Create and activate virtual environment
python3 -m venv .venv
source .venv/bin/activate
# Install dependencies
pip install google-adk python-dotenv
# Set our API key
export GOOGLE_API_KEY=our_api_key_here
4.1. Processing Events
Let's handle different event types during agent execution:
import asyncio
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
agent = Agent(
model="gemini-2.5-flash",
name="event_demo_agent",
instruction="You are a helpful assistant.",
)
session_service = InMemorySessionService()
runner = Runner(agent=agent, app_name="event_app", session_service=session_service)
async def process_with_events(message: str):
"""Process a message and handle all event types."""
content = types.Content(role="user", parts=[types.Part(text=message)])
print(f"\nUser: {message}\n")
print("Events:")
print("-" * 40)
async for event in runner.run_async(
user_id="user_1",
session_id="session_1",
new_message=content
):
event_type = type(event).__name__
# Handle content events (streaming text)
if hasattr(event, 'content') and event.content:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
is_partial = getattr(event, 'partial', False)
status = "[Streaming]" if is_partial else "[Complete]"
print(f"{status}: {part.text[:50]}...")
# Handle tool calls
elif hasattr(event, 'tool_calls'):
for call in event.tool_calls:
print(f"[Tool Call] {call.name}({call.args})")
# Handle tool results
elif hasattr(event, 'tool_result'):
print(f"[Tool Result] {event.tool_result}")
# Handle errors
elif hasattr(event, 'error'):
print(f"[Error] {event.error}")
print("-" * 40)
async def main():
await process_with_events("What is 2 + 2?")
if __name__ == "__main__":
asyncio.run(main())
4.2. Building a Streaming UI
We can use events to build a real-time chat interface:
async def stream_response(message: str) -> str:
"""Stream response text character by character."""
content = types.Content(role="user", parts=[types.Part(text=message)])
full_response = ""
async for event in runner.run_async(
user_id="user_1",
session_id="session_1",
new_message=content
):
if hasattr(event, 'content') and event.content:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
# Get new text since last event
new_text = part.text[len(full_response):]
full_response = part.text
# Print new characters (simulating streaming UI)
print(new_text, end="", flush=True)
print() # New line after streaming completes
return full_response
Real-time Streaming Capabilities
Events enable real-time streaming capabilities:
- Token-by-token streaming: Display text as the LLM generates it
- Progress indicators: Show tool execution status
- Cancellation: Stop generation mid-stream if needed
- Multi-modal streaming: Handle text, images, and audio events
# Example: Streaming with timeout/cancellation
import asyncio
async def stream_with_timeout(message: str, timeout_seconds: float = 30.0):
"""Stream with a timeout for long-running requests."""
content = types.Content(role="user", parts=[types.Part(text=message)])
try:
async with asyncio.timeout(timeout_seconds):
async for event in runner.run_async(
user_id="user_1",
session_id="session_1",
new_message=content
):
# Process events...
yield event
except asyncio.TimeoutError:
print("[Timeout] Request timed out")
raise
Summary
We have now completed the Core Components section of the ADK Masterclass! We covered:
- Sessions & State: Managing conversation context and data
- Context Management: Caching and compression for performance
- Callbacks: Intercepting and customizing agent behavior
- Artifacts: Storing files and binary data
- Events: The event system powering execution and streaming
These core components form the foundation for building robust, production-ready agents. In the next sections, we'll cover Evaluation & Annotation, Deployment, and Observability.