Knative Eventing: Concepts, Core Components, and Practical Usage Patterns (Source‑to‑Sink, Channel & Subscription, Broker & Trigger)
This article introduces Knative Eventing concepts and core components, then demonstrates three practical usage patterns—Source‑to‑Sink, Channel and Subscription, and Broker and Trigger—through step‑by‑step examples, Kubernetes manifests, and command‑line operations, highlighting event flow, filtering, and reply handling.
Continuing from the previous article, this guide explains Knative Eventing concepts and core components, then walks through three main usage patterns with concrete examples.
Source to Sink
The Source to Sink pattern is the simplest event flow in Knative Eventing. It uses two Knative resources: Source and Sink . The Source configures the address of the event consumer ( Sink ), and events are sent directly from the source to a single service (a Knative Service or a core Kubernetes Service). The source does not wait for a reply; any reply from the sink is ignored, and the source handles retries or queuing when the target service is unavailable.
First, create a namespace called eventing-example where all resources will be deployed:
[root@dev ~]# kubectl create namespace eventing-example
namespace/eventing-example createdDefine a Sink service to consume events:
# event-display.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
namespace: eventing-example
spec:
template:
spec:
containers:
- image: docker.io/meteatamel/event-display:v1Define a PingSource that sends a message every minute to the event-display service:
# simple-source.yaml
apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
name: simple-source
namespace: eventing-example
spec:
schedule: "*/1 * * * *"
data: '{"message": "Hello world! Simple source to Sink."}'
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-displayCreate the resources with kubectl :
# Create Sink
[root@dev source-sink]# kubectl apply -f event-display.yaml
service.serving.knative.dev/event-display created
# Create Source
[root@dev source-sink]# kubectl apply -f simple-source.yaml
pingsource.sources.knative.dev/simple-source createdVerify the service and source are running:
# Check pods
[root@dev source-sink]# kubectl -n eventing-example get pods
NAME READY STATUS RESTARTS AGE
event-display-00001-deployment-7fbc9d9b94-wwbv2 2/2 Running 0 95sCheck the logs to see the event being received every minute:
# Logs
[root@dev source-sink]# kubectl -n eventing-example logs event-display-00001-deployment-7fbc9d9b94-wwbv2 -f --tail 100
...output omitted...
info: event_display.Startup[0]
Received CloudEvent
ID: ba08f8eb-65e4-4c49-a311-35a1aa4ba80c
Source: /apis/v1/namespaces/eventing-example/pingsources/simple-source
Type: dev.knative.sources.ping
Data: {"message":"Hello world! Simple source to Sink."}
...output omitted...When testing is finished, stop the services:
# Delete resources
[root@dev source-sink]# kubectl delete -f simple-source.yaml
pingsource.sources.knative.dev "simple-source" deleted
[root@dev source-sink]# kubectl delete -f event-display.yaml
service.serving.knative.dev "event-display" deletedChannel and Subscription
The Channel and Subscription pattern introduces a Channel that can be backed by various technologies (In‑Memory, Apache Kafka, NATS Streaming, etc.). A Channel can have one or more Subscriber services. Messages in the channel follow the CloudEvents format, but this pattern does not provide event filtering.
Example steps:
Define an InMemoryChannel named my-channel : # channel.yaml apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel metadata: name: my-channel namespace: eventing-example
Create a PingSource that sends a message to my-channel every minute: # channel-source.yaml apiVersion: sources.knative.dev/v1 kind: PingSource metadata: name: channel-source namespace: eventing-example spec: schedule: "*/1 * * * *" data: '{"message": "Hello world! Channel and Subscription."}' sink: ref: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel name: my-channel
Define two subscriber services: # chan-pub-services.yaml apiVersion: serving.knative.dev/v1 kind: Service metadata: name: sub-service1 namespace: eventing-example spec: template: spec: containers: - image: docker.io/meteatamel/event-display:v1 --- apiVersion: serving.knative.dev/v1 kind: Service metadata: name: sub-service2 namespace: eventing-example spec: template: spec: containers: - image: docker.io/meteatamel/event-display:v1
Create two Subscription objects linking the channel to the subscriber services: # subscriptions.yaml apiVersion: messaging.knative.dev/v1 kind: Subscription metadata: name: subscription1 namespace: eventing-example spec: channel: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel name: my-channel subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: sub-service1 --- apiVersion: messaging.knative.dev/v1 kind: Subscription metadata: name: subscription2 namespace: eventing-example spec: channel: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel name: my-channel subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: sub-service2
Apply all resources:
# Apply services
[root@dev channel-subscription]# kubectl apply -f chan-pub-services.yaml
service.serving.knative.dev/sub-service1 created
service.serving.knative.dev/sub-service2 created
# Apply channel
[root@dev channel-subscription]# kubectl apply -f channel.yaml
inmemorychannel.messaging.knative.dev/my-channel created
# Apply subscriptions
[root@dev channel-subscription]# kubectl apply -f subscriptions.yaml
subscription.messaging.knative.dev/subscription1 created
subscription.messaging.knative.dev/subscription2 created
# Apply source
[root@dev channel-subscription]# kubectl apply -f channel-source.yaml
pingsource.sources.knative.dev/channel-source createdVerify the resources are ready:
# Channel
[root@dev channel-subscription]# kubectl -n eventing-example get channel
NAME URL AGE READY REASON
my-channel http://my-channel-kn-channel.eventing-example.svc.cluster.local 40s True
# Services
[root@dev channel-subscription]# kubectl -n eventing-example get pod
NAME READY STATUS RESTARTS AGE
sub-service1-00001-deployment-7d9869d6c7-gzl68 2/2 Running 0 34s
sub-service2-00001-deployment-986464c64-7nxx7 2/2 Running 0 34s
# Subscriptions
[root@dev channel-subscription]# kubectl -n eventing-example get subscription
NAME AGE READY REASON
subscription1 63s True
subscription2 63s True
# Source
[root@dev channel-subscription]# kubectl -n eventing-example get sources
NAME SINK SCHEDULE AGE READY REASON
channel-source http://my-channel-kn-channel.eventing-example.svc.cluster.local */1 * * * * 14s TrueCheck the logs of the subscriber services to see the events:
# sub-service1 logs
[root@dev channel-subscription]# kubectl -n eventing-example logs sub-service1-00001-deployment-7d9869d6c7-gzl68 -f --tail 100
...output omitted...
info: event_display.Startup[0]
Received CloudEvent
ID: c11f3717-c03d-4884-ba02-fec881bc144b
Source: /apis/v1/namespaces/eventing-example/pingsources/channel-source
Type: dev.knative.sources.ping
Data: {"message":"Hello world! Channel and Subscription."}
...output omitted...After testing, you can delete the resources:
# Delete
[root@dev channel-subscription]# kubectl delete -f subscriptions.yaml
subscription.messaging.knative.dev "subscription1" deleted
subscription.messaging.knative.dev "subscription2" deleted
[root@dev channel-subscription]# kubectl delete -f channel-source.yaml
pingsource.sources.knative.dev "channel-source" deleted
[root@dev channel-subscription]# kubectl delete -f channel.yaml
inmemorychannel.messaging.knative.dev "my-channel" deleted
[root@dev channel-subscription]# kubectl delete -f chan-pub-services.yaml
service.serving.knative.dev "sub-service1" deleted
service.serving.knative.dev "sub-service2" deletedBroker and Trigger
The Broker and Trigger pattern adds event filtering capabilities. A Broker implicitly creates an internal channel. Trigger objects subscribe to the broker and apply attribute‑based filters before forwarding events to subscriber services.
Define a broker backed by an InMemoryChannel : # my-broker.yaml apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: my-broker namespace: eventing-example
Create a PingSource that sends events to the broker: # broker-source.yaml apiVersion: sources.knative.dev/v1 kind: PingSource metadata: name: broker-source namespace: eventing-example spec: schedule: "*/1 * * * *" data: '{"message": "Hello world! Broker and Trigger."}' sink: ref: apiVersion: eventing.knative.dev/v1 kind: Broker name: my-broker
Define two subscriber services: # broker-services.yaml apiVersion: serving.knative.dev/v1 kind: Service metadata: name: broker-service1 namespace: eventing-example spec: template: spec: containers: - image: docker.io/meteatamel/event-display:v1 --- apiVersion: serving.knative.dev/v1 kind: Service metadata: name: broker-service2 namespace: eventing-example spec: template: spec: containers: - image: docker.io/meteatamel/event-display:v1
Create two triggers that forward events of type dev.knative.sources.ping to the respective services: # broker-trigger.yaml apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: my-trigger1 namespace: eventing-example spec: broker: my-broker filter: attributes: type: dev.knative.sources.ping subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: broker-service1 --- apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: my-trigger2 namespace: eventing-example spec: broker: my-broker filter: attributes: type: dev.knative.sources.ping subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: broker-service2
Apply all resources:
# Services
[root@dev broker-trigger]# kubectl apply -f broker-services.yaml
service.serving.knative.dev/broker-service1 created
service.serving.knative.dev/broker-service2 created
# Broker
[root@dev broker-trigger]# kubectl apply -f my-broker.yaml
broker.eventing.knative.dev/my-broker created
# Triggers
[root@dev broker-trigger]# kubectl apply -f broker-trigger.yaml
trigger.eventing.knative.dev/my-trigger1 created
trigger.eventing.knative.dev/my-trigger2 created
# Source
[root@dev broker-trigger]# kubectl apply -f broker-source.yaml
pingsource.sources.knative.dev/broker-source createdVerify the resources are ready:
# Broker
[root@dev broker-trigger]# kubectl -n eventing-example get broker
NAME URL AGE READY REASON
my-broker http://broker-ingress.knative-eventing.svc.cluster.local/eventing-example/my-broker 2m6s True
# Services
[root@dev broker-trigger]# kubectl -n eventing-example get pod
NAME READY STATUS RESTARTS AGE
broker-service1-00001-deployment-67d97c76dd-zgnfk 2/2 Running 0 9s
broker-service2-00001-deployment-668b8d96f9-llfmk 2/2 Running 0 9s
# Triggers
[root@dev broker-trigger]# kubectl -n eventing-example get trigger
NAME BROKER SUBSCRIBER_URI AGE READY REASON
my-trigger1 my-broker http://broker-service1.eventing-example.svc.cluster.local 49s True
my-trigger2 my-broker http://broker-service2.eventing-example.svc.cluster.local 49s True
# Source
[root@dev broker-trigger]# kubectl -n eventing-example get sources
NAME SINK SCHEDULE AGE READY REASON
broker-source http://broker-ingress.knative-eventing.svc.cluster.local/eventing-example/my-broker */1 * * * * 10m TrueCheck the logs of the subscriber services to confirm event delivery:
# broker-service1 logs
[root@dev broker-trigger]# kubectl -n eventing-example logs broker-service1-00001-deployment-67d97c76dd-zgnfk -f --tail 100
...output omitted...
info: event_display.Startup[0]
Received CloudEvent
ID: 44bfc923-f361-4a8d-9de3-69940979f916
Source: /apis/v1/namespaces/eventing-example/pingsources/broker-source
Type: dev.knative.sources.ping
Data: {"message":"Hello world! Broker and Trigger."}
...output omitted...
# broker-service2 logs (same event)
[root@dev broker-trigger]# kubectl -n eventing-example logs broker-service2-00001-deployment-668b8d96f9-llfmk -f --tail 100
...output omitted...
info: event_display.Startup[0]
Received CloudEvent
ID: 44bfc923-f361-4a8d-9de3-69940979f916
Source: /apis/v1/namespaces/eventing-example/pingsources/broker-source
Type: dev.knative.sources.ping
Data: {"message":"Hello world! Broker and Trigger."}
...output omitted...To demonstrate reply handling, modify broker-service2 to use the event-display-with-reply image, add a third service, and create a new trigger that listens for the reply event type dev.knative.samples.hifromknative :
# Updated broker-services.yaml (adds broker-service3 and changes image of service2)
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: broker-service1
namespace: eventing-example
spec:
template:
spec:
containers:
- image: docker.io/meteatamel/event-display:v1
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: broker-service2
namespace: eventing-example
spec:
template:
spec:
containers:
- image: docker.io/meteatamel/event-display-with-reply:v1
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: broker-service3
namespace: eventing-example
spec:
template:
spec:
containers:
- image: docker.io/meteatamel/event-display:v1 # Updated broker-trigger.yaml (adds my-trigger3 for reply events)
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-trigger3
namespace: eventing-example
spec:
broker: my-broker
filter:
attributes:
type: dev.knative.samples.hifromknative
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: broker-service3Apply the updates:
# Apply services
[root@dev broker-trigger]# kubectl apply -f broker-services.yaml
service.serving.knative.dev/broker-service1 configured
service.serving.knative.dev/broker-service2 configured
service.serving.knative.dev/broker-service3 created
# Apply new trigger
[root@dev broker-trigger]# kubectl apply -f broker-trigger.yaml
trigger.eventing.knative.dev/my-trigger3 createdVerify the new service and trigger are ready, then check logs. broker-service2 now replies with a CloudEvent of type dev.knative.samples.hifromknative , which is consumed by broker-service3 via my-trigger3 :
# broker-service2 logs (shows reply)
info: event_display_with_reply.Startup[0]
Received CloudEvent ...
info: event_display_with_reply.Startup[0]
Replying with CloudEvent
ID: fdc05330-a8cd-4b6a-9347-6ea62f954e92
Type: dev.knative.samples.hifromknative
Data: "This is a Knative reply!"
# broker-service3 logs (receives reply)
info: event_display.Startup[0]
Received CloudEvent
Type: dev.knative.samples.hifromknative
Data: "This is a Knative reply!"When testing is complete, delete all resources:
# Delete
[root@dev broker-trigger]# kubectl delete -f broker-source.yaml
pingsource.sources.knative.dev "broker-source" deleted
[root@dev broker-trigger]# kubectl delete -f broker-trigger.yaml
trigger.eventing.knative.dev "my-trigger1" deleted
trigger.eventing.knative.dev "my-trigger2" deleted
trigger.eventing.knative.dev "my-trigger3" deleted
[root@dev broker-trigger]# kubectl delete -f my-broker.yaml
broker.eventing.knative.dev "my-broker" deleted
[root@dev broker-trigger]# kubectl delete -f broker-services.yaml
service.serving.knative.dev "broker-service1" deleted
service.serving.knative.dev "broker-service2" deleted
service.serving.knative.dev "broker-service3" deletedConclusion
This article explored three primary Knative Eventing patterns—Source‑to‑Sink, Channel & Subscription, and Broker & Trigger. By combining simple event delivery with more advanced routing, filtering, and reply mechanisms, Knative Eventing enables developers to build highly scalable, event‑driven architectures that handle asynchronous data efficiently. As the project evolves, new features and patterns will further expand its capabilities for modern cloud‑native applications.
Follow "360 Zhihui Cloud" for more technical articles and product updates.
360 Smart Cloud
Official service account of 360 Smart Cloud, dedicated to building a high-quality, secure, highly available, convenient, and stable one‑stop cloud service platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.