logoobbig

Introduction

A TypeScript library for type-safe durable workflows with pluggable storage/queue backends.

Jobbig

release

[ˈjɔbːɪɡ] the swedish word for bothersome

This is the first library I have ever published, I might not get everything right immediately. Before version 1.0.0, there will be breaking changes. Before 1.0.0, I will treat minor versions as major versions and patches as both bug fixes and features.

Quick Start

import { job } from "@jobbig/core";
import { LocalQueue, LocalStore } from "@jobbig/local";
import { server } from "@jobbig/runners";
import { z } from "zod";

// Define a job with steps
const myJob = job({
  id: "process-data",
  schema: z.object({
    input: z.number(),
    output: z.number().optional(),
  }),
  run: async ({ ctx }) => {
    await ctx.step.run("preprocess", async () => {
      console.log("Preprocessing input...");
    });

    await ctx.step.run("process", async () => {
      console.log("Processing data...");
    });

    await ctx.step.run("save", async () => {
      console.log("Saving results...");
    });
  },
});

// Set up local storage and queue
const queue = LocalQueue([]);
const store = LocalStore({});



// Start the server
const worker = ContinousWorker({
  queue,
  store,
  jobs: [myJob],
});
worker.start();



// Create the publisher
const publisher = Publisher({
	queue,
	store,
});

// Publish runs
for (const run of runs) {
	await publisher.publish(run);
}

Concepts

A brief explanation of the key interfaces and concepts of Jobbig.

Worker

A worker is a handler for different types of environments.

interface Worker {
	start(): Promise<void>;
}

Orchestrator

An orchestrator specifies how the polling of jobs is done. In a cron job, it exhausts the queue until it is empty, but in a server environment, it constantly polls the queue for new jobs within a specified interval.

Runner

A runner is responsible for executing jobs. There exists a BaseRunner, which is likely sufficient in most cases.

export interface Runner {
	run(): Promise<void>;
}

Job

A job is a function which can be scheduled. A job can consist of multiple steps.

export interface Job<T extends StandardSchemaV1 = any> {
	/**
	 * Unique identifier of the jobs. Used to match handlers with runs.
	 */
	id: string;
	/**
	 * The job runner.
	 * @param opts - The options for running the job.
	 * @returns A promise that resolves when the job is completed.
	 */
	run(opts: RunInput<StandardSchemaV1.InferInput<T>>): Promise<void>;
	/**
	 * Schema of the data
	 */
	schema: T;
	retries?: {
		/**
		 * Max amount of retries
		 * @default 0
		 */
		amount?: number;
		/**
		 * Defaults to exponential backoff
		 * @param attempt (the current attempt, starts at 0)
		 * @returns delay
		 */
		delayFn?: (attempt: number) => number;
	};

	hooks?: {
		beforeRun?(opts: RunInput<StandardSchemaV1.InferInput<T>>): Promise<void>;
		afterRun?(opts: RunInput<StandardSchemaV1.InferInput<T>>): Promise<void>;
		beforeStep?(opts: RunInput<StandardSchemaV1.InferInput<T>>): Promise<void>;
		afterStep?(opts: RunInput<StandardSchemaV1.InferInput<T>>): Promise<void>;
	};
}

Step

A step is a smaller part of a job, which consists of a handler and an id. A step with a given id will only be executed once per job. While the core logic of the job will be rerun on retries.

export interface Step {
	run(id: string, handler: () => Promise<void>): Promise<void>;
	cleanup(): Promise<void>;
}

Run

A run is a scheduled execution of a job. It contains metadata about the jobs execution, such as the status, start time, end time, results and the current execution step.

export interface Run {
	id: string;
	jobId: string;
	status: RunStatus;
	scheduledAt: Date;
	data: unknown;
	result?: unknown;
	currentStep: number;
	startedAt?: Date;
	createdAt: Date;
	finishedAt?: Date;
}

Store

A store contains the state of the run. I choose to separate it from Queue to allow for things that cannot fetch specific data.

export interface Store {
	store(run: Run): Promise<void>;
	set<T extends keyof Run>(runId: string, key: T, value: Run[T]): Promise<void>;
	get<T extends keyof Run>(runId: string, key: T): Promise<Run[T] | undefined>;
	fetch(runId: string): Promise<Run | undefined>;
}

ScopedStore

A scoped store is a store that is scoped to a specific run. It is used to store and retrieve data for a specific run, so that it can be used in the context of the job without being able to impact other runs.

export interface ScopedStore {
	set<T extends keyof Run>(key: T, value: Run[T]): Promise<void>;
	get<T extends keyof Run>(key: T): Promise<Run[T] | undefined>;
}

Queue

A queue is responsible for storing and retrieving runs. It is used to schedule and execute jobs.

export interface QueueInfo {
  exhausted: boolean;
}

export interface Queue {
	push(run: Run): Promise<unknown>;
  poll(amount: number): Promise<{ runs: Run[], info: QueueInfo }>;
}

Publisher

A publisher is responsible for publishing runs to a queue.

export interface Publisher {
	publish(run: Run): Promise<void>;
}

Development

# Install dependencies
bun install

# Build all packages
bun run build

# Run the example
cd examples/local
bun run index.ts

Example

See examples/local/index.ts for a complete working example with:

  • Multiple job definitions
  • Step-based execution
  • Local queue and storage setup
  • Server runner configuration

License

MIT