Open-source ยท durable functions for Postgres

Durable, crash-proof functions
built into Postgres

Orchestrate retries, scheduling, parallel fan-out, and conditional branching with a tiny SQL DSL. Built on Postgres + a background worker โ€” no containers, no external services, just Postgres.

live execution

              

Parallel aggregation ยท df.start( a & b & c ~> โ€ฆ ) โ€” three queries fan out, then join into one result.

๐Ÿ”ง Without pg_durable

-- Goal: run 3 aggregations in parallel, then refresh a dashboard
--       โ€” with retries and crash recovery. Here's the plumbing.

-- 1. Set up job queues and state tables
CREATE TABLE job_queue (
    id SERIAL PRIMARY KEY,
    payload JSONB NOT NULL,
    status TEXT DEFAULT 'pending',
    attempts INT DEFAULT 0,
    max_attempts INT DEFAULT 3,
    created_at TIMESTAMPTZ DEFAULT now(),
    locked_at TIMESTAMPTZ,
    locked_by TEXT
);

CREATE TABLE job_results (
    id SERIAL PRIMARY KEY,
    job_id INT REFERENCES job_queue(id),
    result JSONB,
    error TEXT,
    completed_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE job_state (
    job_id INT PRIMARY KEY REFERENCES job_queue(id),
    current_step INT DEFAULT 0,
    step_data JSONB DEFAULT '{}',
    updated_at TIMESTAMPTZ DEFAULT now()
);

-- 2. Write a polling worker function
CREATE OR REPLACE FUNCTION poll_and_execute()
RETURNS void AS $$
DECLARE
    job RECORD;
BEGIN
    SELECT * INTO job FROM job_queue
    WHERE status = 'pending'
      AND (locked_at IS NULL
           OR locked_at < now() - interval '5 min')
    ORDER BY created_at
    LIMIT 1
    FOR UPDATE SKIP LOCKED;

    IF job IS NULL THEN RETURN; END IF;

    UPDATE job_queue
    SET status = 'running',
        locked_at = now(),
        locked_by = pg_backend_pid()::text
    WHERE id = job.id;

    -- 3. Execute with manual retry logic
    BEGIN
        PERFORM execute_step(job.id, job.payload);
        UPDATE job_queue SET status = 'completed'
        WHERE id = job.id;
    EXCEPTION WHEN OTHERS THEN
        UPDATE job_queue
        SET attempts = attempts + 1,
            status = CASE
              WHEN attempts + 1 >= max_attempts
              THEN 'failed' ELSE 'pending' END,
            locked_at = NULL
        WHERE id = job.id;
    END;
END;
$$ LANGUAGE plpgsql;

-- 4. Track step coordination manually
CREATE TABLE workflow_steps (
    id SERIAL PRIMARY KEY,
    job_id INT REFERENCES job_queue(id),
    step_order INT NOT NULL,
    step_name TEXT NOT NULL,
    step_query TEXT NOT NULL,
    status TEXT DEFAULT 'pending',
    result JSONB,
    error TEXT,
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    depends_on INT[]
);

CREATE OR REPLACE FUNCTION advance_workflow(p_job_id INT)
RETURNS void AS $$
DECLARE
    step RECORD;
    dep_id INT;
    all_deps_done BOOLEAN;
BEGIN
    FOR step IN
        SELECT * FROM workflow_steps
        WHERE job_id = p_job_id AND status = 'pending'
        ORDER BY step_order
    LOOP
        all_deps_done := true;
        IF step.depends_on IS NOT NULL THEN
            FOREACH dep_id IN ARRAY step.depends_on LOOP
                IF NOT EXISTS (
                    SELECT 1 FROM workflow_steps
                    WHERE id = dep_id AND status = 'completed'
                ) THEN
                    all_deps_done := false;
                    EXIT;
                END IF;
            END LOOP;
        END IF;

        IF NOT all_deps_done THEN CONTINUE; END IF;

        UPDATE workflow_steps
        SET status = 'running', started_at = now()
        WHERE id = step.id;

        BEGIN
            EXECUTE step.step_query;
            UPDATE workflow_steps
            SET status = 'completed',
                completed_at = now()
            WHERE id = step.id;
        EXCEPTION WHEN OTHERS THEN
            UPDATE workflow_steps
            SET status = 'failed',
                error = SQLERRM,
                completed_at = now()
            WHERE id = step.id;
            RETURN;
        END;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 5. Build crash recovery from scratch
CREATE OR REPLACE FUNCTION recover_crashed_jobs()
RETURNS void AS $$
DECLARE
    crashed RECORD;
BEGIN
    FOR crashed IN
        SELECT jq.* FROM job_queue jq
        WHERE jq.status = 'running'
          AND jq.locked_at < now() - interval '10 min'
          AND NOT EXISTS (
              SELECT 1 FROM pg_stat_activity
              WHERE pid = jq.locked_by::int
          )
    LOOP
        UPDATE job_queue
        SET status = 'pending',
            locked_at = NULL,
            locked_by = NULL,
            attempts = attempts + 1
        WHERE id = crashed.id;

        UPDATE workflow_steps
        SET status = 'pending',
            started_at = NULL,
            error = NULL
        WHERE job_id = crashed.id
          AND status = 'running';

        INSERT INTO job_results (job_id, error)
        VALUES (crashed.id,
            'Recovered from crash at step ' ||
            (SELECT step_name FROM workflow_steps
             WHERE job_id = crashed.id
               AND status = 'running'
             LIMIT 1));
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 6. Custom status tracking and monitoring
CREATE OR REPLACE FUNCTION get_job_status(p_job_id INT)
RETURNS TABLE (
    job_status TEXT,
    total_steps INT,
    completed_steps INT,
    failed_steps INT,
    current_step TEXT,
    elapsed_time INTERVAL,
    last_error TEXT
) AS $$
BEGIN
    RETURN QUERY
    SELECT
        jq.status,
        (SELECT count(*)::int FROM workflow_steps
         WHERE job_id = p_job_id),
        (SELECT count(*)::int FROM workflow_steps
         WHERE job_id = p_job_id AND status = 'completed'),
        (SELECT count(*)::int FROM workflow_steps
         WHERE job_id = p_job_id AND status = 'failed'),
        (SELECT ws.step_name FROM workflow_steps ws
         WHERE ws.job_id = p_job_id
           AND ws.status = 'running'
         LIMIT 1),
        now() - jq.created_at,
        (SELECT ws.error FROM workflow_steps ws
         WHERE ws.job_id = p_job_id
           AND ws.status = 'failed'
         ORDER BY ws.completed_at DESC LIMIT 1)
    FROM job_queue jq
    WHERE jq.id = p_job_id;
END;
$$ LANGUAGE plpgsql;

-- 7. Parallel execution coordinator
CREATE OR REPLACE FUNCTION run_parallel_steps(
    p_job_id INT,
    p_step_ids INT[]
) RETURNS void AS $$
DECLARE
    step_id INT;
    step RECORD;
    failed BOOLEAN := false;
BEGIN
    FOREACH step_id IN ARRAY p_step_ids LOOP
        SELECT * INTO step FROM workflow_steps
        WHERE id = step_id AND job_id = p_job_id;

        UPDATE workflow_steps
        SET status = 'running', started_at = now()
        WHERE id = step_id;

        BEGIN
            EXECUTE step.step_query;
            UPDATE workflow_steps
            SET status = 'completed',
                completed_at = now()
            WHERE id = step_id;
        EXCEPTION WHEN OTHERS THEN
            UPDATE workflow_steps
            SET status = 'failed',
                error = SQLERRM,
                completed_at = now()
            WHERE id = step_id;
            failed := true;
        END;
    END LOOP;

    IF failed THEN
        UPDATE job_queue SET status = 'failed'
        WHERE id = p_job_id;
    END IF;
END;
$$ LANGUAGE plpgsql;

-- 8. Variable passing between steps
CREATE TABLE step_variables (
    job_id INT REFERENCES job_queue(id),
    var_name TEXT NOT NULL,
    var_value JSONB,
    set_by_step INT,
    created_at TIMESTAMPTZ DEFAULT now(),
    PRIMARY KEY (job_id, var_name)
);

CREATE OR REPLACE FUNCTION set_step_var(
    p_job_id INT, p_name TEXT,
    p_value JSONB, p_step INT
) RETURNS void AS $$
BEGIN
    INSERT INTO step_variables
        (job_id, var_name, var_value, set_by_step)
    VALUES (p_job_id, p_name, p_value, p_step)
    ON CONFLICT (job_id, var_name)
    DO UPDATE SET var_value = p_value,
                  set_by_step = p_step;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION substitute_vars(
    p_job_id INT, p_query TEXT
) RETURNS TEXT AS $$
DECLARE
    v RECORD;
    result TEXT := p_query;
BEGIN
    FOR v IN
        SELECT var_name, var_value
        FROM step_variables
        WHERE job_id = p_job_id
    LOOP
        result := replace(result,
            '$' || v.var_name,
            v.var_value::text);
    END LOOP;
    RETURN result;
END;
$$ LANGUAGE plpgsql;

-- 9. Scheduling and cron support
CREATE TABLE scheduled_jobs (
    id SERIAL PRIMARY KEY,
    cron_expr TEXT NOT NULL,
    job_payload JSONB NOT NULL,
    last_run TIMESTAMPTZ,
    next_run TIMESTAMPTZ,
    enabled BOOLEAN DEFAULT true
);

CREATE OR REPLACE FUNCTION check_scheduled_jobs()
RETURNS void AS $$
DECLARE
    sched RECORD;
BEGIN
    FOR sched IN
        SELECT * FROM scheduled_jobs
        WHERE enabled = true
          AND (next_run IS NULL
               OR next_run <= now())
    LOOP
        INSERT INTO job_queue (payload)
        VALUES (sched.job_payload);

        UPDATE scheduled_jobs
        SET last_run = now()
        WHERE id = sched.id;
        -- next_run calculation requires
        -- external cron parser library...
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 10. Cleanup and maintenance
CREATE OR REPLACE FUNCTION cleanup_old_jobs(
    p_retention INTERVAL DEFAULT '30 days'
) RETURNS INT AS $$
DECLARE
    deleted INT;
BEGIN
    DELETE FROM step_variables
    WHERE job_id IN (
        SELECT id FROM job_queue
        WHERE status IN ('completed', 'failed')
          AND created_at < now() - p_retention
    );
    DELETE FROM workflow_steps
    WHERE job_id IN (
        SELECT id FROM job_queue
        WHERE status IN ('completed', 'failed')
          AND created_at < now() - p_retention
    );
    DELETE FROM job_results
    WHERE job_id IN (
        SELECT id FROM job_queue
        WHERE status IN ('completed', 'failed')
          AND created_at < now() - p_retention
    );
    WITH d AS (
        DELETE FROM job_queue
        WHERE status IN ('completed', 'failed')
          AND created_at < now() - p_retention
        RETURNING id
    ) SELECT count(*) INTO deleted FROM d;
    RETURN deleted;
END;
$$ LANGUAGE plpgsql;

-- 11. ...and only NOW can you wire up the actual workflow
WITH job AS (
    INSERT INTO job_queue (payload)
    VALUES ('{"name":"refresh-dashboard"}') RETURNING id
)
INSERT INTO workflow_steps
    (job_id, step_order, step_name, step_query, depends_on)
SELECT job.id, v.ord, v.name, v.query, v.deps FROM job, (VALUES
    (1, 'count_users',   'SELECT count(*) FROM users',          NULL),
    (1, 'count_orders',  'SELECT count(*) FROM orders',         NULL),
    (1, 'sum_revenue',   'SELECT sum(amount) FROM orders',      NULL),
    (2, 'refresh_dash',  'REFRESH MATERIALIZED VIEW metrics',   ARRAY[1,2,3])
) AS v(ord, name, query, deps);
-- ...then schedule the worker, poll, coordinate the parallel
-- steps, handle failures, recover crashes โ€” see all of the above.

300+ lines of boilerplate

  • ๐Ÿ”ง Queue setup & configuration
  • ๐Ÿ”„ Worker management & polling
  • ๐Ÿ“Š Message handling & state tracking
  • โŒ Error handling & retries
  • ๐Ÿ”— Manual step coordination

โšก With pg_durable

-- Parallel aggregation: 3 queries fan out, then refresh the dashboard
SELECT df.start(
    'SELECT count(*) FROM users'     &
    'SELECT count(*) FROM orders'    &
    'SELECT sum(amount) FROM orders'

    ~> 'refresh dashboard',

    'metrics'
);

โœ๏ธ You write the SQL. pg_durable handles everything else.

Queue management, state tracking, crash recovery, step coordination, and retries โ€” pg_durable is the orchestration engine.

CREATE EXTENSION pg_durable;

Enable in any PostgreSQL 17 database. View full setup guide โ†’

Why pg_durable

๐Ÿ›ก๏ธ

Durable by default

Every step checkpoints state to PostgreSQL. Workflows survive crashes, restarts, and connection drops. Resume exactly where you left off.

Learn more โ†’
๐Ÿ”

Automatic retries

Built-in retry logic for flaky operations. When a step fails, only that step retries โ€” the rest of your workflow continues. No manual error handling code needed.

Learn more โ†’
๐Ÿ”Ž

Full observability in SQL

All workflow state lives in Postgres tables. Query execution history, inspect step outputs, and debug failures with standard SQL. No external dashboards.

Learn more โ†’
โšก

Parallel execution

Fan out independent work with the & operator or df.join(). Run aggregations, API calls, or ETL steps concurrently with automatic coordination.

Learn more โ†’

What you can build

From data pipelines to database maintenance to cloud-connected workflows โ€” here are the patterns pg_durable handles, each backed by a copy-paste-ready scenario.

๐Ÿ“š Full Scenarios & Use Cases Guide

Every pattern below, written out end to end

One consolidated guide: core orchestration patterns (ETL, parallel aggregation, scheduling, branching), standard operational scenarios (vacuum, bloat & wraparound remediation), and Azure integration examples (Functions, HTTP, human approval) โ€” all with runnable SQL.

โ†’

๐Ÿ”— ETL Pipelines

Chain cleanup โ†’ transform โ†’ load with sequential guarantees. Each step waits for the previous one. Failures stop the pipeline cleanly.

~> sequence |=> variables

๐Ÿ“Š Parallel Aggregation

Count users + sum revenue + check inventory simultaneously. Fan out to multiple queries and wait for all to complete.

& parallel df.join()

๐Ÿ“ฆ Order Processing

Capture an order ID, pass it through validation, processing, and completion steps. Variables flow between steps automatically.

|=> capture $var substitution df.sleep()

โฐ Scheduled Jobs

Poll APIs, archive records, or sync data on a cron schedule. Loops run forever and survive restarts.

@> loop df.wait_for_schedule()

๐Ÿ”€ Conditional Branching

Check pending jobs, row counts, or flags โ€” then process or skip based on the result. Branch logic lives in SQL, not application code.

df.if() ?> conditional

โœ… Multi-step Validation

Fetch data, validate schema, check business rules, then approve or reject. Each step is checkpointed โ€” failures don't lose progress.

~> sequence df.if() |=> variables

๐Ÿงน Database Maintenance

Detect autovacuum blockers, table bloat, or wraparound risk, surface findings for review, wait for approval, then remediate โ€” durably, even across restarts.

?> conditional df.wait_for_signal() @> loop

โ˜๏ธ Azure Functions & HTTP

Call Azure Functions or any allowlisted HTTPS endpoint straight from SQL with df.http() โ€” chunk documents, enrich rows, or classify records inline.

df.http() ~> sequence

๐Ÿ™‹ Human-in-the-Loop Approval

Auto-approve routine work and pause high-stakes actions (large invoices, destructive ops) until a human signals approval โ€” like the invoice-approval example.

df.wait_for_signal() df.if() @> loop
๐Ÿค– AI-assisted authoring

Let your AI assistant write the SQL

You describe the workflow in plain English โ€” Copilot writes correct durable-function SQL.

Skip the syntax. Just describe what you want.

This repo ships a reusable agent skill, pg-durable-sql, that teaches GitHub Copilot and other agents how to generate correct durable-function SQL โ€” operators, variable substitution, loops, parallel joins, and more.

๐Ÿ“ฆ 100% open source

Open-source durable functions for Postgres

No waitlist, no lock-in. Clone, build, and run durable functions in your own PostgreSQL today.

Bring durable orchestration to any PostgreSQL. Open source

pg_durable is fully open source today. Clone the repo, build the extension, and run durable functions in your own PostgreSQL โ€” on your laptop, your server, or your cloud.

Run it in the cloud Preview

pg_durable, fully managed on Azure HorizonDB

Azure HorizonDB is Microsoft's new PostgreSQL cloud service โ€” engineered for performance and built with pg_durable inside. Keep the durable functions you write here, and add enterprise scale, security, and AI without managing a single server.

โšก

Up to 3ร— faster

Outscales self-managed Postgres with auto-scaling storage to 128 TB and scale-out compute up to 3,072 vCores.

๐Ÿ›ก๏ธ

Enterprise protection

Real-time threat detection with Microsoft Defender and identity management through Microsoft Entra ID.

๐Ÿง 

Built for AI

Filtered DiskANN vector search, semantic ranking, and a curated set of in-database AI models.

๐Ÿ”—

Azure-native

Near-real-time mirroring to Microsoft Fabric, VS Code integration, and GitHub Copilot โ€” one ecosystem.

Built-in AI pipeline

Postgres Native AI pipeline, built on pg_durable

HorizonDB layers a managed, end-to-end AI pipeline on top of pg_durable's durable execution โ€” every stage is checkpointed, retried, and crash-safe, from raw data to ready-to-query vectors.

1IngestLoad docs & data
โ†’
2ChunkSplit content
โ†’
3EmbedVectorize
โ†’
4IndexDiskANN store
โ†’
5ServeSearch & rank
Run it in the cloud Azure HorizonDB Preview