Skip to content

Pipeline Monitoring & Observability

The scraper pipeline produces continuous data flow — listings are scraped, normalized, imported, and published individually through RabbitMQ queues. This page explains how to monitor what's happening, what happened recently, and how healthy the system is.

Monitoring Stack

┌─────────────────────────────────────────────────────────────────┐
│                    OBSERVABILITY LAYERS                         │
│                                                                 │
│  Real-time          │  Historical         │  Alerts             │
│  ─────────────────  │  ─────────────────  │  ───────────────── │
│  RabbitMQ Mgmt UI   │  Pipeline Events    │  Telegram Hourly   │
│  Admin Dashboard    │  Stats Snapshots    │  (summary report)  │
│  Queue Depths       │  Scrape Runs        │                     │
│  Worker Status      │  Activity Page      │                     │
└─────────────────────────────────────────────────────────────────┘
LayerWhat it showsWhere
RabbitMQ Management UILive queue depths, message rates, connectionsmtl-rent-rabbit.fesenko.net
Admin DashboardWorker health, current period stats, DB totalsmtl-rent-admin.fesenko.net
Pipeline ActivityPer-message event log (last 7 days)Admin → Activity tab
Scrape CyclesPer-cycle stats (found/new/updated)Admin → Scrape Cycles tab
Telegram ReportsHourly aggregate summaryTelegram bot
pm2 logsRaw application logspm2 logs mtl-scraper

Pipeline Events

Every message processed through the pipeline creates a record in scraper.pipeline_events. This gives you a chronological log of everything the system has done.

What gets logged

StageWhenWhat's recorded
scrapedListing extracted from KijijiPlatform ID, whether it's new or updated
normalizedRaw fields parsed into structured dataProcessing duration
importedPublic listing created, updated, or skippedNew: listing ID + image count. Update: {isUpdate: true, changedFields: [...]}. Skip: {skipped: true}
imagesImages downloaded for a listingDownload/fail counts
publishedPipeline complete for this listingFinal listing ID
errorProcessing failed at any stageError message

How it works

Each RabbitMQ consumer writes one row to pipeline_events after processing a message:

Scraper → scraped event → [normalize queue]

Normalizer consumer → normalized event → [import queue]

Importer consumer → imported event → [image queue] + [published queue]
                                          ↓              ↓
Image consumer → images event    Published consumer → published event

Events include:

  • Stage — which pipeline step
  • Raw listing ID — links back to the scraped data
  • Platform — which source (kijiji, kangalou, etc.)
  • Duration — milliseconds to process
  • Result — stage-specific JSON:
    • scraped: {isNew: true} or {isNew: false}
    • normalized: {}
    • imported (new): {isNew: true, listingId: "...", imageCount: 5}
    • imported (update): {isUpdate: true, changedFields: ["price", "images"], hasNewImages: true}
    • imported (skip): {skipped: true, reason: "claimed"}
    • images: {downloaded: 5, failed: 0}
    • published: {listingId: "..."}
  • Error — error message if processing failed

Retention

Events are auto-pruned after 7 days by a daily cleanup task. This keeps the table small while giving enough history for debugging.

Viewing events

  • Admin UI: Navigate to the Activity page for pipeline traces grouped by listing. Filter by computed status (complete, updated, skipped, in_progress, error). Updated traces show which fields changed. Auto-refreshes every 10 seconds.
  • SQL: SELECT * FROM scraper.pipeline_events ORDER BY created_at DESC LIMIT 50;
  • API: GET /api/pipeline/events?stage=imported&page=1&limit=50

Stats Snapshots

The pipeline tracks aggregate throughput counters in memory (the StatsCollector). Every hour, these counters are:

  1. Sent to Telegram as a formatted summary
  2. Persisted to scraper.stats_snapshots for historical trend analysis

What's in each snapshot

json
{
  "periodStart": "2026-03-04T14:00:00Z",
  "periodEnd": "2026-03-04T15:00:00Z",
  "counters": {
    "scraped": 150,
    "scrapedNew": 12,
    "scrapedUpdated": 138,
    "scrapedSkipped": 45,
    "normalized": 150,
    "normalizeErrors": 0,
    "imported": 12,
    "importUpdated": 15,
    "importSkipped": 123,
    "importErrors": 0,
    "imagesDownloaded": 60,
    "imagesFailed": 2,
    "errors": 0
  }
}
  • scrapedNew — listings seen for the first time
  • scrapedUpdated — listings that existed but their content_hash changed
  • importUpdated — existing public listings that were updated (price, description, images, etc.)
  • importSkipped — already-imported listings with no changes detected

The stats collector also tracks update categories — a per-field breakdown of what changed (e.g., {price: 5, images: 3, description: 2}). This is included in the Telegram report but not persisted to DB.

Retention

Stats snapshots are auto-pruned after 90 days.

Querying

  • API: GET /api/pipeline/stats-history?hours=24 — returns snapshots from the last 24 hours
  • SQL: SELECT * FROM scraper.stats_snapshots ORDER BY period_start DESC LIMIT 24;

RabbitMQ Queue Monitoring

The RabbitMQ Management UI shows real-time queue state. Key things to look at:

Queue Depths

Navigate to Queues in the RabbitMQ UI. Each queue shows:

  • Ready — messages waiting to be consumed
  • Unacked — messages being processed right now
  • Total — ready + unacked

In normal operation, all depths are 0 or near-0. The consumers process messages faster than the scraper produces them (a listing takes ~100ms to normalize, ~50ms to import).

If you see high queue depths:

  • normalize.kijiji > 100 — normalizer is falling behind (check for errors in logs)
  • import > 50 — importer is slow (database contention?)
  • images.pending > 20 — image downloads are slow (network issues?)
  • dlq > 0 — messages that failed all retries (investigate immediately)

Message Rates

Click on a specific queue to see publish/deliver rates. This shows real-time throughput — how many messages per second are entering and leaving the queue.

Access

Worker Health

The admin dashboard shows worker status pulled from the scraper's HTTP API (port 3003):

  • Green dot — worker is running and connected to RabbitMQ
  • Red dot — worker is offline or unreachable
  • Uptime — how long since last restart
  • Queue Depths — mirrored from RabbitMQ Management API
  • Current Period — live counters since last hourly flush

Scraper HTTP API

The scraper exposes a lightweight API on port 3003:

EndpointMethodDescription
/statusGETHealth check + queue depths + current stats
/trigger/pipelinePOSTRun full pipeline (scrape → normalize → import)
/trigger/scrapePOSTScrape only
/trigger/normalizePOSTNormalize pending raw listings
/trigger/importPOSTImport normalized listings
/trigger/download-imagesPOSTDownload pending images
/trigger/stalenessPOSTCheck for stale listings

The admin dashboard calls these endpoints through a proxy at the admin API.

Telegram Reports

Every hour, the scraper sends a summary to Telegram (if configured):

📊 Hourly Pipeline Report

⏱ Period: 1h 0m

🔍 Scraping: 150 processed (12 new, 138 updated, 45 skipped)
🔄 Normalized: 150 (0 errors)
📦 Imported: 12 new, 15 updated (123 skipped, 0 errors)
📝 Updated fields: price: 5, images: 3, description: 2, amenities: 2, contact: 1
📥 Images: 60 downloaded, 2 failed

Configure via environment variables:

  • TELEGRAM_BOT_TOKEN — bot API token
  • TELEGRAM_CHAT_ID — chat/channel ID

Database Tables

TablePurposeRetention
scraper.pipeline_eventsPer-message activity log7 days
scraper.stats_snapshotsHourly aggregate throughput90 days
scraper.scrape_runsPer-cycle execution recordsPermanent
scraper.raw_listingsAll scraped data + statusPermanent

Troubleshooting

"No pipeline events" in the Activity page

The scraper hasn't processed any messages yet. Check:

  1. Is the scraper running? pm2 status mtl-scraper
  2. Is RabbitMQ running? docker compose ps
  3. Any errors in logs? pm2 logs mtl-scraper --lines 50

Queue depths are always 0

This is normal! Consumers process messages instantly. Check "Message rates" in the RabbitMQ UI to confirm messages are flowing.

Stats show 0 for everything

The StatsCollector resets every hour. If you just restarted, wait for a scrape cycle to complete. Check GET /api/pipeline/status for current period stats.