The SubscriptionOnStart handler is a custom module hook that allows you to intercept and customize the initialization of GraphQL subscriptions.
This handler is called once when a subscription starts, giving you the opportunity to validate permissions, send initial events, or perform setup logic.
This handler is particularly useful for:
- Subscription authentication: Validate JWT tokens or user permissions before allowing subscriptions
- Initial event delivery: Send welcome messages or current state to new subscribers
- Subscription logging: Track subscription attempts and user behavior
- Connection validation: Ensure clients meet specific criteria before subscribing
- Rate limiting: Control subscription attempts per user or client
- State initialization: Initialize state used by other handlers such as
OnReceiveEvents or OnPublishEvents of the same module
Handler Interface
In order to use the SubscriptionOnStart handler you need to create a Custom Module which implements the SubscriptionOnStartHandler interface.
type SubscriptionOnStartHandler interface {
// SubscriptionOnStart is called once at subscription start
// The error is propagated to the client.
SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error
}
type SubscriptionOnStartHandlerContext interface {
// Request is the original request received by the router.
Request() *http.Request
// Logger is the logger for the request
Logger() *zap.Logger
// Operation is the GraphQL operation
Operation() OperationContext
// Authentication is the authentication for the request
Authentication() authentication.Authentication
// SubscriptionEventConfiguration is the subscription event configuration (will return nil for engine subscription)
SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration
// WriteEvent writes an event to the stream of the current subscription
// It returns true if the event was written to the stream, false if the event was dropped
WriteEvent(event datasource.StreamEvent) bool
// NewEvent creates a new event that can be used in the subscription.
NewEvent(data []byte) datasource.MutableStreamEvent
}
Error Handling
When you return an error from the SubscriptionOnStart handler, the router responds to the client with an error event and closes the subscription.
You can choose to log a generic error or a custom error response with more details for the client.
return errors.New("my handler error")
This will result in an internal server error response to the client.
{
"errors": [
{
"message": "Internal server error"
}
]
}
Whereas you can return a custom error response with more details for the client.
return core.NewHttpGraphqlError(
"my graphql error",
http.StatusText(http.StatusForbidden),
http.StatusForbidden
)
This will result in a error response with more details for the client.
{
"errors": [
{
"message": "my graphql error",
"extensions": {
"statusCode": 403,
"code": "Forbidden"
}
}
]
}
Errors are not logged automatically by the router. If you need the error to be logged, you can use ctx.Logger() to log the error yourself.
Usage Example
Complete Custom Module with Event Bypass
The following example demonstrates how to register a passive SubscriptionOnStart handler that logs subscription attempts but allows all subscriptions to proceed normally.
package module
import (
"github.com/wundergraph/cosmo/router/core"
"go.uber.org/zap"
)
func init() {
// Register your module with the router
core.RegisterModule(&SubscriptionStartModule{})
}
const ModuleID = "subscriptionStartModule"
// SubscriptionStartModule demonstrates a passive subscription start handler
type SubscriptionStartModule struct{}
// SubscriptionOnStart logs subscription attempts and allows them to proceed
func (m *SubscriptionStartModule) SubscriptionOnStart(
ctx core.SubscriptionOnStartHandlerContext,
) error {
logger := ctx.Logger()
config := ctx.SubscriptionEventConfiguration()
// Log subscription details
logger.Info("Subscription started",
zap.String("field_name", config.RootFieldName()),
zap.String("provider_id", config.ProviderID()),
zap.String("provider_type", string(config.ProviderType())),
)
// Allow subscription to proceed
return nil
}
// Module returns the module information for registration
func (m *SubscriptionStartModule) Module() core.ModuleInfo {
return core.ModuleInfo{
ID: ModuleID,
New: func() core.Module {
return &SubscriptionStartModule{}
},
}
}
// Interface guards to ensure we implement the required interfaces
var (
_ core.SubscriptionOnStartHandler = (*SubscriptionStartModule)(nil)
)
Return initial events
You can use ctx.WriteEvent() to send initial or welcome events to subscribers immediately when they connect.
This is useful for providing current state or welcome messages.
func (m *SubscriptionStartModule) SubscriptionOnStart(
ctx core.SubscriptionOnStartHandlerContext,
) error {
// Bypass the handler on other subscriptions
if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" {
return nil
}
// Create an initial event with minimal required fields
// The router will resolve all other fields requested by the subscriber
initialEventData := `{ "__typename": "Employee", "id": 1 }`
initialEvent := ctx.NewEvent([]byte(initialEventData))
success := ctx.WriteEvent(initialEvent)
if !success {
ctx.Logger().Warn("Failed to send initial event to subscriber")
}
return nil
}
The payload data used to create a new event has to follow a specific format.
It has to be a valid JSON object that contains the __typename field to identify the entity type we want to return
for this subscription. The other field in this case is id, which represents the entity key of Employee types as defined in the schema.
The router will use this information to resolve all fields requested by the subscriber to generate a complete response.
Prevent subscriptions on missing token claims
This example validates JWT tokens and blocks subscriptions for users without the required “role” claim, demonstrating proper authentication enforcement.
func (m *SubscriptionStartModule) SubscriptionOnStart(ctx core.SubscriptionOnStartHandlerContext) error {
// Only check "employeeUpdated" subscription
if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" {
return nil
}
auth := ctx.Authentication()
if auth == nil {
return core.NewHttpGraphqlError("unauthorized", http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
}
// Check for specific "admin" role
roleValue, hasRole := auth.Claims()["role"]
if !hasRole {
return core.NewHttpGraphqlError("missing role claim", http.StatusText(http.StatusForbidden), http.StatusForbidden)
}
role, ok := roleValue.(string)
if !ok || role != "admin" {
return core.NewHttpGraphqlError("admin role required", http.StatusText(http.StatusForbidden), http.StatusForbidden)
}
return nil
}