Building a Proactive GenAI Agent with Real-Time Context

In this tutorial, we will build a sophisticated, end-to-end solution featuring a Generative AI agent that can proactively manage airline flight disruptions for high-value customers. This agent's effectiveness hinges on its ability to access and act upon real-time context—a capability we will power using DeltaStream.

We will walk through every step, from generating simulated real-time data to building the data pipeline and exposing it to an agent through a Model Context Protocol (MCP) server.

The Use Case: An airline wants to create a "digital concierge" agent. When a flight is canceled, this agent should immediately identify affected "Platinum" or "Gold" tier passengers and rebook them on the next best flight before the customer is even aware of the disruption. Acting within the "golden seconds" after a cancellation is critical, as the best alternative seats disappear almost instantly.

Architecture Overview

Our system will have five main components:

  1. Data Generator: A Java application that simulates real-time flight events, bookings, and customer profile updates, publishing them to Kafka.

  2. Apache Kafka: The message bus that will transport our live event streams.

  3. DeltaStream: The real-time context engine. It will ingest the raw Kafka streams, use SQL to join and transform them into an actionable, materialized view of "at-risk" passengers.

  4. MCP Server: A lightweight Java server that exposes the materialized view from DeltaStream via a simple API, acting as the bridge to our GenAI agent.

  5. GenAI Agent (OpenAI): The agent configured in a platform like OpenAI's Agent Builder, which will call our MCP server to get the real-time context needed to make intelligent decisions.

Prerequisites

Before you begin, ensure you have the following:

  • Java (JDK 17 or later) and Maven installed.

  • Access to an Apache Kafka cluster.

  • A DeltaStream account.

  • ngrok or a similar tool to expose your local server to the internet.

Step 1: Generate the Real-Time Data

First, we'll create a Java application to simulate our airline's operations. This generator will produce three streams of data into three distinct Kafka topics. See the data generator Java class at DeltaStream examples repository here.

Before running the code, create the three required topics in your Kafka cluster:

  • flight_events

  • booking_events

  • customer_profiles

Now, compile and run the AirlineDataGenerator. You should see log messages indicating that data is being produced to your Kafka topics.

Step 2: Build the Real-Time Pipeline in DeltaStream

Next, we will define the logic in DeltaStream to process these raw Kafka streams into a clean, actionable materialized view. Log into your DeltaStream environment and execute the following SQL statements.

-- Step 1: Create STREAMS to represent the raw Kafka topics.
-- DeltaStream will read from these topics continuously.

CREATE STREAM flight_events (
    flight_id            VARCHAR,
    flight_status        VARCHAR,
    origin               VARCHAR,
    destination          VARCHAR,
    scheduled_departure  TIMESTAMP_LTZ,
    event_timestamp      TIMESTAMP_LTZ
) WITH (
    'topic' = 'flight_events',
    'value.format' = 'json',
    'timestamp' = 'event_timestamp'
);


CREATE STREAM booking_events (
    booking_id           VARCHAR,
    flight_id            VARCHAR,
    passenger_id         VARCHAR,
    seat                 VARCHAR,
    booking_timestamp    TIMESTAMP_LTZ
) WITH (
    'topic' = 'booking_events',
    'value.format' = 'json',
    'timestamp' = 'booking_timestamp'
);

-- For customer profiles, which is dimensional data (slowly changing),
-- we create a changelog stream 
CREATE CHANGELOG customer_profiles_changelog (
    passenger_id         VARCHAR,
    first_name           VARCHAR,
    last_name            VARCHAR,
    loyalty_tier         VARCHAR,
    preferences          STRUCT<seating VARCHAR>,
    update_timestamp     TIMESTAMP_LTZ,
    PRIMARY KEY(passenger_id)
) WITH (
    'topic' = 'customer_profiles',
    'value.format' = 'json',
    'timestamp' = 'update_timestamp'
);


-- Step 2: Create the final Materialized View.
-- This is the core of our real-time context generation. It joins the streams
-- and filters for high-value customers on disrupted flights.

CREATE STREAM enriched_booking_events AS
SELECT 
    booking_id,
    flight_id,
    be.passenger_id,
    seat,
    booking_timestamp,
    first_name,
    last_name,
    loyalty_tier,
    preferences
FROM booking_events be
LEFT JOIN customer_profiles_changelog cpc WITH ('source.idle.timeout.millis' = 1000)
ON be.passenger_id = cpc.passenger_id;


CREATE MATERIALIZED VIEW disrupted_premium_passengers AS
SELECT
        ebe.passenger_id,
        ebe.first_name || ' ' || ebe.last_name AS full_name,
        ebe.loyalty_tier,
        f.flight_id,
        f.flight_status AS flight_status,
        f.origin,
        f.destination,
        f.event_timestamp AS disruption_timestamp,
        ebe.preferences->seating AS seating_preference
    FROM flight_events AS f
    -- Join flights with their bookings.
    -- We use a temporal window to limit the state we need to keep.
    -- This assumes a booking happens reasonably close to a flight event.
    INNER JOIN enriched_booking_events AS ebe 
    WITHIN  1 DAY
    ON f.flight_id = ebe.flight_id
    WHERE
        -- We only care about disruptions
        f.flight_status IN ('CANCELED', 'DELAYED')
        AND
        -- We only care about our premium customers
        ebe.loyalty_tier IN ('PLATINUM', 'GOLD');

Once these statements are executed, DeltaStream is actively processing your Kafka data. As soon as a CANCELED or DELAYED event for a flight with a "GOLD" or "PLATINUM" passenger occurs, an entry will instantly appear in the disrupted_premium_passengers Materialized View.

Step 3: Expose the Real-Time Context via MCP Server

Now we need a bridge between DeltaStream and our GenAI agent. The following MCP server code provides a standard interface that OpenAI's Agent Builder can understand.

3.1 MCP Server Code

Use the Java code from the examples repository here. It creates a simple web server that listens for POST requests, queries DeltaStream's API, and returns the result.

3.2 Configure and Run the MCP Server

  1. Edit the Code: Open the FlightConciergeMCPServer.java file and replace the placeholder values for DS_STATEMENTS_URL and DS_AUTH_TOKEN with your own DeltaStream organization URL and a valid bearer token.

  2. Compile and Run: Build and run the Java application. It will start a server, typically on port 8080.

Step 4: Connect the Agent

The final step is to connect our real-time context to a GenAI agent.

  1. Expose Your Server: Your MCP server is running locally. To make it accessible to a cloud service like OpenAI, use ngrok.

    ngrok http 8080

    ngrok will give you a public HTTPS URL (e.g., https://<unique-id>.ngrok-free.app).

  2. Configure the Agent: In OpenAI's Agent Builder (or a similar platform), create a new agent.

    • From the Tools section add a new MCP Server and provide the ngrok URL, making sure to append the /mcp path (e.g., https://<unique-id>.ngrok-free.app/mcp). The builder will automatically discover the get_passenger_context tool defined in our MCP server.

    • Give the agent its instructions. This is the crucial prompt that defines its personality and goal.

    Example Agent Instructions:

    You are an elite airline concierge agent. Your primary goal is to manage flight disruptions for Platinum and Gold tier passengers with extreme urgency. When alerted, your first and only action should be to use the get_passenger_context tool to fetch real-time data for the affected passenger. Based on the context, your task is to find a new flight and communicate the solution to the customer clearly and calmly. Do not ask the user for information; use your tools.

Step 5: See It in Action!

With everything running, watch the magic happen:

  1. The AirlineDataGenerator produces a CANCELED event for a flight carrying a "PLATINUM" passenger.

  2. The event flows through Kafka to DeltaStream in milliseconds.

  3. The disrupted_premium_passengers Materialized View is instantly updated with the passenger's details.

  4. Trigger your agent (e.g., in the Agent Builder playground) with a simple prompt like "Handle disruption for PASS-1042".

  5. The agent will call your MCP server, which queries DeltaStream and returns the fresh, actionable context.

  6. The agent, now fully aware of the situation, can proceed to complete its mission.

Congratulations! You have successfully built a sophisticated, real-time AI agent that solves a critical business problem by leveraging the power of fresh, streaming data.

Last updated