Beyond Job Queues: Introducing Ductwork for Ruby

Beyond Job Queues: Introducing Ductwork for Ruby
Photo by Elimende Inagella / Unsplash

Ruby has a very mature background job ecosystem. Between Sidekiq, GoodJob, Resque, and Solid Queue there are many, uh, solid options to choose from. But what happens when your background work isn't a single job, but a multi-step pipeline or workflow?

The Problem With Workflows in Ruby

Using existing background job features like batching, callbacks, and continuations you can build pipelines, but the ergonomics are quite rough. Consider a data enrichment workflow: fetch users records, enrich each one from an external API, validate the results, and then persist everything to the database. With traditional job infrastructure you end with something like:

class FetchRecordsJob
  include Sidekiq::Job

  def perform
    records = User.needs_enrichment.pluck(:id)
    batch = Sidekiq::Batch.new
    batch.on(:complete, EnrichmentBatchCallback)
    batch.jobs do
      records.each { |id| EnrichRecordJob.perform_async(id) }
    end
  end
end

class EnrichmentBatchCallback
  def on_complete(status, options)
    ValidateResultsJob.perform_async(options["batch_id"])
  end
end

# ...

See the problems?

The workflow is invisible. To understand the flow, you have to trace through four files, following callback registrations and job names. There's no single place that says "this is the workflow."

Data passing is manual. Each job has to know where to find its input. We're storing batch_id everywhere and querying for results in every subsequent job. The framework doesn't help.

Testing is painful. Want to test ValidateResultsJob1? You need to either mock EnrichmentResult.where or set up the full batch infrastructure. There's no way to test the job in true isolation.

Error handling is scattered. What happens if enrichment fails for some records but not others? What if validation fails? Each scenario requires custom code in different places. It works, but it's not fun. And it definitely doesn't scale to more complex workflows.

Introducing Ductwork

Ductwork is a framework specifically designed for creating complex job pipelines and workflows in Ruby. It makes building multi-step background processes simple, maintainable, testable, durable, and maybe even a little fun. (GitHub Repo)

Here's the same data enrichment workflow in Ductwork:

class DataEnrichmentPipeline < Ductwork::Pipeline
  define do |pipeline|
    pipeline.start(FetchRecords)
            .expand(to: EnrichRecord)
            .chain(ValidateResults)
            .chain(PersistResults)
            .collapse(NotifyResult)
  end
end

class FetchRecords < Ductwork::Step
  def execute
    User.needs_enrichment.pluck(:id)
  end
end

# ...

The entire workflow is expressive and visible in one place. You can read it left-to-right and top-to-bottom. The DSL defines the transitions which allows the library to handle coordination, data passing, and parallel execution. Each step is a self-contained Ruby class you can test in isolation.

Why Ductwork?

Simple

Define workflows declaratively with a fluent DSL. No more hunting through callback definitions to trace your workflow logic. Just simple, readable Ruby.

Maintainable

Each step is a plain Ruby object with a single responsibility. Need to add fraud detection? Insert a step. Need to reorder operations? Move a line. Need to remove a step that's no longer needed? Delete it. Pipelines keep track of their definition without manual versioning hell.

Testable

Steps are isolated units. Test them individually without mocking job infrastructure. A Step takes input, does work, returns output. No framework magic to work around. A few included helpers and RSpec matchers make it even easier to write your tests.

Durable

Pipeline execution state is stored in the database. Query which pipelines are running, where are they in their lifecycle, and what failed. When something goes wrong at 3am, you're checking the included dashboard or making a few queries. Configurable consecutive failures halt the pipeline so you can debug.

Ductwork is purpose-built to handle workflow complexity and do the orchestration for you. No developer wants to spend more time writing plumbing, so don't!

Core Concepts

Let's go over a few high-level concepts of Ductwork to understand it more.

The Pipeline DSL

Ductwork provides five transitions that cover the patterns you'll encounter in real workflows.

Chain

The simplest transition. One step sequentially flows into another. The output of the previous step becomes the input of the next step (more on that below).

class MyPipeline < Ductwork::Pipeline
  define do |pipeline|
    pipeline.start(StepA).chain(StepB).chain(StepC)
  end
end

Expand/Collapse

The expand transition is like a "foreach" loop. It takes a collection from the previous step and creates a step instance for each element. All expanded steps can run in parallel.

The collapse transition is the inverse. It waits for all parallel steps to complete and aggregates their results into an array.

class ProcessUserPipeline < Ductwork::Pipeline
  define do |pipeline|
    pipeline.start(FetchUserIDs)               # Returns [1, 2, 3]
            .expand(to: ProcessUser)           # Creates 3 parallel steps
            .collapse(into: AggregateResults)  # Receives array of results
  end
end

Divide/Combine

The divide transition splits execution into parallel branches, passing the same input to each. Unlike expand, divide creates a fixed set of parallel steps as defined in the DSL.

The combine transition merges branches back together once they have all completed successfully.

class EnrichUserPipeline < Ductwork::Pipeline
  define do |pipeline|
    pipeline.start(FetchUserData)
            .divide(to: [EnrichFromAPIOne, EnrichFromAPITwo])
            .combine(into: MergeEnrichmentData)
  end
end

The Step

Steps are the building blocks of every pipeline. Each step is a plain Ruby class that inherits from Ductwork::Step and implements initialize and execute.This design is intentional. Steps are single-purpose, self-contained, and trivially testable.

class EnrichRecord < Ductwork::Step
  def execute(user_id)
    user = User.find(user_id)
    enrichment_data = EnrichmentAPI.fetch(user.email)
    
    {
      user_id: user.id,
      score: enrichment_data.score,
      metadata: enrichment_data.attributes
    }
  end
end

RSpec.describe EnrichRecord do
  it "returns enrichment data for a user" do
    user = create(:user, email: Faker::Internet.email)
    allow(EnrichmentAPI).to receive(:fetch).and_return(
      OpenStruct.new(score: 85, attributes: { industry: "tech" })
    )
    
    result = EnrichRecord.new(user.id).execute
    
    expect(result[:score]).to eq(85)
  end
end

Steps can be as simple or as complex as your domain requires. A step might send an email, call an external API, run a database query, or orchestrate other services. Ductwork just manages the flow between steps and ensures data gets where it needs.

Data Flow and Global Context

Data flows through the pipeline in two ways.
Return values (step-to-step) pass data linearly. The return value of one step becomes the input argument to the next step. This is explicit and easy to trace as both input and output data are stored in the database.
Global context (pipeline-wide) provides a key/value store accessible from any step. It's useful for sidecar data that all steps may need: "current" user IDs, trace identifiers, configuration fetched early in the pipeline, etc.

class InitializeTracing < Ductwork::Step
  def execute
    context.set("trace_id", SecureRandom.uuid)
  end
end

class ProcessData < Ductwork::Step
  def execute
    trace_id = context.get("trace_id")
    Ductwork.logger.info("[#{trace_id}] Processing...")
  end
end

Global context is write-once by default to prevent race conditions in parallel steps to keep data flow predictable.

Ductwork Pro

For teams running workflows in critical production environments, Ductwork Pro adds features and support:

  • More Concurrency Controls - Control and tune every process and thread count in Ductwork's hybrid concurrency model
  • Step Timeout - Enforce maximum runtime for a single step or all steps in a pipeline
  • Step Delay - Delay executing steps for a set period of time which don't count against runtime
  • Priority Support - with direct communication via email

Get Started

The next time you're designing a complex background job, see how you can make pipeline implementation simpler and more robust:

$ bundle add ductwork
$ bin/rails generate ductwork:install
$ bin/rails db:migrate
$ bin/ductwork start

There is way too much to go over in a single introductory article so check out the full documentation and examples at getductwork.io.