Skip to content
Wuling Blog
Back
10 min readTechnical

Building an Event-Driven AI Monorepo

Why treating AI microservices as async data pipelines — not REST call chains — solves the scaling problems monoliths can't

When building complex, AI-driven applications, monolithic architectures can quickly become bottlenecks—especially when different features scale at different rates or rely on distinct machine learning pipelines. I've been deep in this problem recently, and I want to share the architectural pattern that's been working really well: organizing a fleet of AI microservices in FastAPI within a monorepo structure.

Let's get into how you organize a project like this, handle inter-service communication, and structure the development roadmap.

The Pitfalls of Traditional Microservices

Before jumping into the solution, let's talk about the classic microservice headaches this architecture is designed to fix. When teams blindly split a monolith into services, they tend to hit the same walls:

  • Network Round-Trip Overhead: Every action involves a network hop. Serializing a request, sending it, and deserializing it adds up fast. Even a well-optimized internal network hop adds 1-5ms of overhead—compound that across a 5-service synchronous chain under load, and you've burned a massive chunk of your latency budget before any actual compute happens.
  • Data Availability Constraints: Each microservice owns its own data, which means no simple database joins. Querying across domains often leads to heavy, slow aggregations nobody wants to maintain.
  • Distributed Tracing Nightmares: When a synchronous request traverses five different services, distributed tracing becomes mandatory — and it's notoriously hard to get right. The result? Engineers spending hours debugging systems nobody fully understands.

The fix? Stop thinking about microservices as a chain of REST APIs and start treating them as an asynchronous data pipeline. That's the whole game.


The Monorepo Structure: Shared Libraries

Managing multiple independent services without drowning in code duplication is a real challenge. A monorepo with shared libraries installed as local dependencies solves this elegantly. Using a blazing-fast package manager like uv makes this workflow seamless—you can define workspace dependencies and resolve your entire monorepo environment in milliseconds, keeping local development snappy across all services.

The key insight is splitting shared logic into two distinct packages — lightweight infrastructure vs. heavy AI/ML dependencies. This keeps Docker image sizes small and build times fast for simpler services that don't need a PyTorch installation just to serve a health check.

my-ai-monorepo/
├── packages/
│   ├── core/                  # Lightweight infrastructure
│   │   ├── pyproject.toml
│   │   └── src/
│   │       ├── database.py    # SQLAlchemy session setup
│   │       ├── events.py      # Publisher/Consumer base classes
│   │       └── factory.py     # create_app() shared FastAPI bootstrap
│   └── ai/                    # Heavy ML/LLM logic
│       ├── pyproject.toml
│       └── src/
│           ├── llm.py         # Multi-LLM abstractions
│           └── embeddings.py  # Vector generation
├── services/
│   ├── ingestion-api/         # Depends ONLY on 'core'
│   │   ├── pyproject.toml
│   │   ├── Dockerfile
│   │   └── main.py
│   └── analysis-worker/       # Depends on 'core' AND 'ai'
│       ├── pyproject.toml
│       ├── Dockerfile
│       └── main.py

The Core Infrastructure Package

This is all the boring-but-essential plumbing. A shared create_app() factory here means every microservice automatically gets the same middleware, CORS policies, and exception handling — no drift, no surprises.

# packages/core/src/factory.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
 
def create_app(service_name: str, version: str = "1.0.0") -> FastAPI:
    """Shared factory to guarantee identical API setups across all services."""
    app = FastAPI(title=service_name, version=version)
    
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],  # Restrict in production
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    
    @app.get("/health")
    async def health_check():
        return {"status": "ok", "service": service_name}
        
    return app

The AI/Domain Logic Package

This is where you quarantine the heavy stuff — PyTorch, LangChain, large SDKs — so your foundational services don't drag in gigabytes of libraries they'll never touch.

More importantly, it's where you centralize AI business logic that should behave consistently everywhere: multi-LLM routing, response caching, token budget guards, semantic matching. Here's a simple example of a unified LLMClient that bakes in budget tracking across every service that uses it:

# packages/ai/src/llm.py
import logging
 
class LLMClient:
    def __init__(self, provider: str = "openai"):
        self.provider = provider
        # Initialize heavy SDK clients here
        
    async def generate(self, prompt: str, budget_limit: int = 1000) -> str | None:
        """Unified generation with centralized budget guarding."""
        if await self._is_budget_exceeded(budget_limit):
            logging.warning("Token budget exceeded. Aborting generation.")
            return None
            
        # Optional: Check a centralized cache (Redis) before calling the provider
        return await self._call_provider(prompt)
 
    async def _is_budget_exceeded(self, limit: int) -> bool:
        # Centralized logic to check current token spend vs limit
        return False
        
    async def _call_provider(self, prompt: str) -> str:
        # Actual SDK interaction goes here
        return "AI generated insights."

Inter-Service Communication: Data Pipelines, Not Call Chains

Here's the golden rule: data pipeline, not call chain.

Synchronous REST between services creates cumulative latency and turns every service into a potential single point of failure. Instead, we use two distinct communication patterns — one for services talking to each other, one for users talking to services.

Pattern A: Event-Driven Flow (Between Services)

Service-to-service dependencies are resolved asynchronously via events. When a service finishes its job, it publishes a domain event. Downstream services subscribe, do their thing, update their local state, and publish the next event. Nobody blocks. Nobody waits.

Here's what publishing looks like on the ingestion side:

# services/ingestion-api/main.py
from fastapi import APIRouter, BackgroundTasks, Depends
from core.events import EventPublisher
from models import DocumentInput
 
router = APIRouter()
 
@router.post("/documents/")
async def ingest_document(
    payload: DocumentInput,
    background_tasks: BackgroundTasks,
    publisher: EventPublisher = Depends(EventPublisher)
):
    # 1. Save to local DB (fast)
    doc_id = save_to_database(payload)
    
    # 2. Publish event asynchronously — response goes back immediately
    background_tasks.add_task(
        publisher.publish,
        topic="data.processed",
        payload={"document_id": doc_id, "content": payload.text}
    )
    
    return {"status": "accepted", "document_id": doc_id}

And here's the downstream consumer picking it up:

# services/analysis-worker/main.py
import asyncio
from core.events import BaseConsumer
from ai.llm import LLMClient
 
class AnalysisConsumer(BaseConsumer):
    topic = "data.processed"
    group_id = "analysis-group"
 
    def __init__(self):
        super().__init__()
        # Initialize once to avoid re-creating heavy SDK clients per message
        self.llm_client = LLMClient()
 
    async def process_message(self, message: dict):
        doc_id = message["document_id"]
        text = message["content"]
        
        # Apply AI logic using the shared package
        insights = await self.llm_client.generate(f"Extract key themes from: {text}")
        
        # Save locally and fire the next event in the pipeline
        save_insights(doc_id, insights)
        await self.publish("analysis.completed", {"document_id": doc_id, "insights": insights})
 
if __name__ == "__main__":
    consumer = AnalysisConsumer()
    asyncio.run(consumer.start_listening())

Notice the clean handoff — the ingestion API's job ends at publishing data.processed. What happens next is completely decoupled.

Pattern B: REST API (Single Hop)

When a user or frontend needs data, they make a synchronous REST call to a single service. That service already has a pre-computed local copy of everything it needs — because the background event pipeline already ran. One hop, fast response, done.

The Exception: Long-Running User Uploads

There's one case where the async-first rule bends: when a user uploads a file for immediate processing across multiple services.

The answer isn't to make them wait — it's to accept the file, return a 202 Accepted with a Job ID, and let the pipeline run in the background. The client can poll or receive a WebSocket/Webhook notification when it's ready. Best of both worlds.


Robust Event Infrastructure

The async pipeline is only as good as the infrastructure underneath it. You can start with Redis Streams as the broker — it's lightweight, well-understood, and easy to swap out later for RabbitMQ, Kafka, or SQS if your throughput demands grow.

To make this work reliably at scale, every event flowing through the system needs to adhere to a strict schema contract. A simple Pydantic model acts as the glue for this:

# packages/core/src/schemas.py
from pydantic import BaseModel, Field
from uuid import uuid4
 
class BaseEvent(BaseModel):
    event_id: str = Field(default_factory=lambda: str(uuid4()))
    trace_id: str
    topic: str
    payload: dict

With this schema in place, your BaseConsumer abstraction can automatically handle the three pillars of robust messaging:

  • Idempotency: Using the event_id, if the same event gets delivered twice (it happens), processing it again gets skipped — no corrupted state, no duplicate records.
  • Resilience and DLQs: Transient failures get retried automatically. Permanently failed messages get routed to a Dead Letter Queue for manual inspection.
  • Distributed Observability: Trading sync chains for async events fixes latency, but you still need to know what happened. By passing the trace_id along, you can cleanly string together logs from the initial REST request all the way to the final background job.

Here's a skeletal view of how BaseConsumer encapsulates these guarantees for every consumer that inherits from it:

# packages/core/src/events.py
import logging
from typing import Any
from .schemas import BaseEvent
 
class BaseConsumer:
    topic: str
    group_id: str
 
    async def start_listening(self):
        """Main polling loop — abstracts away broker details entirely."""
        logging.info(f"Starting consumer for topic: {self.topic}")
        while True:
            event = await self._poll_event()
            if not event:
                continue
 
            # 1. Idempotency guard — skip already-processed events
            if await self._is_already_processed(event.event_id):
                logging.debug(f"[{event.trace_id}] Skipping duplicate event: {event.event_id}")
                continue
 
            # 2. Process with built-in resilience and observability
            try:
                logging.info(f"[{event.trace_id}] Processing event {event.event_id}")
                await self.process_message(event.payload)
                await self._mark_processed(event.event_id)
            except Exception as e:
                logging.error(f"[{event.trace_id}] Failed to process {event.event_id}: {str(e)}")
                # Add retry logic (e.g. exponential backoff) before this in production
                await self._route_to_dlq(event)
 
    async def process_message(self, message: dict):
        """Override this in your consumer subclass."""
        raise NotImplementedError
 
    async def _is_already_processed(self, event_id: str) -> bool:
        # Check a Redis SET or a processed_events DB table
        return False
        
    async def _mark_processed(self, event_id: str):
        # Store event_id to prevent reprocessing
        pass
 
    async def _route_to_dlq(self, event: BaseEvent):
        # Push to Dead Letter Queue for inspection
        pass
        
    async def _poll_event(self) -> BaseEvent | None:
        # Fetch from Redis Streams / RabbitMQ / SQS and parse into BaseEvent
        pass

Every AnalysisConsumer, ReportingConsumer, or whatever else you build gets idempotency, tracing, and DLQ routing for free just by inheriting from this class. That's the payoff of the abstraction.


Development Strategy: Dependency Tiers

One of the underrated benefits of this architecture is how cleanly it maps to parallel team development. Because services communicate only through events, you can build them in strict dependency tiers with no blocking:

  • Tier 0 (Foundational): Ingestion and core data services. No upstream dependencies — ship these on day one.
  • Tier 1 (Intermediate): Services that consume Tier 0 events to apply business or AI logic.
  • Tier 2 (End-of-the-line): Aggregation and reporting services consuming the finalized output of Tier 1.

Teams can work in parallel across tiers as long as the event contracts (topic names and payload schemas) are agreed on upfront. The critical path mapping takes some up-front effort, but the runtime payoff is significant: no fragile HTTP chains, just fast single-hop reads and resilient background processing.


Conclusion

Building an event-driven AI architecture in FastAPI isn't free — you're trading the simplicity of linear, synchronous code for the complexity of eventual consistency and distributed debugging. That's a real cost worth acknowledging.

But when your AI workloads scale, when ML models take seconds rather than milliseconds, and when your feature teams need to move independently without stepping on each other, that trade-off becomes worth it. By treating your microservices as asynchronous data pipelines rather than a rigid chain of REST calls, you end up with a system that's resilient, decoupled, and reliably fast for the end user.

In a future post, I'll dig into the operational side of this setup — CI/CD pipelines for individual services, and how to handle schema evolution in the event bus without breaking downstream consumers.