Appearance
Pipeline Monitoring & Observability
The scraper pipeline produces continuous data flow — listings are scraped, normalized, imported, and published individually through RabbitMQ queues. This page explains how to monitor what's happening, what happened recently, and how healthy the system is.
Monitoring Stack
┌─────────────────────────────────────────────────────────────────┐
│ OBSERVABILITY LAYERS │
│ │
│ Real-time │ Historical │ Alerts │
│ ───────────────── │ ───────────────── │ ───────────────── │
│ RabbitMQ Mgmt UI │ Pipeline Events │ Telegram Hourly │
│ Admin Dashboard │ Stats Snapshots │ (summary report) │
│ Queue Depths │ Scrape Runs │ │
│ Worker Status │ Activity Page │ │
└─────────────────────────────────────────────────────────────────┘| Layer | What it shows | Where |
|---|---|---|
| RabbitMQ Management UI | Live queue depths, message rates, connections | mtl-rent-rabbit.fesenko.net |
| Admin Dashboard | Worker health, current period stats, DB totals | mtl-rent-admin.fesenko.net |
| Pipeline Activity | Per-message event log (last 7 days) | Admin → Activity tab |
| Scrape Cycles | Per-cycle stats (found/new/updated) | Admin → Scrape Cycles tab |
| Telegram Reports | Hourly aggregate summary | Telegram bot |
| pm2 logs | Raw application logs | pm2 logs mtl-scraper |
Pipeline Events
Every message processed through the pipeline creates a record in scraper.pipeline_events. This gives you a chronological log of everything the system has done.
What gets logged
| Stage | When | What's recorded |
|---|---|---|
scraped | Listing extracted from Kijiji | Platform ID, whether it's new or updated |
normalized | Raw fields parsed into structured data | Processing duration |
imported | Public listing created, updated, or skipped | New: listing ID + image count. Update: {isUpdate: true, changedFields: [...]}. Skip: {skipped: true} |
images | Images downloaded for a listing | Download/fail counts |
published | Pipeline complete for this listing | Final listing ID |
error | Processing failed at any stage | Error message |
How it works
Each RabbitMQ consumer writes one row to pipeline_events after processing a message:
Scraper → scraped event → [normalize queue]
↓
Normalizer consumer → normalized event → [import queue]
↓
Importer consumer → imported event → [image queue] + [published queue]
↓ ↓
Image consumer → images event Published consumer → published eventEvents include:
- Stage — which pipeline step
- Raw listing ID — links back to the scraped data
- Platform — which source (kijiji, kangalou, etc.)
- Duration — milliseconds to process
- Result — stage-specific JSON:
scraped:{isNew: true}or{isNew: false}normalized:{}imported(new):{isNew: true, listingId: "...", imageCount: 5}imported(update):{isUpdate: true, changedFields: ["price", "images"], hasNewImages: true}imported(skip):{skipped: true, reason: "claimed"}images:{downloaded: 5, failed: 0}published:{listingId: "..."}
- Error — error message if processing failed
Retention
Events are auto-pruned after 7 days by a daily cleanup task. This keeps the table small while giving enough history for debugging.
Viewing events
- Admin UI: Navigate to the Activity page for pipeline traces grouped by listing. Filter by computed status (complete, updated, skipped, in_progress, error). Updated traces show which fields changed. Auto-refreshes every 10 seconds.
- SQL:
SELECT * FROM scraper.pipeline_events ORDER BY created_at DESC LIMIT 50; - API:
GET /api/pipeline/events?stage=imported&page=1&limit=50
Stats Snapshots
The pipeline tracks aggregate throughput counters in memory (the StatsCollector). Every hour, these counters are:
- Sent to Telegram as a formatted summary
- Persisted to
scraper.stats_snapshotsfor historical trend analysis
What's in each snapshot
json
{
"periodStart": "2026-03-04T14:00:00Z",
"periodEnd": "2026-03-04T15:00:00Z",
"counters": {
"scraped": 150,
"scrapedNew": 12,
"scrapedUpdated": 138,
"scrapedSkipped": 45,
"normalized": 150,
"normalizeErrors": 0,
"imported": 12,
"importUpdated": 15,
"importSkipped": 123,
"importErrors": 0,
"imagesDownloaded": 60,
"imagesFailed": 2,
"errors": 0
}
}scrapedNew— listings seen for the first timescrapedUpdated— listings that existed but their content_hash changedimportUpdated— existing public listings that were updated (price, description, images, etc.)importSkipped— already-imported listings with no changes detected
The stats collector also tracks update categories — a per-field breakdown of what changed (e.g., {price: 5, images: 3, description: 2}). This is included in the Telegram report but not persisted to DB.
Retention
Stats snapshots are auto-pruned after 90 days.
Querying
- API:
GET /api/pipeline/stats-history?hours=24— returns snapshots from the last 24 hours - SQL:
SELECT * FROM scraper.stats_snapshots ORDER BY period_start DESC LIMIT 24;
RabbitMQ Queue Monitoring
The RabbitMQ Management UI shows real-time queue state. Key things to look at:
Queue Depths
Navigate to Queues in the RabbitMQ UI. Each queue shows:
- Ready — messages waiting to be consumed
- Unacked — messages being processed right now
- Total — ready + unacked
In normal operation, all depths are 0 or near-0. The consumers process messages faster than the scraper produces them (a listing takes ~100ms to normalize, ~50ms to import).
If you see high queue depths:
- normalize.kijiji > 100 — normalizer is falling behind (check for errors in logs)
- import > 50 — importer is slow (database contention?)
- images.pending > 20 — image downloads are slow (network issues?)
- dlq > 0 — messages that failed all retries (investigate immediately)
Message Rates
Click on a specific queue to see publish/deliver rates. This shows real-time throughput — how many messages per second are entering and leaving the queue.
Access
- URL: mtl-rent-rabbit.fesenko.net
- Credentials: Same as admin (mtl/mtl) — RabbitMQ's own login form
Worker Health
The admin dashboard shows worker status pulled from the scraper's HTTP API (port 3003):
- Green dot — worker is running and connected to RabbitMQ
- Red dot — worker is offline or unreachable
- Uptime — how long since last restart
- Queue Depths — mirrored from RabbitMQ Management API
- Current Period — live counters since last hourly flush
Scraper HTTP API
The scraper exposes a lightweight API on port 3003:
| Endpoint | Method | Description |
|---|---|---|
/status | GET | Health check + queue depths + current stats |
/trigger/pipeline | POST | Run full pipeline (scrape → normalize → import) |
/trigger/scrape | POST | Scrape only |
/trigger/normalize | POST | Normalize pending raw listings |
/trigger/import | POST | Import normalized listings |
/trigger/download-images | POST | Download pending images |
/trigger/staleness | POST | Check for stale listings |
The admin dashboard calls these endpoints through a proxy at the admin API.
Telegram Reports
Every hour, the scraper sends a summary to Telegram (if configured):
📊 Hourly Pipeline Report
⏱ Period: 1h 0m
🔍 Scraping: 150 processed (12 new, 138 updated, 45 skipped)
🔄 Normalized: 150 (0 errors)
📦 Imported: 12 new, 15 updated (123 skipped, 0 errors)
📝 Updated fields: price: 5, images: 3, description: 2, amenities: 2, contact: 1
📥 Images: 60 downloaded, 2 failedConfigure via environment variables:
TELEGRAM_BOT_TOKEN— bot API tokenTELEGRAM_CHAT_ID— chat/channel ID
Database Tables
| Table | Purpose | Retention |
|---|---|---|
scraper.pipeline_events | Per-message activity log | 7 days |
scraper.stats_snapshots | Hourly aggregate throughput | 90 days |
scraper.scrape_runs | Per-cycle execution records | Permanent |
scraper.raw_listings | All scraped data + status | Permanent |
Troubleshooting
"No pipeline events" in the Activity page
The scraper hasn't processed any messages yet. Check:
- Is the scraper running?
pm2 status mtl-scraper - Is RabbitMQ running?
docker compose ps - Any errors in logs?
pm2 logs mtl-scraper --lines 50
Queue depths are always 0
This is normal! Consumers process messages instantly. Check "Message rates" in the RabbitMQ UI to confirm messages are flowing.
Stats show 0 for everything
The StatsCollector resets every hour. If you just restarted, wait for a scrape cycle to complete. Check GET /api/pipeline/status for current period stats.