Appearance
Scraper Pipeline
The scraper service (services/scraper/) ingests rental listings from external platforms into the public database. It uses a RabbitMQ-based pipeline with per-item message passing, where each stage consumes from one queue and publishes to the next.
Architecture
SCRAPER PIPELINE — DATA FLOW (RabbitMQ)
════════════════════════════════════════
┌──────────────────────────────────────────────────────┐
│ SCRAPER SERVICE │
│ (services/scraper/ — port 3003) │
│ │
Kijiji.ca ──────┤ ① SCRAPE (continuous) │
│ │ Playwright fetches search pages continuously │
│ │ Extracts listing data + image URLs │
│ │ Publishes each item individually to normalize q │
│ │ │
│ ├──→ scraper.scrape_runs (run metadata) │
│ ├──→ scraper.raw_listings (status: raw) │
│ ├──→ scraper.raw_images (URLs, pending) │
│ └──→ [normalize queue] (per-item message) │
│ │
│ ② NORMALIZE (queue consumer) │
│ │ Consumes from normalize queue │
│ │ Parses raw text → standard fields │
│ │ "3½" → 1 bed, "$1,450" → 145000 cents │
│ │ │
│ ├──→ scraper.raw_listings (status: normalized) │
│ └──→ [import queue] (per-item message) │
│ │
│ ③ IMPORT (queue consumer) │
│ │ Consumes from import queue │
│ │ Creates OR UPDATES public listings │
│ │ COPIES images: data/images/ → api/uploads/ │
│ │ Skips claimed listings (landlord edits protected) │
│ │ │
│ ├──→ public.listings (create or update) │
│ ├──→ public.listing_images (/uploads/uuid.jpg) │
│ └──→ [image-download queue] (per-item message) │
│ │
│ ④ DOWNLOAD IMAGES (queue consumer) │
│ │ Consumes from image-download queue │
│ │ Fetches image URLs, rate-limited per domain │
│ │ │
│ ├──→ data/images/kijiji/{id}/NNN.jpg │
│ ├──→ scraper.raw_images (status: downloaded) │
│ └──→ [published queue] (per-item message) │
│ │
│ ⑤ PUBLISHED (queue consumer) │
│ │ Final stage — listing fully processed │
│ │ Updates status, triggers any post-processing │
│ │ │
│ └──→ scraper.raw_listings (status: published) │
│ │
│ ⑥ STALENESS CHECK (periodic) │
│ │ Finds imported listings not seen in 72h │
│ │ Verifies listing actually removed from platform │
│ │ Archives only confirmed-removed listings │
│ │ │
│ └──→ public.listings (status → "archived") │
│ │
│ ⑦ RE-SCRAPE BATCH (periodic) │
│ │ Every 4th scrape cycle (~20min) │
│ │ Visits 10 existing listings by platformUrl │
│ │ Feeds updates through normal pipeline │
│ │ │
│ └──→ [normalize queue] (re-scraped data) │
└──────────────────────────────────────────────────────┘
Admin triggers ──→ HTTP API (:3003) ──→ Scraper service
Monitoring ──→ RabbitMQ Management UI (:15672)
Stats ──→ Hourly Telegram summaryQueue Topology
scrape (continuous) → [normalize] → [import] → [image-download] → [published]
↑
re-scrape batch (every 4th cycle) ──────┘Each queue holds per-item messages. Consumers process items individually and publish to the next queue on success. Failed messages are retried or dead-lettered.
Stages
1. Scrape (Continuous)
Playwright navigates Kijiji search results continuously, parses __NEXT_DATA__ Apollo state, and extracts RealEstateListing:* entries. Each listing is published individually as a message to the normalize queue.
- Mode: Continuous (not scheduled) — scraper loops through pages automatically
- Coverage: Grand Montreal (
grand-montreal/c37l80002) — ~6,000 listings - CLI:
pnpm --filter @mtl-rent/scraper scrape:kijiji - Output:
scraper.scrape_runs,scraper.raw_listings(raw),scraper.raw_images(pending), messages on normalize queue
2. Normalize (Queue Consumer)
Consumes per-item messages from the normalize queue. Parses raw text fields into structured data: apartment types ("3 1/2" → bedrooms), prices to cents, addresses, amenities. Runs reverse geocoding for vague Kijiji addresses.
- Input: Normalize queue (per-item messages)
- Geocoding: Google Geocoding API for listings with hidden addresses
- Output: Updates
scraper.raw_listingstonormalizedwithnorm_*fields, publishes to import queue
3. Import (Queue Consumer)
Consumes per-item messages from the import queue. Handles both new imports and updates to existing listings.
New listings: Creates public.listings and public.listing_images from normalized data. Copies images from the scraper's data/images/ directory to services/api/uploads/ with new UUID filenames.
Existing listings (update detection): When a listing has already been imported (importedListingId set), the importer compares all fields (price, title, description, bedrooms, bathrooms, area, amenities, contact, availability) and builds a diff. Only changed fields are updated. Image changes are detected by comparing raw_images count vs listing_images count.
- Input: Import queue (per-item messages)
- Output: Public listings (created or updated) + copied images, publishes to image-download queue
- Claimed listings: Listings with
claimed_byset are skipped — landlord edits are never overwritten - Archived listings: Re-scraped archived listings are reactivated (status: archived → active)
- Update result:
{isUpdate: true, changedFields: ["price", "images", ...], hasNewImages: true/false}
4. Download Images (Queue Consumer)
Consumes per-item messages from the image-download queue. Fetches image URLs from scraper.raw_images. Downloads are rate-limited per domain and saved to disk.
- Input: Image-download queue (per-item messages)
- Storage:
services/scraper/data/images/{platform}/{platformId}/NNN.jpg - Output: Updates
scraper.raw_imagestodownloadedwithlocal_path, publishes to published queue
5. Published (Queue Consumer)
Final stage. Marks the listing as fully processed through the pipeline.
- Input: Published queue (per-item messages)
- Output: Updates
scraper.raw_listingsstatus topublished
6. Staleness Check
Detects listings that have disappeared from source platforms. If a listing hasn't been seen in any scrape for 72 hours (configurable via STALE_THRESHOLD_HOURS), the staleness checker verifies the listing is actually gone before archiving.
Verification process: verifyListingRemoved(platformUrl) performs a lightweight fetch() (no Playwright) and checks for:
- HTTP 404/410 status
- Redirect to search results page
- "No longer available" markers in the response
If the listing is still live on the platform, lastScrapedAt is refreshed (preventing re-checking for another 72h). If verified gone, the public listing is archived. On verification errors, the listing is not archived (fail-safe).
- Trigger: Periodic (every 4 hours)
- Threshold: 72 hours
- Rate limit: 2.5s delay between verifications
- Output: Archives confirmed-removed listings, refreshes still-live ones
7. Re-scrape Batch
Expands scraper coverage beyond search pages. Kijiji search results only cover pages 1-25 (~500-1000 listings), but Montreal has 6,000+ active listings. The re-scrape batch periodically revisits existing imported listings to detect changes and prevent false staleness.
Every 4th continuous scrape cycle (~20 minutes), 10 existing listings are re-visited:
- Queries
raw_listingswherelastScrapedAt > 24h, oldest first - Visits each listing's
platformUrlwith Playwright, extracts Apollo state - Upserts raw data (replaces
raw_imageswith fresh set on update) - Publishes through the normal pipeline (normalize → import)
- The importer's update logic handles field comparison and selective updates
- Trigger: Every 4th scrape cycle
- Batch size: 10 listings per cycle
- Browser: Reuses the continuous scraper's existing browser context
- Output: Re-scraped data flows through normalize → import pipeline
Notifications
Hourly Telegram stats summary (configurable via TELEGRAM_BOT_TOKEN + TELEGRAM_CHAT_ID) replaces per-run notifications. Includes counts of scraped, normalized, imported (new + updated), and published items in the past hour. For updates, shows a field breakdown (e.g., "price: 5, images: 3, description: 2").
Listing Lifecycle
NEW LISTING:
Scraped → raw_listings(raw) + raw_images(pending) → [normalize queue]
Normalized → raw_listings(normalized) → [import queue]
Imported → public.listings + listing_images → [image-download queue]
Downloaded → raw_images(downloaded) + files on disk → [published queue]
Published → raw_listings(published) — fully processed
UPDATE (re-scraped listing):
Re-scraped → raw_listings updated + raw_images replaced → [normalize queue]
Normalized → raw_listings(normalized) → [import queue]
Imported → public.listings UPDATED (changed fields only) → [image-download if new images]
Downloaded → old listing_images cleared + fresh images → [published queue]
STALENESS:
Not seen 72h → verify listing removed from platform → archive if confirmed gone
Still live → refresh lastScrapedAt (recheck in 72h)Key Design Decisions
- Per-item message passing — each listing flows through the pipeline individually via RabbitMQ queues, enabling real-time processing instead of batch scheduling.
- Continuous scraping — the scraper runs in a loop rather than on a cron schedule, providing fresher data.
- Updates over rewrites — the importer compares fields and only updates what changed, preserving data integrity. Returns a diff for logging.
- Claimed listings protected — listings claimed by landlords are never overwritten by the scraper. Landlord edits take priority.
- Verify before archive — staleness checker confirms a listing is actually removed from the platform (via HTTP fetch) before archiving. Fail-safe: errors don't archive.
- Re-scrape for coverage — search results only cover ~1000 of 6000+ listings. Re-scrape batch ensures older listings stay fresh.
- Image change detection via count diff — comparing raw_images count vs listing_images count is simpler and more reliable than URL comparison. On change: delete old → re-download all.
- Images copied, not moved — originals stay in scraper storage for reprocessing. This means ~2x disk usage.
- Failed images don't block import — a listing gets imported even if some images failed to download.
- Isolated schema — all scraper data lives in
scraper.*PostgreSQL schema, separate frompublic.*. - Admin triggers via HTTP — the admin dashboard calls the scraper's HTTP API (port 3003) instead of directly adding queue jobs.
Database Tables (scraper schema)
| Table | Purpose | Retention |
|---|---|---|
scraper.scrape_runs | Run metadata (platform, status, timing, counts) | Permanent |
scraper.raw_listings | Raw scraped data + normalized fields | Permanent |
scraper.raw_images | Image URLs, download status, local paths | Permanent |
scraper.pipeline_events | Per-message activity log (scraped/normalized/imported/etc.) | 7 days |
scraper.stats_snapshots | Hourly aggregate throughput counters | 90 days |
scraper.scraper_state | Persistent key-value store (cursor state) | Permanent |
scraper.dedupe_clusters | Cross-platform deduplication (future) | Permanent |
Admin Dashboard
Monitor the pipeline at mtl-rent-admin.fesenko.net (Basic Auth). Features:
- Dashboard — Worker health, queue depths, current period stats, DB status totals
- Activity — Pipeline traces grouped by listing with status filter (complete, updated, skipped, in_progress, error). Shows changed fields for updates.
- Scrape Cycles — Per-cycle execution history with filtering
- Raw Listings — Browse and manage scraped data
- Pipeline triggers — One-click scrape, normalize, import, etc.
Queue monitoring is available via RabbitMQ Management UI at mtl-rent-rabbit.fesenko.net.
For detailed monitoring documentation, see Pipeline Monitoring & Observability.