Appearance
F-016: Async Per-Item Pipeline with RabbitMQ
Status: ✅ Done · Priority: P1 · Branch:
feature/F-016-async-pipeline· Updated: Mar 4, 2026
Summary
Migrate the scraper pipeline from BullMQ (batch processing, single queue, concurrency 1) to RabbitMQ with per-item message passing and multiple independent queues. Each scraped listing becomes an individual message flowing through the pipeline. This gives us per-listing observability, parallel processing between stages, and easy extensibility (add new pipeline steps by wiring up new queues).
Current Architecture Problems
- Opaque batch processing — A job processes up to 500 items. If it fails, you don't know which listing caused it. No per-listing visibility.
- Sequential bottleneck — Concurrency 1 means the entire pipeline blocks while one stage runs. Download can't start until scrape finishes, even for already-scraped listings.
- No parallelism between stages — Normalizer waits for ALL images to download before starting, even though normalization doesn't need images at all.
- Hard to add steps — Inserting an enrichment stage means modifying the chain logic, adding a new job type to the discriminated union, updating the processors.ts switch statement.
- Monitoring is job-level — Bull Board shows jobs (scrape, download, normalize, import), not individual listings flowing through the pipeline.
Requirements
- [ ] RabbitMQ in docker-compose with Management Plugin
- [ ] Per-item message flow: each listing = one message, each image = one message
- [ ] Multiple independent queues with separate consumers
- [ ] Dead letter queues for failed messages
- [ ] DB audit log preserved (
raw_listingstable stays as reprocessing safety net) - [ ] Images download ONLY after import (no wasted downloads for rejected/duplicate listings)
- [ ] Listings appear text-first, images attach asynchronously as they download
- [ ] Staleness check stays as periodic batch job (no event-driven change needed)
- [ ] Admin can trigger pipeline stages and monitor queue health
- [ ] Migrate incrementally — BullMQ and RabbitMQ coexist during transition
Design
Queue Topology
Multi-scraper with topic exchange routing:
Topic Exchange (scraped)
/ | \
Kijiji Scraper ───→ [scraped.kijiji] [scraped.kangalou] ←─── Kangalou Scraper
↓ ↓
Kijiji Normalizer Kangalou Normalizer
\ /
→ [listings.normalized] ←
↓
(Deduplicator — future)
↓
[listings.ready] (future)
↓
Importer
/ \
[listings.published] [images.pending]
↓
Image DownloaderEach platform is a Scraper + Normalizer pair (Lego pieces). Scrapers publish to platform-specific queues via a topic exchange with routing keys (kijiji, kangalou, etc.). Platform-specific normalizers consume from their respective queues and all output to the shared listings.normalized queue. Everything downstream is platform-agnostic.
Messages carry a platform field throughout so downstream consumers know the source (for stats, debugging) but don't need platform-specific logic.
Key insight: Images are only downloaded for listings that survive all validation — rejected, duplicate, and existing-unchanged listings never trigger image downloads.
Queues
| Queue | Message Content | Producer | Consumer |
|---|---|---|---|
scraped.{platform} | Full raw listing data (all fields + image URLs array + platform field) | Platform Scraper | Platform Normalizer |
listings.normalized | Normalized listing (norm fields, address, coords, image URLs carried forward) | Any Normalizer | Deduplicator (future) or Importer |
listings.ready | Verified listing ready for publish (future, when dedup exists) | Deduplicator | Importer |
listings.published | Published listing ID + metadata (from day one — for notifications, stats, webhooks) | Importer | Notification consumers, analytics |
images.pending | Image URL + public listing ID + sort order | Importer | Image Downloader |
Per-Stage Behavior
Scraper (producer only — continuous mode):
- Runs as a long-lived loop, not triggered by cron
- Two-phase per listing (same as current Kijiji scraper):
- Search pages — cycles through search result pages to discover listings (Kijiji: parses
__NEXT_DATA__Apollo cache, ~25 listings/page) - Detail pages — visits each listing's page for full description, ALL image URLs, contact info
- Search pages — cycles through search result pages to discover listings (Kijiji: parses
- For each listing found: checks if recently seen (by platformId + lastScrapedAt)
- New/stale → visit detail page → upsert into
raw_listings(DB audit) → publish toscraped.{platform}queue - Recently seen → skip (already in pipeline or up-to-date)
- New/stale → visit detail page → upsert into
- Each listing published immediately — no batching, no waiting for full page scan
- Configurable delay between requests per source (e.g., Kijiji: 9s between page loads)
- After cycling through all search pages, brief cooldown, then restart from page 1
- Not a queue consumer — it's a producer that feeds the pipeline
Normalizer (consumer → producer — per-platform):
- Each platform has its own normalizer consuming from
scraped.{platform} - Validates required fields (title, price, bedrooms) — rejects invalid listings (nack → DLQ)
- Normalizes price, bedrooms, bathrooms, property type, amenities
- Carries image URLs forward in the message (untouched)
- Updates
raw_listings.status = "normalized"in DB (audit) - Publishes to
listings.normalized
Deduplicator (future consumer → producer):
- Consumes from
listings.normalized - Checks if listing already exists in system (by platform+platformId, or fuzzy match for cross-platform dedup)
- Skips duplicates: ack without forwarding (no downstream processing, no image downloads)
- Publishes new/changed listings to
listings.ready - Without dedup: Importer consumes directly from
listings.normalized
Importer (consumer → producer):
- Consumes from
listings.normalized(orlistings.readywhen dedup exists) - Matches neighborhood by coordinates
- Creates/updates
public.listingsentry (status = "active") - For each image URL in the message: publishes a message to
images.pendingwith the public listing ID and sort order - Updates
raw_listings.status = "imported"in DB (audit) - Publishes listing ID to
listings.published
Image Downloader (consumer):
- Consumes one image at a time from
images.pending - Downloads to disk
- Inserts into
public.listing_imagesattached to the public listing (by listing ID from message) - Can run with concurrency > 1 (e.g., prefetch 5)
- Rate-limited per domain (500ms between requests to same host)
- If download fails after retries: nack → DLQ (listing still exists, just missing that image)
Staleness Checker (unchanged):
- Stays as a periodic cron job (every 4h)
- Queries DB for stale listings (
lastScrapedAt < now - 48h) - Archives public listings (skips claimed ones)
- No RabbitMQ involvement — it's a simple time-based DB query
Extensibility: Adding New Steps
Example — adding AI enrichment (amenity tagging from description):
[listings.normalized] ──→ Enricher ──→ [listings.enriched] ──→ ImporterJust create a new consumer that reads from listings.normalized, adds enriched fields, publishes to listings.enriched. Change Importer to consume from listings.enriched instead. No existing code changes — just a new file and a queue binding swap.
Continuous Scraping
Scrapers run continuously instead of batch every 30 minutes. Benefits:
- Fresher data — new listings appear within seconds, not up to 30 min delay
- Smoother load — constant trickle vs burst of 500+ items every 30 min
- Natural fit for RabbitMQ — continuous message flow is what message queues are designed for
- Simpler scheduling — no cron, just a loop with sleep
Per-source configuration:
ts
interface ScraperConfig {
platform: string;
delayBetweenRequests: number; // ms (e.g., 9000 for Kijiji)
maxPagesPerCycle: number; // search pages before looping back to page 1
cooldownBetweenCycles: number; // ms pause after completing a full cycle
enabled: boolean;
}Browser lifecycle:
- Restart Playwright browser every ~50 cycles or every 2 hours (whichever first)
- Fresh context per scrape run (already implemented), closed after use
- On restart:
browser.close()→chromium.launch()→ reset counter - Simple and effective — avoids Chromium memory leaks without complex monitoring
Smart skipping (saves page loads):
- When listing found on search page, check
raw_listingsby platformId + lastScrapedAt - If scraped recently (< 6h ago) → skip detail page visit, don't re-publish
- Only visit detail pages for new or stale listings
- Most search results are already-known → saves majority of page loads
Anti-ban strategy:
- Per-source configurable delay (already have
KIJIJI_DELAY_MS/rateLimiter.wait()) - 9 seconds between requests by default for Kijiji (continuous 24/7 mode needs gentler pacing)
- Rotate user agents, use residential-like headers
- Exponential backoff on block (429/CAPTCHA): base 5s, 2x multiplier, ±20% jitter, max 5 min. Reset to normal on success
- Playwright with stealth mode (already in place)
Process Architecture
Single Node.js worker process with multiple RabbitMQ channel consumers (one channel per consumer). For current scale (~5k listings, single machine) this is simpler to deploy (one pm2 process) with each consumer independent. Easy to split into separate processes later if a specific stage needs scaling.
Admin Triggers
Hybrid approach:
- Trigger scrape: REST → invoke scraper directly (scraping is a command that produces messages, not a message itself)
- Re-normalize / re-import: REST → replay from DB audit log into the appropriate queue (these are message-driven operations)
RabbitMQ vs BullMQ
| Feature | BullMQ (current) | RabbitMQ |
|---|---|---|
| Multiple independent queues | Awkward (multiple Queue/Worker instances) | Native — each queue is independent |
| Per-message ack/nack | Jobs complete/fail as unit | Individual message ack with requeue |
| Dead letter queues | Manual failed job handling | Built-in DLX — auto-routes failed messages |
| Routing/fan-out | Not built-in | Exchanges: topic, direct, fanout |
| Management UI | Bull Board (job-focused) | Built-in Management Plugin (queue depths, rates, graphs) |
| Consumer concurrency | Per-worker setting | Per-queue prefetch, multiple consumers |
| Message durability | Redis persistence (AOF/RDB) | Durable queues + persistent messages + publisher confirms |
Verdict: RabbitMQ is purpose-built for data pipelines with multiple stages and per-item processing. BullMQ is better suited for background task queues (send email, resize image).
Message Durability
With correct configuration, RabbitMQ messages don't get lost:
- Durable queues survive broker restarts
- Persistent messages (delivery mode 2) are written to disk
- Publisher confirms ensure the broker received the message
- Consumer acks — messages stay in queue until consumer explicitly acknowledges
- If a consumer crashes, the message is automatically redelivered
The raw_listings DB table remains as the ultimate safety net for reprocessing (e.g., re-normalize all listings with updated logic without re-scraping).
Infrastructure
yaml
# docker-compose.yml
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP protocol
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: mtlrent
RABBITMQ_DEFAULT_PASS: mtlrent
volumes:
- rabbitmq_data:/var/lib/rabbitmq- Management UI at
http://localhost:15672(expose via Cloudflare tunnel for remote access) - Node.js client:
amqplib(most popular, stable, well-typed) - Future: Prometheus + Grafana for production-grade monitoring if needed
Dashboard Decision
Start with built-in RabbitMQ Management Plugin — zero setup, comes with the Docker image, excellent queue-level visibility. Consider Prometheus + Grafana for production later. RabbitScout is interesting but less mature.
Migration Strategy
BullMQ and RabbitMQ coexist during transition — migrate one stage at a time:
- Add RabbitMQ to docker-compose, create connection/channel helpers, define queue topology
- Rewrite scraper to publish per-item messages (keep DB writes too)
- Create normalizer consumer (replaces batch normalizer)
- Create importer consumer (replaces batch importer)
- Rewrite image downloader as per-image queue consumer
- Remove BullMQ, old processors.ts, old worker.ts
- Wire up admin triggers (publish messages instead of BullMQ jobs)
- Expose RabbitMQ Management UI via Cloudflare tunnel
Discussion Notes
Mar 4, 2026
- Discussed RabbitMQ vs keeping BullMQ — decided RabbitMQ fits the per-item multi-queue pattern better
- Decided to keep
raw_listingsDB table as audit log + reprocessing safety net alongside RabbitMQ messages - Staleness: keep as periodic batch check (time-based, doesn't benefit from event-driven)
- Dashboard: start with built-in Management Plugin, Prometheus+Grafana later if needed
- This is a discussion-first feature — refine design before building
Mar 4, 2026 (revised topology)
- Revised: images download ONLY after import, not in parallel with normalization
- Scraper publishes full listing data (including image URLs) as single message — no separate image messages from scraper
- Image download messages are produced by Importer AFTER creating the public listing
- This avoids wasted downloads for rejected/duplicate/existing-unchanged listings
- Added Deduplicator slot between Normalizer and Importer for future cross-platform dedup (F-015)
- Without dedup: Importer consumes directly from
listings.normalized - With dedup: Deduplicator sits between, consuming
listings.normalized→ publishinglistings.ready
Mar 4, 2026 (resolved decisions + continuous scraping)
- Resolved all 5 original open questions:
- Trigger → continuous scraping loop (not cron). Per-source delay (9s for Kijiji), cooldown between cycles
listings.publishedqueue → yes, from day one. Cheap to add, ready for Telegram notifications, import stats, future webhooks- Process architecture → single Node.js worker with multiple RabbitMQ channel consumers. Scale: ~5k listings now, ~10k future. Split later if needed
- Admin triggers → hybrid: REST invokes scraper directly for scrape triggers; REST replays DB audit log into queues for re-normalize/re-import
- Multi-scraper → per-platform Scraper+Normalizer pairs via RabbitMQ topic exchange. Each platform publishes to
scraped.{platform}. All normalizers output to sharedlistings.normalized. Everything downstream is universal
- Continuous scraping model: scrapers run as long-lived loops instead of batch every 30 min. Check if listing recently seen before visiting detail page. Each listing published to queue immediately — no batching. Smoother load, fresher data, natural fit for message queues
- Anti-ban: per-source delay config, exponential backoff on rate limiting, Playwright stealth (already in place)
- Kijiji two-phase confirmed: search pages (Apollo
__NEXT_DATA__) for discovery + individual detail pages for full data (description, all images, phone)
Mar 4, 2026 (browser lifecycle, search/detail coupling, backoff)
- Browser lifecycle → restart every ~50 cycles or every 2 hours (whichever first). Current BrowserPool singleton already creates fresh contexts per scrape and closes them. For continuous mode, add a cycle counter — after N cycles,
browser.close()+chromium.launch()+ reset counter. Simple, avoids Chromium memory leaks without complex monitoring - Search + detail pages → keep together in same scraper loop (not decoupled into separate queues). Detail page enrichment is always needed (full description, all images, phone). Decoupling adds an extra queue + consumer for minimal benefit. Improvement: smart skipping — when a listing is found on search page, check
raw_listingsby platformId; if scraped recently (< 6h), skip detail page visit entirely. Saves most page loads since most search results are already-known listings - Rate limit backoff → exponential with jitter, max 5 min. On block (429/CAPTCHA/connection error): base 5s, 2x multiplier, ±20% jitter, cap at 5 minutes. Reset to normal 9s delay on first successful request. Example: fail → 5s → 10s → 20s → 40s → ... → 5min cap → success → back to 9s
Open Questions
All questions resolved — design is ready for implementation.
Implementation Notes
Not yet started.