Building a Proactive GenAI Agent with Real-Time Context
In this tutorial, we will build a sophisticated, end-to-end solution featuring a Generative AI agent that can proactively manage airline flight disruptions for high-value customers. This agent's effectiveness hinges on its ability to access and act upon real-time context—a capability we will power using DeltaStream.
We will walk through every step, from generating simulated real-time data to building the data pipeline and exposing it to an agent through a Model Context Protocol (MCP) server.
The Use Case: An airline wants to create a "digital concierge" agent. When a flight is canceled, this agent should immediately identify affected "Platinum" or "Gold" tier passengers and rebook them on the next best flight before the customer is even aware of the disruption. Acting within the "golden seconds" after a cancellation is critical, as the best alternative seats disappear almost instantly.
Architecture Overview
Our system will have five main components:
Data Generator: A Java application that simulates real-time flight events, bookings, and customer profile updates, publishing them to Kafka.
Apache Kafka: The message bus that will transport our live event streams.
DeltaStream: The real-time context engine. It will ingest the raw Kafka streams, use SQL to join and transform them into an actionable, materialized view of "at-risk" passengers.
MCP Server: A lightweight Java server that exposes the materialized view from DeltaStream via a simple API, acting as the bridge to our GenAI agent.
GenAI Agent (OpenAI): The agent configured in a platform like OpenAI's Agent Builder, which will call our MCP server to get the real-time context needed to make intelligent decisions.
Prerequisites
Before you begin, ensure you have the following:
Java (JDK 17 or later) and Maven installed.
Access to an Apache Kafka cluster.
A DeltaStream account.
ngrok
or a similar tool to expose your local server to the internet.
Step 1: Generate the Real-Time Data
First, we'll create a Java application to simulate our airline's operations. This generator will produce three streams of data into three distinct Kafka topics. See the data generator Java class at DeltaStream examples repository here.
Before running the code, create the three required topics in your Kafka cluster:
flight_events
booking_events
customer_profiles
Now, compile and run the AirlineDataGenerator
. You should see log messages indicating that data is being produced to your Kafka topics.
Step 2: Build the Real-Time Pipeline in DeltaStream
Next, we will define the logic in DeltaStream to process these raw Kafka streams into a clean, actionable materialized view. Log into your DeltaStream environment and execute the following SQL statements.
-- Step 1: Create STREAMS to represent the raw Kafka topics.
-- DeltaStream will read from these topics continuously.
CREATE STREAM flight_events (
flight_id VARCHAR,
flight_status VARCHAR,
origin VARCHAR,
destination VARCHAR,
scheduled_departure TIMESTAMP_LTZ,
event_timestamp TIMESTAMP_LTZ
) WITH (
'topic' = 'flight_events',
'value.format' = 'json',
'timestamp' = 'event_timestamp'
);
CREATE STREAM booking_events (
booking_id VARCHAR,
flight_id VARCHAR,
passenger_id VARCHAR,
seat VARCHAR,
booking_timestamp TIMESTAMP_LTZ
) WITH (
'topic' = 'booking_events',
'value.format' = 'json',
'timestamp' = 'booking_timestamp'
);
-- For customer profiles, which is dimensional data (slowly changing),
-- we create a changelog stream
CREATE CHANGELOG customer_profiles_changelog (
passenger_id VARCHAR,
first_name VARCHAR,
last_name VARCHAR,
loyalty_tier VARCHAR,
preferences STRUCT<seating VARCHAR>,
update_timestamp TIMESTAMP_LTZ,
PRIMARY KEY(passenger_id)
) WITH (
'topic' = 'customer_profiles',
'value.format' = 'json',
'timestamp' = 'update_timestamp'
);
-- Step 2: Create the final Materialized View.
-- This is the core of our real-time context generation. It joins the streams
-- and filters for high-value customers on disrupted flights.
CREATE STREAM enriched_booking_events AS
SELECT
booking_id,
flight_id,
be.passenger_id,
seat,
booking_timestamp,
first_name,
last_name,
loyalty_tier,
preferences
FROM booking_events be
LEFT JOIN customer_profiles_changelog cpc WITH ('source.idle.timeout.millis' = 1000)
ON be.passenger_id = cpc.passenger_id;
CREATE MATERIALIZED VIEW disrupted_premium_passengers AS
SELECT
ebe.passenger_id,
ebe.first_name || ' ' || ebe.last_name AS full_name,
ebe.loyalty_tier,
f.flight_id,
f.flight_status AS flight_status,
f.origin,
f.destination,
f.event_timestamp AS disruption_timestamp,
ebe.preferences->seating AS seating_preference
FROM flight_events AS f
-- Join flights with their bookings.
-- We use a temporal window to limit the state we need to keep.
-- This assumes a booking happens reasonably close to a flight event.
INNER JOIN enriched_booking_events AS ebe
WITHIN 1 DAY
ON f.flight_id = ebe.flight_id
WHERE
-- We only care about disruptions
f.flight_status IN ('CANCELED', 'DELAYED')
AND
-- We only care about our premium customers
ebe.loyalty_tier IN ('PLATINUM', 'GOLD');
Once these statements are executed, DeltaStream is actively processing your Kafka data. As soon as a CANCELED
or DELAYED
event for a flight with a "GOLD" or "PLATINUM" passenger occurs, an entry will instantly appear in the disrupted_premium_passengers
Materialized View.
Step 3: Expose the Real-Time Context via MCP Server
Now we need a bridge between DeltaStream and our GenAI agent. The following MCP server code provides a standard interface that OpenAI's Agent Builder can understand.
3.1 MCP Server Code
Use the Java code from the examples repository here. It creates a simple web server that listens for POST
requests, queries DeltaStream's API, and returns the result.
3.2 Configure and Run the MCP Server
Edit the Code: Open the
FlightConciergeMCPServer.java
file and replace the placeholder values forDS_STATEMENTS_URL
andDS_AUTH_TOKEN
with your own DeltaStream organization URL and a valid bearer token.Compile and Run: Build and run the Java application. It will start a server, typically on port
8080
.
Step 4: Connect the Agent
The final step is to connect our real-time context to a GenAI agent.
Expose Your Server: Your MCP server is running locally. To make it accessible to a cloud service like OpenAI, use
ngrok
.ngrok http 8080
ngrok
will give you a public HTTPS URL (e.g.,https://<unique-id>.ngrok-free.app
).Configure the Agent: In OpenAI's Agent Builder (or a similar platform), create a new agent.
From the Tools section add a new MCP Server and provide the
ngrok
URL, making sure to append the/mcp
path (e.g.,https://<unique-id>.ngrok-free.app/mcp
). The builder will automatically discover theget_passenger_context
tool defined in our MCP server.Give the agent its instructions. This is the crucial prompt that defines its personality and goal.
Example Agent Instructions:
You are an elite airline concierge agent. Your primary goal is to manage flight disruptions for Platinum and Gold tier passengers with extreme urgency. When alerted, your first and only action should be to use the
get_passenger_context
tool to fetch real-time data for the affected passenger. Based on the context, your task is to find a new flight and communicate the solution to the customer clearly and calmly. Do not ask the user for information; use your tools.
Step 5: See It in Action!
With everything running, watch the magic happen:
The
AirlineDataGenerator
produces aCANCELED
event for a flight carrying a "PLATINUM" passenger.The event flows through Kafka to DeltaStream in milliseconds.
The
disrupted_premium_passengers
Materialized View is instantly updated with the passenger's details.Trigger your agent (e.g., in the Agent Builder playground) with a simple prompt like "Handle disruption for PASS-1042".
The agent will call your MCP server, which queries DeltaStream and returns the fresh, actionable context.
The agent, now fully aware of the situation, can proceed to complete its mission.
Congratulations! You have successfully built a sophisticated, real-time AI agent that solves a critical business problem by leveraging the power of fresh, streaming data.
Last updated