Skip to content

Scraper Pipeline

The scraper service (services/scraper/) ingests rental listings from external platforms into the public database. It uses a RabbitMQ-based pipeline with per-item message passing, where each stage consumes from one queue and publishes to the next.

Architecture

SCRAPER PIPELINE — DATA FLOW (RabbitMQ)
════════════════════════════════════════

                    ┌──────────────────────────────────────────────────────┐
                    │              SCRAPER SERVICE                          │
                    │         (services/scraper/ — port 3003)               │
                    │                                                       │
   Kijiji.ca ──────┤  ① SCRAPE (continuous)                               │
                    │  │  Playwright fetches search pages continuously      │
                    │  │  Extracts listing data + image URLs                │
                    │  │  Publishes each item individually to normalize q   │
                    │  │                                                    │
                    │  ├──→ scraper.scrape_runs    (run metadata)           │
                    │  ├──→ scraper.raw_listings   (status: raw)            │
                    │  ├──→ scraper.raw_images     (URLs, pending)          │
                    │  └──→ [normalize queue]      (per-item message)       │
                    │                                                       │
                    │  ② NORMALIZE (queue consumer)                        │
                    │  │  Consumes from normalize queue                     │
                    │  │  Parses raw text → standard fields                 │
                    │  │  "3½" → 1 bed, "$1,450" → 145000 cents            │
                    │  │                                                    │
                    │  ├──→ scraper.raw_listings  (status: normalized)      │
                    │  └──→ [import queue]         (per-item message)       │
                    │                                                       │
                    │  ③ IMPORT (queue consumer)                           │
                    │  │  Consumes from import queue                        │
                    │  │  Creates OR UPDATES public listings                │
                    │  │  COPIES images: data/images/ → api/uploads/        │
                    │  │  Skips claimed listings (landlord edits protected) │
                    │  │                                                    │
                    │  ├──→ public.listings        (create or update)        │
                    │  ├──→ public.listing_images  (/uploads/uuid.jpg)      │
                    │  └──→ [image-download queue] (per-item message)       │
                    │                                                       │
                    │  ④ DOWNLOAD IMAGES (queue consumer)                  │
                    │  │  Consumes from image-download queue                │
                    │  │  Fetches image URLs, rate-limited per domain       │
                    │  │                                                    │
                    │  ├──→ data/images/kijiji/{id}/NNN.jpg                 │
                    │  ├──→ scraper.raw_images  (status: downloaded)        │
                    │  └──→ [published queue]   (per-item message)          │
                    │                                                       │
                    │  ⑤ PUBLISHED (queue consumer)                        │
                    │  │  Final stage — listing fully processed              │
                    │  │  Updates status, triggers any post-processing       │
                    │  │                                                    │
                    │  └──→ scraper.raw_listings  (status: published)       │
                    │                                                       │
                    │  ⑥ STALENESS CHECK (periodic)                        │
                    │  │  Finds imported listings not seen in 72h            │
                    │  │  Verifies listing actually removed from platform   │
                    │  │  Archives only confirmed-removed listings           │
                    │  │                                                    │
                    │  └──→ public.listings  (status → "archived")          │
                    │                                                       │
                    │  ⑦ RE-SCRAPE BATCH (periodic)                        │
                    │  │  Every 4th scrape cycle (~20min)                   │
                    │  │  Visits 10 existing listings by platformUrl        │
                    │  │  Feeds updates through normal pipeline             │
                    │  │                                                    │
                    │  └──→ [normalize queue] (re-scraped data)             │
                    └──────────────────────────────────────────────────────┘

Admin triggers ──→ HTTP API (:3003) ──→ Scraper service
Monitoring    ──→ RabbitMQ Management UI (:15672)
Stats         ──→ Hourly Telegram summary

Queue Topology

scrape (continuous) → [normalize] → [import] → [image-download] → [published]

re-scrape batch (every 4th cycle) ──────┘

Each queue holds per-item messages. Consumers process items individually and publish to the next queue on success. Failed messages are retried or dead-lettered.

Stages

1. Scrape (Continuous)

Playwright navigates Kijiji search results continuously, parses __NEXT_DATA__ Apollo state, and extracts RealEstateListing:* entries. Each listing is published individually as a message to the normalize queue.

  • Mode: Continuous (not scheduled) — scraper loops through pages automatically
  • Coverage: Grand Montreal (grand-montreal/c37l80002) — ~6,000 listings
  • CLI: pnpm --filter @mtl-rent/scraper scrape:kijiji
  • Output: scraper.scrape_runs, scraper.raw_listings (raw), scraper.raw_images (pending), messages on normalize queue

2. Normalize (Queue Consumer)

Consumes per-item messages from the normalize queue. Parses raw text fields into structured data: apartment types ("3 1/2" → bedrooms), prices to cents, addresses, amenities. Runs reverse geocoding for vague Kijiji addresses.

  • Input: Normalize queue (per-item messages)
  • Geocoding: Google Geocoding API for listings with hidden addresses
  • Output: Updates scraper.raw_listings to normalized with norm_* fields, publishes to import queue

3. Import (Queue Consumer)

Consumes per-item messages from the import queue. Handles both new imports and updates to existing listings.

New listings: Creates public.listings and public.listing_images from normalized data. Copies images from the scraper's data/images/ directory to services/api/uploads/ with new UUID filenames.

Existing listings (update detection): When a listing has already been imported (importedListingId set), the importer compares all fields (price, title, description, bedrooms, bathrooms, area, amenities, contact, availability) and builds a diff. Only changed fields are updated. Image changes are detected by comparing raw_images count vs listing_images count.

  • Input: Import queue (per-item messages)
  • Output: Public listings (created or updated) + copied images, publishes to image-download queue
  • Claimed listings: Listings with claimed_by set are skipped — landlord edits are never overwritten
  • Archived listings: Re-scraped archived listings are reactivated (status: archived → active)
  • Update result: {isUpdate: true, changedFields: ["price", "images", ...], hasNewImages: true/false}

4. Download Images (Queue Consumer)

Consumes per-item messages from the image-download queue. Fetches image URLs from scraper.raw_images. Downloads are rate-limited per domain and saved to disk.

  • Input: Image-download queue (per-item messages)
  • Storage: services/scraper/data/images/{platform}/{platformId}/NNN.jpg
  • Output: Updates scraper.raw_images to downloaded with local_path, publishes to published queue

5. Published (Queue Consumer)

Final stage. Marks the listing as fully processed through the pipeline.

  • Input: Published queue (per-item messages)
  • Output: Updates scraper.raw_listings status to published

6. Staleness Check

Detects listings that have disappeared from source platforms. If a listing hasn't been seen in any scrape for 72 hours (configurable via STALE_THRESHOLD_HOURS), the staleness checker verifies the listing is actually gone before archiving.

Verification process: verifyListingRemoved(platformUrl) performs a lightweight fetch() (no Playwright) and checks for:

  • HTTP 404/410 status
  • Redirect to search results page
  • "No longer available" markers in the response

If the listing is still live on the platform, lastScrapedAt is refreshed (preventing re-checking for another 72h). If verified gone, the public listing is archived. On verification errors, the listing is not archived (fail-safe).

  • Trigger: Periodic (every 4 hours)
  • Threshold: 72 hours
  • Rate limit: 2.5s delay between verifications
  • Output: Archives confirmed-removed listings, refreshes still-live ones

7. Re-scrape Batch

Expands scraper coverage beyond search pages. Kijiji search results only cover pages 1-25 (~500-1000 listings), but Montreal has 6,000+ active listings. The re-scrape batch periodically revisits existing imported listings to detect changes and prevent false staleness.

Every 4th continuous scrape cycle (~20 minutes), 10 existing listings are re-visited:

  1. Queries raw_listings where lastScrapedAt > 24h, oldest first
  2. Visits each listing's platformUrl with Playwright, extracts Apollo state
  3. Upserts raw data (replaces raw_images with fresh set on update)
  4. Publishes through the normal pipeline (normalize → import)
  5. The importer's update logic handles field comparison and selective updates
  • Trigger: Every 4th scrape cycle
  • Batch size: 10 listings per cycle
  • Browser: Reuses the continuous scraper's existing browser context
  • Output: Re-scraped data flows through normalize → import pipeline

Notifications

Hourly Telegram stats summary (configurable via TELEGRAM_BOT_TOKEN + TELEGRAM_CHAT_ID) replaces per-run notifications. Includes counts of scraped, normalized, imported (new + updated), and published items in the past hour. For updates, shows a field breakdown (e.g., "price: 5, images: 3, description: 2").

Listing Lifecycle

NEW LISTING:
  Scraped      → raw_listings(raw) + raw_images(pending) → [normalize queue]
  Normalized   → raw_listings(normalized) → [import queue]
  Imported     → public.listings + listing_images → [image-download queue]
  Downloaded   → raw_images(downloaded) + files on disk → [published queue]
  Published    → raw_listings(published) — fully processed

UPDATE (re-scraped listing):
  Re-scraped   → raw_listings updated + raw_images replaced → [normalize queue]
  Normalized   → raw_listings(normalized) → [import queue]
  Imported     → public.listings UPDATED (changed fields only) → [image-download if new images]
  Downloaded   → old listing_images cleared + fresh images → [published queue]

STALENESS:
  Not seen 72h → verify listing removed from platform → archive if confirmed gone
  Still live   → refresh lastScrapedAt (recheck in 72h)

Key Design Decisions

  • Per-item message passing — each listing flows through the pipeline individually via RabbitMQ queues, enabling real-time processing instead of batch scheduling.
  • Continuous scraping — the scraper runs in a loop rather than on a cron schedule, providing fresher data.
  • Updates over rewrites — the importer compares fields and only updates what changed, preserving data integrity. Returns a diff for logging.
  • Claimed listings protected — listings claimed by landlords are never overwritten by the scraper. Landlord edits take priority.
  • Verify before archive — staleness checker confirms a listing is actually removed from the platform (via HTTP fetch) before archiving. Fail-safe: errors don't archive.
  • Re-scrape for coverage — search results only cover ~1000 of 6000+ listings. Re-scrape batch ensures older listings stay fresh.
  • Image change detection via count diff — comparing raw_images count vs listing_images count is simpler and more reliable than URL comparison. On change: delete old → re-download all.
  • Images copied, not moved — originals stay in scraper storage for reprocessing. This means ~2x disk usage.
  • Failed images don't block import — a listing gets imported even if some images failed to download.
  • Isolated schema — all scraper data lives in scraper.* PostgreSQL schema, separate from public.*.
  • Admin triggers via HTTP — the admin dashboard calls the scraper's HTTP API (port 3003) instead of directly adding queue jobs.

Database Tables (scraper schema)

TablePurposeRetention
scraper.scrape_runsRun metadata (platform, status, timing, counts)Permanent
scraper.raw_listingsRaw scraped data + normalized fieldsPermanent
scraper.raw_imagesImage URLs, download status, local pathsPermanent
scraper.pipeline_eventsPer-message activity log (scraped/normalized/imported/etc.)7 days
scraper.stats_snapshotsHourly aggregate throughput counters90 days
scraper.scraper_statePersistent key-value store (cursor state)Permanent
scraper.dedupe_clustersCross-platform deduplication (future)Permanent

Admin Dashboard

Monitor the pipeline at mtl-rent-admin.fesenko.net (Basic Auth). Features:

  • Dashboard — Worker health, queue depths, current period stats, DB status totals
  • Activity — Pipeline traces grouped by listing with status filter (complete, updated, skipped, in_progress, error). Shows changed fields for updates.
  • Scrape Cycles — Per-cycle execution history with filtering
  • Raw Listings — Browse and manage scraped data
  • Pipeline triggers — One-click scrape, normalize, import, etc.

Queue monitoring is available via RabbitMQ Management UI at mtl-rent-rabbit.fesenko.net.

For detailed monitoring documentation, see Pipeline Monitoring & Observability.