Real-time OpenAI Response Streaming

Estimated read time 12 min read

The Realtime API enables you to build low-latency, multi-modal conversational experiences. It currently supports text and audio as both input and output, as well as function calling.

Quickstart

The Realtime API is a WebSocket interface that is designed to run on the server. To help you get started quickly, we’ve created a console demo application that shows some of the features of the API.

Overview

The Realtime API is a stateful, event-based API that communicates over a WebSocket. The WebSocket connection requires the following parameters:

URL: wss://api.openai.com/v1/realtimeQuery Parameters: ?model=gpt-4o-realtime-preview-2024-10-01Headers:Authorization: Bearer YOUR_API_KEYOpenAI-Beta: realtime=v1

Below is a simple example using Python – FastAPi

📦openAI-realtime
┣ 📂app
┗ 📂routes
┗  📜 health_check.py
┗  📜 realtime.py
┗  📜 config.py
┗  📜 main.py
┣ 📂client
┗  📜 client.html

Create a .env file in the root directory with the following content. Use the appropriate URL depending on whether you are using OpenAI or Azure OpenAI:

OPENAI_API_KEY=“your-openai-api-key
OPENAI_REALTIME_URL=“wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01
USE_AZURE_OPENAI=False
 

For Azure OpenAI, replace the URL as follows and set USE_AZURE_OPENAI to True:

OPENAI_API_KEY=“your-azure-openai-api-key
OPENAI_REALTIME_URL=“wss://<sub-domain>.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview
USE_AZURE_OPENAI=True
 
 

app/routes/realtime.py

 

 

 

import asyncio
import json
import logging
import websockets
import traceback

from fastapi import WebSocket, WebSocketDisconnect, APIRouter
from app.config import VENDOR_WS_URL, API_KEY, USE_AZURE_OPENAI

realtime_router = APIRouter()

# Set up logging
logging.basicConfig(
level=logging.DEBUG, format=”%(asctime)s – %(levelname)s – %(message)s”
)

# Determine appropriate headers
extra_headers = (
{
“Authorization”: f”Bearer {API_KEY}”,
“openai-beta”: “realtime=v1”,
}
if USE_AZURE_OPENAI
else {“api-key”: API_KEY}
)

async def relay_messages(client_ws: WebSocket, vendor_ws):
“””Relay messages between client and vendor WebSockets.”””

async def client_to_vendor():
try:
while True:
data = await client_ws.receive_json()
if data and json_validator(data):
await vendor_ws.send(json.dumps(data))
else:
warning_msg = “Invalid data: payload should be JSON.”
logging.warning(warning_msg)
await send_text_safe(client_ws, warning_msg)
except WebSocketDisconnect:
logging.info(“Client WebSocket disconnected.”)
except Exception as e:
print(traceback.format_exc())

logging.error(f”Error in client_to_vendor: {e}”)

async def vendor_to_client():
try:
while True:
data = await vendor_ws.recv()
await client_ws.send_text(data)
except websockets.exceptions.ConnectionClosed as e:
logging.info(f”Vendor WebSocket disconnected: {e}”)
except Exception as e:
print(traceback.format_exc())

logging.error(f”Error in vendor_to_client: {e}”)

tasks = [
asyncio.create_task(client_to_vendor()),
asyncio.create_task(vendor_to_client()),
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
await asyncio.gather(task, return_exceptions=True)

@realtime_router.websocket(“/realtime”)
async def websocket_endpoint(websocket: WebSocket):
“””Handle WebSocket connections from clients.”””
client_ip = websocket.client.host
logging.info(f”Client connected: {client_ip}”)
await websocket.accept()

try:
async with websockets.connect(
VENDOR_WS_URL, extra_headers=extra_headers
) as vendor_ws:
logging.info(“Connected to vendor WebSocket.”)
await relay_messages(websocket, vendor_ws)
except websockets.exceptions.InvalidHandshake as e:
error_msg = f”Vendor WebSocket handshake failed: {e}”
logging.error(error_msg)
await send_text_safe(websocket, error_msg)
except WebSocketDisconnect:
logging.info(f”Client disconnected: {client_ip}”)
except Exception as e:
logging.error(f”Unexpected error: {e}”)
await send_text_safe(websocket, f”Unexpected error: {e}”)

async def send_text_safe(ws: WebSocket, message: str):
“””Safely send messages to the client WebSocket.”””
try:
await ws.send_text(message)
except Exception as e:
logging.error(f”Error sending message to client: {e}”)

def json_validator(data) -> bool:
“””Validate if the input data is JSON.”””
try:
print(f”data: {data}”)
# Check if data is already a dict, which is valid JSON in Python
if isinstance(data, dict):
return True
# Check if the input is a non-empty string
if isinstance(data, str) and data.strip() == “”:
return False
# If data is a string, try to load it as JSON
json.loads(data)
return True
except (json.JSONDecodeError, TypeError):
return False

 

 

 

app/routes/health_check.py

 

 

 

from fastapi import APIRouter
from fastapi.responses import JSONResponse

health_check_router = APIRouter()

@health_check_router.get(“/”)
async def health_check():
return JSONResponse(content={“status”: “Iam Alive!”}, status_code=200)

 

 

 

app/config.py

 

 

 

import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Configuration variables
VENDOR_WS_URL = os.getenv(“OPENAI_REALTIME_URL”)
API_KEY = os.getenv(“OPENAI_API_KEY”)
USE_AZURE_OPENAI = bool(os.getenv(“USE_AZURE_OPENAI”))

 

 

 

app/main.py

 

 

 

from fastapi import FastAPI
from app.routes.health_check import health_check_router
from app.routes.realtime import realtime_router

app = FastAPI()

# Registering routers
app.include_router(health_check_router)
app.include_router(realtime_router)

 

 

 

client/client.html

 

 

 

<!DOCTYPE html>
<html lang=”en”>
<head>
<meta charset=”UTF-8″>
<title>WebSocket Client</title>
</head>
<body>
<h1>WebSocket Client</h1>
<button id=”connectButton”>Connect to WebSocket</button>
<br><br>
<textarea id=”messageInput” placeholder=”Enter your message here…” rows=”4″ cols=”50″></textarea>
<br>
<button id=”sendButton” disabled>Send Message</button>
<h2>Messages:</h2>
<pre id=”messages”></pre>

<script>
let websocket;
const connectButton = document.getElementById(‘connectButton’);
const sendButton = document.getElementById(‘sendButton’);
const messageInput = document.getElementById(‘messageInput’);
const messagesDisplay = document.getElementById(‘messages’);

connectButton.addEventListener(‘click’, () => {
websocket = new WebSocket(‘ws://localhost:8000/realtime’);

websocket.onopen = () => {
messagesDisplay.textContent += ‘Connected to WebSocket server.n’;
sendButton.disabled = false;
};

websocket.onmessage = (event) => {
messagesDisplay.textContent += ‘Received: ‘ + event.data + ‘n’;
};

websocket.onclose = () => {
messagesDisplay.textContent += ‘WebSocket connection closed.n’;
sendButton.disabled = true;
};

websocket.onerror = (error) => {
messagesDisplay.textContent += ‘WebSocket error: ‘ + error.message + ‘n’;
};
});

sendButton.addEventListener(‘click’, () => {
const message = messageInput.value.trim();
if (message !== ”) {
websocket.send(message);
messagesDisplay.textContent += ‘Sent: ‘ + message + ‘n’;
messageInput.value = ”;
} else {
alert(‘Please enter a message to send.’);
}
});
</script>
</body>
</html>

 

 

 

requirements.txt

fastapi==0.115.0
uvicorn==0.31.0
websockets==13.1
python-dotenv==1.0.1

Install dependencies:

 

 

 

pip install -r requirements.txt<div class=””><h2>Usage<a class=”” href=”https://github.com/Geo-Joy/openai-realtime-fastapi/blob/main/README.md#usage” target=”_blank”><p>To start the application, use Uvicorn to run the FastAPI server:<li-code lang=”abap”>uvicorn main:app –reload

 

 

 

 

The application should now be running on http://localhost:8000.

 

Reference Links

Realtime API DocumentationAzure Realtime Audio SDK Samples

 

​ The Realtime API enables you to build low-latency, multi-modal conversational experiences. It currently supports text and audio as both input and output, as well as function calling.QuickstartThe Realtime API is a WebSocket interface that is designed to run on the server. To help you get started quickly, we’ve created a console demo application that shows some of the features of the API.OverviewThe Realtime API is a stateful, event-based API that communicates over a WebSocket. The WebSocket connection requires the following parameters:URL: wss://api.openai.com/v1/realtimeQuery Parameters: ?model=gpt-4o-realtime-preview-2024-10-01Headers:Authorization: Bearer YOUR_API_KEYOpenAI-Beta: realtime=v1Below is a simple example using Python – FastAPi📦openAI-realtime
┣ 📂app â”— 📂routes ┗  📜 health_check.py ┗  📜 realtime.py ┗  📜 config.py ┗  📜 main.py┣ 📂client ┗  📜 client.htmlCreate a .env file in the root directory with the following content. Use the appropriate URL depending on whether you are using OpenAI or Azure OpenAI:OPENAI_API_KEY=”your-openai-api-key”
OPENAI_REALTIME_URL=”wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01″
USE_AZURE_OPENAI=False For Azure OpenAI, replace the URL as follows and set USE_AZURE_OPENAI to True:OPENAI_API_KEY=”your-azure-openai-api-key”
OPENAI_REALTIME_URL=”wss://<sub-domain>.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview”
USE_AZURE_OPENAI=True  app/routes/realtime.py   import asyncio
import json
import logging
import websockets
import traceback

from fastapi import WebSocket, WebSocketDisconnect, APIRouter
from app.config import VENDOR_WS_URL, API_KEY, USE_AZURE_OPENAI

realtime_router = APIRouter()

# Set up logging
logging.basicConfig(
level=logging.DEBUG, format=”%(asctime)s – %(levelname)s – %(message)s”
)

# Determine appropriate headers
extra_headers = (
{
“Authorization”: f”Bearer {API_KEY}”,
“openai-beta”: “realtime=v1”,
}
if USE_AZURE_OPENAI
else {“api-key”: API_KEY}
)

async def relay_messages(client_ws: WebSocket, vendor_ws):
“””Relay messages between client and vendor WebSockets.”””

async def client_to_vendor():
try:
while True:
data = await client_ws.receive_json()
if data and json_validator(data):
await vendor_ws.send(json.dumps(data))
else:
warning_msg = “Invalid data: payload should be JSON.”
logging.warning(warning_msg)
await send_text_safe(client_ws, warning_msg)
except WebSocketDisconnect:
logging.info(“Client WebSocket disconnected.”)
except Exception as e:
print(traceback.format_exc())

logging.error(f”Error in client_to_vendor: {e}”)

async def vendor_to_client():
try:
while True:
data = await vendor_ws.recv()
await client_ws.send_text(data)
except websockets.exceptions.ConnectionClosed as e:
logging.info(f”Vendor WebSocket disconnected: {e}”)
except Exception as e:
print(traceback.format_exc())

logging.error(f”Error in vendor_to_client: {e}”)

tasks = [
asyncio.create_task(client_to_vendor()),
asyncio.create_task(vendor_to_client()),
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
await asyncio.gather(task, return_exceptions=True)

@realtime_router.websocket(“/realtime”)
async def websocket_endpoint(websocket: WebSocket):
“””Handle WebSocket connections from clients.”””
client_ip = websocket.client.host
logging.info(f”Client connected: {client_ip}”)
await websocket.accept()

try:
async with websockets.connect(
VENDOR_WS_URL, extra_headers=extra_headers
) as vendor_ws:
logging.info(“Connected to vendor WebSocket.”)
await relay_messages(websocket, vendor_ws)
except websockets.exceptions.InvalidHandshake as e:
error_msg = f”Vendor WebSocket handshake failed: {e}”
logging.error(error_msg)
await send_text_safe(websocket, error_msg)
except WebSocketDisconnect:
logging.info(f”Client disconnected: {client_ip}”)
except Exception as e:
logging.error(f”Unexpected error: {e}”)
await send_text_safe(websocket, f”Unexpected error: {e}”)

async def send_text_safe(ws: WebSocket, message: str):
“””Safely send messages to the client WebSocket.”””
try:
await ws.send_text(message)
except Exception as e:
logging.error(f”Error sending message to client: {e}”)

def json_validator(data) -> bool:
“””Validate if the input data is JSON.”””
try:
print(f”data: {data}”)
# Check if data is already a dict, which is valid JSON in Python
if isinstance(data, dict):
return True
# Check if the input is a non-empty string
if isinstance(data, str) and data.strip() == “”:
return False
# If data is a string, try to load it as JSON
json.loads(data)
return True
except (json.JSONDecodeError, TypeError):
return False   app/routes/health_check.py   from fastapi import APIRouter
from fastapi.responses import JSONResponse

health_check_router = APIRouter()

@health_check_router.get(“/”)
async def health_check():
return JSONResponse(content={“status”: “Iam Alive!”}, status_code=200)   app/config.py   import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Configuration variables
VENDOR_WS_URL = os.getenv(“OPENAI_REALTIME_URL”)
API_KEY = os.getenv(“OPENAI_API_KEY”)
USE_AZURE_OPENAI = bool(os.getenv(“USE_AZURE_OPENAI”))   app/main.py   from fastapi import FastAPI
from app.routes.health_check import health_check_router
from app.routes.realtime import realtime_router

app = FastAPI()

# Registering routers
app.include_router(health_check_router)
app.include_router(realtime_router)   client/client.html   <!DOCTYPE html>
<html lang=”en”>
<head>
<meta charset=”UTF-8″>
<title>WebSocket Client</title>
</head>
<body>
<h1>WebSocket Client</h1>
<button id=”connectButton”>Connect to WebSocket</button>
<br><br>
<textarea id=”messageInput” placeholder=”Enter your message here…” rows=”4″ cols=”50″></textarea>
<br>
<button id=”sendButton” disabled>Send Message</button>
<h2>Messages:</h2>
<pre id=”messages”></pre>

<script>
let websocket;
const connectButton = document.getElementById(‘connectButton’);
const sendButton = document.getElementById(‘sendButton’);
const messageInput = document.getElementById(‘messageInput’);
const messagesDisplay = document.getElementById(‘messages’);

connectButton.addEventListener(‘click’, () => {
websocket = new WebSocket(‘ws://localhost:8000/realtime’);

websocket.onopen = () => {
messagesDisplay.textContent += ‘Connected to WebSocket server.n’;
sendButton.disabled = false;
};

websocket.onmessage = (event) => {
messagesDisplay.textContent += ‘Received: ‘ + event.data + ‘n’;
};

websocket.onclose = () => {
messagesDisplay.textContent += ‘WebSocket connection closed.n’;
sendButton.disabled = true;
};

websocket.onerror = (error) => {
messagesDisplay.textContent += ‘WebSocket error: ‘ + error.message + ‘n’;
};
});

sendButton.addEventListener(‘click’, () => {
const message = messageInput.value.trim();
if (message !== ”) {
websocket.send(message);
messagesDisplay.textContent += ‘Sent: ‘ + message + ‘n’;
messageInput.value = ”;
} else {
alert(‘Please enter a message to send.’);
}
});
</script>
</body>
</html>   requirements.txtfastapi==0.115.0uvicorn==0.31.0websockets==13.1python-dotenv==1.0.1

Install dependencies:   pip install -r requirements.txt<div class=””><h2>Usage<a class=”” href=”https://github.com/Geo-Joy/openai-realtime-fastapi/blob/main/README.md#usage” target=”_blank”><p>To start the application, use Uvicorn to run the FastAPI server:<li-code lang=”abap”>uvicorn main:app –reload    The application should now be running on http://localhost:8000. Reference LinksRealtime API DocumentationAzure Realtime Audio SDK Samples   Read More Technology Blogs by SAP articles 

#SAP

#SAPTechnologyblog

You May Also Like

More From Author