New Batch connector for Workflows


Batch and Workflows

Workflows just released a new connector for Batch that greatly simplifies how to create and run Batch jobs from Workflows. Let’s take a look how you can use the new Batch connector of Workflows.

Recap: Batch and Workflows

Batch is a fully managed service to schedule, queue, and execute batch jobs on Google’s infrastructure. These batch jobs run on Compute Engine VM instances but they are managed by Batch service, so you don’t have to provision and manage VM instances yourself.

Workflows is an orchestration service that enables you to call other services in the order that you define.

When you run a batch job, you typically need to provision other resources before or after the batch job or do some cleanup. Workflows is great for these tasks, in addition to managing the lifecycle of your batch jobs.

Without the Batch connector

Let’s look at a concrete use case to understand how Batch and Workflows work together but let’s not use the connector yet.

Say you have a container that calculates prime numbers and saves to a Cloud Storage bucket in batches. You can use Workflows to create and manage a Batch job to run these containers in parallel. You can also use Workflows to do pre and post processing steps before the Batch job, for example, create a bucket for the results.

First, you need to define some variables for the workflow such as the Batch API url:

main:
  params: [args]
  steps:
    - init:
        assign:
          - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - region: "us-central1"
          - batchApi: "batch.googleapis.com/v1"
          - batchApiUrl: ${"https://" + batchApi + "/projects/" + projectId + "/locations/" + region + "/jobs"}
          - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
          - jobId: ${"job-primegen-" + string(int(sys.now()))}
          - bucket: ${projectId + "-" + jobId}

Next, create a bucket to save the Batch job results:

    - createBucket:
        call: googleapis.storage.v1.buckets.insert
        args:
          query:
            project: ${projectId}
          body:
            name: ${bucket}

And create and start a Batch job by calling the Batch API:

    - createAndRunBatchJob:
        call: http.post
        args:
          url: ${batchApiUrl}
          query:
            job_id: ${jobId}
          headers:
            Content-Type: application/json
          auth:
            type: OAuth2
          body:
            taskGroups:
              taskSpec:
                runnables:
                  - container:
                      imageUri: ${imageUri}
                    environment:
                      variables:
                        BUCKET: ${bucket}
              # Run 6 tasks on 2 VMs
              taskCount: 6
              parallelism: 2
            logsPolicy:
              destination: CLOUD_LOGGING
        result: createAndRunBatchJobResponse

At this point, the batch job is running but you need to wait for it to complete. That means, you need to get the job state, check if it succeeded, and if not, sleep in a loop:

    - getJob:
        call: http.get
        args:
          url: ${batchApiUrl + "/" + jobId}
          auth:
            type: OAuth2
        result: getJobResult
    - checkState:
        switch:
          - condition: ${getJobResult.body.status.state == "SUCCEEDED"}
            next: returnResult
        next: sleep
    - sleep:
        call: sys.sleep
        args:
          seconds: 10
        next: getJob
    - returnResult:
        return:
          jobId: ${jobId}
          bucket: ${bucket}

This isn’t so bad but it’s a bit of extra logic to manage the lifecyle of the Batch job.

With the Batch connector

The new Batch connector creates and waits for the batch job to finish. Instead of calling the Batch API directly, you can use the Batch connector like this:

    - createAndRunBatchJob:
        call: googleapis.batch.v1.projects.locations.jobs.create
        args:
            parent: ${"projects/" + projectId + "/locations/" + region}
            jobId: ${jobId}
            body:
              taskGroups:
                taskSpec:
                  runnables:
                    - container:
                        imageUri: ${imageUri}
                      environment:
                        variables:
                          BUCKET: ${bucket}
                # Run 6 tasks on 2 VMs
                taskCount: 6
                parallelism: 2
              logsPolicy:
                destination: CLOUD_LOGGING
        result: createAndRunBatchJobResponse

And you can get rid of all the logic to check for the state of the batch and sleep loop, sweet!

Here’s the full primgen sample if you want to run it yourself.


Batch is great for long-running tasks and Workflows is great for managing the lifecycle of Batch jobs and the tasks around it. With the new Batch connector, this is even easier.

If you have questions or feedback, feel free to reach out to me on Twitter @meteatamel.


See also