Event-Driven Image Processing Pipeline with Knative Eventing


In this post, I want to talk about an event-driven image processing pipeline that I built recently using Knative Eventing. Along the way, I’ll tell you about event sources, custom events and other components provided by Knative that simply development of event-driven architectures.

Requirements

Let’s first talk about the basic requirements I had for the image processing pipeline:

  1. Users upload files to an input bucket and get processed images in an output bucket.
  2. Uploaded images are filtered (eg. no adult or violent images) before sending through the pipeline.
  3. Pipeline can contain any number of processing services that can be added or removed as needed. For the initial pipeline, I decided to go with 3 services: resizer, watermarker, and labeler. The resizer will resize large images. The watermarker will add a watermark to resized images and the labeler will extract information about images (labels) and save it.

Requirement #3 is especially important. I wanted to be able to add services to the pipeline as I need them or create multiple pipelines with different services chained together.

Knative Eventing Goodness

Knative Eventing provides a number of components that help for building such as pipeline, namely:

  1. Eventing Sources allow you to read external events in your cluster and Knative-GCP Sources provide a number of eventing sources ready to read events from various Google Cloud sources.
  2. Broker and Trigger provide an eventing backbone where right events are delivered to right event consumers without producers or consumers having to know about how the events are routed.
  3. Custom Events and Event Replies: In Knative, all events are CloudEvents. It’s useful to have a standard format for events and various SDKs to read/write them. Moreover, Knative supports custom events and event replies. Any service can receive an event, do some processing, create a custom event with new data and reply back to Broker for other services to read the custom event. This is very useful in pipelines where each service does a little bit of work and passes on the message forward.

Architecture

Here’s the architecture of the image processing pipeline. It’s deployed to Google Kubernetes Engine (GKE) on Google Cloud:

Image processing pipeline architecture

  1. An image is saved to an input Cloud Storage bucket.
  2. Cloud Storage update event is read into Knative by CloudStorageSource.
  3. Filter service receives the Cloud Storage event. It uses Vision API to determine if the image is safe. If so, it creates a custom CloudEvent of type dev.knative.samples.fileuploaded and passes it onwards.
  4. Resizer service receives the fileuploaded event, resizes the image using ImageSharp library, saves to the resized image to the output bucket, creates a custom CloudEvent of type dev.knative.samples.fileresized and passes the event onwards.
  5. Watermark service receives the fileresized event, adds a watermark to the image using ImageSharp library and saves the image to the output bucket.
  6. Labeler receives the fileuploaded event, extracts labels of the image with Vision API and saves the labels to the output bucket.

Build the pipeline

I have the instructions on how to build the pipeline and code and configuration in my Knative Tutorial. Let me highlight the main parts.

Read Cloud Storage events

To read Google Cloud storage events, you need to create a CloudStorageSource and point to the bucket you want to listen events from:

apiVersion: events.cloud.google.com/v1alpha1
kind: CloudStorageSource
metadata:
  name: storagesource-images-input
spec:
  bucket: knative-atamel-images-input
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1beta1
      kind: Broker
      name: default

Once events are read in, they are fed into the Broker in default namespace.

Filter service

Filter service will get all the storage events and decide on which images are safe to pass on.

First, you need to create a Trigger to listen for com.google.cloud.storage.object.finalize events:

apiVersion: eventing.knative.dev/v1beta1
kind: Trigger
metadata:
  name: trigger-filter
spec:
  filter:
    attributes:
      type: com.google.cloud.storage.object.finalize
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: filter

Then, create the Knative Service to handle those events:

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: filter
spec:
  template:
    spec:
      containers:
        - image: docker.io/meteatamel/filter:v1
          env:
            - name: BUCKET
              value: "knative-atamel-images-input"

The code is in filter but the service basically uses Vision API to detect the safety of the image as follows:

private async Task<bool> IsPictureSafe(string storageUrl)
{
    var visionClient = ImageAnnotatorClient.Create();
    var response = await visionClient.DetectSafeSearchAsync(Image.FromUri(storageUrl));
    return response.Adult < Likelihood.Possible
        && response.Medical < Likelihood.Possible
        && response.Racy < Likelihood.Possible
        && response.Spoof < Likelihood.Possible
        && response.Violence < Likelihood.Possible;
}

Once Filter service determines the picture is safe, it sends out a custom CloudEvent of type dev.knative.samples.fileuploaded and the data field of the CloudEvent is simply the bucket and file name:

var replyData = JsonConvert.SerializeObject(new {bucket = bucket, name = name});
await eventWriter.Write(replyData, context);

Resizer service

The code is here for resizer.

It only responds to dev.knative.samples.fileuploaded events as defined in its trigger:

apiVersion: eventing.knative.dev/v1beta1
kind: Trigger
metadata:
  name: trigger-resizer
spec:
  filter:
    attributes:
      type: dev.knative.samples.fileuploaded
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: resizer

It reads and resizes images using ImageSharp and emits another custom event dev.knative.samples.fileresized for the next service watermarker to pick up.

Watermarker service

You can look at the details of Watermarker service in watermarker folder but in a nutshell, it receives events from resizer, reads resized image and adds a Google Cloud Platform watermark to the image before saving it to the bucket.

Labeler service

Labeler receives the same event that resizer receives. Instead, it uses Vision API to extract labels out of the image as follows:

private async Task<string> ExtractLabelsAsync(string storageUrl)
{
    var visionClient = ImageAnnotatorClient.Create();
    var labels = await visionClient.DetectLabelsAsync(Image.FromUri(storageUrl), maxResults: 10);

    var orderedLabels = labels
        .OrderByDescending(x => x.Score)
        .TakeWhile((x, i) => i <= 2 || x.Score > 0.50)
        .Select(x => x.Description)
        .ToList();

    return string.Join(",", orderedLabels.ToArray());
}

Then, it saves those labels to a text file in the output bucket.

Test the pipeline

If everything is setup correctly, you can see triggers in READY state:

kubectl get trigger

NAME                  READY   REASON   BROKER    SUBSCRIBER_URI
trigger-filter        True             default   http://filter.default.svc.cluster.local
trigger-labeler       True             default   http://labeler.default.svc.cluster.local
trigger-resizer       True             default   http://resizer.default.svc.cluster.local
trigger-watermarker   True             default   http://watermarker.default.svc.cluster.local

I upload the folllowing picture from my favorite beach (Ipanema in Rio de Janeiro) to the bucket:

Beach with sunset

After a few seconds, I see 3 files in my output bucket:

gsutil ls gs://knative-atamel-images-output

gs://knative-atamel-images-output/beach-400x400-watermark.jpeg
gs://knative-atamel-images-output/beach-400x400.png
gs://knative-atamel-images-output/beach-labels.txt

We can see the labels Sky,Body of water,Sea,Nature,Coast,Water,Sunset,Horizon,Cloud,Shore in the text file and the following resized and watermarked image:

Beach with sunset

Wrap up

As I mentioned, the complete example is on my Knative Tutorial repo. Our image pipeline is flexible, you can add and remove services as you wish via triggers. This is the beauty of event-driven architectures. Even though it’s a little more work to read and write events (in this case CloudEvents), it allows a high degree of flexibility.

If you have questions/comments, feel free to reach out to me on Twitter (@meteatamel).


See also