jd-worker — Library Reference

Complete reference for the jd-worker Python package. Covers the jd_worker_cli command (launch, background workers, interactive mode, and management subcommands), all importable helper functions, environment variables, and practical usage patterns.

? Overview

The jd-worker package provides two things:

  1. A CLI tool (jd_worker_cli) — the worker process you run in your terminal. It polls the job server every ~3 minutes via POST /worker/heartbeat (liveness, dashboard control, and job assignment when idle), heartbeats every 57 s while a job runs, invokes your script once per job, and reports results.
  2. A Python library (import jd) — a small set of helper functions your script imports to resolve correct file paths and upload results to the central server.

The library functions are designed to be zero-configuration. They read the context (experiment name, job ID, server URL) from environment variables that jd_worker_cli sets automatically before calling your script. You never need to pass these values manually.

1 Installation

bash
pip install jd-worker

# Upgrade to the latest version at any time
pip install --upgrade jd-worker

Requires Python 3.8 or later. Install inside a virtual environment (see the Getting Started guide for instructions).

After installation, verify both the CLI and the library are available:

bash
jd_worker_cli help
export JD_API_KEY=jd_…   # required for interactive mode
jd_worker_cli          # interactive shell (API key verified with Hub)
python -c "import jd; print(jd.__version__)"

2 jd_worker_cli — The Worker Command

jd_worker_cli is the main executable installed by the package. In its default mode it runs as a long-lived background process that:

  1. Connects to the Hub to obtain a short-lived worker token and the server URL for your experiment (Hub mode).
  2. Registers itself in a local SQLite registry so you can list, stop, and manage workers without tmux or process hunting.
  3. Polls the server for the next PENDING job.
  4. Sets environment variables describing the job and calls your entry script as a subprocess.
  5. Sends POST /worker/heartbeat every 57 seconds while the script runs.
  6. Marks the job DONE or ABORTED depending on the exit code, then goes back to step 3.
  7. Keeps running when the queue is empty — probing every 3 minutes (180 s idle heartbeat) for new jobs. Use once=true to exit after one job or when no job is available.

On every launch (except help), the CLI first scans the local registry SQLite databases, removes worker rows whose PIDs are no longer running, releases orphaned instance names, and deletes empty experiment cache directories. Large machines (50+ registered workers) may show a short progress bar with ETA during this step; later commands in the same process skip redundant work.

Arguments use key=value tokens. Subcommands such as worker-list or exp-status are bare words mixed with the same tokens. Run jd_worker_cli help for the full embedded reference.

Starting workers

The run command (no subcommand) starts one or more workers:

bash
export JD_API_KEY=jd_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

jd_worker_cli expId=digits-tune entry_script=train.py

Required to start:

ArgumentDescription
expId=<name> Experiment name — must match the server’s --expId.
entry_script=<path> Python script executed for each job (same interpreter as jd_worker_cli).

Launch arguments

Arguments can be passed as key=value tokens or via the corresponding environment variable.

ArgumentEnvironment variableDefaultDescription
expId=<name> JD_EXP_ID required The experiment name as created in the Hub Dashboard.
entry_script=<path> JD_ENTRY_SCRIPT required Path to the Python script to execute for each job.
api_key=<key> JD_API_KEY Hub API key. Prefer setting via environment rather than the command line.
hub=<url> JD_HUB_URL https://hub.jobdistributor.net Hub base URL. Only change for a self-hosted Hub.
server=<url> JD_SERVER http://localhost Job server URL for standalone mode (no Hub). Host-only values get http:// prepended.
port=<N> JD_PORT 5000 Port when server= has no port (standalone mode).
machine_type=<label> JD_MACHINE_TYPE worker Label shown in the dashboard (e.g. gpu-a100, slurm-hpc).
process_id=<N> 0 Numeric ID for a single worker. Ignored when num_workers > 1.
num_workers=<N> JD_NUM_WORKERS 1 Spawn N parallel workers. Hub auth runs once in the parent and is shared with children.
foreground=true JD_FOREGROUND false Run attached to the terminal (logs to stdout). Default is background.
once=true JD_ONCE false Exit after one completed job, or immediately when no job is available.
log_dir=<path> JD_LOG_DIR (derived) Override log directory. Default: <parent>/jd_data/<expId>/jd_worker_logs/.
JD_WORKSPACE_PATH ~ Parent directory for job sandboxes; data lives under …/jd_data/<expId>/.
JD_CACHE_PATH ~/.jd_cache Root for the local worker registry (.cache/<expId>/workers.db). Independent of workspace path.
ℹ️ Local storage (defaults):
Job data: ~/jd_data/<expId>/<job_id>/ (override parent with JD_WORKSPACE_PATH — the jd_data/ subdirectory is always appended).
Registry DB: ~/.jd_cache/.cache/<expId>/workers.db (override with JD_CACHE_PATH, e.g. /tmp/.jd_cache on HPC node-local scratch).
There is no workspace_path= CLI argument — set env vars before launching.

Background workers (default)

By default, workers detach to the background — no tmux required. The launcher prints each worker’s worker_id and PID, then returns immediately. Process metadata is stored in:

path
~/.jd_cache/.cache/<expId>/workers.db
# or <JD_CACHE_PATH>/.cache/<expId>/workers.db

Each worker gets a worker_id like gpunode_egg_0 ({host}_{instance}_{slot}). The instance segment is a short random object name (e.g. egg, table, max 6 letters) allocated from a pool in workers.db. Legacy workers may still use 6-character alphanumeric tokens. Standalone launches always use slot 0; num_workers=N uses slots 0 … N-1. The same id appears in worker-list, on the server as requested_by, and in log filenames.

bash
# Start four background workers (returns immediately)
jd_worker_cli expId=digits-tune entry_script=train.py num_workers=4

# List workers for one experiment
jd_worker_cli expId=digits-tune worker-list

# List all experiments with worker counts on this machine
jd_worker_cli exp-list

# Stop one worker or all
jd_worker_cli expId=digits-tune stop gpunode_egg_0
jd_worker_cli expId=digits-tune stop all

Use foreground=true when debugging — logs stream to the terminal and the process stays attached until you press Ctrl+C.

💡 How many workers? One per CPU core for CPU-bound jobs, or one per GPU for GPU-bound jobs. Check cores with nproc (Linux) or sysctl -n hw.physicalcpu (macOS).

Interactive mode

Run jd_worker_cli with no arguments (or interactive / -i) for a mysql-style REPL. This is useful on shared clusters where you manage workers over a long SSH session without retyping expId= every time.

ℹ️ Interactive mode requires a valid Hub API key before the shell starts. Use export JD_API_KEY=jd_… (or pass api_key= on the command line). If unset, the CLI prompts securely and verifies the key with the Hub. Invalid keys must be corrected before entering the REPL — create keys at hub.jobdistributor.net/api-keys.
bash
jd_worker_cli
jd_worker_cli -i expId=digits-tune
text
jd_worker_cli 1.15.0 — interactive mode
Type help for commands, exit or Ctrl-D to quit.
jd> use digits-tune
Using experiment 'digits-tune'.
jd[digits-tune]> worker-list
jd[digits-tune]> exp-status
jd[digits-tune]> entry_script=train.py num_workers=4
jd[digits-tune]> stop all
jd[digits-tune]> exit
REPL commandDescription
use <expId>Set default experiment for the session (also honours JD_EXP_ID on startup).
useShow current experiment.
help / \hCommand reference.
exit / quit / \qLeave interactive mode.

Once use is set, omit expId= on later lines. Trailing semicolons are optional. Command history is saved to ~/.jd_cache/.cache/jd_worker_history (under JD_CACHE_PATH when set).

Management commands

These subcommands manage workers already running on the machine, query the server, or perform housekeeping. Most require expId=<name> unless you are in interactive mode with use set.

CommandDescription
versionPackage version, Python executable, and cache root.
health [expId=<id>]Check Hub and job-server connectivity.
exp-listAll experiments on this machine with worker counts.
expId=<id> worker-listList running workers (ID, PID, status, current job).
expId=<id> worker-status <worker-id>Detailed status: uptime, current job, last ping, log path.
expId=<id> worker-logs <worker-id> [lines=N] [follow=true]Print or tail a worker log file.
expId=<id> exp-statusExperiment summary: worker count, busy/idle, draining flag.
expId=<id> server-infoJob counts by status (PENDING, SERVED, DONE, …) from the server.
expId=<id> whereShow paths: registry DB, jd_data root, logs.
expId=<id> show-config <worker-id>Launch configuration stored when the worker registered.
expId=<id> stop all|<worker-id>Stop all workers or one worker (SIGTERM, then SIGKILL after 30 s).
expId=<id> stop job=<job-id>Stop whichever worker is running that job.
expId=<id> confirm-stopStop all workers after typing the experiment name (shared-cluster guardrail).
expId=<id> stop all confirm-stop=trueSame confirmation behaviour as confirm-stop.
stop all-experimentsStop workers for every experiment on this machine.
expId=<id> restart all|<worker-id>Stop and respawn using stored launch config.
expId=<id> scale num_workers=<N>Scale up or down to exactly N workers.
expId=<id> drainMark experiment draining — workers finish their current job then exit (no new jobs).
pruneForce deep-clean of all local registries (also runs automatically at CLI startup).
clear_allWipe all local experiment cache on this machine (type clear_all to confirm). Notifies the server for active workers first.
bash
jd_worker_cli expId=digits-tune exp-status
jd_worker_cli expId=digits-tune worker-status gpunode_egg_0
jd_worker_cli expId=digits-tune worker-logs gpunode_egg_0 lines=100 follow=true
jd_worker_cli expId=digits-tune server-info
jd_worker_cli expId=digits-tune scale num_workers=8
jd_worker_cli expId=digits-tune drain
jd_worker_cli expId=digits-tune stop job=42
jd_worker_cli prune

Heartbeat protocol (POST /worker/heartbeat)

Workers registered with jd-worker 1.15.0+ use POST /worker/heartbeat for liveness, job assignment, and dashboard control. Every 180 seconds when idle (and every 57 seconds while a job runs), the worker sends:

  • worker_id, host, machine_type
  • reported_statusidle or busy
  • current_job_id — set while running a job
  • applied_version — ack of the last dashboard command applied
  • system_metrics — CPU/RAM snapshot

The server responds with:

  • desired_staterun, pause, drain, or stop
  • desired_version / applied_version — command versioning
  • job — optional job payload when idle and desired_state=run
  • heartbeat_interval — seconds until the next heartbeat (180 idle, 57 busy)

While a job runs, a background heartbeat thread keeps the worker and job rows fresh and picks up dashboard commands. Stop and drain take effect after the current job completes (or immediately when idle for stop).

Dashboard worker control

The server dashboard shows live idle and busy worker counts in the Jobs sidebar. Open the Workers Management nav tab for the full UI: active vs disabled workers, host/instance/slot filters, per-worker history, and metrics. Open Details on any row to view history (newest first) and system metrics like the job detail modal.

  • Filter workers by host, instance, and slot
  • Queue Resume, Pause, Drain, or Stop on the Active Workers sub-tab
  • Review Disabled / Stopped workers (read-only) after shutdown or lost heartbeat
  • Cancel a queued command before the worker’s next idle heartbeat (~3 min / 180 s)

Dashboard commands set desired state on the server — they do not send signals directly to worker processes. Precedence is stop > drain > pause > run. Local CLI commands (stop, drain) still work on the machine where the worker runs; dashboard control is for remote management without SSH.

ℹ️ Queued commands show as queued drain / queued stop in the dashboard until the worker heartbeats and applies them. Use Cancel to revert to the previous desired state.

Examples

Standard Hub mode:

bash
export JD_API_KEY=jd_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

jd_worker_cli expId=digits-tune entry_script=train.py machine_type=cpu

Four background workers on one machine:

bash
jd_worker_cli expId=digits-tune entry_script=train.py \
              num_workers=4 machine_type=gpu

Smoke-test one job in the foreground:

bash
jd_worker_cli expId=digits-tune entry_script=train.py \
              foreground=true once=true

Standalone server (no Hub):

bash
jd_worker_cli expId=digits-tune entry_script=train.py \
              server=192.168.1.10 port=8000

Custom data root on scratch disk (HPC):

bash
# Shared storage for large job I/O
export JD_WORKSPACE_PATH=/scratch/$USER
# Node-local registry (SQLite) — one .jd_cache per compute node
export JD_CACHE_PATH="${TMPDIR:-/tmp}/.jd_cache"
mkdir -p "$JD_WORKSPACE_PATH" "$JD_CACHE_PATH"

jd_worker_cli expId=digits-tune entry_script=train.py num_workers=4
ℹ️ On a remote machine you can still use tmux or screen so an interactive REPL session survives SSH disconnects — but background workers no longer require it.

3 Path Helper Functions

These functions return pathlib.Path objects pointing to the correct directories on the worker machine. They work by reading the JD_JOB_ID and JD_EXP_ID environment variables that jd_worker_cli sets before calling your script. You only need to import and call them — no arguments required.

function jd.jd_job_dir() → Path

Returns the path to the working directory for the current job. This directory is unique per job and is where your script should write all its output files.

Path: ~/jd_data/<exp_id>/<job_id>/

The directory is created by jd_worker_cli before your entry script runs (and passed as --base_path / JD_WORKER_JOB_DIR). Call .mkdir(parents=True, exist_ok=True) if you write to subdirectories you create yourself.

Python
from jd import jd_job_dir

job_dir = jd_job_dir()
job_dir.mkdir(parents=True, exist_ok=True)

# Write a result file
(job_dir / "result.json").write_text('{"accuracy": 0.97}')

# Write a model checkpoint
import torch
torch.save(model.state_dict(), job_dir / "model.pt")
function jd.jd_exp_dir() → Path

Returns the path to the shared experiment directory — the parent of all job directories for this experiment.

Path: ~/jd_data/<exp_id>/

Use this when your script needs to read a shared resource (a dataset, a pre-trained model) that should be downloaded once per machine and shared across all jobs.

Python
from jd import jd_exp_dir
from pathlib import Path

# Load a shared dataset stored once in the experiment directory
dataset_path = jd_exp_dir() / "dataset.csv"

if not dataset_path.exists():
    # Download it on first use
    download_dataset(dataset_path)
function jd.jd_worker_workspace() → Path

Returns the root of the jd data directory on the worker machine. All job output, logs, and cached shared data live under this directory.

Default path: ~/jd_data/

The root can be redirected by setting JD_WORKSPACE_PATH in the environment before launching jd_worker_cli. This is useful on shared HPC clusters, scratch disks, or any machine where the home directory has limited quota.

bash
# Store all job data on a fast scratch disk instead of ~
export JD_WORKSPACE_PATH=/scratch/$USER

jd_worker_cli expId=digits-tune entry_script=train.py num_workers=4

With the above, all job directories resolve to /scratch/<user>/jd_data/<exp_id>/<job_id>/ instead of ~/jd_data/…. Inside your entry script, always use jd_job_dir() or jd_worker_workspace() rather than constructing paths manually — they honour JD_WORKSPACE_PATH automatically.

Python
from jd import jd_worker_workspace

workspace = jd_worker_workspace()
# ~/jd_data/  (default)   or   /scratch/alice/jd_data/  (if JD_WORKSPACE_PATH is set)

print(workspace)  # Path object — e.g. /home/alice/jd_data

4 File Upload

function jd.jd_upload(file_path: str | Path, …) → dict

Uploads a local file to the central job server, associating it with the current job. The uploaded file is immediately visible in the server dashboard under the job's entry, where it can be downloaded or browsed.

Parameters:

  • file_path — path to the local file to upload (string or pathlib.Path). Maximum file size: 100 MB.

Returns: A dict with success, filename, version, and size_bytes on success. Raises on failure after retries (default 5 attempts; configure with JD_UPLOAD_MAX_RETRIES).

When to use it: Call jd_upload() when workers run on machines with temporary storage (cloud spot instances, Slurm nodes) where files would be lost after the job ends. Uploading consolidates all results on the server for easy retrieval.

Python
from jd import jd_job_dir, jd_upload
import json

job_dir = jd_job_dir()
job_dir.mkdir(parents=True, exist_ok=True)

# Save result locally first
result_path = job_dir / "result.json"
result_path.write_text(json.dumps({"accuracy": 0.97}, indent=2))

# Upload to the server so it's visible in the dashboard
jd_upload(result_path)

# You can upload multiple files from the same job
jd_upload(job_dir / "confusion_matrix.png")
jd_upload(job_dir / "model.pt")
💡 Always save the file locally with jd_job_dir() before calling jd_upload(). The local copy is useful for debugging even if the upload fails.

5 Checkpoints

Two functions let your entry script save intermediate training state to the server and resume from it on restart:

FunctionSignatureWhat it does
jd_update_checkpoint (obj) → dict Serialises any Python object with pickle and uploads it as a versioned checkpoint. Max 100 MB.
jd_get_last_checkpoint () → object | None Downloads the latest checkpoint for the current job and returns it as a Python object. Returns None on first run.
Python
from jd import jd_update_checkpoint, jd_get_last_checkpoint

# Resume if a checkpoint exists (returns None on first run)
ckpt        = jd_get_last_checkpoint()
start_epoch = ckpt["epoch"] + 1 if ckpt else 0
if ckpt:
    model.load_state_dict(ckpt["model"])

# Save a checkpoint every 5 epochs
for epoch in range(start_epoch, args.epochs):
    train_one_epoch(model, optimizer)
    if (epoch + 1) % 5 == 0:
        jd_update_checkpoint({"epoch": epoch, "model": model.state_dict()})

For the full guide — including patterns for saving the best model, non-PyTorch objects, size limits, and a comparison with local file saves — see the dedicated Checkpointing Guide.

6 Environment Variables

jd_worker_cli sets these environment variables in the subprocess that runs your script. You can read them directly with os.environ when you need values that the path helpers do not expose.

VariableValueExample
JD_JOB_ID Integer job ID (as a string) "42"
JD_EXP_ID Experiment name "digits-tune"
JD_SERVER Job server base URL "https://digits-tune-server.jobdistributor.net"
JD_WORKER_WORKSPACE_ROOT Root data directory (jd_data/ parent) "/home/alice/jd_data"
JD_WORKER_JOB_DIR Full path to this job's output directory "/home/alice/jd_data/digits-tune/42"
JD_WORKER_ID Worker id for this process (Hub mode) "gpunode_egg_0"
JD_WORKER_TOKEN Short-lived JWT for server API calls (initial value; registry is refreshed by the parent worker) "eyJ…"

Example — read job ID and experiment name directly:

Python
import os

job_id  = int(os.environ["JD_JOB_ID"])
exp_id  = os.environ["JD_EXP_ID"]
print(f"Running job {job_id} for experiment '{exp_id}'")

Additionally, the following variables from the worker's own environment are inherited by your script and can also be used to control jd_worker_cli itself:

VariableControls
JD_API_KEYAPI key for Hub authentication
JD_HUB_URLHub base URL (override only if self-hosting the Hub)
JD_EXP_IDDefault experiment name (replaces expId= CLI argument)
JD_ENTRY_SCRIPTDefault entry script path
JD_MACHINE_TYPEMachine type label shown in the server dashboard
JD_WORKSPACE_PATHParent directory for jd_data/ job sandboxes (default: home directory ~ → data at ~/jd_data/)
JD_CACHE_PATHRoot for local worker registry (default: ~/.jd_cache; registry at .cache/<expId>/workers.db)
JD_NUM_WORKERSNumber of parallel workers to spawn (default: 1)
JD_FOREGROUNDSet to true to run attached to the terminal (default: background)
JD_LOG_DIROverride worker log directory
JD_ONCESet to true to run one job then exit (or exit when queue is empty)
JD_SKIP_REGISTRY_PRUNESet to 1 to skip automatic startup registry cleanup (debugging only)

7 Worker Logs

Each worker process writes a log file capturing job fetches, status updates, ping activity, and the combined stdout/stderr from your entry script. Background workers log to file only; use foreground=true to mirror logs to the terminal.

Default log location:

path
~/jd_data/<exp_id>/jd_worker_logs/jd_worker_<worker_id>.log

worker_id is formatted as {host}_{instance}_{slot} (e.g. gpunode_egg_0). Override the directory with log_dir=<path> or JD_LOG_DIR.

View logs via the CLI (uses the path stored in the worker registry):

bash
# Last 50 lines
jd_worker_cli expId=digits-tune worker-logs gpunode_egg_0

# Last 200 lines, then follow live
jd_worker_cli expId=digits-tune worker-logs gpunode_egg_0 lines=200 follow=true

Or tail the file directly:

bash
tail -f ~/jd_data/digits-tune/jd_worker_logs/jd_worker_*.log

Use worker-status <worker-id> to see the exact log path for a registered background worker.

8 Effective Usage Patterns

Pattern 1 — Minimal script (result only)

For quick experiments where you only need a single JSON result and no file upload:

Python
import argparse, json
from jd import jd_job_dir

parser = argparse.ArgumentParser()
parser.add_argument("--lr", type=float)
parser.add_argument("--epochs", type=int)
args = parser.parse_args()

# ... train ...
accuracy = 0.95

job_dir = jd_job_dir()
job_dir.mkdir(parents=True, exist_ok=True)
(job_dir / "result.json").write_text(json.dumps({
    "lr": args.lr, "epochs": args.epochs, "accuracy": accuracy
}))

Pattern 2 — Ephemeral nodes (Slurm / cloud) with upload

When nodes lose their local storage after the job ends, upload everything important before the script exits:

Python
from jd import jd_job_dir, jd_upload

job_dir = jd_job_dir()
job_dir.mkdir(parents=True, exist_ok=True)

# ... train and save outputs ...

# Upload all files in the job directory
for f in job_dir.iterdir():
    if f.is_file():
        jd_upload(f)

Pattern 3 — Shared dataset across jobs

Download a dataset once per machine to the experiment directory, then load it for every job without re-downloading:

Python
from jd import jd_exp_dir
import filelock

dataset_path = jd_exp_dir() / "data.csv"
lock_path    = jd_exp_dir() / "data.csv.lock"

# Thread-safe: only the first worker downloads; others wait and reuse
with filelock.FileLock(str(lock_path)):
    if not dataset_path.exists():
        download_dataset(dataset_path)

df = pd.read_csv(dataset_path)
ℹ️ Install filelock with pip install filelock to safely share resources between concurrent workers on the same machine.

Pattern 4 — Resumable training with checkpoints

For long-running jobs on preemptible hardware, use jd_update_checkpoint() periodically and jd_get_last_checkpoint() at startup to resume automatically from the last saved epoch. See the Checkpointing Guide for complete examples including best-model saving, non-PyTorch objects, and a full resume pattern.