Orbit
02 · Concepts

Unit of Work + event dispatch

Every write runs inside one transaction. Domain events are collected there and dispatched after commit — so projectors always see committed state.

The Unit of Work is the single seam through which every mutation passes. Services never touch the ORM client directly; they ask the UoW for a transactional context, call repository methods, and record domain events. The UoW opens a Postgres transaction, runs the work, commits, and then dispatches events to the event bus.

Both ORM tracks extend the same BaseUnitOfWork abstract class — event collection, the read-only proxy, and post-commit dispatch are identical across PrismaUnitOfWork and DrizzleUnitOfWork. Everything below applies to both.

Two modes: run and read

apps/api/src/kernel/uow.ts

export interface UnitOfWork {
run<T>(fn: (tx: TxContext) => Promise<T>): Promise<T>;
read<T>(fn: (tx: TxContext) => Promise<T>): Promise<T>;
}

run() opens a transaction, commits on success, rolls back on throw. read() runs the same repository API against a read-only context with no transaction and no event collector — a proxy wraps every repository and throws if you call a write method. Use read() for queries, keep every mutation inside run().

Note

There's no middle tier. Services either write through uow.run(), or they read through uow.read() — nothing else. That constraint keeps transaction boundaries explicit and makes it impossible to silently emit a domain event from a query path.

The TxContext

TxContext is the bag of repositories and the event collector that gets handed to your function. Every repository on it is transactional — they all share the same transactional handle, so writes across multiple aggregates commit together:

export interface TxContext {
users: UserRepository;
workspaces: WorkspaceRepository;
workspaceMembers: WorkspaceMemberRepository;
workspaceInvites: WorkspaceInviteRepository;
workspaceRoles: WorkspaceRoleRepository;
teams: TeamRepository;
teamMembers: TeamMemberRepository;
teamRoles: TeamRoleRepository;
billingCustomers: BillingCustomerRepository;
subscriptions: SubscriptionRepository;
billingEvents: BillingEventRepository;
events: TxEventCollector;
}

A real service

Here's the shape of a service. Read aggregates, mutate via domain methods, save, collect the resulting events, let the UoW do the rest:

Inside a service

await uow.run(async (tx) => {
const workspace = await tx.workspaces.findBySlug(slug);
if (!workspace) throw new NotFoundError("workspace.not_found");
const role = await tx.workspaceRoles.findByWorkspaceAndSystemKey(
workspace.id,
"MEMBER",
);
const member = WorkspaceMember.join(
{ workspaceId: workspace.id, userId, role: snapshotOf(role), seed: userId },
clock,
);
await tx.workspaceMembers.save(member);
tx.events.addMany(member.pullEvents()); // WorkspaceMemberJoined
});

What the UoW does after commit

Once the transaction callback returns and Postgres commits, the UoW dispatches every event you added to the event bus. The logic lives on the shared BaseUnitOfWork; each ORM subclass only supplies how to open a transaction:

apps/api/src/kernel/base-uow.ts

async run<T>(fn: (tx: TxContext) => Promise<T>): Promise<T> {
const pending: DomainEvent[] = [];
const collector: TxEventCollector = {
add: (event) => pending.push(event),
addMany: (events) => { for (const e of events) pending.push(e); },
};
const result = await this.openTransaction(async (handle) => {
const ctx: TxContext = { ...this.buildContext(handle), events: collector };
return fn(ctx);
});
if (pending.length > 0) {
await this.bus.publishMany(pending);
}
return result;
}
ORM
apps/api/src/infrastructure/prisma-uow.ts — open a Prisma transaction
protected openTransaction<T>(fn: (handle: Prisma) => Promise<T>): Promise<T> {
return this.db.$transaction((tx) => fn(tx as unknown as Prisma));
}

This is load-bearing for two reasons:

  • Projectors never extend the transaction budget. If the realtime publisher takes 50 ms to do a query, it doesn't push the parent transaction's row-lock window out by 50 ms.
  • Projectors always see committed state. If a projector opens a new read to build a DTO, it's guaranteed to find the rows that triggered the event — no read-after-write races against an uncommitted transaction.

The event bus

EventBus is a thin pub/sub:

export interface EventBus {
subscribe<E extends DomainEvent>(type: string, handler: EventHandler<E>): () => void;
subscribeAll(handler: EventHandler): () => void;
publish(event: DomainEvent): Promise<void>;
publishMany(events: readonly DomainEvent[]): Promise<void>;
}

The default implementation (InProcessEventBus) runs handlers sequentially in the same process. Tests use RecordingEventBus, which keeps an array of everything published so you can assert on it directly.

Note

Handlers run sequentially and errors propagate. One slow projector blocks others; one throwing projector kills the whole publish. When you need the opposite, wrap the handler body in try/catch and log — domain events are not a reliable job queue. For long work, enqueue a job from the projector instead.

Projectors

A projector is any class that subscribes to events and does something with them. The two that ship with the kit:

  • RealtimeEventPublisher — translates domain events into ServerEvent DTOs and broadcasts them on the appropriate workspace/team/member channel.
  • Mailer projectors — listen for WorkspaceInvited and friends; ask the Mailer port to send the corresponding email.

Shape of a projector:

this.bus.subscribe<WorkspaceMemberJoined>(
"workspaces.member.joined",
async (event) => {
const dto = await this.uow.read(async (tx) => {
const member = await tx.workspaceMembers.findById(event.memberId);
if (!member) return null;
const [user, role] = await Promise.all([
tx.users.findById(member.userId),
tx.workspaceRoles.findById(member.roleId),
]);
return workspaceMemberToDTO(member, user, role);
});
if (!dto) return;
this.hub.broadcast(channels.workspace(event.workspaceId), {
type: "workspace.member.joined",
member: dto,
});
},
);

Testing

Service tests don't need a database. Swap in in-memory repositories and a RecordingEventBus, run the service, and assert on the events:

const bus = new RecordingEventBus();
const uow = new InMemoryUnitOfWork(bus);
await service.execute({ userId, workspaceId });
expect(bus.events).toEqual([
expect.objectContaining({ type: "workspaces.member.joined", userId }),
]);