Skip to content

F-016: Async Per-Item Pipeline with RabbitMQ

Status: ✅ Done · Priority: P1 · Branch: feature/F-016-async-pipeline · Updated: Mar 4, 2026

Summary

Migrate the scraper pipeline from BullMQ (batch processing, single queue, concurrency 1) to RabbitMQ with per-item message passing and multiple independent queues. Each scraped listing becomes an individual message flowing through the pipeline. This gives us per-listing observability, parallel processing between stages, and easy extensibility (add new pipeline steps by wiring up new queues).

Current Architecture Problems

  1. Opaque batch processing — A job processes up to 500 items. If it fails, you don't know which listing caused it. No per-listing visibility.
  2. Sequential bottleneck — Concurrency 1 means the entire pipeline blocks while one stage runs. Download can't start until scrape finishes, even for already-scraped listings.
  3. No parallelism between stages — Normalizer waits for ALL images to download before starting, even though normalization doesn't need images at all.
  4. Hard to add steps — Inserting an enrichment stage means modifying the chain logic, adding a new job type to the discriminated union, updating the processors.ts switch statement.
  5. Monitoring is job-level — Bull Board shows jobs (scrape, download, normalize, import), not individual listings flowing through the pipeline.

Requirements

  • [ ] RabbitMQ in docker-compose with Management Plugin
  • [ ] Per-item message flow: each listing = one message, each image = one message
  • [ ] Multiple independent queues with separate consumers
  • [ ] Dead letter queues for failed messages
  • [ ] DB audit log preserved (raw_listings table stays as reprocessing safety net)
  • [ ] Images download ONLY after import (no wasted downloads for rejected/duplicate listings)
  • [ ] Listings appear text-first, images attach asynchronously as they download
  • [ ] Staleness check stays as periodic batch job (no event-driven change needed)
  • [ ] Admin can trigger pipeline stages and monitor queue health
  • [ ] Migrate incrementally — BullMQ and RabbitMQ coexist during transition

Design

Queue Topology

Multi-scraper with topic exchange routing:

                         Topic Exchange (scraped)
                        /          |           \
Kijiji Scraper ───→ [scraped.kijiji]     [scraped.kangalou] ←─── Kangalou Scraper
                        ↓                      ↓
                  Kijiji Normalizer     Kangalou Normalizer
                        \                    /
                         → [listings.normalized] ←

                            (Deduplicator — future)

                            [listings.ready] (future)

                              Importer
                            /         \
              [listings.published]   [images.pending]

                                  Image Downloader

Each platform is a Scraper + Normalizer pair (Lego pieces). Scrapers publish to platform-specific queues via a topic exchange with routing keys (kijiji, kangalou, etc.). Platform-specific normalizers consume from their respective queues and all output to the shared listings.normalized queue. Everything downstream is platform-agnostic.

Messages carry a platform field throughout so downstream consumers know the source (for stats, debugging) but don't need platform-specific logic.

Key insight: Images are only downloaded for listings that survive all validation — rejected, duplicate, and existing-unchanged listings never trigger image downloads.

Queues

QueueMessage ContentProducerConsumer
scraped.{platform}Full raw listing data (all fields + image URLs array + platform field)Platform ScraperPlatform Normalizer
listings.normalizedNormalized listing (norm fields, address, coords, image URLs carried forward)Any NormalizerDeduplicator (future) or Importer
listings.readyVerified listing ready for publish (future, when dedup exists)DeduplicatorImporter
listings.publishedPublished listing ID + metadata (from day one — for notifications, stats, webhooks)ImporterNotification consumers, analytics
images.pendingImage URL + public listing ID + sort orderImporterImage Downloader

Per-Stage Behavior

Scraper (producer only — continuous mode):

  • Runs as a long-lived loop, not triggered by cron
  • Two-phase per listing (same as current Kijiji scraper):
    1. Search pages — cycles through search result pages to discover listings (Kijiji: parses __NEXT_DATA__ Apollo cache, ~25 listings/page)
    2. Detail pages — visits each listing's page for full description, ALL image URLs, contact info
  • For each listing found: checks if recently seen (by platformId + lastScrapedAt)
    • New/stale → visit detail page → upsert into raw_listings (DB audit) → publish to scraped.{platform} queue
    • Recently seen → skip (already in pipeline or up-to-date)
  • Each listing published immediately — no batching, no waiting for full page scan
  • Configurable delay between requests per source (e.g., Kijiji: 9s between page loads)
  • After cycling through all search pages, brief cooldown, then restart from page 1
  • Not a queue consumer — it's a producer that feeds the pipeline

Normalizer (consumer → producer — per-platform):

  • Each platform has its own normalizer consuming from scraped.{platform}
  • Validates required fields (title, price, bedrooms) — rejects invalid listings (nack → DLQ)
  • Normalizes price, bedrooms, bathrooms, property type, amenities
  • Carries image URLs forward in the message (untouched)
  • Updates raw_listings.status = "normalized" in DB (audit)
  • Publishes to listings.normalized

Deduplicator (future consumer → producer):

  • Consumes from listings.normalized
  • Checks if listing already exists in system (by platform+platformId, or fuzzy match for cross-platform dedup)
  • Skips duplicates: ack without forwarding (no downstream processing, no image downloads)
  • Publishes new/changed listings to listings.ready
  • Without dedup: Importer consumes directly from listings.normalized

Importer (consumer → producer):

  • Consumes from listings.normalized (or listings.ready when dedup exists)
  • Matches neighborhood by coordinates
  • Creates/updates public.listings entry (status = "active")
  • For each image URL in the message: publishes a message to images.pending with the public listing ID and sort order
  • Updates raw_listings.status = "imported" in DB (audit)
  • Publishes listing ID to listings.published

Image Downloader (consumer):

  • Consumes one image at a time from images.pending
  • Downloads to disk
  • Inserts into public.listing_images attached to the public listing (by listing ID from message)
  • Can run with concurrency > 1 (e.g., prefetch 5)
  • Rate-limited per domain (500ms between requests to same host)
  • If download fails after retries: nack → DLQ (listing still exists, just missing that image)

Staleness Checker (unchanged):

  • Stays as a periodic cron job (every 4h)
  • Queries DB for stale listings (lastScrapedAt < now - 48h)
  • Archives public listings (skips claimed ones)
  • No RabbitMQ involvement — it's a simple time-based DB query

Extensibility: Adding New Steps

Example — adding AI enrichment (amenity tagging from description):

[listings.normalized]  ──→  Enricher  ──→  [listings.enriched]  ──→  Importer

Just create a new consumer that reads from listings.normalized, adds enriched fields, publishes to listings.enriched. Change Importer to consume from listings.enriched instead. No existing code changes — just a new file and a queue binding swap.

Continuous Scraping

Scrapers run continuously instead of batch every 30 minutes. Benefits:

  • Fresher data — new listings appear within seconds, not up to 30 min delay
  • Smoother load — constant trickle vs burst of 500+ items every 30 min
  • Natural fit for RabbitMQ — continuous message flow is what message queues are designed for
  • Simpler scheduling — no cron, just a loop with sleep

Per-source configuration:

ts
interface ScraperConfig {
  platform: string;
  delayBetweenRequests: number;  // ms (e.g., 9000 for Kijiji)
  maxPagesPerCycle: number;      // search pages before looping back to page 1
  cooldownBetweenCycles: number; // ms pause after completing a full cycle
  enabled: boolean;
}

Browser lifecycle:

  • Restart Playwright browser every ~50 cycles or every 2 hours (whichever first)
  • Fresh context per scrape run (already implemented), closed after use
  • On restart: browser.close()chromium.launch() → reset counter
  • Simple and effective — avoids Chromium memory leaks without complex monitoring

Smart skipping (saves page loads):

  • When listing found on search page, check raw_listings by platformId + lastScrapedAt
  • If scraped recently (< 6h ago) → skip detail page visit, don't re-publish
  • Only visit detail pages for new or stale listings
  • Most search results are already-known → saves majority of page loads

Anti-ban strategy:

  • Per-source configurable delay (already have KIJIJI_DELAY_MS / rateLimiter.wait())
  • 9 seconds between requests by default for Kijiji (continuous 24/7 mode needs gentler pacing)
  • Rotate user agents, use residential-like headers
  • Exponential backoff on block (429/CAPTCHA): base 5s, 2x multiplier, ±20% jitter, max 5 min. Reset to normal on success
  • Playwright with stealth mode (already in place)

Process Architecture

Single Node.js worker process with multiple RabbitMQ channel consumers (one channel per consumer). For current scale (~5k listings, single machine) this is simpler to deploy (one pm2 process) with each consumer independent. Easy to split into separate processes later if a specific stage needs scaling.

Admin Triggers

Hybrid approach:

  • Trigger scrape: REST → invoke scraper directly (scraping is a command that produces messages, not a message itself)
  • Re-normalize / re-import: REST → replay from DB audit log into the appropriate queue (these are message-driven operations)

RabbitMQ vs BullMQ

FeatureBullMQ (current)RabbitMQ
Multiple independent queuesAwkward (multiple Queue/Worker instances)Native — each queue is independent
Per-message ack/nackJobs complete/fail as unitIndividual message ack with requeue
Dead letter queuesManual failed job handlingBuilt-in DLX — auto-routes failed messages
Routing/fan-outNot built-inExchanges: topic, direct, fanout
Management UIBull Board (job-focused)Built-in Management Plugin (queue depths, rates, graphs)
Consumer concurrencyPer-worker settingPer-queue prefetch, multiple consumers
Message durabilityRedis persistence (AOF/RDB)Durable queues + persistent messages + publisher confirms

Verdict: RabbitMQ is purpose-built for data pipelines with multiple stages and per-item processing. BullMQ is better suited for background task queues (send email, resize image).

Message Durability

With correct configuration, RabbitMQ messages don't get lost:

  • Durable queues survive broker restarts
  • Persistent messages (delivery mode 2) are written to disk
  • Publisher confirms ensure the broker received the message
  • Consumer acks — messages stay in queue until consumer explicitly acknowledges
  • If a consumer crashes, the message is automatically redelivered

The raw_listings DB table remains as the ultimate safety net for reprocessing (e.g., re-normalize all listings with updated logic without re-scraping).

Infrastructure

yaml
# docker-compose.yml
rabbitmq:
  image: rabbitmq:3-management
  ports:
    - "5672:5672"    # AMQP protocol
    - "15672:15672"  # Management UI
  environment:
    RABBITMQ_DEFAULT_USER: mtlrent
    RABBITMQ_DEFAULT_PASS: mtlrent
  volumes:
    - rabbitmq_data:/var/lib/rabbitmq
  • Management UI at http://localhost:15672 (expose via Cloudflare tunnel for remote access)
  • Node.js client: amqplib (most popular, stable, well-typed)
  • Future: Prometheus + Grafana for production-grade monitoring if needed

Dashboard Decision

Start with built-in RabbitMQ Management Plugin — zero setup, comes with the Docker image, excellent queue-level visibility. Consider Prometheus + Grafana for production later. RabbitScout is interesting but less mature.

Migration Strategy

BullMQ and RabbitMQ coexist during transition — migrate one stage at a time:

  1. Add RabbitMQ to docker-compose, create connection/channel helpers, define queue topology
  2. Rewrite scraper to publish per-item messages (keep DB writes too)
  3. Create normalizer consumer (replaces batch normalizer)
  4. Create importer consumer (replaces batch importer)
  5. Rewrite image downloader as per-image queue consumer
  6. Remove BullMQ, old processors.ts, old worker.ts
  7. Wire up admin triggers (publish messages instead of BullMQ jobs)
  8. Expose RabbitMQ Management UI via Cloudflare tunnel

Discussion Notes

Mar 4, 2026

  • Discussed RabbitMQ vs keeping BullMQ — decided RabbitMQ fits the per-item multi-queue pattern better
  • Decided to keep raw_listings DB table as audit log + reprocessing safety net alongside RabbitMQ messages
  • Staleness: keep as periodic batch check (time-based, doesn't benefit from event-driven)
  • Dashboard: start with built-in Management Plugin, Prometheus+Grafana later if needed
  • This is a discussion-first feature — refine design before building

Mar 4, 2026 (revised topology)

  • Revised: images download ONLY after import, not in parallel with normalization
  • Scraper publishes full listing data (including image URLs) as single message — no separate image messages from scraper
  • Image download messages are produced by Importer AFTER creating the public listing
  • This avoids wasted downloads for rejected/duplicate/existing-unchanged listings
  • Added Deduplicator slot between Normalizer and Importer for future cross-platform dedup (F-015)
  • Without dedup: Importer consumes directly from listings.normalized
  • With dedup: Deduplicator sits between, consuming listings.normalized → publishing listings.ready

Mar 4, 2026 (resolved decisions + continuous scraping)

  • Resolved all 5 original open questions:
    1. Trigger → continuous scraping loop (not cron). Per-source delay (9s for Kijiji), cooldown between cycles
    2. listings.published queue → yes, from day one. Cheap to add, ready for Telegram notifications, import stats, future webhooks
    3. Process architecture → single Node.js worker with multiple RabbitMQ channel consumers. Scale: ~5k listings now, ~10k future. Split later if needed
    4. Admin triggers → hybrid: REST invokes scraper directly for scrape triggers; REST replays DB audit log into queues for re-normalize/re-import
    5. Multi-scraper → per-platform Scraper+Normalizer pairs via RabbitMQ topic exchange. Each platform publishes to scraped.{platform}. All normalizers output to shared listings.normalized. Everything downstream is universal
  • Continuous scraping model: scrapers run as long-lived loops instead of batch every 30 min. Check if listing recently seen before visiting detail page. Each listing published to queue immediately — no batching. Smoother load, fresher data, natural fit for message queues
  • Anti-ban: per-source delay config, exponential backoff on rate limiting, Playwright stealth (already in place)
  • Kijiji two-phase confirmed: search pages (Apollo __NEXT_DATA__) for discovery + individual detail pages for full data (description, all images, phone)

Mar 4, 2026 (browser lifecycle, search/detail coupling, backoff)

  • Browser lifecycle → restart every ~50 cycles or every 2 hours (whichever first). Current BrowserPool singleton already creates fresh contexts per scrape and closes them. For continuous mode, add a cycle counter — after N cycles, browser.close() + chromium.launch() + reset counter. Simple, avoids Chromium memory leaks without complex monitoring
  • Search + detail pages → keep together in same scraper loop (not decoupled into separate queues). Detail page enrichment is always needed (full description, all images, phone). Decoupling adds an extra queue + consumer for minimal benefit. Improvement: smart skipping — when a listing is found on search page, check raw_listings by platformId; if scraped recently (< 6h), skip detail page visit entirely. Saves most page loads since most search results are already-known listings
  • Rate limit backoff → exponential with jitter, max 5 min. On block (429/CAPTCHA/connection error): base 5s, 2x multiplier, ±20% jitter, cap at 5 minutes. Reset to normal 9s delay on first successful request. Example: fail → 5s → 10s → 20s → 40s → ... → 5min cap → success → back to 9s

Open Questions

All questions resolved — design is ready for implementation.

Implementation Notes

Not yet started.