Event Socket Streaming Endpoint (SSE) - Webhooks Drop-In
This feature is EXPERIMENTAL
. Learn what this means...
Many of Uniques FSI clients do not allow their developers or staff to install tunneling tools like ngrok™ for many reasons. Deploying for each development iteration ruins the Developer Experience and, ultimately, the adoption.
The Event Socket acts as a drop-in tool, that streams recent events from Unique FinanceGPT and emits them for developers to feel like being called by a webhook.
- 1 Rationale
- 1.1 Scope
- 2 Architecture
- 2.1 Event Socket
- 3 Limitations
- 4 Security
- 4.1 Accessibility
- 5 Python Example
Rationale
Uniques' ICPs are FSI clients, for this case here, specifically their developers FSI Developer (ff. Developer) most often have (for good reasons) heavily governed computers. This interferes with certain software architecture patterns Unique leverages especially when locally developing with/against them.
Unique FinanceGPT chat empowers extensibility via its App Repository and the respective Webhook Workers and Schedulers. That means the Unique platform (in all Deployment models) actively pushes content securely to a remote machine that is naturally exposed to the same network.
This push principle is not achievable for some Developers. While there are some workarounds that these Developers could try, most would realistically violate a company policy…
Unique does not encourage that, so it introduced the Event Socket, which allows developers to open an event stream from their own machines without the need to open up a public endpoint to the internet.
Scope
The Event Socket service allows consumers to subscribe to events and open a long-lived HTTP EventSource connection. Events from the Unique FinanceGPT platform are sent via the connection.
The connection does not provide any delivery guarantee and automatically closes (without warning) after 10 minutes (also see Limitations below).
For these reasons, Unique discourages any use of the Event Socket in any non-development use cases.
Architecture
The Event Socket is subscribed to the same Event Bus as the Webhook Services themselves. While the Webhooks send HTTP requests to registered endpoints with retry mechanisms, the Event Socket will publish events to subscribed consumers via server-sent events.
The Event Socket is a dedicated service that can be scaled independently from the rest of Unique FinanceGPT to accommodate the client's needs and scale.
Event Socket
In order to receive events from the Event Socket service, consumers must open an EventSource stream to the Event Socket service:
export BASE_URL=gateway.<tenant>.unique.app
export SUBSCRIPTIONS=unique.chat.user-message.created
curl --location "https://$BASE_URL/public/event-socket/events/stream?subscriptions=$SUBSCRIPTIONS" \
--header 'x-app-id: <UNIQUE-APP-ID>' \
--header 'x-company-id: <UNIQUE-COMPANY-ID>' \
--header 'Authorization: ••••••'
The endpoint is secured like all other Public API endpoints and requires an x-app-id
and an API key sent via the Authorization header as a Bearer token. The current x-company-id
must also be provided.
By using the mandatory subscriptions
query argument, the consumer can define which events he wants to receive via the subscription. Multiple events can be separated by a comma.
In Python, there is the sseclient a library that supports server-sent events. Check here for a Python example.
Limitations
Timeout
The connection will timeout after 10 minutes, no matter how many events are sent over the line. The consumer must implement a proper timeout detection and re-new the subscription to keep receiving events.
No Retry
The Event Socket service has no knowledge of which consumers are supposed to receive events. He only sends events to active subscriptions. If the connection is not up and running, the consumer application will not receive any events and there is no replay or retry mechanism.
For production purposes, Webhooks must be used.
Security
Accessibility
These mechanisms allow developers to de-facto attach their machines to a test or even production system, a scenario most clients want to actively avoid (mainly a reason in the first place not even to allow developers to expose their machines).
Unique is fully aware of this risk and mitigates it in the following ways:
Developers leveraging this mechanism have access to the same data already, as the stream is On Behalf Of (OBO).
Developers and clients who want to leverage these services have actively acknowledged the identified risks in a written form and are accepting the Terms of Use.
Python Example
Below is a Python script that demonstrates how to consume the unique.chat.user-message.created
subscription and respond with a dummy assistant message in the corresponding chat. To use this example, make sure to update the API_ID
, API_KEY
, BASE_URL
, and COMPANY_ID
with your specific details. Additionally, specify the ASSISTANT_ID
that this application should interact with to ensure it only responds to the designated assistant.
import json
import logging
import os
import unique_sdk
from sseclient import SSEClient
# Subscription (multiple events can be separated by a comma)
SUBSCRIPTIONS = "unique.chat.user-message.created"
# Company ID
COMPANY_ID = <YOUR_COMPANY_ID>
# APP credentials
API_ID = <YOUR_API_ID>
API_KEY = <YOUR_API_KEY>
BASE_URL = <gateway.<tenant>.unique.app>
# Assistant, on which the this application should run
ASSISTANT_ID = <ASSISTANT_ID>
# Setup logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
unique_sdk.api_key = API_KEY
unique_sdk.app_id = API_ID
def get_sse_client() -> SSEClient:
url=f'{os.getenv("BASE_URL")}/public/event-socket/events/stream?subscriptions={SUBSCRIPTIONS}'
headers={
"Authorization": f'Bearer {os.getenv("API_KEY")}',
"x-app-id": os.getenv("APP_ID"),
"x-company-id": os.getenv("COMPANY_ID"),
}
return SSEClient(url=url, headers=headers)
def process_event(event_data):
try:
event = json.loads(event_data)
logger.debug(f"Event details: {event}")
except json.JSONDecodeError as e:
logger.error(f"JSON decoding error: {e}")
return
if (
event
and event["event"] in SUBSCRIPTIONS.split(",")
and event["payload"]["assistantId"] == os.getenv("ASSISTANT_ID")
):
logger.info(f"Event subscription received: {SUBSCRIPTIONS}")
# Send a message back to the user
try:
unique_sdk.Message.create(
user_id=event["userId"],
company_id=event["companyId"],
chatId=event["payload"]["chatId"],
assistantId=ASSISTANT_ID,
text="Hello from Unique! 🚀",
role="ASSISTANT",
)
except Exception as e:
logger.error(f"Error sending message: {e}")
def event_socket():
for event in get_sse_client():
logger.debug("New event received.")
if event.data:
process_event(event.data)
if __name__ == "__main__":
event_socket()
Author | @Sandro Camastral @Konstantin Krauss @Fabian Schläpfer |
---|
© 2024 Unique AG. All rights reserved. Privacy Policy – Terms of Service