Last updated:
0 purchases
httpxsse 0.4.0
httpx-sse
Consume Server-Sent Event (SSE) messages with HTTPX.
Table of contents
Installation
Quickstart
How-To
API Reference
Installation
NOTE: This is beta software. Please be sure to pin your dependencies.
pip install httpx-sse=="0.4.*"
Quickstart
httpx-sse provides the connect_sse and aconnect_sse helpers for connecting to an SSE endpoint. The resulting EventSource object exposes the .iter_sse() and .aiter_sse() methods to iterate over the server-sent events.
Example usage:
import httpx
from httpx_sse import connect_sse
with httpx.Client() as client:
with connect_sse(client, "GET", "http://localhost:8000/sse") as event_source:
for sse in event_source.iter_sse():
print(sse.event, sse.data, sse.id, sse.retry)
You can try this against this example Starlette server (credit):
# Requirements: pip install uvicorn starlette sse-starlette
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse
async def numbers(minimum, maximum):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.9)
yield {"data": i}
async def sse(request):
generator = numbers(1, 5)
return EventSourceResponse(generator)
routes = [
Route("/sse", endpoint=sse)
]
app = Starlette(routes=routes)
if __name__ == "__main__":
uvicorn.run(app)
How-To
Calling into Python web apps
You can call into Python web apps with HTTPX and httpx-sse to test SSE endpoints directly.
Here's an example of calling into a Starlette ASGI app...
import asyncio
import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
async def auth_events(request):
async def events():
yield {
"event": "login",
"data": '{"user_id": "4135"}',
}
return EventSourceResponse(events())
app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])
async def main():
async with httpx.AsyncClient(app=app) as client:
async with aconnect_sse(
client, "GET", "http://localhost:8000/sse/auth/"
) as event_source:
events = [sse async for sse in event_source.aiter_sse()]
(sse,) = events
assert sse.event == "login"
assert sse.json() == {"user_id": "4135"}
asyncio.run(main())
Handling reconnections
(Advanced)
SSETransport and AsyncSSETransport don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an httpx.ReadError from iter_sse() (or aiter_sse()).
However, httpx-sse does allow implementing reconnection by using the Last-Event-ID and reconnection time (in milliseconds), exposed as sse.id and sse.retry respectively.
Here's how you might achieve this using stamina...
import time
from typing import Iterator
import httpx
from httpx_sse import connect_sse, ServerSentEvent
from stamina import retry
def iter_sse_retrying(client, method, url):
last_event_id = ""
reconnection_delay = 0.0
# `stamina` will apply jitter and exponential backoff on top of
# the `retry` reconnection delay sent by the server.
@retry(on=httpx.ReadError)
def _iter_sse():
nonlocal last_event_id, reconnection_delay
time.sleep(reconnection_delay)
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
with connect_sse(client, method, url, headers=headers) as event_source:
for sse in event_source.iter_sse():
last_event_id = sse.id
if sse.retry is not None:
reconnection_delay = sse.retry / 1000
yield sse
return _iter_sse()
Usage:
with httpx.Client() as client:
for sse in iter_sse_retrying(client, "GET", "http://localhost:8000/sse"):
print(sse.event, sse.data)
API Reference
connect_sse
def connect_sse(
client: httpx.Client,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> ContextManager[EventSource]
Connect to an SSE endpoint and return an EventSource context manager.
This sets Cache-Control: no-store on the request, as per the SSE spec, as well as Accept: text/event-stream.
If the response Content-Type is not text/event-stream, this will raise an SSEError.
aconnect_sse
async def aconnect_sse(
client: httpx.AsyncClient,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> AsyncContextManager[EventSource]
An async equivalent to connect_sse.
EventSource
def __init__(response: httpx.Response)
Helper for working with an SSE response.
response
The underlying httpx.Response.
iter_sse
def iter_sse() -> Iterator[ServerSentEvent]
Decode the response content and yield corresponding ServerSentEvent.
Example usage:
for sse in event_source.iter_sse():
...
aiter_sse
async def iter_sse() -> AsyncIterator[ServerSentEvent]
An async equivalent to iter_sse.
ServerSentEvent
Represents a server-sent event.
event: str - Defaults to "message".
data: str - Defaults to "".
id: str - Defaults to "".
retry: str | None - Defaults to None.
Methods:
json() -> Any - Returns sse.data decoded as JSON.
SSEError
An error that occurred while making a request to an SSE endpoint.
Parents:
httpx.TransportError
License
MIT
Changelog
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog.
0.4.0 - 2023-12-22
Removed
Dropped Python 3.7 support, as it has reached EOL. (Pull #21)
Added
Add official support for Python 3.12. (Pull #21)
Fixed
Allow Content-Type that contain but are not strictly text/event-stream. (Pull #22 by @dbuades)
Improve error message when Content-Type is missing. (Pull #20 by @jamesbraza)
0.3.1 - 2023-06-01
Added
Add __repr__() for ServerSentEvent model, which may help with debugging and other tasks. (Pull #16)
0.3.0 - 2023-04-27
Changed
Raising an SSEError if the response content type is not text/event-stream is now performed as part of iter_sse() / aiter_sse(), instead of connect_sse() / aconnect_sse(). This allows inspecting the response before iterating on server-sent events, such as checking for error responses. (Pull #12)
0.2.0 - 2023-03-27
Changed
connect_sse() and aconnect_sse() now require a method argument: connect_sse(client, "GET", "https://example.org"). This provides support for SSE requests with HTTP verbs other than GET. (Pull #7)
0.1.0 - 2023-02-05
Initial release
Added
Add connect_sse, aconnect_sse(), ServerSentEvent and SSEError.
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.