Buffer workflow executions with a Cloud Tasks queue


Introduction

In my previous post, I talked about how you can use a parent workflow to execute child workflows in parallel for faster overall processing time and easier detection of errors. Another useful pattern is to use a Cloud Tasks queue to create Workflows executions and that’s the topic of this post.

When your application experiences a sudden surge of traffic, it’s natural to want to handle the increased load by creating a high number of concurrent workflow executions. However, Google Cloud’s Workflows enforces quotas to prevent abuse and ensure fair resource allocation. These quotas limit the maximum number of concurrent workflow executions per region, per project, for example, Workflows currently enforces a maximum of 2000 concurrent executions by default. Once this limit is reached, any new executions beyond the quota will fail with an HTTP 429 error.

Cloud Tasks queue can help. Rather than creating Workflow executions directly, you can add Workflows execution tasks to the Cloud Tasks queue and let Cloud Tasks drain the queue at a rate that you define. This allows for better utilization of your workflow quota and ensures the smooth execution of workflows.

https://storage.googleapis.com/gweb-cloudblog-publish/images/0_workflows_with_queue.max-1300x1300.png

Let’s dive into how to set this up.

Create a Cloud Tasks queue

We’ll start by creating a Cloud Tasks queue. The Cloud Tasks queue acts as a buffer between the parent workflow and the child workflows, allowing us to regulate the rate of executions.

Create the Cloud Tasks queue (initially with no dispatch rate limits) with the desired name and location:

QUEUE=queue-workflow-child
LOCATION=us-central1

gcloud tasks queues create $QUEUE --location=$LOCATION

Now that we have our queue in place, let’s proceed to set up the child workflow.

Create and deploy a child workflow

The child workflow performs a specific task and returns a result to the parent workflow.

Create workflow-child.yaml to define the child workflow:

main:
  params: [args]
  steps:
    - init:
        assign:
          - iteration: ${args.iteration}
    - wait:
        call: sys.sleep
        args:
            seconds: 10
    - return_message:
        return: ${"Hello world" + iteration}

In this example, the child workflow receives an iteration argument from the parent workflow, simulates work by waiting for 10 seconds, and returns a string as the result.

Deploy the child workflow:

gcloud workflows deploy workflow-child --source=workflow-child.yaml --location=$LOCATION

Create and deploy a parent workflow

Next, create a parent workflow in workflow-parent.yaml.

The workflow assigns some constants first. Note that it’s referring to the child workflow and the queue name between the parent and child workflows:

main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
          - location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
          - workflow_child_name: "workflow-child"
          - queue_name: "queue-workflow-child"

In the next step, Workflows creates and adds a high number of tasks (whose body is an HTTP request to execute the child workflow) to the Cloud Tasks queue:

- enqueue_tasks_to_execute_child_workflow:
      for:
        value: iteration
        range: [1, 100]
        steps:
            - iterate:
                assign:
                  - data:
                      iteration: ${iteration}
                  - exec:
                      # Need to wrap into argument for Workflows args.
                      argument: ${json.encode_to_string(data)}
            - create_task_to_execute_child_workflow:
                call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                args:
                    parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
                    body:
                      task:
                        httpRequest:
                          body: ${base64.encode(json.encode(exec))}
                          url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
                          oauthToken:
                            serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}

Note that task creation is a non-blocking call in Workflows. Cloud Tasks takes care of running those tasks to execute child workflows asynchronously.

Deploy the parent workflow:

gcloud workflows deploy workflow-parent --source=workflow-parent.yaml --location=$LOCATION

Execute the parent workflow with no dispatch rate limits

Time to execute the parent workflow:

gcloud workflows run workflow-parent --location=$LOCATION

As the parent workflow is running, you can see parallel executions of the child workflow, all executed roughly around the same:

https://storage.googleapis.com/gweb-cloudblog-publish/images/1_parallel_executions_allsametime.max-1000x1000.png

In this case, 100 executions is a well under the concurrency limit for Workflows. Quota issues may arise if you submit 1000s of executions all at once. This is when Cloud Tasks queue and its rate limits become useful.

Execute the parent workflow with dispatch rate limits

Let’s now apply a rate limit to the Cloud Tasks queue. In this case, 1 dispatch per second:

gcloud tasks queues update $QUEUE --max-dispatches-per-second=1 --location=$LOCATION

Execute the parent workflow again:

gcloud workflows run workflow-parent --location=$LOCATION

This time, you see a more smooth execution rate (1 execution request per second):

https://storage.googleapis.com/gweb-cloudblog-publish/images/2_parallel_executions_buffered.max-900x900.png

Summary

By introducing a Cloud Tasks queue before executing a workflow and playing with different dispatch rates and concurrency settings, you can better utilize your Workflows quota and stay below the limits without triggering unnecessary quota related failures. 

Check out the Buffer HTTP requests with Cloud Tasks codelab, if you want to get more hands-on experience with Cloud Tasks. As always, feel free to contact me on Twitter @meteatamel for any questions or feedback.


See also