Orbit
03 · Guides

Write a graphile-worker job

Declare a payload type, register a handler, pick a schedule. Works identically under QStash.

Walking example: a workspace-invite cleanup job that runs hourly and deletes invites older than 14 days. The shape is provider-agnostic — the same defineJob call works under graphile-worker and QStash.

1. Augment the global jobs registry

Jobs are typed end-to-end: the payload you enqueue is the payload the handler receives. Declare it once by augmenting OrbitJobs.Jobs:

apps/api/src/workspaces/jobs/cleanup-stale-invites.job.ts

declare global {
namespace OrbitJobs {
interface Jobs {
"workspaces.invites.cleanup-stale": { olderThanHours: number };
}
}
}
Note

The job name is a string literal, not a constant. Pick something descriptive and namespaced — "<context>.<aggregate>.<verb>" works well. It's what shows up in logs and queue tables.

2. Define the handler

defineJob binds the name, handler, schedule, and retry policy. Dependencies (UoW, clock, application services) come in via the outer factory's closure — the handler context only carries what's specific to this run: signal, attempt, and jobName.

import { defineJob } from "@/jobs/application/job-registry.ts";
import type { WorkspacesServices } from "./feature.ts";
export function cleanupStaleInvitesJob(services: WorkspacesServices) {
return defineJob({
name: "workspaces.invites.cleanup-stale",
schedule: "0 * * * *", // every hour, on the hour
maxAttempts: 3,
handler: async (payload, ctx) => {
if (ctx.signal.aborted) return;
const deleted = await services.cleanupStaleInvites.execute({
olderThanHours: payload.olderThanHours,
});
console.log(`[${ctx.jobName}] deleted ${deleted} invite(s)`);
},
});
}
Note

Real jobs in the repo follow this shape — see apps/api/src/audit/jobs.ts and apps/api/src/demo/jobs.ts. The handler stays thin and delegates to a service that already owns its UoW, clock, and repositories.

The schedule is a 5-field crontab string (minute, hour, day, month, weekday). 0 * * * * is hourly, 0 0 * * * is daily at midnight UTC. Omit schedule entirely for on-demand jobs.

3. Register it in buildJobs()

Jobs are collected centrally in apps/api/src/jobs/feature.ts. The buildJobs(container) function appends each job to the registry, closing over container.services so handlers reach the same application layer the HTTP routes use:

apps/api/src/jobs/feature.ts

import { cleanupStaleInvitesJob } from "@/workspaces/jobs/cleanup-stale-invites.job.ts";
export function buildJobs(container: AppContainer): JobRegistry {
const jobs: JobDefinition[] = [];
jobs.push(cleanupStaleInvitesJob(container.services));
// other feature-gated jobs go here
return jobs;
}

That's all the wiring. composition.ts already invokes buildJobs() after services are built and hands the aggregate registry to the chosen JobRuntime adapter.

4. Enqueueing on demand

Scheduled jobs run themselves. For ad-hoc or event-triggered dispatch, services call queue.enqueue() with the typed name and payload:

await queue.enqueue(
"workspaces.invites.cleanup-stale",
{ olderThanHours: 24 * 14 },
{
jobKey: "invites-cleanup-daily", // dedupe across parallel enqueues
runAt: new Date(Date.now() + 10_000),
},
);

jobKey is the provider-level dedupe handle. Same key, same-ish time? Only one runs. Use it for idempotency on projectors that may fire the same event more than once.

Enqueueing from a projector

A common pattern: domain event fires, projector queues a job that does the heavy lifting after commit. Keeps the event-bus handler fast and makes the real work retriable:

bus.subscribe<WorkspaceMemberJoined>(
"workspaces.member.joined",
async (event) => {
await queue.enqueue(
"workspaces.onboarding.send-welcome",
{ memberId: event.memberId },
{ jobKey: `welcome:${event.memberId}` },
);
},
);

5. How it actually runs

Under graphile-worker

The API process boots GraphileJobRuntime as part of startup. It polls the graphile_worker.jobs table and uses Postgres LISTEN/NOTIFY for low-latency dispatch. Scheduled jobs are inserted by graphile's cron mechanism on boot and re-scheduled after each run.

Heads up

Workers need long-lived Postgres sessions. If DATABASE_URL points at a transaction pooler, set WORKER_DATABASE_URL to a direct URL — see the Postgres deploy page.

Under QStash

Enqueue publishes to QStash. On the schedule or runAt, QStash POSTs to ${QSTASH_CALLBACK_URL}/v1/jobs/run/workspaces.invites.cleanup-stale. The API verifies the signature against QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY, parses the payload, and dispatches to the same handler.

You don't change anything in the job code — same defineJob definition, different transport.

Idempotency

Jobs can run more than once. Providers retry on thrown errors; upstash-retried counts increment on each attempt. Write handlers that tolerate it:

handler: async (payload, ctx) => {
if (ctx.attempt > 1) {
console.warn(`[${ctx.jobName}] retry attempt=${ctx.attempt}`);
}
await services.cleanupStaleInvites.execute(payload);
// Use natural keys + UPSERT inside the service. Don't check-then-write.
// Don't assume the previous attempt did nothing.
}

Testing

Job handlers are just async functions over the service closure. Test them by building the same services you'd hand to a controller — in-memory UoW, fake clock, recording event bus — and invoking the handler with a stub context:

it("deletes invites older than the cutoff", async () => {
const clock = new FixedClock("2026-05-01T00:00:00Z");
const uow = new InMemoryUnitOfWork();
await uow.run(tx => tx.workspaceInvites.saveAll([
pendingInvite({ createdAt: "2026-04-01T00:00:00Z" }), // stale
pendingInvite({ createdAt: "2026-04-29T00:00:00Z" }), // fresh
]));
const services: WorkspacesServices = {
cleanupStaleInvites: new CleanupStaleInvitesService(uow, clock),
// ... other services
};
const job = cleanupStaleInvitesJob(services);
await job.handler(
{ olderThanHours: 24 * 14 },
{ signal: new AbortController().signal, attempt: 1, jobName: job.name },
);
const remaining = await uow.read(tx => tx.workspaceInvites.listAll());
expect(remaining).toHaveLength(1);
});

No queue, no worker, no Postgres. The handler is pure logic; the runtime is the thing you're not testing.