The OnReceiveEvents handler is a custom module hook that allows you to intercept and process events received from
supported message providers before they are delivered to GraphQL subscription clients.
This handler is called whenever a batch of events is received from a provider, giving you the opportunity to filter, transform, enrich,
or validate events before they reach your subscribers.
This handler is particularly useful for:
- Event filtering: Remove unwanted events based on custom logic
- Data transformation: Modify event payloads to match client expectations
- Event enrichment: Add additional data to events from external sources
- Authentication and authorization: Filter events based on user permissions
- Monitoring and analytics: Log or track events for observability
If there is no active subscription this handler is not executed, even if new messages arrive at the provider.
This is because the Router will not listen for messages on the provider topic/queue until at least one
client subscribes to a particular subscription.
Handler Interface
In order to use the OnReceiveEvents handler you need to create a Custom Module which implements
the StreamReceiveEventHandler interface.
type StreamReceiveEventHandler interface {
// OnReceiveEvents is called whenever a batch of events is received from a provider,
// before delivering them to clients.
// The hook will be called once for each active subscription, therefore it is advised to
// avoid resource heavy computation or blocking tasks whenever possible.
// The events argument contains all events from a batch and is shared between
// all active subscribers of these events.
// Use events.All() to iterate through them and event.Clone() to create mutable copies, when needed.
// Returning an error will result in the subscription being closed and the error being logged.
OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error)
}
type StreamReceiveEventHandlerContext interface {
// Context is a context for handlers.
// If it is cancelled, the handler should stop processing.
Context() context.Context
// Request is the initial client request that started the subscription
Request() *http.Request
// Logger is the logger for the request
Logger() *zap.Logger
// Operation is the GraphQL operation
Operation() OperationContext
// Authentication is the authentication for the request
Authentication() authentication.Authentication
// SubscriptionEventConfiguration the subscription event configuration
SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration
// NewEvent creates a new event that can be used in the subscription.
NewEvent(data []byte) datasource.MutableStreamEvent
}
The OnReceiveEvents handler is executed asynchronously for each active subscription when events are received from the provider.
To control resource usage and prevent overwhelming your system, you can configure the maximum number of concurrent handlers using
the max_concurrent_handlers configuration option.
events:
subscription_hooks:
on_receive_events:
max_concurrent_handlers: 100 # Default: 100
This limit applies per Trigger, not globablly.
When the maximum number of concurrent handlers for a topic is reached, the router will not poll new events from the message queue until a handler finishes and becomes available again.
To avoid delivering events out of order to subscription clients, the router waits for all handlers to complete before polling the next batch of events.
This waiting period is configurable via the router settings:
events:
subscription_hooks:
on_receive_events:
handler_timeout: 1s # default: 5s
If the timeout is reached, the router immediately polls the next batch of events, which may result in out-of-order delivery to subscription clients.
In this case, a warning is logged.
It is recommended to use ctx.Context(), which is cancelled in such situations.
You can use this context to abort any long-running operations:
func (m *CosmoStreamsModule) OnReceiveEvents(
ctx core.StreamReceiveEventHandlerContext,
events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
for _, event := range events.All() {
select {
case <-ctx.Context().Done():
ctx.Logger().Debug("context cancelled, stopping processing and return no events to subscriber")
return datasource.StreamEvents{}, nil
default:
// process event here...
}
}
return events, nil
}
The router does not abort the handler when the context is cancelled. Instead, it proceeds to receive the next batch of events from the provider.
In this case, events may be delivered out of order because long-running handlers are still processing the previous batch.
While the handler limit is reached, the router will not poll the next batch of events from the provider.
This effectively means that the subscription will not receive updates until a handler becomes free again.
Error Handling
When the OnReceiveEvents handler returns an error, the router takes the following actions:
- Send Returned Events: Alongside the error you can return events, if you wish to sent them to the client prior connection closing
- Subscription Closure: The affected subscription is immediately closed for the client that encountered the error
- Error Logging: The error is logged by the router with details about the subscription, provider, and field name
- Error Deduplication: If multiple subscriptions experience the same error for the same events, the router deduplicates the error messages in the logs to prevent spam
- No Error Propagation: The error is not sent directly to the GraphQL client - the subscription simply closes
Returning an error from OnReceiveEvents will close the subscription for that specific client. Use this only when you want to terminate the subscription due to unrecoverable conditions. For filtering events, return an empty event list instead of an error.
The error gets logged by the router but it won’t be send to the client.
From the view of the client the subscription closes server-side without a reason. We are working on a solution for this.
Example of proper error handling:
func (m *MyEventHandler) OnReceiveEvents(
ctx core.StreamReceiveEventHandlerContext,
events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
// For recoverable issues, filter events instead of returning errors
if someCondition {
return datasource.NewStreamEvents(nil), nil // Empty events, no error
}
// Only return errors for unrecoverable conditions
if criticalSystemFailure {
return nil, errors.New("critical system failure - closing subscription")
}
return events, nil
}
Usage Example
Complete Custom Module with Event Bypass
The following example contains a complete Custom Module implementation, including handler registration,
with a handler that will simply bypass events unchanged. This is not useful on it’s own but demonstrates
how to register your OnReceiveEvents Custom Module.
package module
import (
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/pubsub/datasource"
"go.uber.org/zap"
)
func init() {
// Register your module with the router
core.RegisterModule(&EventBypassModule{})
}
const ModuleID = "eventBypassModule"
// EventBypassModule demonstrates a complete custom module implementation
// that implements StreamReceiveEventHandler but simply passes events through unchanged
type EventBypassModule struct {}
// OnReceiveEvents passes all events through unchanged
func (m *EventBypassModule) OnReceiveEvents(
ctx core.StreamReceiveEventHandlerContext,
events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
logger := ctx.Logger()
logger.Debug("Processing events - bypassing unchanged",
zap.Int("event_count", len(events.All())),
)
// Simply return the events unchanged
return events, nil
}
// Module returns the module information for registration
func (m *EventBypassModule) Module() core.ModuleInfo {
return core.ModuleInfo{
ID: ModuleID,
New: func() core.Module {
return &EventBypassModule{}
},
}
}
// Interface guards to ensure we implement the required interfaces
var (
_ core.StreamReceiveEventHandler = (*EventBypassModule)(nil)
)
Restrict Handler to run on certain subscriptions and providers
Most of the time you want your hook to only deal with a certain subscription.
The OnReceiveEvents Handler is run for every subscription configured for Cosmo Streams.
You can access the name of the subscription you care for and return early if it’s not the right one.
func (m *SelectiveEventHandler) OnReceiveEvents(
ctx core.StreamReceiveEventHandlerContext,
events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
logger := ctx.Logger()
subConfig := ctx.SubscriptionEventConfiguration()
// Bypass handler if it's not the right subscription
if subConfig.RootFieldName() != "employeeUpdated" {
return events, nil
}
// And / or you can decide to process events only from a specific provider configured in the Router
if subConfig.ProviderID() != "my-nats" {
return events, nil
}
// Your specific event processing logic here
// ...
return datasource.NewStreamEvents(processedEvents), nil
}
Filter out events based on clients authentication token claim
You can use ctx.Authentication() to access authentication data, such as tokens, if available.
Based on that you can filter events, if the token misses the proper claim.
func (m *EventFilterModule) OnReceiveEvents(
ctx core.StreamReceiveEventHandlerContext,
events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
logger := ctx.Logger()
auth := ctx.Authentication()
// If no authentication, block all events
if auth == nil {
return datasource.NewStreamEvents(nil),
errors.New("No authentication present, closing subscription")
}
// Check JWT claims
claims := auth.Claims()
if claims == nil {
return datasource.NewStreamEvents(nil),
errors.New("No claims present, closing subscription")
}
// Check for admin role claim
roleClaimValue, hasRole := claims["role"]
if !hasRole {
logger.Debug("No role claim, blocking all events")
return datasource.NewStreamEvents(nil), nil
}
userRole, ok := roleClaimValue.(string)
if !ok || userRole != "admin" {
logger.Debug("User is not admin, blocking all events",
zap.Any("role", roleClaimValue),
)
return datasource.NewStreamEvents(nil), nil
}
// User is admin - pass all events through
logger.Debug("Admin user authorized, passing all events",
zap.Int("event_count", events.Len()),
)
return events, nil
}
Certain providers enrich their messages with metadata accessible by the Router.
Kafka and NATS, for example, have the option to add headers to messages.
Here’s an example that filters out all messages coming from a Kafka instance where a header indicates
it’s not meant for GraphQL subscriptions.
import (
"github.com/wundergraph/cosmo/router/pkg/pubsub/kafka"
)
func (m *HeaderFilterModule) OnReceiveEvents(
ctx core.StreamReceiveEventHandlerContext,
events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
logger := ctx.Logger()
// Only process events from Kafka providers.
// Pass through unchanged for non-Kafka providers.
if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeKafka {
return events, nil
}
// Optionally validate specific provider ID or subscription field
logger.Debug("Processing Kafka events for subscription",
zap.String("provider_id", subConfig.ProviderID()),
zap.String("field_name", subConfig.RootFieldName()),
zap.Int("event_count", events.Len()),
)
filteredEvents := make([]datasource.StreamEvent, 0, events.Len())
for _, event := range events.All() {
// Check if this is a Kafka event with headers
if kafkaEvent, ok := event.(*kafka.Event); ok {
headers := kafkaEvent.GetHeaders()
// Filter out events with "internal" header set to "true"
if internalHeader, exists := headers["internal"]; exists {
if string(internalHeader) == "true" {
logger.Debug("Filtering out internal event")
continue
}
}
}
// Include this event in the results
filteredEvents = append(filteredEvents, event)
}
logger.Debug("Filtered events by headers",
zap.Int("original_count", events.Len()),
zap.Int("filtered_count", len(filteredEvents)),
)
return datasource.NewStreamEvents(filteredEvents), nil
}