Beyond Job Queues: Introducing Ductwork for Ruby
Ruby has a very mature background job ecosystem. Between Sidekiq, GoodJob, Resque, and Solid Queue there are many, uh, solid options to choose from. But what happens when your background work isn't a single job, but a multi-step pipeline or workflow?
The Problem With Workflows in Ruby
Using existing background job features like batching, callbacks, and continuations you can build pipelines, but the ergonomics are quite rough. Consider a data enrichment workflow: fetch users records, enrich each one from an external API, validate the results, and then persist everything to the database. With traditional job infrastructure you end with something like:
class FetchRecordsJob
include Sidekiq::Job
def perform
records = User.needs_enrichment.pluck(:id)
batch = Sidekiq::Batch.new
batch.on(:complete, EnrichmentBatchCallback)
batch.jobs do
records.each { |id| EnrichRecordJob.perform_async(id) }
end
end
end
class EnrichmentBatchCallback
def on_complete(status, options)
ValidateResultsJob.perform_async(options["batch_id"])
end
end
# ...See the problems?
The workflow is invisible. To understand the flow, you have to trace through four files, following callback registrations and job names. There's no single place that says "this is the workflow."
Data passing is manual. Each job has to know where to find its input. We're storing batch_id everywhere and querying for results in every subsequent job. The framework doesn't help.
Testing is painful. Want to test ValidateResultsJob1? You need to either mock EnrichmentResult.where or set up the full batch infrastructure. There's no way to test the job in true isolation.
Error handling is scattered. What happens if enrichment fails for some records but not others? What if validation fails? Each scenario requires custom code in different places. It works, but it's not fun. And it definitely doesn't scale to more complex workflows.
Introducing Ductwork
Ductwork is a framework specifically designed for creating complex job pipelines and workflows in Ruby. It makes building multi-step background processes simple, maintainable, testable, durable, and maybe even a little fun. (GitHub Repo)
Here's the same data enrichment workflow in Ductwork:
class DataEnrichmentPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(FetchRecords)
.expand(to: EnrichRecord)
.chain(ValidateResults)
.chain(PersistResults)
.collapse(NotifyResult)
end
end
class FetchRecords < Ductwork::Step
def execute
User.needs_enrichment.pluck(:id)
end
end
# ...The entire workflow is expressive and visible in one place. You can read it left-to-right and top-to-bottom. The DSL defines the transitions which allows the library to handle coordination, data passing, and parallel execution. Each step is a self-contained Ruby class you can test in isolation.
Why Ductwork?
Simple
Define workflows declaratively with a fluent DSL. No more hunting through callback definitions to trace your workflow logic. Just simple, readable Ruby.
Maintainable
Each step is a plain Ruby object with a single responsibility. Need to add fraud detection? Insert a step. Need to reorder operations? Move a line. Need to remove a step that's no longer needed? Delete it. Pipelines keep track of their definition without manual versioning hell.
Testable
Steps are isolated units. Test them individually without mocking job infrastructure. A Step takes input, does work, returns output. No framework magic to work around. A few included helpers and RSpec matchers make it even easier to write your tests.
Durable
Pipeline execution state is stored in the database. Query which pipelines are running, where are they in their lifecycle, and what failed. When something goes wrong at 3am, you're checking the included dashboard or making a few queries. Configurable consecutive failures halt the pipeline so you can debug.
Ductwork is purpose-built to handle workflow complexity and do the orchestration for you. No developer wants to spend more time writing plumbing, so don't!
Core Concepts
Let's go over a few high-level concepts of Ductwork to understand it more.
The Pipeline DSL
Ductwork provides five transitions that cover the patterns you'll encounter in real workflows.
Chain
The simplest transition. One step sequentially flows into another. The output of the previous step becomes the input of the next step (more on that below).
class MyPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(StepA).chain(StepB).chain(StepC)
end
endExpand/Collapse
The expand transition is like a "foreach" loop. It takes a collection from the previous step and creates a step instance for each element. All expanded steps can run in parallel.
The collapse transition is the inverse. It waits for all parallel steps to complete and aggregates their results into an array.
class ProcessUserPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(FetchUserIDs) # Returns [1, 2, 3]
.expand(to: ProcessUser) # Creates 3 parallel steps
.collapse(into: AggregateResults) # Receives array of results
end
endDivide/Combine
The divide transition splits execution into parallel branches, passing the same input to each. Unlike expand, divide creates a fixed set of parallel steps as defined in the DSL.
The combine transition merges branches back together once they have all completed successfully.
class EnrichUserPipeline < Ductwork::Pipeline
define do |pipeline|
pipeline.start(FetchUserData)
.divide(to: [EnrichFromAPIOne, EnrichFromAPITwo])
.combine(into: MergeEnrichmentData)
end
endThe Step
Steps are the building blocks of every pipeline. Each step is a plain Ruby class that inherits from Ductwork::Step and implements initialize and execute.This design is intentional. Steps are single-purpose, self-contained, and trivially testable.
class EnrichRecord < Ductwork::Step
def execute(user_id)
user = User.find(user_id)
enrichment_data = EnrichmentAPI.fetch(user.email)
{
user_id: user.id,
score: enrichment_data.score,
metadata: enrichment_data.attributes
}
end
end
RSpec.describe EnrichRecord do
it "returns enrichment data for a user" do
user = create(:user, email: Faker::Internet.email)
allow(EnrichmentAPI).to receive(:fetch).and_return(
OpenStruct.new(score: 85, attributes: { industry: "tech" })
)
result = EnrichRecord.new(user.id).execute
expect(result[:score]).to eq(85)
end
endSteps can be as simple or as complex as your domain requires. A step might send an email, call an external API, run a database query, or orchestrate other services. Ductwork just manages the flow between steps and ensures data gets where it needs.
Data Flow and Global Context
Data flows through the pipeline in two ways.
Return values (step-to-step) pass data linearly. The return value of one step becomes the input argument to the next step. This is explicit and easy to trace as both input and output data are stored in the database.
Global context (pipeline-wide) provides a key/value store accessible from any step. It's useful for sidecar data that all steps may need: "current" user IDs, trace identifiers, configuration fetched early in the pipeline, etc.
class InitializeTracing < Ductwork::Step
def execute
context.set("trace_id", SecureRandom.uuid)
end
end
class ProcessData < Ductwork::Step
def execute
trace_id = context.get("trace_id")
Ductwork.logger.info("[#{trace_id}] Processing...")
end
endGlobal context is write-once by default to prevent race conditions in parallel steps to keep data flow predictable.
Ductwork Pro
For teams running workflows in critical production environments, Ductwork Pro adds features and support:
- More Concurrency Controls - Control and tune every process and thread count in Ductwork's hybrid concurrency model
- Step Timeout - Enforce maximum runtime for a single step or all steps in a pipeline
- Step Delay - Delay executing steps for a set period of time which don't count against runtime
- Priority Support - with direct communication via email
Get Started
The next time you're designing a complex background job, see how you can make pipeline implementation simpler and more robust:
$ bundle add ductwork
$ bin/rails generate ductwork:install
$ bin/rails db:migrate
$ bin/ductwork start
There is way too much to go over in a single introductory article so check out the full documentation and examples at getductwork.io.