Hands on Knative — Part 2


In my previous post, I talked about Knative Serving for rapid deployment and autoscaling of serverless containers. Knative Serving is great if you want your services to be synchronously triggered by HTTP calls. However, in the serverless microservices world, asynchronous triggers are more common and useful. That’s when Knative Eventing comes into play.

In this second part of Hands on Knative series, I want to introduce Knative Eventing and show some examples from my Knative Tutorial on how to integrate it with various services.

What is Knative Eventing?

Knative Eventing works hand-in-hand with Knative Serving and it provides primitives for loosely coupled event-driven services. A typical Knative Eventing architecture looks like this:

There are 4 main components:

  • Source (aka Producer) reads events from the actual source and forwards downstream to a Channel or less commonly directly to a Service.
  • Channel receives events from the source, saves to its underlying storage (more on this later) and fans out to all subscribers.
  • Subscription bridges a Channel and a Service (or another Channel).
  • Service (aka Consumer) is the Knative Service consuming the event stream.

Let’s look at these in more detail.

Source, Channel and Subscription

The ultimate goal of Knative Eventing is to route events from a source to a service and it does that with the primitives I mentioned before: Source, Channel and Subscription.

Source reads events from the actual source and forwards them downstream. As of today, Knative supports reading events from Kubernetes, GitHub, Google Cloud Pub/Sub, AWS SQS topic, Containers, and CronJobs.

Once the event is pulled into Knative, it needs to be saved either in-memory or somewhere more durable like Kafka or Google Cloud Pub/Sub. This happens with a Channel. It has multiple implementations to support different options.

From Channel, the event is delivered to all interested Knative Services or other Channels. This might be one-to-one or a fanout. Subscription determines the nature of this delivery and acts like a bridge between the Channel and the Knative service.

Now that we understand the basics of Knative eventing, let’s take a look at a concrete example.

Hello World Eventing

For Hello World Eventing, let’s read messages from Google Cloud Pub/Sub and log them out in a Knative Service. My Hello World Eventing tutorial has all the details but to recap here, this is what we need to setup:

  1. A GcpPubSubSource to read message from Google Cloud Pub/Sub.
  2. A Channel to save the message in-memory.
  3. A Subscription to link Channel to the Knative Service.
  4. A Knative Service to receive messages and log out.

gcp-pubsub-source.yaml defines the GcpPubSubSource. It points to a Pub/Sub topic calledtesting, it has credentials to access Pub/Sub and also specifies which Channel events should be forwarded to like this:

apiVersion: sources.eventing.knative.dev/v1alpha1  
kind: GcpPubSubSource  
metadata:  
  name: testing-source  
spec:  
  **gcpCredsSecret**:  # A secret in the knative-sources namespace  
    name: google-cloud-key  
    key: key.json  
  googleCloudProject: knative-atamel  # Replace this  
  **topic: testing**  
  sink:  
    apiVersion: eventing.knative.dev/v1alpha1  
 **kind: Channel  
    name: pubsub-test**

Next, we define the Channel withchannel.yaml. In this case, we’re simply saving messages in-memory:

apiVersion: eventing.knative.dev/v1alpha1  
kind: Channel  
metadata:  
  name: pubsub-test  
spec:  
  provisioner:  
    apiVersion: eventing.knative.dev/v1alpha1  
    kind: ClusterChannelProvisioner  
    name: in-memory-channel

Go ahead and create the source and the channel:

kubectl apply -f gcp-pubsub-source.yaml  
kubectl apply -f channel.yaml

You can see that the source and channel are created and there is a source pod also created:

kubectl get gcppubsubsource  
NAME             AGE  
testing-source   1m

kubectl get channel  
NAME          AGE  
pubsub-test   1m

kubectl get pods  
NAME                                              READY     STATUS      
gcppubsub-testing-source-qjvnk-64fd74df6b-ffzmt   2/2       Running

Finally, we can create the Knative Service and link it to the Channel with a subscription in a subscriber.yaml file:

apiVersion: serving.knative.dev/v1alpha1  
kind: **Service**  
metadata:  
  name: message-dumper-csharp  
spec:  
  runLatest:  
    configuration:  
      revisionTemplate:  
        spec:  
          container:  
            # Replace {username} with your actual DockerHub  
            image: docker.io/{username}/message-dumper-csharp:v1

\---  
apiVersion: eventing.knative.dev/v1alpha1  
kind: Subscription  
metadata:  
  name: gcppubsub-source-sample-csharp  
spec:  
  channel:  
    apiVersion: eventing.knative.dev/v1alpha1  
 **kind: Channel  
    name: pubsub-test**  
  subscriber:  
    ref:  
      apiVersion: serving.knative.dev/v1alpha1  
 **kind: Service  
      name: message-dumper-csharp**

As you can see, message-dumper-csharp is just a regular Knative Service but it’s triggered asyncronously via Knative Eventing with its subscription.

kubectl apply -f subscriber.yaml

service.serving.knative.dev "message-dumper-csharp" created  
subscription.eventing.knative.dev "gcppubsub-source-sample-csharp" configured

Once you kubectl apply all the yaml files, you can send a message to Pub/Sub topic using gcloud:

gcloud pubsub topics publish testing --message="Hello World"

And you should be able to see pods created for the service:

kubectl get pods  
NAME                                                      READY  
gcppubsub-testing-source-qjvnk-64fd74df6b-ffzmt           2/2       Running   0          3m  
**message-dumper-csharp-00001-deployment-568cdd4bbb-grnzq **  3/3       Running   0          30s

The service logs the Base64 encoded message under Data:

info: message\_dumper\_csharp.Startup\[0\]  
      C# Message Dumper received message: {"ID":"198012587785403","Data":"SGVsbG8gV29ybGQ=","Attributes":null,"PublishTime":"2019-01-21T15:25:58.25Z"}  
info: Microsoft.AspNetCore.Hosting.Internal.WebHost\[2\]  
      Request finished in 29.9881ms 200

Check out my Hello World Eventing tutorial on more details on the steps and the actual code.

Integrate with Cloud Storage and Vision API

Knative Eventing truly shines when you try to connect completely unrelated services in a seamless way. In my Integrate with Vision API tutorial, I show how to connect Google Cloud Storage and Google Cloud Vision API using Knative Eventing.

Cloud Storage is a globally available data storage service. A bucket can be configured to emit Pub/Sub messages whenever an image is saved. We can then listen for these Pub/Sub messages with Knative Eventing and pass them to a Knative Service. In the service, we make a Vision API call with the image and extract labels out of it using Machine Learning. All the details are explained in the tutorial but I want to point out a few things here.

First, all the outbound traffic is blocked by default in Knative. This means you cannot even make Vision API calls from your Knative Service by default. This surprised me initially, so make sure you configure network outbound access.

Second, whenever an image is saved to Cloud Storage, it emits CloudEvents. Knative Eventing in general works with CloudEvents. You need to parse the incoming requests as CloudEvents and extract the information you need such as the event type and the location of the image file:

var cloudEvent = JsonConvert.DeserializeObject<CloudEvent>(content);

var eventType = cloudEvent.Attributes\["eventType"\];

var storageUrl = ConstructStorageUrl(cloudEvent);

With this information, it’s quite easy to construct a Storage URL for the image and make a Vision API call with that URL. The full source code is explained here but here’s the relevant part:

var visionClient = ImageAnnotatorClient.Create();
var labels = await visionClient.DetectLabelsAsync(Image.FromUri(storageUrl), maxResults: 10);

Once the code is ready, we can hook our service to Knative Eventing by defining asubscriber.yaml. It is very similar to before. We’re re-using the existing Source and Channel, so we don’t have to recreate them. We just create a new subscription to point to our new Knative Service with the Vision API container:

apiVersion: serving.knative.dev/v1alpha1  
kind: **Service**  
metadata:  
  name: vision-csharp  
spec:  
  runLatest:  
    configuration:  
      revisionTemplate:  
        spec:  
          container:  
            # Replace {username} with your actual DockerHub  
            image: docker.io/{username}/**vision-csharp:v1**  
\---  
apiVersion: eventing.knative.dev/v1alpha1  
kind: **Subscription**  
metadata:  
  name: gcppubsub-source-vision-csharp  
spec:  
  channel:  
    apiVersion: eventing.knative.dev/v1alpha1  
 **kind: Channel  
    name: pubsub-test**  
  subscriber:  
    ref:  
      apiVersion: serving.knative.dev/v1alpha1  
 **kind: Service  
      name: vision-csharp**

Once everything is created with kubectl apply, whenever you save an image to your Cloud Storage bucket, you should see the Knative service logging labels of that image.

For example, I have a picture from one of my favorite places:

Ipanema Beach in Rio De Janeiro Ipanema Beach in Rio De Janeiro

When I save that image to the bucket, I can see the following labels from the Vision API in the logs:

info: vision\_csharp.Startup\[0\]  
      This picture is labelled: Sea,Coast,Water,Sunset,Horizon  
info: Microsoft.AspNetCore.Hosting.Internal.WebHost\[2\]  
      Request finished in 1948.3204ms 200

As you can see, we connected one service (Cloud Storage) to another service (Vision API) using Knative Eventing. This is just one example but possibilities are limitless. In Integrate with Translation API part of the tutorial, I show how to connect Pub/Sub to Translation API.

That’s it for Knative Eventing. In the next and final post of the series, I will be talking about Knative Build.


See also