This blog is part of the ADK Masterclass - Hands-On Series. Events are the fundamental building blocks of ADK's execution model. Understanding events is key to building responsive, streaming agents and debugging complex agent behaviors.
View Code on GitHubTable of Contents
1. What are Events?
Events in ADK represent every significant occurrence during an agent's interaction lifecycle. They are immutable records that capture user messages, agent replies, tool requests, results, state changes, and errors. When we call runner.run_async(), it returns an async generator that yields events as they happen.
Event Structure
Events build upon an LlmResponse structure and add critical ADK metadata. The official import is:
from google.adk.events import Event, EventActions
from google.genai import types
Note: TheEventclass import is optional for basic usage. When you iterate overrunner.run_async(), Python's duck typing lets you access event properties directly without explicit type references. However, importingEventandEventActionsis recommended for type hints, IDE autocomplete, and better code documentation.
Key event properties:
| Property | Description |
|---|---|
author |
Identifies the message source ('user' or agent name) |
invocation_id |
Tracks the entire interaction cycle |
id |
Unique identifier for this specific event |
timestamp |
Event creation time |
content |
The actual message payload (text, function calls, etc.) |
actions |
Signals for side-effects and control flow |
partial |
Boolean flag indicating streaming (incomplete) content |
- Real-time feedback: See what the agent is doing as it happens
- Streaming responses: Display text as it's generated
- Debugging: Trace the exact sequence of operations
- Session history: Events are stored in the session for context
2. Why Use Events?
Without events, we'd have to wait for the entire agent execution to complete before seeing any output. For complex agents that call multiple tools or generate long responses, this could mean waiting several seconds with no feedback. Events solve this by:
- Enabling streaming UX: Show text as it's generated, just like ChatGPT
- Providing visibility: See when tools are called and what results they return
- Supporting debugging: Trace exactly what happened during execution
- Enabling cancellation: Stop execution early if needed
3. Identifying Event Types
ADK uses a single Event class. You classify events by examining their properties using helper methods:
| Event Type | How to Identify | Access Data |
|---|---|---|
| Text Content | Check event.content has text parts |
event.content.parts[0].text |
| Tool Call Request | Use event.get_function_calls() |
call.name, call.args |
| Tool Result | Use event.get_function_responses() |
response.name, response.response |
| Streaming Content | Check event.partial == True |
Incomplete text chunks that will continue |
| State/Artifact Updates | Check event.actions.state_delta or artifact_delta |
Pure metadata events with no content |
| Final Response | Use event.is_final_response() |
Returns True for displayable content |
The is_final_response() Method
Use this built-in helper to filter events for user-facing output. It returns True when:
- Tool result contains
skip_summarizationflag - Long-running tool call completes
- Complete text message without tool calls/responses and not streaming
This prevents intermediate steps from cluttering user-facing output.
Event Actions
The event.actions field carries operational signals for control flow:
| Action | Description |
|---|---|
state_delta |
Key-value pairs of modified session state |
artifact_delta |
Updated artifact versions or metadata |
transfer_to_agent |
Routes control to a named agent |
escalate |
Boolean that terminates agent loops |
skip_summarization |
Prevents LLM processing of tool results |
4. Tutorial
Prerequisites
- Google AI Studio API Key
- Python 3.9+ installed
Setup Environment
# Create and activate virtual environment
python3 -m venv .venv
source .venv/bin/activate
# Install dependencies
pip install google-adk python-dotenv
# Set our API key
export GOOGLE_API_KEY=our_api_key_here
4.1. Processing Events
Let's handle different event types during agent execution using the correct ADK methods:
import asyncio
from google.adk.agents import Agent
from google.adk.events import Event, EventActions
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
agent = Agent(
model="gemini-2.5-flash",
name="event_demo_agent",
instruction="You are a helpful assistant.",
)
session_service = InMemorySessionService()
runner = Runner(agent=agent, app_name="event_app", session_service=session_service)
async def process_with_events(message: str):
"""Process a message and handle all event types using ADK methods."""
content = types.Content(role="user", parts=[types.Part(text=message)])
print(f"\nUser: {message}")
print(f"{'=' * 50}")
async for event in runner.run_async(
user_id="user_1",
session_id="session_1",
new_message=content
):
# Print event metadata
print(f"\n[Event] Author: {event.author}, ID: {event.id}")
# Check for function calls (tool requests)
function_calls = event.get_function_calls()
if function_calls:
for call in function_calls:
print(f" → Tool Call: {call.name}")
print(f" Arguments: {call.args}")
# Check for function responses (tool results)
function_responses = event.get_function_responses()
if function_responses:
for response in function_responses:
print(f" ← Tool Result: {response.name}")
print(f" Response: {response.response}")
# Check for text content
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
streaming_status = "[Streaming...]" if event.partial else "[Complete]"
print(f" {streaming_status} {part.text[:100]}...")
# Check for state changes
if event.actions and event.actions.state_delta:
print(f" State Updated: {event.actions.state_delta}")
# Check if this is a final response for display
if event.is_final_response():
print(f" ✓ This is a final response")
print(f"\n{'=' * 50}")
async def main():
await process_with_events("What is 2 + 2?")
if __name__ == "__main__":
asyncio.run(main())
4.2. Filtering Final Responses
Use is_final_response() to show only user-facing content:
async def get_agent_reply(message: str) -> str:
"""Get only the final, displayable response from the agent."""
content = types.Content(role="user", parts=[types.Part(text=message)])
final_text = ""
async for event in runner.run_async(
user_id="user_1",
session_id="session_1",
new_message=content
):
# Only process events meant for user display
if event.is_final_response():
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
final_text = part.text
return final_text
4.3. Building a Streaming UI
For real-time streaming, check the partial flag to detect incomplete chunks:
async def stream_response(message: str) -> str:
"""Stream response text in real-time as it's generated."""
content = types.Content(role="user", parts=[types.Part(text=message)])
accumulated_text = ""
async for event in runner.run_async(
user_id="user_1",
session_id="session_1",
new_message=content
):
# Skip non-agent events
if event.author == "user":
continue
# Process text content
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
# Calculate new characters since last update
new_chars = part.text[len(accumulated_text):]
accumulated_text = part.text
# Stream new characters to display
print(new_chars, end="", flush=True)
# Check if streaming is complete
if not event.partial:
print() # Add newline when done
return accumulated_text
4.4. Tracking State Changes
Monitor session state modifications through event actions:
async def track_state_changes(message: str):
"""Monitor all state changes during agent execution."""
content = types.Content(role="user", parts=[types.Part(text=message)])
state_history = []
async for event in runner.run_async(
user_id="user_1",
session_id="session_1",
new_message=content
):
# Check for state modifications
if event.actions and event.actions.state_delta:
for key, value in event.actions.state_delta.items():
state_history.append({
"event_id": event.id,
"author": event.author,
"key": key,
"value": value
})
print(f"State changed: {key} = {value}")
# Check for artifact updates
if event.actions and event.actions.artifact_delta:
print(f"Artifact updated: {event.actions.artifact_delta}")
return state_history
4.5. Monitoring Tool Execution
Track tool calls and their results for debugging and logging:
async def monitor_tools(message: str):
"""Log all tool calls and responses during execution."""
content = types.Content(role="user", parts=[types.Part(text=message)])
tool_log = []
async for event in runner.run_async(
user_id="user_1",
session_id="session_1",
new_message=content
):
# Log tool call requests
function_calls = event.get_function_calls()
if function_calls:
for call in function_calls:
entry = {
"type": "call",
"tool": call.name,
"args": call.args,
"event_id": event.id
}
tool_log.append(entry)
print(f"Calling: {call.name}({call.args})")
# Log tool responses
function_responses = event.get_function_responses()
if function_responses:
for response in function_responses:
entry = {
"type": "response",
"tool": response.name,
"result": response.response,
"event_id": event.id
}
tool_log.append(entry)
print(f"Result from {response.name}: {response.response}")
return tool_log
Summary
In this tutorial, we learned how ADK's event system works:
- Event Structure: Events are immutable records with
author,id,invocation_id,timestamp,content, andactions - Identifying Events: Use
get_function_calls(),get_function_responses(), andis_final_response()to classify events - Event Actions: The
actionsfield carries control signals likestate_delta,artifact_delta,transfer_to_agent, andescalate - Streaming: Check
event.partialto detect incomplete content chunks during real-time streaming - Final Responses: Use
is_final_response()to filter events meant for user display
We have now completed the Core Components section of the ADK Masterclass! These fundamentals form the foundation for building robust, production-ready agents.