Skip to main content
The OnPublishEvents handler is a custom module hook that allows you to intercept and process events before they are sent to providers through GraphQL mutations. This handler is called whenever a batch of events is about to be published to a provider, giving you the opportunity to filter, transform, enrich, or validate events before they are sent to the provider. This handler is particularly useful for:
  • Event validation: Ensure events meet specific criteria before publishing
  • Data transformation: Modify event payloads to match provider expectations
  • Event enrichment: Add additional metadata or context to events
  • Authentication and authorization: Filter events based on user permissions
  • Monitoring and analytics: Log or track outgoing events for observability
This handler is executed only when a GraphQL mutation triggers event publishing. Unlike OnReceiveEvents, this handler processes outgoing events to providers, not incoming events from subscriptions.

Handler Interface

In order to use the OnPublishEvents handler you need to create a Custom Module which implements the StreamPublishEventHandler interface.
type StreamPublishEventHandler interface {
    // OnPublishEvents is called each time a batch of events is going to be sent to a provider.
    // The events argument contains all events from a batch.
    // Use events.All() to iterate through them and event.Clone() to create mutable copies, when needed.
    // Returning an error will result in a GraphQL error being returned to the client, could be customized returning a
    // StreamHookError.
    OnPublishEvents(ctx StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error)
}

type StreamPublishEventHandlerContext interface {
    // Request is the original request received by the router.
    Request() *http.Request
    // Logger is the logger for the request
    Logger() *zap.Logger
    // Operation is the GraphQL operation
    Operation() OperationContext
    // Authentication is the authentication for the request
    Authentication() authentication.Authentication
    // PublishEventConfiguration the publish event configuration
    PublishEventConfiguration() datasource.PublishEventConfiguration
    // NewEvent creates a new event that can be used in the subscription.
    NewEvent(data []byte) datasource.MutableStreamEvent
}

Error Handling

As mentioned in the Publish Overview Section the return type of a Cosmo Streams mutation must use the type PublishEventResult. This type declares a boolean success field.
Implementations of OnPublishEvents handlers return two fields: events and error. When error is not nil, the client’s response will have the success field set to false. Also the error will be logged on the routers console output. When events are returned, these will always be sent to the provider, even if you return an error. This can be useful in case you partially processed data but hit an error along the way. In case you don’t wont to sent any events to the provider, you can return datasource.NewStreamEvents(nil). See code examples below for a demonstration.
When the OnPublishEvents handler returns an error, the router takes the following actions:
  1. The client will receive a response, where the success field is false
  2. Returned events are sent to the message providers, if any provided
  3. The error is logged by the router with details about the mutation, provider, and field name
Returning events alongide an error from OnPublishEvents will send these events to the provider. In case you don’t want to send any you need to return an empty list of events. Refer to the examples down below to see how this can be done.
Here is an example of proper error handling:
func (m *MyEventHandler) OnPublishEvents(
	ctx core.StreamPublishEventHandlerContext,
	events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
	// For validation failures, don't send events to the provider
	// and return a success=false to the client.
	for _, event := range events.All() {
		if !isValidEvent(event.GetData()) {
			return datasource.NewStreamEvents(nil), errors.New("invalid event data - publication rejected")
		}
	}

	// In case of partial processing of data,
	// you can send all events part of the successfull processing to the provider
	// and can still return an error.
	if failureAfterPartialProcessing {
		return partialEvents, errors.New("error during data processing")
	}

	return events, nil
}

Usage Example

Complete Custom Module with Event Bypass

The following example contains a complete Custom Module implementation, including handler registration, with a handler that will simply pass events through unchanged. This demonstrates how to register your OnPublishEvents Custom Module.
package module

import (
    "github.com/wundergraph/cosmo/router/core"
    "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource"
    "go.uber.org/zap"
)

func init() {
    // Register your module with the router
    core.RegisterModule(&EventPublishModule{})
}

const ModuleID = "eventPublishModule"

// EventPublishModule demonstrates a complete custom module implementation
// that implements StreamPublishEventHandler but simply passes events through unchanged
type EventPublishModule struct {}

func (m *AuthEventHandler) OnPublishEvents(
    ctx core.StreamPublishEventHandlerContext, 
    events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
    logger := ctx.Logger()
    auth := ctx.Authentication()
    
    // Require authentication for publishing events
    if auth == nil {
        logger.Warn("Unauthenticated user attempted to publish events")
        return nil, errors.New("authentication required to publish events")
    }
    
    // Check JWT claims for required permissions
    claims := auth.Claims()
    if claims == nil {
        return nil, errors.New("invalid authentication token")
    }
    
    // Check for required role to publish events
    roleClaimValue, hasRole := claims["role"]
    if !hasRole {
        return nil, errors.New("missing role claim - publication denied")
    }
    
    userRole, ok := roleClaimValue.(string)
    if !ok || (userRole != "admin" && userRole != "publisher") {
        logger.Warn("User without publish permissions attempted to publish events", 
            zap.Any("role", roleClaimValue),
        )
        return nil, errors.New("insufficient permissions to publish events")
    }
    
    // User is authorized - allow event publishing
    logger.Debug("Authorized user publishing events", 
        zap.String("role", userRole),
        zap.Int("event_count", events.Len()),
    )
    return events, nil
}

// Module returns the module information for registration
func (m *EventPublishModule) Module() core.ModuleInfo {
    return core.ModuleInfo{
        ID: ModuleID,
        New: func() core.Module {
            return &EventPublishModule{}
        },
    }
}

// Interface guards to ensure we implement the required interfaces
var (
    _ core.StreamPublishEventHandler = (*EventPublishModule)(nil)
)

Restrict Handler to run on certain mutations and providers

Most of the time you want your hook to only deal with certain mutations. The OnPublishEvents Handler is run for every mutation configured for Cosmo Streams. You can access the name of the mutation you care for and return early if it’s not the right one.
func (m *SelectivePublishHandler) OnPublishEvents(
    ctx core.StreamPublishEventHandlerContext, 
    events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
    logger := ctx.Logger()
    pubConfig := ctx.PublishEventConfiguration()

    // Bypass handler if it's not the right mutation
    if pubConfig.RootFieldName() != "updateEmployee" {
        return events, nil
    }

    // And / or you can decide to bypass in case it's not the right provider
    // you want to deal with here.
    if pubConfig.ProviderID() != "my-kafka" {
        return events, nil
    }
    

    // Your specific event processing logic here
    // ...
    
    return datasource.NewStreamEvents(processedEvents), nil
}

Prevent unauthorized users from sending Cosmo Streams mutation events to providers

You can use ctx.Authentication() to validate that only authorized users can publish events to specific providers. This is useful for securing mutation operations that trigger event publishing.
func (m *AuthEventHandler) OnPublishEvents(
    ctx core.StreamPublishEventHandlerContext, 
    events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
    logger := ctx.Logger()
    auth := ctx.Authentication()
    
    // Require authentication for publishing events
    if auth == nil {
        logger.Warn("Unauthenticated user attempted to publish events")
        return nil, errors.New("authentication required to publish events")
    }
    
    // Check JWT claims for required permissions
    claims := auth.Claims()
    if claims == nil {
        return nil, errors.New("invalid authentication token")
    }
    
    // Check for required role to publish events
    roleClaimValue, hasRole := claims["role"]
    if !hasRole {
        return nil, errors.New("missing role claim - publication denied")
    }
    
    userRole, ok := roleClaimValue.(string)
    if !ok || (userRole != "admin" && userRole != "publisher") {
        logger.Warn("User without publish permissions attempted to publish events", 
            zap.Any("role", roleClaimValue),
        )
        return nil, errors.New("insufficient permissions to publish events")
    }
    
    // User is authorized - allow event publishing
    logger.Debug("Authorized user publishing events", 
        zap.String("role", userRole),
        zap.Int("event_count", events.Len()),
    )
    return events, nil
}

Attach headers to Kafka events

You can attach headers to Kafka events before sending them to providers.
func (m *EventPublishModule) OnPublishEvents(
	ctx core.StreamPublishEventHandlerContext,
	events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
	// Bypass handler in case it's not about Kafka events
	if ctx.PublishEventConfiguration().ProviderType() != datasource.ProviderTypeKafka {
		return events, nil
	}

	eventsWithHeaders := make([]datasource.StreamEvent, 0, events.Len())
	for _, evt := range events.All() {
        // In order to set headers we need to clone the event first to make it mutable
		clonedEvent := evt.Clone()
		kafkaEvent, ok := clonedEvent.(*kafka.MutableEvent)
		if !ok {
			rootFieldName := ctx.PublishEventConfiguration().RootFieldName()
			ctx.Logger().
				With(zapcore.Field{Key: "root_field_name", String: rootFieldName}).
				Warn("got non-kafka event in kafka based handler, this should not happen")
		}

		kafkaEvent.Headers["event_source"] = []byte("graphql_mutation")
		eventsWithHeaders = append(eventsWithHeaders, kafkaEvent)
	}

	return datasource.NewStreamEvents(eventsWithHeaders), nil
}