Building a daily data pipeline with Dagu, Python, and a JSONL data lake


TL;DR. Three stages (ingest, index, alert), one stage per script, JSONL as the index format, a flat dedup state file, Dagu for scheduling. Boring, reliable, and dramatically less work than reaching for a heavyweight orchestrator.

There is a class of pipeline that does not deserve a Spark cluster, an Airflow deployment, or a multi-tenant orchestrator. It is the “fetch a few hundred records from an API every day, index them, alert on the interesting ones” job. We have built this kind of thing dozens of times. Here is the shape that has held up.

The example below is a calendar pipeline: pull a list of upcoming events for a few hundred entities from a public source, index each daily snapshot, and send Slack alerts when an event is approaching. Substitute your own domain.

Three stages, each with a single responsibility:

ingest  ->  index  ->  alert
   |          |          |
   v          v          v
  JSON      JSONL     Slack
snapshot    index    message
  • Ingest writes a timestamped JSON snapshot to disk.
  • Index scans every snapshot and produces a single JSONL file with file-level metadata.
  • Alert reads the last line of the JSONL, groups by threshold (D-30, D-15, D-7, D-3, D-1), deduplicates against a state file, and posts to Slack.

Each stage runs from its own script, can be invoked manually, and can be replayed without affecting the others.

It is tempting to keep the snapshot files as the source of truth and re-read them every alert run. JSONL gives you three things for very little:

  • One line per snapshot. File path, ingest timestamp, snapshot content embedded as a content_text field.
  • Append-only. Re-indexing is “scan dir, sort by mtime, re-emit”. Cheap.
  • Latest payload is tail -n 1. No glob and no timestamp parsing in the alerting code.

The alerting script does not care how many snapshots you keep. It always reads the last JSONL line.

def ingest(output_dir: Path, lookahead_days: int = 60, delay_sec: float = 0.5):
    universe = load_universe()  # from a CSV in the repo
    rows = []
    for ticker in universe:
        try:
            row = fetch_one(ticker, lookahead_days=lookahead_days)
        except Exception as exc:
            logging.warning("fetch failed for %s: %s", ticker, exc)
            continue
        if row is not None:
            rows.append(row)
        time.sleep(delay_sec)

    out = output_dir / f"snapshot_{datetime.utcnow():%Y%m%d_%H%M%S}.json"
    out.write_text(json.dumps({"fetched_at": datetime.utcnow().isoformat(),
                               "rows": rows}, indent=2))
    return out

What to notice:

  • Per-row try and except. A single bad upstream record never fails the whole run. The log line is enough to investigate after the fact.
  • Polite delay between calls. If the source has no documented rate limit, 0.5 to 1 second is the cheapest insurance you can buy. We have seen “no rate limit” turn into “soft block for an hour” on more than one source.
  • Optional proxy file. A --proxy-file flag that takes one HTTP proxy per line. Off by default, on when you have to be.
  • Snapshot file is timestamped, never overwritten. Rolling back a bad ingest run is rm and re-run.
def index(snapshot_dir: Path, index_path: Path):
    files = sorted(snapshot_dir.glob("snapshot_*.json"))
    with index_path.open("w") as f:
        for path in files:
            content = path.read_text()
            f.write(json.dumps({
                "path": str(path),
                "mtime": path.stat().st_mtime,
                "size": path.stat().st_size,
                "content_text": content,
            }) + "\n")

Important: the index reflects whatever is in the snapshot directory. If old or broken snapshots are still on disk, they are still in the JSONL. The discipline is “delete bad snapshots, then re-index”, not “filter at index time”.

def alert(index_path: Path, state_path: Path, thresholds: list[int],
          channel: str, dry_run: bool):
    payload = json.loads(read_last_line(index_path))["content_text"]
    snapshot = json.loads(payload)

    state = json.loads(state_path.read_text()) if state_path.exists() else {}
    today = datetime.utcnow().date()

    for threshold in thresholds:
        target = today + timedelta(days=threshold)
        rows = [r for r in snapshot["rows"] if parse_date(r["event_date"]) == target]
        for row in rows:
            key = f"{row['id']}|{row['event_date']}|D-{threshold}"
            if key in state:
                continue
            send_slack(channel, format_alert(row, threshold), dry_run=dry_run)
            state[key] = today.isoformat()

    if not dry_run:
        state_path.write_text(json.dumps(state))

Key behaviours:

  • The state file is a flat dict keyed by id|date|threshold. New thresholds, new dates: the file just grows. Periodic pruning on age keeps it small.
  • --dry-run is a first-class flag, not an afterthought. The whole pipeline can be exercised end to end against the production index without sending anything.
  • Slack failures do not corrupt state. The state write happens after sends complete, so a partial failure means the next run retries and the user sees one “duplicate-looking” message instead of a missing alert.

Dagu is a single Go binary that runs as the user, with YAML files on disk for job definitions. There is no central state to back up beyond the YAML files. The full pipeline becomes two short DAGs:

# ingest-and-index.yaml
schedule: "20 */6 * * *"
steps:
  - name: ingest
    command: /opt/pipeline/scripts/run_ingest.sh
  - name: index
    command: /opt/pipeline/scripts/run_index.sh
    depends:
      - ingest
# alerts.yaml
schedule: "25 6 * * *"
steps:
  - name: alerts
    command: /opt/pipeline/scripts/run_alerts.sh

Two cron specs, two shell wrappers, done. No DAG framework upgrades, no python plugin breakage, no Kubernetes operator.

  • Logs go to a known directory. One log file per script, rotated daily. Stdout is fine for the Dagu UI; the on-disk logs are for grep on the host.
  • Error budgets are explicit. “0 errors under normal conditions, up to 50 missing entries” is a sentence in the runbook. Anything outside that is a page.
  • Dry-run from production. The alerts script can be run against the production index manually with --dry-run plus the live index path. This is also what the on-call uses to validate after a state-file recovery.
  • State file is checked into version control daily. A small commit hook copies the state file into a private git repo. If the host disappears, the worst case is one duplicate alert per active threshold.

This shape covers maybe 80 percent of small data pipelines. It stops being a good fit when:

  • Ingest is parallelisable across many workers. At that point you want a real queue.
  • Schemas are evolving fast. A flat JSON snapshot is fine when the source is stable. If new fields appear weekly, push toward a typed schema and validation at ingest time.
  • You need to support backfill at scale. Replaying every day of history through the alert script is fine; replaying it through Slack is not. Add a “no-send” mode that only writes state.

A small daily pipeline does not need much to be reliable: timestamped snapshots, an append-only index, a dedup state file, and a scheduler that does not hide. Effort goes into the boring parts: clean error handling, dry-run support, a runbook that fits on one page. The interesting parts (the actual data fetching and alert formatting) end up being the small slice of the codebase, which is exactly how it should feel.