Mapped backend architecture to UI and identified missing screens and endpoints.
UnderstandImplemented UI screens, API endpoints, and unit tests.
BuildRan tests and fixed Greptile P2 issues.
VerifyReviewed PR #11 and triggered Greptile review.
PlanDiagnosed divergence between PR #11 and main's independently shipped features.
Understandunderstand our current codebase architecture and map it towards what we have in the UI side of things; we need to make sure the UI is fully functional and data we have is properly flowing. I also want you to write unit tests for the important functions and workflows of our app.
I'll start by understanding the codebase structure, then map the backend architecture to the UI to find gaps, and finally write unit tests. Let me kick off exploration.
echo "=== TOP LEVEL ===" && ls -la && echo "=== APP ===" && find app -type f 2>/dev/null | head -100 && echo "=== PACKAGE ===" && cat package.jsonecho "=== LIB ===" && find lib -type f 2>/dev/null && echo "=== TRIGGER ===" && find trigger -type f 2>/dev/null && echo "=== MIGRATIONS ===" && ls supabase/migrations 2>/dev/null && echo "=== TESTS ===" && find . -path ./node_modules -prune -o -name "*.test.ts" -print 2>/dev/null && find . -path ./node_modules -prune -o -name "*.spec.ts" -print 2>/dev/null && echo "=== VITEST CONFIG ===" && ls vitest* 2>/dev/nullWe're in plan mode. Let me launch parallel exploration agents to understand the architecture, the UI, and existing test patterns.
Perfect. Now I have all the information I need. Let me compile a comprehensive report.
## Comprehensive UI/HTTP Data-Flow Map: Zrux Next.js Project
### 1. PAGES AND UI STATE
#### `/` (app/page.tsx)
- **What it renders:** A static skeleton page with the title "zrux" and message "Your context engine is booting. Phase 0 skeleton is live."
- **State held:** None (static component, no hooks)
- **API calls:** None
- **Notable:** This is a placeholder page. Comments in the code explicitly state this is a "Phase 0 skeleton"
#### `/ask` (app/ask/page.tsx)
- **What it renders:** An interactive question-answer interface
- **State held (all React state):**
- `question: string` — user's input text (line 45)
- `answer: string` — streamed response text (line 46)
- `meta: Meta | null` — citations and retrieval metadata (line 47)
- `loading: boolean` — request in flight (line 48)
- `error: string | null` — error message (line 49)
- **API calls:**
- **POST /api/answer** (lines 58-77):
- **Request shape:** `{ question: string }`
- **Response handling:**
- Success: HTTP 200, streams response body as text, reads `x-zrux-meta` header as base64-encoded Meta JSON
- Error: Sets error state with `${status}: ${responseText}` (line 64)
- Timeout: Caught in general try/catch, sets error message
- **Response body:** Plain text (streamed via TextEncoder)
- **Response headers:** `x-zrux-meta` (base64-encoded JSON with structure `{ thin: boolean, relaxed: boolean, itemCount: number, intent: string, citations: Citation[] }`)
- **UI behavior:**
- Shows "Thinking..." button while loading (line 149)
- Displays answer in a white box with citations below (lines 157-195)
- Citations rendered as numbered list with source, date, and URL (lines 179-192)
- **Preset buttons:** Three hardcoded prompts that trigger `ask()` on click (lines 23-27, 94-112)
- **Notable features:**
- Uses `decodeMeta()` helper to decode base64-encoded header metadata (lines 29-42)
- Handles streaming responses via `ReadableStream` API (lines 68-78)
- Displays "filters relaxed for breadth" note conditionally if `meta.relaxed === true` (line 176)
---
### 2. API ROUTES
#### **POST /api/answer** (app/api/answer/route.ts)
- **Gated by:** NextAuth (line 25: `getUserId(req)`)
- **Input:**
- Method: POST
- Body shape: `{ question?: string }` (validated on lines 35-39)
- Validation: question must be a non-empty string; returns 400 if invalid
- **Response:**
- On 401 (no session): `Response('Unauthorized', { status: 401 })`
- On 400 (bad question): `Response('Missing "question"', { status: 400 })` or `Response('Invalid JSON body', { status: 400 })`
- On 502 (pipeline error): `Response('Answer service temporarily unavailable', { status: 502 })` (line 50)
- On 200 (success):
- **Body:** Streamed plain text (the LLM answer from `synthesizeStream()`)
- **Headers:** `x-zrux-meta` with Base64-encoded JSON structure:
```
{
thin: boolean, // true if context was too thin, short-circuits synthesis
relaxed: boolean, // true if retrieval relaxed filters for breadth
itemCount: number, // count of items retrieved
intent: string, // query intent (e.g., 'daily_briefing', 'lookup')
citations: Citation[] // array of cited items
}
```
- If `thin === true`, body is the refusal text: "There is not enough in your connected tools to answer that yet. Try connecting more sources or asking about something from the last 90 days."
- If `thin === false`, body is the LLM-synthesized answer with citations
- **Internals:**
- Calls `retrieve(userId, question)` which returns `{ plan, context, relaxed, itemCount, graphFactCount }`
- If context is thin, skips synthesis and returns refusal (lines 94-104)
- Otherwise streams via `synthesizeStream(question, context)` (line 106)
- Uses Langfuse tracing if enabled (lines 61-82)
- **Used by:** `/ask` page only (single fetch call)
---
#### **GET /api/graph** (app/api/graph/route.ts)
- **Gated by:** NextAuth (line 19)
- **Input:** No request body, query parameters, or user input
- **Response:**
- On 401: `Response('Unauthorized', { status: 401 })`
- On 500: `Response('Failed to load graph', { status: 500 })`
- On 200:
```json
{
"entities": [
{
"id": "uuid",
"type": "string (person|company|project)",
"name": "string",
"email": "string | null",
"domain": "string | null",
"aliases": ["string"]
}
],
"edges": [
{
"id": "uuid",
"relation": "string",
"confidence": "number",
"source_item": "uuid | null",
"occurred_at": "ISO timestamp | null",
"from": { "id": "uuid", "name": "string | null" },
"to": { "id": "uuid", "name": "string | null" }
}
]
}
```
- **Internals:**
- Queries `entity` table (limited to 500 rows)
- Queries `edge` table (limited to 1000 rows)
- Resolves edge endpoints to entity names client-side (lines 44-53)
- **Used by:** NONE in current UI (no page imports or calls this)
- **Status:** Dead endpoint — declared for "Phase 6" relationships screen which does not exist
---
#### **POST /api/connect/[source]** (app/api/connect/[source]/route.ts)
- **Gated by:** NextAuth (line 24)
- **Input:**
- Method: POST
- URL parameter: `source` — must be in connectable sources (line 18: `isConnectable(source)`)
- No request body
- **Response:**
- On 400 (source not connectable): `Response('Source not connectable: ${source}', { status: 400 })`
- On 401: `Response('Unauthorized', { status: 401 })`
- On 502 (Composio or DB failure): `Response('Failed to start connection', { status: 502 })`
- On 200: `{ redirectUrl: string, connectedAccountId: string }`
- **Internals:**
- Calls `composio().connectedAccounts.initiate(userId, authConfigId(source), { callbackUrl })`
- Creates a `source_connection` row with status 'initiated'
- Returns Composio's OAuth redirect URL
- **Used by:** NONE in current UI (no page or UI component imports this)
- **Status:** Dead endpoint — wired only in `/onboarding` page which doesn't exist
---
#### **GET /api/oauth/callback** (app/api/oauth/callback/route.ts)
- **Gated by:** NextAuth (line 18)
- **Input:** No request body; Composio redirects here after user grants consent
- **Response:**
- Redirects to `/onboarding?connected=1` on success (line 53)
- Redirects to `/onboarding?error=1` on DB lookup failure (line 32)
- Both redirects are 302 HTTP
- **Internals:**
- Queries all pending 'initiated' `source_connection` rows
- For each, verifies the connected account status is 'ACTIVE' via Composio
- Updates status to 'active' and calls `enqueueLoad(userId, source)` to start Trigger.dev job
- Falls through to redirect regardless of individual connection results
- **Used by:** External redirect from Composio OAuth
- **Status:** Partially dead — redirects to `/onboarding` which doesn't exist in main branch
---
#### **GET/POST /api/auth/[...nextauth]** (app/api/auth/[...nextauth]/route.ts)
- **Implementation:** Thin wrapper around NextAuth
- Imports `authOptions` from `lib/auth/options`
- Google OAuth provider
- JWT sessions (no DB adapter)
- User ID is derived from email and stored in token (line 22)
- **Used by:** NextAuth middleware on protected routes
---
#### **POST /api/webhooks/[source]** (app/api/webhooks/[source]/route.ts)
- **Gated by:** HMAC signature verification (not NextAuth)
- **Input:**
- Method: POST
- URL parameter: `source` (currently only 'slack' is handled, line 114)
- Headers: `x-slack-signature`, `x-slack-request-timestamp`
- Body: JSON (Slack event envelope)
- **Response (all non-404/non-error):**
- Acks with `{ ok: true, skipped: true }` if event should be dropped
- Acks with `{ ok: true, unrouted: true }` if tenant cannot be routed
- Acks with `{ ok: true, enqueued: true }` if enqueued to Trigger.dev
- On signature verification failure: `Response('invalid signature', { status: 401 })`
- On invalid JSON: `Response('bad json', { status: 400 })`
- On unknown source: `Response('No webhook handler for source: ...', { status: 404 })`
- **Internals (Slack):**
- Verifies HMAC signature (lines 56-66)
- Handles `url_verification` handshake (lines 76-78)
- Deduplicates on `event_id` or `channel:ts` (lines 100-104)
- Routes by looking up `source_connection.metadata.teamId` or falls back to single active Slack connection (lines 20-38)
- Filters to ingestable events: human messages only, no bot echoes (lines 82-87)
- Calls `enqueueEvent(userId, 'slack', event, dedupeId)` (line 105)
- **Used by:** External Slack webhooks (not UI-driven)
---
### 3. MIDDLEWARE (middleware.ts)
- **Enforces:** NextAuth authentication on protected routes
- **Matcher:** `/ask/:path*`, `/today/:path*`, `/relationships/:path*`, `/search/:path*`
- **Behavior:** Redirects unauthenticated users to NextAuth sign-in page
- **Note:** API routes do their own `getUserId()` check and are NOT matched here
---
### 4. AUTHENTICATION FLOW
1. **NextAuth Configuration** (`lib/auth/options.ts`):
- Provider: Google OAuth
- Session strategy: JWT (no database)
- JWT callback derives stable `userId` from email (line 22)
- Session callback injects `session.user.id`
2. **Server-side User Resolution** (`lib/auth/session.ts`):
- `getUserId(req)` extracts user_id from NextAuth session
- In non-production: falls back to `DEV_USER_ID` env var or `x-zrux-user-id` header (lines 21-26)
- Throws `UnauthorizedError` if no session and not in dev mode
- User_id is NEVER trusted from client
---
### 5. DATA-FLOW SUMMARY TABLE
| Endpoint | HTTP Method | UI Caller | Status | Request | Response | Notes |
|----------|------------|-----------|--------|---------|----------|-------|
| `/api/answer` | POST | `/ask` page | **WORKING** | `{ question: string }` | Streamed text + `x-zrux-meta` header | Fully functional, called on every ask |
| `/api/graph` | GET | NONE | **DEAD** | None | JSON (entities + edges) | Declared for Phase 6 relationships screen |
| `/api/connect/[source]` | POST | NONE | **DEAD** | None | `{ redirectUrl, connectedAccountId }` | Wired only in missing `/onboarding` page |
| `/api/oauth/callback` | GET | External (Composio) | **PARTIALLY DEAD** | None | 302 redirect to `/onboarding?connected=1` | Redirects to non-existent page |
| `/api/auth/[...nextauth]` | GET/POST | NextAuth + middleware | **WORKING** | OAuth flow | JWT session | Auth working, middleware protects `/ask` |
| `/api/webhooks/[source]` | POST | External (Slack) | **WORKING** (if configured) | Slack event + HMAC | `{ ok: true, ... }` | Async ingestion, enqueues to Trigger.dev |
---
### 6. UI/BACKEND MISMATCHES AND GAPS
#### Critical Issues:
1. **Missing `/onboarding` page:**
- `/api/oauth/callback` redirects to `/onboarding?connected=1` and `/onboarding?error=1` (lines 32, 53 of callback route)
- The page does not exist in the main app directory
- **Impact:** OAuth flow completes but user is sent to a 404
2. **`/api/graph` endpoint is dead:**
- Fully implemented and callable
- No UI endpoint calls it
- Comments declare it's for "Phase 6" (line 4)
- **Impact:** Wasted code; the relationships screen does not exist
3. **`/api/connect/[source]` endpoint is unreachable from UI:**
- No page or component in the UI calls this endpoint
- Would be called from `/onboarding` page (which doesn't exist)
- **Impact:** Users cannot connect sources via the UI in the current branch
4. **Middleware gates routes that don't exist:**
- Middleware protects `/today/:path*`, `/relationships/:path*`, `/search/:path*` (middleware.ts:8)
- None of these pages exist in the app directory
- **Impact:** If user somehow reaches these URLs, they'll be redirected to sign-in, then land on a 404
#### Data-Flow Validation:
- **`/ask` ↔ `/api/answer`:** ✅ **MATCH** — Page sends `{ question: string }`, receives streamed text + base64 metadata header. UI correctly decodes and displays citations and answer.
- **Landing page ↔ API:** ✅ **NO CALL** — Home page is static, as intended.
- **Routing gates:** ✅ **WORKING** — Middleware correctly requires auth; API routes verify user_id server-side.
---
### 7. UI FEATURES WITH NO BACKING
1. **Preset buttons** (line 94-112 in `/ask`): Buttons have hardcoded prompts and are fully functional — they call `ask()` which calls `/api/answer`. ✅ **These work.**
2. **Error handling in `/ask`**: Catches and displays API errors (line 153-155). Functional but rarely tested since the pipeline is in early phase.
3. **Citations rendering**: Only shows if `meta.citations.length > 0` (line 173). Citations are populated by `/api/answer` and UI correctly renders them. ✅ **Works.**
---
### 8. INCOMPLETE/FUTURE PHASES NOTED IN CODE
- **Phase 0:** Home page skeleton (line 17 of page.tsx) — explicitly notes this is early
- **Phase 2:** Slack webhook support is live, but code notes other providers (Linear, GitHub) will be added (line 114 of webhooks route)
- **Phase 5:** Error degradation for synthesis/gateway failures noted as "hardened in Phase 5" (line 47 of answer route)
- **Phase 6:** Relationships screen and `/api/graph` (line 4 of graph route)
---
### 9. ENVIRONMENT AND RUNTIME NOTES
- **Runtime:** `export const runtime = 'nodejs'` on answer, graph, connect, oauth routes
- **Duration limits:** `export const maxDuration = 60` on answer route (matches streaming timeout)
- **Environment variables used:**
- `NEXT_PUBLIC_APP_URL` — used in oauth/callback and connect routes for callback URL
- `WEBHOOK_SECRET_SLACK` — HMAC verification in webhooks route
- `NEXTAUTH_SECRET` — NextAuth JWT signing
- `GOOGLE_CLIENT_ID`, `GOOGLE_CLIENT_SECRET` — OAuth provider
- `DEV_USER_ID` — non-production user override (session.ts:22)
---
### 10. FINAL ASSESSMENT
**Functional Features (Ship-Ready):**
- Ask page with streaming answers
- Citation display and metadata header transport
- NextAuth Google OAuth
- Webhook ingestion for Slack (async via Trigger.dev)
**Dead/Missing Features (Phase boundaries):**
- Onboarding flow (redirected to from `/api/oauth/callback`, page doesn't exist)
- Connect sources UI (endpoint exists but unreachable)
- Relationships graph screen and API (endpoint exists, no caller)
- Today, Search screens (middleware gates them but pages don't exist)
**No hardcoded test data or placeholder responses** — all features that exist are real. The gaps are phase-boundary issues where endpoints are built ahead of UI, or UI expectations point to not-yet-built pages.Perfect! Now I have enough information to create a comprehensive analysis. Let me compile the findings:
## COMPREHENSIVE BACKEND ARCHITECTURE ANALYSIS: ZRUX CONTEXT ENGINE
Based on a detailed code walkthrough of the `/Users/venusbhatia/Downloads/Zrux` Next.js context-engine project, here is a precise map of the ingestion and retrieval planes against the CLAUDE.md specification.
---
### **PART 1: RETRIEVAL PIPELINE (Answer Path)**
The answer path is implemented across **lib/retrieval/** with the following stages (as specified in CLAUDE.md §8, Stages 0–8):
#### **Stage 0: Semantic Cache** — **MISSING**
- **CLAUDE.md spec**: Redis via Upstash, per-tenant near-hit on embedding bucket. Cache hit = skip everything.
- **Actual code**: No `lib/cache/semantic-cache.ts` exists.
- **Status**: STUBBED — `lib/llm/gateway.ts:4` notes "The breaker (Redis state) is Phase 5" — cache is deferred.
- **File reference**: None found in codebase.
#### **Stage 1: Query Understanding** — **IMPLEMENTED**
- **File**: `/lib/retrieval/plan.ts:21-58` (`planQuery()`)
- **Function signature**: `async function planQuery(question: string, now: Date = new Date()): Promise<RetrievalPlan>`
- **Input**: Founder's question string
- **Output**: `RetrievalPlan` (see types below)
- **External calls**:
- `chatModel()` (OpenRouter, claude-sonnet-4-6 primary) via `generateObject` with Zod schema
- `aiTelemetry('plan-query')` (Langfuse tracing)
- **Details**: One LLM call, structured output (Zod), produces semantic_query, keyword_terms, sources, after/before, type, status, entities, intent, time_basis, recency_weight. System prompt in file:33-45.
#### **Stage 2: Hybrid Retrieval** — **IMPLEMENTED**
- **File**: `/lib/retrieval/search.ts:76-145` (`hybridSearch()`)
- **Function signature**: `async function hybridSearch(userId: string, plan: RetrievalPlan, queryEmbedding: number[]): Promise<SearchResult>`
- **Input**: userId, plan (from stage 1), embeddings (from `embedText`)
- **Output**: `SearchResult { hits: SearchHit[], relaxed: boolean, diversify: boolean }`
- **External calls**:
- `createServiceClient().rpc('hybrid_search', {...})` — calls the Postgres function defined in `/supabase/migrations/0002_hybrid_search.sql:10-57`
- `userSources(userId)` — helper that calls `db.rpc('distinct_sources')`
- **Filter-relax fallback**: Lines 126-144 — if hits < 4 and filters present, relaxes sources and time bounds and retries
- **Broad intents**: Per-source stratified retrieval (lines 88-114) for cross_source, company_summary, daily_briefing to prevent inbox monopoly
- **Details**: Calls `hybrid_search()` SQL function with:
- p_user_id, p_query_embedding, p_query_text
- p_sources (or null for all)
- p_after (time filter), p_time_basis ('updated' or 'created')
- p_recency_weight (0.0–0.3 depending on intent)
- p_limit (60 default)
#### **Stage 3: Graph Expansion** — **IMPLEMENTED**
- **File**: `/lib/retrieval/graph-expand.ts:28-86` (`expandGraph()`)
- **Function signature**: `async function expandGraph(userId: string, entityNames: string[]): Promise<GraphExpansion>`
- **Input**: userId, entity names extracted from question (from stage 1)
- **Output**: `GraphExpansion { facts: GraphFact[], itemIds: string[], entities: string[] }`
- **External calls**:
- `db.rpc('find_entities', {...})` — fuzzy name resolution (threshold 0.4) across all entity types
- `db.from('edge').select(...)` — one-hop edges (lines 48-54)
- `db.from('entity').select('id, name')` — hydrate connected entity names (lines 66-71)
- **Details**: Best-effort, never blocks. Returns facts (subject-relation-object tuples) and the source_item IDs those edges came from for enrichment.
#### **Stage 4: Reranker** — **MISSING**
- **CLAUDE.md spec**: Cohere Rerank 3.5, post-RRF cross-encoder over ~50–100 candidates.
- **Actual code**: No rerank stage in `lib/retrieval/`. Comment in `pipeline.ts:1-3` says "cache, graph, rerank, rail arrive in later phases".
- **Status**: STUBBED — deferred to Phase 5.
- **File reference**: None found.
#### **Stage 5: Chunk-to-Item Rollup** — **IMPLEMENTED**
- **File**: `/lib/retrieval/rollup.ts:40-92` (`rollupToItems()`)
- **Function signature**: `async function rollupToItems(userId: string, hits: SearchHit[], opts: { diversify?: boolean }): Promise<RolledItem[]>`
- **Input**: userId, hits from hybrid_search, diversify flag
- **Output**: `RolledItem[]` (best chunk per parent item, hydrated with metadata, capped at MAX_ITEMS=12)
- **External calls**:
- `createServiceClient().from('context_item').select(...).eq('user_id', userId).in('id', itemIds)` — hydrate item metadata (lines 57-63)
- **Details**:
- Dedupes chunks to parent item_id, keeps best score (lines 48-52)
- Hydrates item fields: source, type, title, author, url, source_created_at, source_updated_at, status (lines 59-60)
- Filters out is_deleted items (line 68)
- If diversify=true, uses `interleaveBySource()` to round-robin across sources (lines 14-38); else pure top-N by score (line 91)
- MAX_ITEMS = 12 (line 8)
#### **Stage 6: Retrieval Rail** — **MISSING**
- **CLAUDE.md spec**: Drop semantically distant chunks, cap item count.
- **Actual code**: Item cap (MAX_ITEMS=12) is in rollup.ts. No semantic distance filter found.
- **Status**: PARTIALLY STUBBED — item count is capped, but no "semantic distance" filtering (e.g., re-scoring or threshold) exists.
- **File reference**: Implied in rollup.ts:8, but no explicit rail logic.
#### **Stage 7: Assemble** — **IMPLEMENTED**
- **File**: `/lib/retrieval/assemble.ts:13-57` (`assembleContext()`)
- **Function signature**: `function assembleContext(items: RolledItem[], graphFacts: GraphFact[] = []): AssembledContext`
- **Input**: rolled items, graph facts
- **Output**: `AssembledContext { block: string, citations: Citation[] }` — numbered, citable context block
- **Details**:
- Builds a numbered context block with [n] citations (lines 35-54)
- Prefixes relationships (Layer 2 facts) at the top if present (lines 23-33)
- Each item becomes `[n] source=... type=... title=... author=... status=... date=...` plus best_content (lines 47-53)
- Citations map [n] back to item metadata for UI expansion (lines 38-46)
#### **Stage 8: Synthesis** — **IMPLEMENTED**
- **File**: `/lib/retrieval/synthesize.ts:30-44` (`synthesizeStream()`)
- **Function signature**: `function synthesizeStream(question: string, context: AssembledContext, opts?: { onFinish?: (text: string) => void | Promise<void> }): StreamTextResult`
- **Input**: question, assembled context
- **Output**: streamed text response
- **External calls**:
- `chatModel()` (OpenRouter, read-only model, zero tools)
- `streamText()` (Vercel AI SDK, temperature=0.2)
- `aiTelemetry('synthesize-answer')` (Langfuse)
- **Details**:
- Grounded answer from CONTEXT only, cites with [n] markers (system prompt lines 11-18)
- Refuses to guess if context is thin (isThin check in route.ts:95)
- REFUSAL text (line 20): "There is not enough in your connected tools to answer that yet..."
- onFinish callback records trace, closes span, flushes to Langfuse (async)
#### **Full Retrieval Entry Point**
- **File**: `/lib/retrieval/pipeline.ts:21-36` (`retrieve()`)
- **Orchestrates**: Stages 1–7 in order
- **Flow**:
1. planQuery (stage 1)
2. embedText (query embedding for stage 2)
3. hybridSearch + expandGraph in parallel (stages 2–3)
4. rollupToItems (stage 5, skipping rerank/rail for now)
5. assembleContext (stage 7)
6. Return plan, context, relaxed flag, itemCount, graphFactCount
#### **Answer API Route**
- **File**: `/app/api/answer/route.ts:22-116` (`POST /api/answer`)
- **Flow**:
1. Extract userId (server-side auth)
2. Parse question from request body
3. Call `retrieve(userId, question)` (stages 1–7)
4. If thin, return REFUSAL (lines 95-104)
5. Otherwise call `synthesizeStream(question, context, { onFinish })` (stage 8)
6. Stream response with citations in x-zrux-meta header (base64 JSON)
7. onFinish closes Langfuse trace and flushes spans
---
### **PART 2: INGESTION PIPELINE**
The ingestion pipeline is implemented across **lib/ingestion/** and **trigger/ingest.ts**:
#### **Entry Point: Trigger.dev Task**
- **File**: `/trigger/ingest.ts:56-81` (`ingestTask`)
- **Modes**: 'load' (bulk), 'poll' (incremental), 'event' (webhook)
- **Idempotency**: `idempotencyKey` per mode + userId + source
- **Invoked by**: `lib/ingestion/enqueue.ts` from routes/actions
- **Flow**:
1. Resolve connector for source
2. Pick load/poll/event stream based on mode (lines 29-46)
3. For poll/load: check sync_state for last cursor; for event: use provided event
4. Call `ingestItems()` (lines 50-52)
5. Langfuse wrapping for observability (lines 70-79)
#### **Step 1–2: Fetch + Normalize**
- **File**: `/lib/ingestion/run.ts:24-37` (`ingestOne()` lines 24–37)
- **Functions**:
- `normalizeItem(userId, raw)` in `/lib/ingestion/normalize.ts:10-25`
- **Input**: RawItem from connector
- **Output**: context_item row (upserted)
- **Details**:
- Normalizes source timestamp → source_created_at, source_updated_at (ISO strings)
- Preserves raw payload in context_item.raw (episodic layer)
- Upserts on unique(user_id, source, external_id) (lines 31-35)
#### **Step 3: Chunk**
- **File**: `/lib/ingestion/chunk.ts:10-39` (`chunkText()`)
- **Input**: item.body (normalized human-readable text)
- **Output**: string[] (chunks)
- **Details**:
- Chunks if body > 1800 chars (line 8: SINGLE_CHUNK_CEILING)
- Default chunk size 1500 chars with 150 char overlap (lines 6–7)
- Splits on paragraph boundaries (\n\s*\n), hard-splits oversized paragraphs (lines 15–36)
- Max 1500 chars per chunk by default
- Meeting transcripts chunk by speaker turn (Phase 7 audio; for now falls through to paragraph split)
#### **Step 4: Enrich**
- **File**: `/lib/ingestion/enrich.ts:47-60` (`enrichChunk()`)
- **Input**: RawItem, chunk string, date ISO
- **Output**: enriched chunk string
- **Details**:
- Deterministic provenance line (line 52): `[Source: {source}] [{date}][{author}]`
- For structured items (Linear, Calendar, Sentry, GitHub): provenance + body only (lines 54–55)
- For unstructured (email, Notion): optional LLM gloss if ENRICH_GLOSS='true' (lines 28–44, gated to Haiku-class)
- Gloss system prompt (lines 25–26): "one short sentence of context"
- Final format (line 58): provenance + gloss + body, or provenance + title + body
#### **Step 5: Embed**
- **File**: `/lib/ingestion/embed.ts:24-47`
- **Functions**:
- `embedText(text)` (line 24–31) — one vector
- `embedTexts(texts[])` (line 33–41) — batch
- `toVectorLiteral(embedding)` (line 45–47) — format for pgvector
- **Model**: OpenAI text-embedding-3-large, 1536 dims (via Matryoshka truncation)
- **External calls**: `embed()` / `embedMany()` from Vercel AI SDK, Langfuse telemetry
#### **Step 6: Upsert Chunks**
- **File**: `/lib/ingestion/run.ts:64-71`
- **Details**:
- Delete all chunks for item (line 65)
- Insert new chunks with embedding (line 68)
- Both in one retry block (lines 64–71) so network errors don't leave orphans
- Idempotent: retry will delete the partial insert first, then re-insert (lines 49–54 comment)
#### **Step 7–8: Summary & Document-Level Embedding** — **MISSING**
- **CLAUDE.md spec**: Long docs get a doc-level summary + summary_embedding for two-tier index.
- **Actual code**: No summary generation found in run.ts. context_item.summary and summary_embedding columns exist in schema (0001_init.sql:29–30) but are never populated.
- **Status**: STUBBED — two-tier index is designed in schema but not implemented in ingestion.
- **File reference**: None found; deferred.
#### **Step 9–10: Triple Extraction + Entity Resolution**
- **File**: `/lib/ingestion/run.ts:76-85` (calls `extractAndResolve`)
- **Gate**: `EXTRACT_TRIPLES='false'` can disable (line 76); high-signal sources only (email, calendar, notion, linear + meetings)
- **Best-effort**: Isolated try/catch, never blocks ingestion (lines 77–84)
- **Details**:
- **Triple extraction** in `/lib/graph/triple-extraction.ts:59-85` (`extractTriples()`)
- LLM call (Haiku-class, structured Zod output)
- Filters on JUNK_NAME regex (lines 32–38)
- Returns { subject, subject_type, relation, object, object_type, confidence }[] (line 28)
- System prompt (lines 48–56): extract named entities + relationships
- **Entity resolution** in `/lib/graph/entity-resolution.ts:178-206` (`extractAndResolve()`)
- For each triple: resolve subject/object to entities (lines 191–200)
- `resolveEntity()` (lines 82–123):
- Canonicalize on email if present (lines 91–108)
- Fuzzy match on name (pg_trgm, threshold 0.45) if no email (lines 110–114, 120–121)
- Create new entity if no match (line 116 or 122)
- Add aliases if name differs from existing (lines 100–106)
- `emailHints()` (lines 154–173): extract name → email mappings from author and attendees
- `upsertEdge()` (lines 126–149): append-only edge upsert, deduped on (user_id, subject_id, relation, object_id, source_item)
#### **Step 11: Slim (Deletion Reconciliation)**
- **File**: `/trigger/slim.ts:13-55` (scheduled task, cron: '15 */6 * * *')
- **File**: `/lib/db/slim.ts:39-92` (`reconcileDeletions()`)
- **Flow**:
1. List all active source_connections (line 21–24)
2. For each: call connector.slim(ctx) to get external_ids still present at source (line 35)
3. Compare against stored items for this (user_id, source) (lines 50–57)
4. Flip is_deleted on vanished IDs (line 83)
5. Clear is_deleted on resurrected IDs (line 84)
6. Safety rail: refuse empty live set (lines 65–72)
- **Windowed slim**: Connectors can set slimWindowed=true to indicate their slim() only returns IDs within the lookback window; reconciliation is scoped to that window (lines 38–40)
#### **Sync State Tracking**
- **File**: `/lib/db/sync-state.ts`
- **Tables**: sync_state (user_id, source, last_successful_sync_at, cursor)
- **Read**: `getSyncState()` (lines 11–27) — used by ingestTask to get 'since' date for poll
- **Write**: `setSyncState()` (lines 29–46) — called by ingestItems after success (run.ts:117–121)
#### **Enqueue API**
- **File**: `/lib/ingestion/enqueue.ts`
- **Functions**:
- `enqueueLoad(userId, source)` — trigger.dev load task
- `enqueueIngest(userId, source, mode)` — generic enqueue with idempotency key
- `enqueueEvent(userId, source, event, dedupeId)` — webhook event ingest
- **Graceful degradation**: If TRIGGER_SECRET_KEY not set, logs and no-ops (lines 17–19, 42–44)
---
### **PART 3: CONNECTOR ARCHITECTURE**
#### **The Connector Contract**
- **File**: `/lib/connectors/types.ts:47-62` (`Connector` interface)
- **Methods**:
1. `load(ctx: SyncContext): AsyncIterable<RawItem>` — full bulk index, first run
2. `poll(ctx: SyncContext, since: Date): AsyncIterable<RawItem>` — incremental by cursor
3. `slim(ctx: SyncContext): AsyncIterable<ExternalId>` — IDs only, deletion detection
4. `handleEvent?(payload: unknown): AsyncIterable<RawItem>` — optional webhook
5. `slimWindowed?: boolean` — signals slim() is bounded to lookback window
- **RawItem shape** (lines 18–33):
- source (SourceName), type, externalId, title, author, url
- sourceCreatedAt, sourceUpdatedAt (both dates)
- status, metadata, body, raw (untouched payload)
#### **Registry**
- **File**: `/lib/connectors/registry.ts`
- **Implemented sources**:
- gmail (gmailConnector)
- calendar (calendarConnector)
- linear (linearConnector)
- slack (slackConnector)
- notion (notionConnector)
- **Missing sources** (in spec but not implemented):
- github, sentry, drive, voice_memo
- **Function**: `getConnector(source)` (lines 19–23) — registry lookup
#### **Example: Gmail Connector**
- **File**: `/lib/connectors/gmail.ts:70-90`
- **Composio slug**: GMAIL_FETCH_EMAILS
- **load()** (lines 73–75): Query `newer_than:{lookbackDays}d`
- **poll()** (lines 77–83): Query `after:{YYYY/MM/DD}` for incremental
- **slim()** (lines 85–89): Fetch same window, yield external IDs only
- **Mapping**: toRawItem() (lines 32–51) converts GmailMessage → RawItem
#### **Composio Integration**
- **File**: `/lib/connectors/composio.ts` (not shown, but referenced in gmail.ts:6)
- **Abstracts**: OAuth + fetch for all sources via Composio toolkit
- **Encapsulation**: Each connector calls `executeTool(SLUG, userId, params)` (gmail.ts:56)
---
### **PART 4: GRAPH LAYER (Layer 2)**
#### **Triple Extraction** (step 9 of ingestion)
- **File**: `/lib/graph/triple-extraction.ts:59-85`
- **Structured output**: Zod schema (lines 15–26)
- **Gate**: `shouldExtract()` (lines 44–46) checks high-signal sources only
- **Filter**: `isNamedEntity()` (lines 33–38) rejects placeholders
- **System prompt** (lines 48–56): Extract subject–relation–object with types and confidence
#### **Entity Resolution** (step 10 of ingestion)
- **File**: `/lib/graph/entity-resolution.ts:82-206`
- **resolveEntity(userId, mention)**: Canonicalize on email → fuzzy name → new entity
- **fuzzyMatchId()**: RPC call to `match_entity()` function (rpc line 39)
- **upsertEdge()**: Append-only, deduped on (user_id, subject_id, relation, object_id, source_item)
- **emailHints()**: Extract name → email mappings from author and attendees
- **Principle**: "Prefer a missed merge over a wrong merge" (entity-resolution.ts:4)
#### **Database Functions** (Postgres)
- **match_entity()** in `/supabase/migrations/0006_entity_resolution.sql:8-25`
- Fuzzy match on normalized name within same type + user_id
- Trigram similarity >= 0.45 (line 22)
- Returns id, name, email, similarity (line 23)
- **find_entities()** in lines 31–44
- Loosely resolve name from question to candidate entities (threshold 0.4)
- Used by graph-expand.ts stage 3
- Returns id, name, type, similarity
---
### **PART 5: LLM GATEWAY**
- **File**: `/lib/llm/gateway.ts`
- **Provider**: OpenRouter (OpenAI-compatible endpoint via Vercel AI SDK)
- **Primary model**: anthropic/claude-sonnet-4-6 (env: OPENROUTER_PRIMARY_MODEL)
- **Fallback model**: anthropic/claude-haiku-4-5 (env: OPENROUTER_FALLBACK_MODEL)
- **Function**: `chatModel(modelId = PRIMARY_MODEL)` (lines 27–29) — returns LanguageModelV1
- **Retry wrapper**: `withRetry()` (lines 32–50) — exponential backoff + jitter, 2 retries by default, baseMs=400
- **Circuit breaker + full fallback chain**: DEFERRED TO PHASE 5 (line 3 comment)
- **Status**: STUBBED — only retry wrapper implemented; Redis state for breaker not wired
---
### **PART 6: SEMANTIC CACHE**
- **CLAUDE.md spec**: Redis/Upstash, per-tenant near-hit on embedding bucket
- **Actual code**: No `lib/cache/semantic-cache.ts` found
- **Comment**: `llm/gateway.ts:3` notes "The breaker (Redis state) is Phase 5"
- **Status**: **MISSING** — deferred to Phase 5
- **Impact**: Every answer query hits the full retrieval pipeline; no cache bypass for repeat queries
---
### **PART 7: DATABASE SCHEMA & MIGRATIONS**
#### **Tables**
1. **context_item** (0001_init.sql:16–42)
- id (uuid), user_id, source, type, external_id
- title, author, url, source_created_at, source_updated_at
- status, metadata (jsonb), summary, summary_embedding (vector, unpopulated)
- raw (jsonb, episodic), is_deleted, created_at
- Unique: (user_id, source, external_id)
2. **context_chunk** (0001_init.sql:47–73) — hash-partitioned by user_id
- id, item_id, user_id, source, source_created_at, source_updated_at
- content, embedding (vector 1536), fts (tsvector, generated)
- Primary key: (user_id, id)
- 8 partitions (p0–p7)
3. **entity** (0001_init.sql:81–100)
- id, user_id, type ('person' | 'company' | 'project')
- name, email (canonical key), domain, aliases (text[]), metadata, created_at
4. **edge** (0001_init.sql:106–120)
- id, user_id, subject_id → entity(id), object_id → entity(id)
- relation, confidence, source_item → context_item(id), occurred_at, created_at
- Unique: (user_id, subject_id, relation, object_id, source_item)
5. **source_connection** (0004_source_connection.sql:6–14)
- user_id, source (PK), connected_account_id (Composio handle)
- status ('initiated' | 'active' | 'error'), metadata, created_at, updated_at
6. **sync_state** (0003_sync_state.sql:6–13)
- user_id, source (PK), last_successful_sync_at, cursor, updated_at
#### **Functions**
1. **hybrid_search()** (0002_hybrid_search.sql:10–57)
- Input: p_user_id, p_query_embedding, p_query_text, p_sources, p_after, p_time_basis, p_recency_weight, p_limit
- Output: chunk_id, item_id, content, score
- Two CTEs (vec + kw) fused by RRF, post-fusion recency weight
- EXACT KNN over filtered set (not HNSW scan at per-tenant scale)
2. **distinct_sources()** (0005_distinct_sources.sql:8–15)
- Input: p_user_id
- Output: distinct source (text)
- Used by search.ts to stratify broad-intent retrieval
3. **match_entity()** (0006_entity_resolution.sql:8–25)
- Input: p_user_id, p_type, p_name, p_threshold (default 0.45)
- Output: id, name, email, similarity
- Fuzzy name match via pg_trgm
4. **find_entities()** (0006_entity_resolution.sql:31–44)
- Input: p_user_id, p_name, p_threshold (default 0.4), p_limit (default 3)
- Output: id, name, type, similarity
- Used by graph-expand.ts stage 3
---
### **PART 8: SPEC vs. IMPLEMENTATION GAP ANALYSIS**
#### **IMPLEMENTED (Spec → Code)**
| CLAUDE.md Spec | Actual Code | File:Line | Status |
|---|---|---|---|
| Stage 1: Query understanding | planQuery() | lib/retrieval/plan.ts:21 | ✓ FULL |
| Stage 2: Hybrid retrieval (RRF + recency) | hybridSearch() | lib/retrieval/search.ts:76; supabase/migrations/0002_hybrid_search.sql:10 | ✓ FULL |
| Stage 3: Graph expansion | expandGraph() | lib/retrieval/graph-expand.ts:28 | ✓ FULL |
| Stage 5: Rollup + diversify | rollupToItems() + interleaveBySource() | lib/retrieval/rollup.ts:40, 14 | ✓ FULL |
| Stage 7: Assemble citations | assembleContext() | lib/retrieval/assemble.ts:13 | ✓ FULL |
| Stage 8: Grounded synthesis | synthesizeStream() + answer route | lib/retrieval/synthesize.ts:30; app/api/answer/route.ts | ✓ FULL |
| Ingestion: Normalize | normalizeItem() | lib/ingestion/normalize.ts:10 | ✓ FULL |
| Ingestion: Chunk | chunkText() | lib/ingestion/chunk.ts:10 | ✓ FULL |
| Ingestion: Enrich (provenance + gloss) | enrichChunk() | lib/ingestion/enrich.ts:47 | ✓ FULL |
| Ingestion: Embed | embedTexts() | lib/ingestion/embed.ts:33 | ✓ FULL |
| Ingestion: Upsert | ingestOne() | lib/ingestion/run.ts:24 | ✓ FULL |
| Ingestion: Triple extraction | extractTriples() | lib/graph/triple-extraction.ts:59 | ✓ FULL |
| Ingestion: Entity resolution | resolveEntity() + upsertEdge() | lib/graph/entity-resolution.ts:82, 126 | ✓ FULL |
| Ingestion: Slim (deletions) | reconcileDeletions() | lib/db/slim.ts:39 | ✓ FULL |
| Trigger.dev orchestration | ingestTask | trigger/ingest.ts:56 | ✓ FULL |
| Connector contract | Connector interface + registry | lib/connectors/types.ts:47; registry.ts | ✓ FULL |
| Connectors: Gmail, Calendar, Linear, Slack, Notion | 5 connectors + Composio integration | lib/connectors/gmail.ts, etc. | ✓ FULL |
| LLM gateway (retry wrapper) | chatModel() + withRetry() | lib/llm/gateway.ts | ✓ PARTIAL (retry only) |
| Database schema (context_item, context_chunk, entity, edge) | 6 migrations | supabase/migrations/0001–0006.sql | ✓ FULL |
| Hybrid_search SQL function | RRF + recency weight + time basis | supabase/migrations/0002_hybrid_search.sql:10 | ✓ FULL |
| Entity resolution RPC functions (match_entity, find_entities) | Two fuzzy-match RPC functions | supabase/migrations/0006_entity_resolution.sql | ✓ FULL |
| Partitioning (context_chunk by user_id) | 8 partitions (p0–p7) | supabase/migrations/0001_init.sql:60–67 | ✓ FULL |
| RLS (Row-Level Security) | 4 policies per table | supabase/migrations/0001_init.sql:130–142 | ✓ FULL |
#### **MISSING / STUBBED (Spec → Missing Code)**
| CLAUDE.md Spec | Expected File | Actual | Status | Notes |
|---|---|---|---|---|
| Stage 0: Semantic cache (Redis near-hit) | lib/cache/semantic-cache.ts | NOT FOUND | ✗ MISSING | Cache check before full pipeline. Deferred to Phase 5. Every query runs full retrieval. |
| Stage 4: Reranking (Cohere 3.5) | lib/retrieval/rerank.ts | NOT FOUND | ✗ MISSING | Post-RRF cross-encoder stage. Deferred to Phase 5. |
| Stage 6: Retrieval rail (semantic distance filter) | lib/retrieval/rail.ts or in rollup.ts | PARTIAL | ✗ MISSING | Item count is capped (MAX_ITEMS=12), but no semantic distance threshold drop. |
| Ingestion: Summary + summary_embedding | lib/ingestion/summarize.ts | NOT FOUND | ✗ MISSING | Two-tier index designed in schema (context_item.summary, summary_embedding) but never populated. Deferred. |
| LLM gateway: Circuit breaker (Redis state) | lib/cache/circuit-breaker.ts or in gateway.ts | NOT FOUND | ✗ MISSING | Retry wrapper exists, but no Redis circuit-breaker state, no model fallback chain. Deferred to Phase 5. |
| LLM gateway: Model fallback chain | lib/llm/gateway.ts | PARTIAL | ✗ MISSING | FALLBACK_MODEL constant defined, but not actually used in fallback chain logic. Deferred. |
| Observability: Error tracking / alerts | lib/observability/*.ts | PARTIAL | ✗ MISSING | Langfuse tracing exists, but no alert/error aggregation. |
| Connectors: GitHub, Sentry, Drive, Voice memo | lib/connectors/github.ts, etc. | NOT FOUND | ✗ MISSING | In spec but not implemented. Deferred to later phases. |
---
### **PART 9: CRITICAL DATA FLOW GAPS**
#### **End-to-End Flows That Work**
1. **Ingestion end-to-end**: Source → Connector.load/poll → normalize → chunk → enrich → embed → upsert context_item/context_chunk → extract triples → resolve entities → upsert edges ✓
2. **Retrieval end-to-end**: Question → plan → embed → hybrid_search → graph_expand → rollup → assemble → synthesize ✓
3. **Deletion tracking**: Connector.slim → reconcileDeletions → flip is_deleted ✓
#### **Flows with Missing Stages**
1. **Answer caching**: Query → **[CACHE MISS]** → full retrieval pipeline (no semantic cache)
2. **Reranking**: hybrid_search results → **[RERANK MISSING]** → directly to rollup
3. **Semantic rail**: rollup → **[NO RAIL]** → directly to assemble
4. **Summary retrieval**: No code path uses context_item.summary or summary_embedding
5. **Graceful degradation**: If synthesis fails, answer route returns 502 (line 50); spec says "return cited context with 'summary temporarily unavailable'" (not implemented)
6. **Breaker fallback**: If OpenRouter fails, no fallback to FALLBACK_MODEL; just retries (3 max) then fails
---
### **PART 10: EXTERNAL SERVICE INTEGRATIONS**
| Service | Purpose | Where Called | Status |
|---|---|---|---|
| OpenRouter | LLM (chat, structured output) | lib/llm/gateway.ts; plan, enrich-gloss, triple-extraction, synthesize | ✓ WIRED |
| Supabase | PostgreSQL + pgvector + auth | lib/db/supabase.ts (createServiceClient, createAnonClient) | ✓ WIRED |
| Composio | OAuth + connector data fetch | lib/connectors/composio.ts → executeTool() | ✓ WIRED |
| Trigger.dev | Ingestion job orchestration | trigger/ingest.ts (task), lib/ingestion/enqueue.ts (trigger) | ✓ WIRED |
| OpenAI embedding | text-embedding-3-large (1536 dims) | lib/ingestion/embed.ts | ✓ WIRED |
| Cohere Rerank | Post-RRF reranking | **NOT FOUND** | ✗ MISSING |
| Deepgram | Speech-to-text (batch + streaming) | **NOT FOUND** (spec says Phase 7) | ✗ MISSING |
| Upstash Redis | Semantic cache + circuit breaker | **NOT FOUND** | ✗ MISSING |
| Supermemory | Layer 3 personalization | **NOT FOUND** (spec says Phase 4) | ✗ MISSING |
| Langfuse | Observability / tracing | lib/observability/langfuse.ts (initTracing, flushTracing, aiTelemetry) | ✓ WIRED |
---
### **PART 11: CRITICAL CODE REFERENCES BY COMPONENT**
```
RETRIEVAL PIPELINE:
Entry: app/api/answer/route.ts:22 (POST /api/answer)
Orchestration: lib/retrieval/pipeline.ts:21 (retrieve)
Stage 1: lib/retrieval/plan.ts:47 (planQuery)
Stage 2: lib/retrieval/search.ts:76 (hybridSearch)
↳ hybrid_search SQL: supabase/migrations/0002_hybrid_search.sql:10
Stage 3: lib/retrieval/graph-expand.ts:28 (expandGraph)
↳ find_entities RPC: supabase/migrations/0006_entity_resolution.sql:31
Stage 5: lib/retrieval/rollup.ts:40 (rollupToItems)
Stage 7: lib/retrieval/assemble.ts:13 (assembleContext)
Stage 8: lib/retrieval/synthesize.ts:30 (synthesizeStream)
INGESTION PIPELINE:
Entry: trigger/ingest.ts:56 (ingestTask)
Orchestration: lib/ingestion/run.ts:90 (ingestItems)
Per-item: lib/ingestion/run.ts:24 (ingestOne)
→ Normalize: lib/ingestion/normalize.ts:10
→ Chunk: lib/ingestion/chunk.ts:10
→ Enrich: lib/ingestion/enrich.ts:47
→ Embed: lib/ingestion/embed.ts:33
→ Upsert: lib/ingestion/run.ts:64–71
→ Extract: lib/graph/triple-extraction.ts:59
→ Resolve: lib/graph/entity-resolution.ts:178
Deletion: lib/db/slim.ts:39 (reconcileDeletions)
Enqueue: lib/ingestion/enqueue.ts:12 (enqueueIngest)
CONNECTORS:
Registry: lib/connectors/registry.ts:19 (getConnector)
Gmail: lib/connectors/gmail.ts:70
Calendar: lib/connectors/calendar.ts
Linear: lib/connectors/linear.ts
Slack: lib/connectors/slack.ts
Notion: lib/connectors/notion.ts
GRAPH LAYER:
Triple extraction: lib/graph/triple-extraction.ts:59
Entity resolution: lib/graph/entity-resolution.ts:82
match_entity RPC: supabase/migrations/0006_entity_resolution.sql:8
find_entities RPC: supabase/migrations/0006_entity_resolution.sql:31
LLM GATEWAY:
Chat model: lib/llm/gateway.ts:27
Retry wrapper: lib/llm/gateway.ts:32
Telemetry: lib/observability/langfuse.ts
DATABASE:
Supabase client: lib/db/supabase.ts:14 (createAnonClient), 25 (createServiceClient)
Sync state: lib/db/sync-state.ts:11 (getSyncState), 29 (setSyncState)
Slim reconcile: lib/db/slim.ts:39 (reconcileDeletions)
Types (generated): lib/db/types.ts
Schema migrations: supabase/migrations/0001–0006.sql
```
---
### **SUMMARY: IMPLEMENTATION COMPLETENESS**
**Retrieval Pipeline (Stages 0–8):**
- ✓ Stages 1, 2, 3, 5, 7, 8 fully implemented
- ✗ Stage 0 (semantic cache) missing
- ✗ Stage 4 (reranker) missing
- ✗ Stage 6 (retrieval rail semantic distance) missing
**Ingestion Pipeline (Steps 1–11):**
- ✓ Steps 1–10 fully implemented
- ✗ Step 7–8 (summary generation + summary_embedding) missing
**Connectors:**
- ✓ 5 of 8 sources (Gmail, Calendar, Linear, Slack, Notion)
- ✗ 3 missing (GitHub, Sentry, Drive, Voice memo)
**LLM Gateway:**
- ✓ Retry wrapper implemented
- ✗ Circuit breaker missing
- ✗ Fallback chain stubbed but not wired
**Cache & Resilience:**
- ✗ Semantic cache (Redis) missing
- ✗ Circuit breaker missing
- ✗ Graceful degradation (return context on synthesis failure) missing
**Data Flowing End-to-End:**
- **Ingestion → Storage**: ✓ Full flow works (sources to Postgres context_item + context_chunk + entity + edge)
- **Storage → Retrieval**: ✓ Full flow works (hybrid_search + graph + assemble to answer)
- **Caching → Fast path**: ✗ No semantic cache, every query is full retrieval
- **Reranking → Better results**: ✗ No reranker stage
- **Graceful failure → Partial answer**: ✗ No fallback when synthesis fails
---
### **RECOMMENDED GAPS TO FILL**
1. **Semantic cache** (`lib/cache/semantic-cache.ts`): Check Redis on question embedding bucket → return cached answer if near-hit, skip full pipeline
2. **Reranker** (`lib/retrieval/rerank.ts`): Cohere Rerank 3.5 call after hybrid_search, before rollup
3. **Retrieval rail** (`lib/retrieval/rail.ts` or in rollup.ts): Drop items below semantic distance threshold, cap final item count
4. **Summary generation** (`lib/ingestion/summarize.ts`): Generate doc-level summary + embedding for long items during ingestion (populate context_item.summary and summary_embedding)
5. **LLM fallback chain** (enhance `lib/llm/gateway.ts`): Wire FALLBACK_MODEL into retry logic; add circuit breaker with Redis state
6. **Graceful degradation**: If synthesis fails, return assembled context with "Summary temporarily unavailable" banner instead of 502
7. **Remaining connectors**: GitHub, Sentry, Drive, Voice memo
8. **Deepgram integration**: Audio transcription for voice memos / raw recordings (Phase 7)
9. **Supermemory integration**: Layer 3 personalization (Phase 4)
This architecture is **production-ready on its core path** (ingestion → storage → retrieval → synthesis), with **resilience and optimization layers deferred to later phases**.Perfect! Now I have a comprehensive view. Let me create a detailed summary report:
## Comprehensive Test Setup & Coverage Report
### 1. **Test Configuration** (vitest.config.ts)
**File:** `/Users/venusbhatia/Downloads/Zrux/vitest.config.ts`
**Config Details:**
- **Environment:** `node`
- **Include patterns:** `lib/**/*.test.ts`, `app/**/*.test.ts`
- **Exclude patterns:** `**/node_modules/**`, `**/.next/**`, `dist`
- **Test env vars provided:**
- `OPENROUTER_API_KEY: 'test-key'`
- `OPENAI_API_KEY: 'test-key'`
- `NEXT_PUBLIC_SUPABASE_URL: 'http://localhost'`
- `NEXT_PUBLIC_SUPABASE_ANON_KEY: 'test-key'`
- `SUPABASE_SERVICE_ROLE_KEY: 'test-key'`
*Purpose of env vars:* Dummy keys are provided so modules that read env at import time (e.g., LLM gateway) load in pure-logic unit tests. Tests never call out; real keys come from .env.local.
**Test command:** `pnpm test` runs `vitest run` (22 tests passing, 2 import-blocked test files)
---
### 2. **Testing Patterns Used Across Existing Tests**
All tests follow a consistent idiom. Here's a detailed breakdown by category:
#### **A. Connector Tests (Composio-based)**
Files: `lib/connectors/slack.test.ts`, `gmail.test.ts`, `notion.test.ts`
**Mocking pattern (lines 1-6 of each):**
```typescript
import { describe, it, expect, vi, beforeEach } from 'vitest'
const executeTool = vi.fn()
vi.mock('./composio', () => ({ executeTool: (...args: unknown[]) => executeTool(...args) }))
import { [connector] } from './[module]'
```
**Key pattern details:**
- **Mock strategy:** `vi.mock()` called at **module scope BEFORE the component import** (hoisting)
- **Mock target:** The `./composio` module's `executeTool` function (the I/O boundary)
- **Mock reset:** `beforeEach(() => executeTool.mockReset())` clears call history
- **Response shape:** `.mockResolvedValueOnce()` chains sequential responses for pagination/retries
- **Assertion style:**
- `expect(items).toHaveLength(n)`
- `expect(items[0]).toMatchObject({...})`
- `expect(items[0]!.body).toBe('...')`
- `expect(executeTool).toHaveBeenCalledTimes(n)`
- `expect(call[0]).toBe('TOOL_SLUG')`
**Helper:** `async function collect<T>(it: AsyncIterable<T>): Promise<T[]>` collects async iterable results
**Tests cover:**
- `/slack.test.ts:17-46` — walks member channels, filters join/leave noise
- `/slack.test.ts:48-62` — webhook event mapping
- `/gmail.test.ts:18-50` — pagination + RawItem mapping
- `/gmail.test.ts:52-60` — 90-day query window verification
- `/notion.test.ts:17-59` — title extraction, paging cutoff, markdown fetch
- `/notion.test.ts:61-80` — fallback when markdown fetch fails
---
#### **B. Pure Logic Tests (No Mocking)**
Files: `lib/ingestion/chunk.test.ts`, `lib/graph/entity-resolution.test.ts`, `lib/graph/triple-extraction.test.ts`, `lib/retrieval/rollup.test.ts`, `lib/retrieval/assemble.test.ts`, `lib/webhooks/slack.test.ts`
**Pattern:**
```typescript
import { describe, it, expect } from 'vitest'
import { [function] } from './[module]'
```
**Assertion styles:**
- **Equality:** `expect(result).toEqual([...])`
- **Matchers:** `toMatchObject({...})`, `toContain(...)`, `toBeLessThanOrEqual(n)`, `toHaveLength(n)`
- **Booleans:** `expect(fn()).toBe(true/false)`
- **Set membership:** `expect(new Set(...).size).toBe(n)`
**Examples:**
- `/chunk.test.ts:5-26` — `chunkText()` splits on paragraph boundaries, respects MAX_CHARS
- `/entity-resolution.test.ts:5-8` — `normalizeName()` trims/collapses whitespace, preserves casing
- `/entity-resolution.test.ts:10-12` — case-insensitive rejection of placeholders
- `/triple-extraction.test.ts:5-20` — `shouldExtract()` gates on source/type
- `/triple-extraction.test.ts:23-34` — `isNamedEntity()` rejects junk names
- `/rollup.test.ts:22-50` — `interleaveBySource()` round-robins across sources (anti-monopoly)
- `/assemble.test.ts:23-44` — `assembleContext()` numbers citations, embeds dates
- `/webhooks/slack.test.ts:16-42` — `verifySlackSignature()` validates HMAC, checks freshness
**Test helper:** `/webhooks/slack.test.ts:7-9` defines a `sign()` function using `node:crypto`
---
### 3. **Test Coverage Map: Source Files by Module**
**Legend:** ✓ = HAS TESTS | ✗ = NO TESTS | ! = IMPORT-BLOCKED TEST
#### **lib/connectors/**
| File | Status | Type |
|------|--------|------|
| `slack.ts` | ✓ | Async iterable connector; mocked Composio |
| `gmail.ts` | ✓ | Async iterable connector; mocked Composio |
| `notion.ts` | ✓ | Async iterable connector; mocked Composio |
| `calendar.ts` | ✗ | Async iterable connector; NOT TESTED (testable) |
| `linear.ts` | ✗ | Async iterable connector; NOT TESTED (testable) |
| `composio.ts` | ✗ | I/O wrapper around @composio/core (thin, hard to test) |
| `registry.ts` | ✗ | Lookup table; TESTABLE but trivial |
| `types.ts` | ✗ | Type definitions only |
| `util.ts` | ✗ | Pure logic: `warnOnUndercollection()` (testable) |
#### **lib/ingestion/**
| File | Status | Type |
|------|--------|------|
| `chunk.ts` | ✓ | Pure logic: `chunkText()` (paragraph-aware splitting) |
| `enrich.ts` | ✗ | Mixed: `isStructured()` pure, `provenanceLine()` pure, `enrichChunk()` I/O (LLM gloss call, best-effort) |
| `normalize.ts` | ✗ | Pure logic: `normalizeItem()` (object mapping) |
| `embed.ts` | ✗ | I/O wrapper: `embedText()`, `embedTexts()` call OpenAI; pure helper: `toVectorLiteral()` |
| `enqueue.ts` | ✗ | I/O wrapper: Trigger.dev task enqueue (no-ops if unconfigured) |
| `run.ts` | ✗ | Orchestration: `ingestOne()`, `ingestStream()` (DB writes + LLM calls) |
#### **lib/graph/**
| File | Status | Type |
|------|--------|------|
| `entity-resolution.test.ts` | ! | IMPORT-BLOCKED: `normalizeName()` test present but fails due to @langfuse/otel import in entity-resolution.ts |
| `entity-resolution.ts` | ! | Mixed: pure logic (`normalizeName()`, `cleanEmail()`) + DB I/O (`fuzzyMatchId()`, `insertEntity()`, `extractAndResolve()`) |
| `triple-extraction.test.ts` | ! | IMPORT-BLOCKED: `shouldExtract()`, `isNamedEntity()` tests present but fail |
| `triple-extraction.ts` | ! | Mixed: pure logic (`isNamedEntity()`, `shouldExtract()`) + I/O (`extractTriples()` calls LLM) |
**Note:** Graph tests fail because `entity-resolution.ts` imports `triple-extraction.ts` which imports from `../llm/gateway`, which imports `@langfuse/otel` not available in test environment. Tests themselves are sound.
#### **lib/retrieval/**
| File | Status | Type |
|------|--------|------|
| `rollup.ts` | ✓ | `interleaveBySource()` pure logic (round-robin); `rollupToItems()` I/O (DB hydration) |
| `assemble.ts` | ✓ | `assembleContext()` pure logic (citation assembly); no I/O |
| `search.ts` | ✗ | I/O: `userSources()`, `mergeHits()` (pure), hybrid search query (mocked or integration-test only) |
| `synthesize.ts` | ✗ | I/O: `synthesizeStream()` calls LLM; pure: `isThin()` |
| `plan.ts` | ✗ | I/O: `planQuery()` calls generateObject (LLM) |
| `graph-expand.ts` | ✗ | I/O: `expandGraph()` DB queries only |
| `pipeline.ts` | ✗ | Orchestration wrapper |
| `types.ts` | ✗ | Type definitions only |
#### **lib/llm/**
| File | Status | Type |
|------|--------|------|
| `gateway.ts` | ✗ | I/O wrapper: `chatModel()` returns model instance; `withRetry()` pure logic (testable) |
#### **lib/db/**
| File | Status | Type |
|------|--------|------|
| `supabase.ts` | ✗ | I/O wrapper: `createAnonClient()`, `createServiceClient()` (thin, checked for window at runtime) |
| `sync-state.ts` | ✗ | I/O: `getSyncState()`, `setSyncState()` (DB-only, hard to unit-test without mocking) |
| `slim.ts` | ✗ | I/O: `[undetermined]` |
| `types.ts` | ✗ | Generated from Supabase (read-only) |
#### **lib/auth/**
| File | Status | Type |
|------|--------|------|
| `options.ts` | ✗ | NextAuth config (hard to test in isolation) |
| `session.ts` | ✗ | Session utilities (depends on NextAuth context) |
| `tenant.ts` | ✗ | Tenant/user context (thin wrapper) |
#### **lib/observability/**
| File | Status | Type |
|------|--------|------|
| `langfuse.ts` | ✗ | I/O: tracing setup (best-effort, opt-in if env vars present) |
#### **app/** (API routes & pages)
| File | Status | Type |
|------|--------|------|
| All route handlers | ✗ | I/O: route handlers (integration-test only, hard to unit-test without framework harness) |
---
### 4. **Untested Files Categorized by Testability**
#### **Good Unit-Test Candidates (Pure Logic, No I/O)**
- `lib/connectors/util.ts` — `warnOnUndercollection()` (pure; single responsibility)
- `lib/connectors/registry.ts` — `getConnector()`, `connectableSources()`, `isConnectable()` (lookup table)
- `lib/ingestion/normalize.ts` — `normalizeItem()` (object mapping, testable)
- `lib/ingestion/enrich.ts` — `isStructured()`, `provenanceLine()` (pure helpers; can mock `generateText` for `enrichChunk()`)
- `lib/llm/gateway.ts` — `withRetry()` (pure retry logic; can test with mock rejections)
- `lib/retrieval/synthesize.ts` — `isThin()` (pure predicate on AssembledContext)
#### **Hard to Unit-Test (I/O Wrappers, Thin Adapters)**
- `lib/connectors/composio.ts` — OAuth/fetch wrapper (integration-test only)
- `lib/connectors/calendar.ts`, `linear.ts` — connector stubs (same as Slack/Gmail pattern but untested; testable with same mocking)
- `lib/ingestion/embed.ts` — calls OpenAI embedding service (would need OpenAI mock at module scope)
- `lib/ingestion/enqueue.ts` — calls Trigger.dev (would need SDK mock)
- `lib/ingestion/run.ts` — orchestrates DB writes + LLM + embeddings (integration-test candidate)
- `lib/graph/entity-resolution.ts` — DB calls + LLM extraction (DB mock + extraction mock needed)
- `lib/graph/triple-extraction.ts` — LLM extraction (would need LLM mock)
- `lib/retrieval/search.ts` — DB queries (integration-test only)
- `lib/retrieval/plan.ts`, `graph-expand.ts`, `pipeline.ts` — all I/O orchestration
- `lib/db/supabase.ts` — Supabase client factory (no logic)
- `lib/db/sync-state.ts` — DB access (integration-test only)
- `lib/auth/*.ts` — NextAuth integration (needs framework harness)
- `app/**` — route handlers (Next.js integration-test only)
---
### 5. **Shared Test Utilities & No Global Setup Files**
**Finding:** No global test setup file (no `vitest.setup.ts` or `setup.ts` referenced).
**Shared patterns across tests:**
1. **Mock hoisting:** `vi.mock()` at module scope, before imports
2. **AsyncIterable helper:** Each connector test defines `collect<T>()` locally (not shared)
3. **Test data builders:** `/rollup.test.ts:5-19` defines `item()` factory; `/assemble.test.ts:5-20` defines `item()` factory with overrides
4. **Crypto helper:** `/webhooks/slack.test.ts:7-9` defines `sign()` helper (uses `node:crypto`)
**Recommendation:** Could extract shared helpers to `lib/test-utils.ts` (helpers for collect, item builders, etc.) but not currently needed—test files are small.
---
### 6. **Why Graph Tests Are Import-Blocked (Not Execution-Blocked)**
**Problem chain:**
- `lib/graph/entity-resolution.test.ts` imports `normalizeName` from `./entity-resolution`
- `entity-resolution.ts` (line 9) imports `{ extractTriples, shouldExtract } from './triple-extraction'`
- `triple-extraction.ts` (line 9) imports `{ chatModel, FALLBACK_MODEL, withRetry } from '../llm/gateway'`
- `gateway.ts` (line 6-7) imports from `'@ai-sdk/openai'` (present in node_modules)
- BUT: `lib/observability/langfuse.ts` (imported transitively) imports `LangfuseSpanProcessor` from `'@langfuse/otel'` (NOT in devDependencies; only `@langfuse/tracing` is)
**Solution:** The tests themselves are correct. Either:
1. Add `@langfuse/otel` to devDependencies, OR
2. Mock `../observability/langfuse` in test setup to avoid the import, OR
3. Move `@langfuse/otel` import behind a lazy-load guard in langfuse.ts
The test code at lines 1-36 of both graph test files is **sound and follows correct patterns**—it's just not executed due to the import failure at module-load time.
---
### Summary Table
| Metric | Value |
|--------|-------|
| **Total test files** | 9 (7 runnable, 2 import-blocked) |
| **Total tests passing** | 21 |
| **Total source files in lib/** | 35 |
| **Source files with tests** | 7 |
| **Untested files (lib/app)** | 28 |
| **Testable-but-untested (pure logic)** | 6 |
| **Connector tests** | 3 (Slack, Gmail, Notion) |
| **Pure logic tests** | 5 (Chunk, Entity-resolution*, Triple-extraction*, Rollup, Assemble, Slack webhook) |
| **Mocking approach** | `vi.mock()` hoisted module-scope + `vi.fn()` spies |
| **Assertion style** | Vitest expect; matchers like toMatchObject, toContain, toHaveLength |
| **Shared test setup** | None (no setup files) |
| **Test env vars** | 5 dummy keys for modules reading env at import |
---
**Key Takeaway for Writing Consistent Tests:**
When adding tests:
1. **For connectors:** Use `vi.mock('./composio')` hoisting pattern; mock `executeTool` with sequential `.mockResolvedValueOnce()`; use `collect()` helper for AsyncIterable; assert with `toMatchObject()` and `.mock.calls`
2. **For pure logic:** Import directly, no mocks; use equality/matcher assertions; build test data inline with simple factories
3. **For I/O functions:** Either skip unit tests (integration-test instead) or mock the I/O boundary (Supabase, LLM, Composio) at module scope before the import
4. **Env vars:** Already provided by vitest.config.ts; tests never call out to real servicesExcellent, I have a thorough picture now. Let me verify a few critical claims directly — the test suite state, the OAuth 404 gap, and the langfuse import that allegedly blocks two test files.
pnpm test 2>&1 | tail -40echo "=== app dir tree ===" && find app -type d && echo "=== langfuse otel in deps? ===" && grep -n "langfuse" package.json && echo "=== langfuse.ts head ===" && sed -n '1,40p' lib/observability/langfuse.tsConfirmed the key facts. Two test files (entity-resolution.test.ts, triple-extraction.test.ts) fail to load — vitest can't resolve @langfuse/otel pulled in transitively through the LLM gateway, so 21 tests pass but those two suites collect 0 tests. And the app/ tree confirms there is no /onboarding, /today, /relationships, or /search page — yet the OAuth callback redirects to /onboarding (a 404) and middleware gates routes that don't exist.
Let me read the key UI files directly to ground the plan.
Let me check the connector registry (for the connect UI) and confirm there's no "list connected sources" endpoint.
I now have a complete, verified picture. Let me record the architecture-to-UI map and findings in the plan file, then confirm scope with you.
Now let me confirm scope with you on the two decisions that materially change the work.
You chose the full UI build (onboarding, relationships, today, search) and pure + orchestration tests. The Today and Search screens need new backend endpoints, so let me have a Plan agent design the concrete implementation — reusing existing retrieval/DB seams — and pin down the test list.
I have a complete, grounded picture of the codebase. Here is the full implementation plan.
---
# zrux: Make the UI Functional + Add Unit Tests — Implementation Plan
## Key environment facts (verified, govern every step)
- Next.js `^14.2.0` App Router. Dynamic route params are a **plain sync object**, not a Promise — confirmed at `app/api/connect/[source]/route.ts:15` (`{ params }: { params: { source: string } }`). Copy this signature exactly for any new dynamic route.
- AI SDK `ai@^4.3.0`. `streamText().toTextStreamResponse()` is the streaming idiom (`lib/retrieval/synthesize.ts:36`, `app/api/answer/route.ts:107`). `LanguageModelV1` is the model type (`lib/llm/gateway.ts:2`).
- Test runner: `vitest@^2.0.0`, `node` env, includes `lib/**/*.test.ts` and `app/**/*.test.ts` (`vitest.config.ts:6`). Dummy env vars (OpenRouter/Supabase keys) injected at `vitest.config.ts:10-16` so import-time `requireEnv` calls don't throw.
- Auth idiom (copy verbatim into every new route): `getUserId(req)` then `if (err instanceof UnauthorizedError) return new Response('Unauthorized', { status: 401 })` then `throw err` (`app/api/graph/route.ts:18-23`).
- DB access: `createServiceClient()` from `@/lib/db/supabase` — service role, **must** scope every query by `user_id` first (`lib/db/supabase.ts:25-34`). Never importable in a client component (throws if `window` exists, `:26`).
- Styling: inline-style + CSS vars `--accent #0071e3`, `--muted #6e6e73`, `--bg #f5f5f7`, `--text #1d1d1f` (`app/globals.css:1-6`). The Ask page (`app/ask/page.tsx`) is the canonical visual reference for new pages.
- No semicolons, named exports for libs, `export default function` for pages/route handlers stay as-is.
---
## PART 1 — Backend endpoints (build first; UI depends on them)
### 1.1 `GET /api/sources` — new file `app/api/sources/route.ts`
**Purpose:** list every connectable source merged with this user's connection status, so onboarding can render Connect buttons.
- `export const runtime = 'nodejs'`.
- Auth block copied from `app/api/graph/route.ts:18-23`.
- Reuse `connectableSources()` (`lib/connectors/registry.ts:25`) for the full universe (gmail, calendar, linear, slack, notion).
- Query: `createServiceClient().from('source_connection').select('source, status').eq('user_id', userId)`. Columns confirmed present: `source`, `status` (`lib/db/types.ts:154-168`).
- Merge: build a `Map<source, status>` from rows, then map over `connectableSources()` producing `{ source, status: map.get(source) ?? 'not_connected' }`.
- **Response shape:** `Response.json({ sources: [{ source: string, status: 'not_connected'|'initiated'|'active'|'error' }] })`.
- Error: wrap the DB call in try/catch, `console.error(...)`, `return new Response('Failed to load sources', { status: 500 })` (mirror `app/api/graph/route.ts:56-58`).
### 1.2 `GET /api/today` — new file `app/api/today/route.ts`
**Purpose:** show "data we have" — recent items grouped by source. Simplest design that proves data flows; **does not** call the LLM (avoids cost/latency for a dashboard).
- `runtime = 'nodejs'`, same auth block.
- Single query (exact):
```
createServiceClient()
.from('context_item')
.select('id, source, type, title, author, url, source_updated_at, status')
.eq('user_id', userId)
.eq('is_deleted', false)
.order('source_updated_at', { ascending: false })
.limit(50)
```
All columns confirmed at `lib/db/types.ts:19-37`.
- Compute `counts`: reduce rows into `Record<source, number>` server-side (cheap; avoids a second `distinct_sources` rpc).
- **Response shape:** `Response.json({ items: Row[], counts: { [source]: number } })`.
- Same 500 error idiom.
- Note for implementer: this `limit(50)` is the recent slice, while `counts` reflects only those 50 rows. If true totals are wanted, add a `count: 'exact', head: true` query per source — **not** required for the approved scope; flag as optional. Reusing `daily_briefing` synthesis is explicitly **not** recommended here (extra LLM call, slower, duplicates `/ask`).
### 1.3 `GET /api/search?q=` — new file `app/api/search/route.ts` + thin helper in `pipeline.ts`
**Purpose:** ranked items for a query, **no synthesis**.
**Reuse seam (exact):** the first three stages of `retrieve()` (`lib/retrieval/pipeline.ts:22-33`):
1. `planQuery(question)` — `lib/retrieval/plan.ts:47`
2. `embedText(plan.semantic_query || question)` — imported in pipeline at `lib/retrieval/pipeline.ts:5`
3. `hybridSearch(userId, plan, queryEmbedding)` — `lib/retrieval/search.ts:76`
4. `rollupToItems(userId, hits, { diversify })` — `lib/retrieval/rollup.ts:40`
Then **stop** (skip `expandGraph` + `assembleContext` + `synthesizeStream`). Return `RolledItem[]` (`lib/retrieval/types.ts:38-50`).
**Recommendation: add a thin `searchItems` helper to `lib/retrieval/pipeline.ts`** (named export, alongside `retrieve`). This is the right home because pipeline.ts already imports all four functions, so no new import surface and no refactor of `retrieve` (low risk — `retrieve` is untouched). Signature:
```
export async function searchItems(userId: string, question: string): Promise<RolledItem[]>
```
Body: `planQuery` → `embedText` → `hybridSearch` → `rollupToItems({ diversify })`. Drop the graph branch entirely. This keeps the route handler thin and makes the orchestration unit-testable by mocking the four stage modules (see Part 5.3).
**Route handler `app/api/search/route.ts`:**
- `runtime = 'nodejs'`, same auth block.
- Read `q`: `const q = new URL(req.url).searchParams.get('q')?.trim()`. If empty → `Response.json({ items: [] })` (or 400 `'Missing "q"'` to mirror answer route's missing-question handling at `app/api/answer/route.ts:36`; recommend the empty-array 200 so the page renders cleanly on first load).
- `const items = await searchItems(userId, q)`.
- **Response shape:** `Response.json({ items: RolledItem[] })`.
- 500 error idiom on catch.
**Risk flag:** `RolledItem.best_content` (`lib/retrieval/types.ts:48`) carries the enrichment provenance prefix (`[Source: ...] [date]: ...`, from `lib/ingestion/enrich.ts:58`). The search UI should render `title`/`source`/`score` prominently and treat `best_content` as a secondary snippet. No backend change needed; just a UI note.
### 1.4 OAuth callback redirect path — decision (no code change required)
`app/api/oauth/callback/route.ts:32,53` redirects to `/onboarding?error=1` and `/onboarding?connected=1`. **Decision: put the page at `/onboarding`** so the existing callback works untouched. Do **not** also expose `/sources` — a second route is dead weight and the callback only knows `/onboarding`. (If a `/sources` alias is later desired, make it a re-export, but it is out of scope.)
---
## PART 2 — UI pages + shell
### 2.1 Shared nav — `app/layout.tsx` (edit)
**Recommendation: one shared nav in the root layout**, not per-page. Reasons: every gated page needs the same links; per-page duplication drifts; layout is the single mount point.
- Add a small server component `Nav` (can live inline in `app/layout.tsx` or as `app/_components/Nav.tsx`). Plain `<nav>` with `next/link` `<Link>` to `/` (Home), `/ask`, `/today`, `/onboarding` (label "Sources"), `/relationships`, `/search`.
- Render `<Nav />` above `{children}` inside `<body>` (`app/layout.tsx:12`).
- Style with the existing tokens (`--accent`, `--muted`); keep it a thin top bar consistent with the Ask page header.
- No semicolons; `export default function RootLayout` stays.
### 2.2 Home shell — `app/page.tsx` (replace)
- Replace the Phase-0 skeleton (`app/page.tsx:1-21`) with a real landing: product name, one-line description, and a card/button grid linking to Ask, Today, Sources, Relationships, Search via `next/link`.
- Keep it a server component (no `'use client'`). Reuse the inline-style + CSS-var aesthetic already in the file.
### 2.3 Onboarding/Sources — new `app/onboarding/page.tsx` (client)
- `'use client'`.
- On mount (`useEffect`): `fetch('/api/sources')` → render each `{ source, status }`.
- Each not-`active` source shows a **Connect** button:
```
const res = await fetch(`/api/connect/${source}`, { method: 'POST' })
const { redirectUrl } = await res.json()
window.location.href = redirectUrl
```
`/api/connect/[source]` already returns `{ redirectUrl, connectedAccountId }` (`app/api/connect/[source]/route.ts:49`).
- Read query params with `useSearchParams()`: `?connected=1` → green "Connected! Importing your data..." banner; `?error=1` → red error banner. These are exactly what the callback sets (`app/api/oauth/callback/route.ts:32,53`). Wrap the `useSearchParams` consumer in `<Suspense>` (Next 14 requirement).
- Status pills: `active` = green check, `initiated` = "Connecting…", `error` = red, `not_connected` = Connect button.
- Visual: copy the rounded-card/pill styles from `app/ask/page.tsx:101-112`.
### 2.4 Relationships — new `app/relationships/page.tsx` (client)
- `'use client'`. On mount `fetch('/api/graph')` (endpoint already complete, `app/api/graph/route.ts`).
- Response is `{ entities: [{id,type,name,email,domain,aliases}], edges: [{id, relation, confidence, from:{id,name}, to:{id,name}, occurred_at, source_item}] }` (shapes at `app/api/graph/route.ts:45-55`).
- Render two sections (no graph viz needed): **Entities** grouped by `type` (person/company/project) as lists; **Relationships** as a table of `from.name — relation — to.name` with confidence. Use `from.name`/`to.name` directly (already resolved server-side at `:51-52`).
- Empty state: "No relationships yet. Connect sources and let ingestion run."
### 2.5 Today — new `app/today/page.tsx` (client)
- `'use client'`. On mount `fetch('/api/today')` → `{ items, counts }`.
- Top row: per-source count chips from `counts`. Below: list the recent `items` (title or `(type)` fallback, source, `source_updated_at` sliced to `YYYY-MM-DD`, link to `url` when present — mirror the citation rendering at `app/ask/page.tsx:184-190`).
- Empty state mirrors Relationships.
### 2.6 Search — new `app/search/page.tsx` (client)
- `'use client'`, controlled input + form like `app/ask/page.tsx:115-151` (reuse the input/button styling).
- On submit: `fetch('/api/search?q=' + encodeURIComponent(q))` → `{ items }`.
- Render each `RolledItem`: `title ?? type`, `source`, `score` (e.g. `score.toFixed(2)`), date from `source_updated_at`, link to `url`, and a truncated `best_content` snippet (note the provenance prefix from Part 1.3).
- Loading/error states copied from `app/ask/page.tsx:48-49,79-84,153-155`.
---
## PART 3 — middleware
`middleware.ts:8` matcher currently gates `/ask`, `/today`, `/relationships`, `/search`. **Add `/onboarding`** so the sources page is auth-gated like the rest:
```
matcher: ['/ask/:path*', '/today/:path*', '/relationships/:path*', '/search/:path*', '/onboarding/:path*']
```
Do **not** add `/sources` (not used). Do **not** gate `/api/*` — API routes self-check via `getUserId` (`middleware.ts:1-4` documents this; the new routes follow it). Home `/` stays public.
**Risk flag (dev):** `getUserId` dev fallback (`lib/auth/session.ts:21-27`) reads `x-zrux-user-id` header or `DEV_USER_ID` env. Client `fetch` from these pages **cannot** set that header, so local testing relies on `DEV_USER_ID` being set in env, OR a real NextAuth session. Note this in the handoff; no code change needed.
---
## PART 4 — Fix the 2 broken test suites
**Root cause (confirmed):** `lib/graph/entity-resolution.test.ts` imports `./entity-resolution`, which imports `./triple-extraction` (`lib/graph/entity-resolution.ts:9`), which imports `aiTelemetry` from `../observability/langfuse` (`lib/graph/triple-extraction.ts:10`). That module top-level-imports `@langfuse/otel` (`lib/observability/langfuse.ts:12`), which Vite can't resolve under the node test env. `triple-extraction.test.ts` hits the same chain directly (`lib/graph/triple-extraction.test.ts:2`).
**Chosen fix: per-test `vi.mock('../observability/langfuse', …)`** — best matches the existing hoisted-mock idiom already used for connectors (`vi.mock('./composio', …)` at `lib/connectors/gmail.test.ts:5`). Preferred over a global vitest alias because it keeps the stub local/visible and doesn't risk masking the real module in suites that may later want it. Mocking `../llm/gateway` would **not** fix it (the offending import is `langfuse`, not the gateway).
**Exact mock shape** — only what the transitively-loaded modules actually import from `langfuse`:
- `triple-extraction.ts:10` imports `aiTelemetry`.
- `plan.ts:7` and `synthesize.ts:8` also import `aiTelemetry` (relevant if those get tests).
- `answer/route.ts:11` imports `flushTracing`, `tracingEnabled` (relevant for the answer-route test, Part 5.3).
So the stub for the graph tests needs only:
```
vi.mock('../observability/langfuse', () => ({
aiTelemetry: () => ({ isEnabled: false }),
}))
```
Place this hoisted block **at the top of each file**, above the `import { ... } from './entity-resolution'` / `'./triple-extraction'` lines. The tests themselves (`normalizeName`, `shouldExtract`, `isNamedEntity`) need no other change — those functions are pure (`lib/graph/entity-resolution.ts:22`, `lib/graph/triple-extraction.ts:33,44`). For the answer-route test add `flushTracing: async () => {}, tracingEnabled: false` to the same stub object.
---
## PART 5 — New tests
All co-located as `*.test.ts`. Pure tests follow the direct-import + matcher idiom (`lib/retrieval/assemble.test.ts`); mock-based tests follow the hoisted `vi.mock` idiom (`lib/connectors/gmail.test.ts`).
### 5.1 Pure-logic unit tests (no mocks needed unless noted)
**`lib/ingestion/normalize.test.ts`** — `normalizeItem` (`lib/ingestion/normalize.ts:10`, pure). Cases: (a) full `RawItem` maps all fields, dates become ISO strings via `.toISOString()`; (b) missing optionals (`title`/`author`/`url`/`status`) become `null`; (c) absent `metadata` becomes `{}`; (d) `external_id` mapped from `raw.externalId`.
**`lib/ingestion/enrich.test.ts`** — `isStructured` (`:15`) and `provenanceLine` (`:19`), both pure. `isStructured`: true for `linear:issue`, `calendar:meeting`, `sentry:error`; false for `gmail:email`, `notion:doc`. `provenanceLine`: includes date sliced to 10 chars, includes `[author]` when present, omits author bracket when absent. Do **not** test `enrichChunk`/`gloss` here (they call the LLM gateway).
**`lib/llm/gateway.test.ts`** — `withRetry` (`:32`). Pure logic but uses `setTimeout` (`:46`). Use `vi.useFakeTimers()` + `vi.runAllTimersAsync()` (or pass `baseMs: 0` to skip real delay — cleaner). Cases: (a) resolves first try → fn called once; (b) fails twice then succeeds with `retries: 2` → called 3 times, returns value; (c) always fails → throws the last error, called `retries+1` times. **Constraint flagged:** backoff uses `2 ** attempt` plus jitter (`:45`) — there is no `Math.random`, jitter is deterministic (`baseMs * 0.5 * (attempt+1)`), so timing is exactly assertable; no random stubbing needed.
**`lib/retrieval/synthesize.test.ts`** — `isThin` (`lib/retrieval/synthesize.ts:23`), pure. **Requires** the langfuse stub (Part 4 shape) because the module imports `aiTelemetry` at `:8`. Cases: thin when `citations.length === 0`; thin when `block.trim() === ''`; not thin when both present.
**`lib/connectors/registry.test.ts`** — `connectableSources`, `isConnectable`, `getConnector` (`lib/connectors/registry.ts:19-31`). **Risk:** importing `registry.ts` pulls in all five connectors → `./composio` → `@composio/core` at import time. Mitigation: `vi.mock('./composio', () => ({ executeTool: vi.fn() }))` hoisted (same as gmail test) so connector modules load without the SDK. Cases: `connectableSources()` returns the 5 keys; `isConnectable('gmail')` true, `isConnectable('github')` false (github is in `SourceName` but not the REGISTRY, `:11-17`); `getConnector('gmail')` returns an object with `source === 'gmail'`; `getConnector('github' as SourceName)` throws.
**`lib/connectors/util.test.ts`** — `warnOnUndercollection` (`lib/connectors/util.ts:7`), pure (only `console.warn`). Spy with `vi.spyOn(console, 'warn')`. Cases: warns when `collected < reportedTotal`; no warn when equal; no warn when `reportedTotal` undefined; no warn when `reportedTotal` non-finite (`NaN`).
**`lib/retrieval/search.test.ts`** — `mergeHits` is **not exported** (`lib/retrieval/search.ts:42`, module-private). **Recommendation:** instead test the **already-exported pure** `interleaveBySource` in `lib/retrieval/rollup.ts:14` (note: `rollup.test.ts` exists — check it doesn't already cover this; if it does, skip). If `mergeHits` coverage is wanted, it would require adding an `export` to `search.ts` — flag this as a one-line export change, optional, and note `search.ts` also imports `createServiceClient` + `toVectorLiteral` at module top so the test needs the supabase/embed env (already provided by `vitest.config.ts:10`). Recommend testing `interleaveBySource` (zero new exports): cases — interleaves round-robin across sources, respects `cap`, orders sources by best score (`rollup.ts:23`).
**`lib/connectors/calendar.test.ts`** and **`lib/connectors/linear.test.ts`** — feasible, copy the gmail test pattern exactly (`lib/connectors/gmail.test.ts:1-13`: hoisted `vi.mock('./composio', …)` + `collect()` helper).
- **calendar** (`lib/connectors/calendar.ts`): mock `executeTool` to return `{ items: [CalEvent], nextPageToken }` then `{ items: [...] }`. Cases: (a) maps a `CalEvent` to a `meeting` RawItem (`source:'calendar'`, `type:'meeting'`, `externalId` from `e.id`, author from `organizer`); (b) paginates on `nextPageToken` (2 calls); (c) drops events with no `id` (`toRawItem` returns null, `:38`); (d) `load` passes a `timeMin/timeMax` window (assert call args, `:74`).
- **linear** (`lib/connectors/linear.ts`): cases: (a) maps issue → `issue` RawItem, `status:'blocked'` when state name includes "block" (`mapStatus`, `:39`), `status:'resolved'` when type `completed`; (b) handles both connection (`{issues:{nodes,pageInfo}}`) and array (`{issues:[...]}`) shapes via `asConnection` (`:32`); (c) relay pagination via `endCursor`/`hasNextPage` (`:93`); (d) `poll(since)` skips issues with `updatedAt < since` (`:89`).
- **Feasibility note:** both connectors call `executeTool` which (unmocked) returns `res.data ?? {}` (`lib/connectors/composio.ts:64`), but the mock replaces `executeTool` wholesale and returns the **raw toolkit shape directly** — exactly as the gmail test does (`gmail.test.ts:20`). So mock returns must be the `{items|issues, ...}` payload, **not** wrapped in `{data: ...}`.
### 5.2 (covered above; pure tests)
### 5.3 Orchestration tests (mock-based)
**`app/api/answer/route.test.ts`** — test the thin-vs-synthesis branch in `buildAnswer` (`app/api/answer/route.ts:87-116`). Mock:
- `vi.mock('@/lib/auth/session', …)` → `getUserId: async () => 'u1'`, and re-export `UnauthorizedError` (the route does `instanceof` at `:27`, so export the real class or a stub class).
- `vi.mock('@/lib/retrieval/pipeline', …)` → `retrieve` returning a controlled `RetrievalResult` (`lib/retrieval/pipeline.ts:13`).
- `vi.mock('@/lib/retrieval/synthesize', …)` → keep real `isThin`/`REFUSAL` (or stub), stub `synthesizeStream` to return `{ toTextStreamResponse: () => new Response('answer text', {...}) }`.
- `vi.mock('@/lib/observability/langfuse', …)` → `tracingEnabled: false`, `flushTracing: async () => {}` (forces the simple non-traced path at `:60`).
Cases: (a) thin context (`citations:[]`) → response body is `REFUSAL`, header `x-zrux-meta` decodes to `{thin:true,...}` (`:96-103`); (b) non-thin → `synthesizeStream` called once, header `thin:false` with citations (`:106-115`); (c) missing/blank question → 400 (`:36`); (d) `getUserId` throws `UnauthorizedError` → 401 (`:27`).
**`lib/graph/entity-resolution.test.ts`** (extend the fixed file) — `resolveEntity` (`lib/graph/entity-resolution.ts:82`) with mocked Supabase. Mock `vi.mock('../db/supabase', …)` → `createServiceClient` returning a chainable stub. The chain to model: `.from().select().eq().eq().maybeSingle()` (email lookup, `:93-98`), `.rpc('match_entity', …)` (fuzzy, `:39`), `.from().insert().select().single()` (`:51-56`). Cases: (a) email hit → returns existing id, appends alias when name differs (`:100-107`); (b) no email, fuzzy match → returns matched id (`:120-121`); (c) no email, no match → inserts and returns new id (`:122`). Keep the existing pure `normalizeName` tests; add the langfuse stub (Part 4) since this file transitively loads it via `triple-extraction`.
**`app/api/sources/route.test.ts`** — mock `@/lib/auth/session` (getUserId), `@/lib/db/supabase` (`createServiceClient().from().select().eq()` resolving rows), and let real `connectableSources` run (it needs the `./composio` mock — add `vi.mock('@/lib/connectors/composio', …)`). Cases: (a) merges DB statuses over the full connectable list, unconnected → `not_connected`; (b) 401 on UnauthorizedError; (c) 500 on DB error.
**`app/api/search/route.test.ts`** — mock `@/lib/auth/session` and `@/lib/retrieval/pipeline` (`searchItems` returning a `RolledItem[]`). Cases: (a) `?q=foo` → `searchItems('u1','foo')` called, returns `{items}`; (b) missing `q` → `{items:[]}` (or 400, per chosen contract); (c) 401 on UnauthorizedError.
**`app/api/today/route.test.ts`** — mock `@/lib/auth/session` and `@/lib/db/supabase` (chain `.from().select().eq().eq().order().limit()` resolving rows). Cases: (a) returns `{items, counts}` with counts grouped by source; (b) 401; (c) 500 on DB error.
---
## Ordered execution checklist
1. **Backend:** `app/api/sources/route.ts`, `app/api/today/route.ts`, `searchItems` in `lib/retrieval/pipeline.ts` + `app/api/search/route.ts`. (graph route already done.)
2. **Shell + pages:** `app/layout.tsx` (Nav) → `app/page.tsx` → `app/onboarding/page.tsx` → `app/relationships/page.tsx` → `app/today/page.tsx` → `app/search/page.tsx`.
3. **Middleware:** add `/onboarding/:path*` to `middleware.ts:8`.
4. **Test fix:** add hoisted `vi.mock('../observability/langfuse', …)` to `lib/graph/entity-resolution.test.ts` and `lib/graph/triple-extraction.test.ts`.
5. **New tests:** pure (5.1) first, then orchestration (5.3). Run `pnpm test`.
## Consolidated risk flags
- **search reuse:** adding `searchItems` to `pipeline.ts` is additive (no `retrieve` refactor) — low risk. `best_content` carries a provenance prefix; handle in UI only.
- **`mergeHits` is private** — prefer testing the exported `interleaveBySource`; exporting `mergeHits` is an optional one-line change.
- **registry/connector tests** must hoist `vi.mock('./composio')` or import-time `@composio/core` load fails.
- **dev auth:** client `fetch` can't send `x-zrux-user-id`; local runs need `DEV_USER_ID` or a real session (`lib/auth/session.ts:21-27`).
- **langfuse stub** is required for any test that transitively imports `plan.ts`/`synthesize.ts`/`triple-extraction.ts`/`answer/route.ts`.
- **Next 14 `useSearchParams`** must be wrapped in `<Suspense>` on the onboarding page.
### Critical Files for Implementation
- /Users/venusbhatia/Downloads/Zrux/lib/retrieval/pipeline.ts
- /Users/venusbhatia/Downloads/Zrux/app/api/graph/route.ts
- /Users/venusbhatia/Downloads/Zrux/app/ask/page.tsx
- /Users/venusbhatia/Downloads/Zrux/lib/observability/langfuse.ts
- /Users/venusbhatia/Downloads/Zrux/lib/connectors/gmail.test.tsThe Plan agent produced a grounded, file-by-file design. Let me finalize the plan file with the concrete implementation.
The plan is complete and grounded. Here's the summary before I hand it over for approval.
What I found (architecture → UI map):
/api/answer → plan → hybrid_search → graph-expand → rollup → assemble → streamed cited synthesis), and ingestion is solid./ask. There is no UI to connect a data source, the OAuth callback redirects to /onboarding which is a 404, GET /api/graph is fully built but has no caller, and middleware gates /today//relationships//search pages that don't exist.pnpm test: 21 pass, but 2 suites fail to load (entity-resolution, triple-extraction) — vitest can't resolve @langfuse/otel pulled in transitively.What the plan does (your chosen scope: all screens + pure & orchestration tests):
GET /api/sources, GET /api/today, GET /api/search (via a new searchItems helper that reuses existing retrieval stages, no synthesis).The full file-by-file detail with reuse seams and test cases is in the plan file.
Plan approved. Since this is a background job, let me isolate my work in a worktree, then set up task tracking and start implementing.
pwd && ls app && echo "---" && cat lib/retrieval/pipeline.tsWorktree ready. Let me create the task list and read the key files I'll reuse.
Now let me read the remaining files needed for the endpoints and tests.
I have what I need for the backend. Let me start building. Marking task 1 in progress.
Now the three API routes:
Let me verify the generated DB types cover the columns I'm selecting.
grep -n "source_connection\|context_item" lib/db/types.ts | head && echo "--- source_connection Row ---" && sed -n '/source_connection: {/,/Insert:/p' lib/db/types.ts | head -40Backend endpoints done. Moving to UI.
Now the home page:
Now the onboarding page:
Now the relationships page:
Now the today page:
Now the search page:
UI pages done. Now the middleware update:
Middleware done. Now fixing the broken test suites and writing tests. Let me read the test files and modules I'll need.
Now I understand the patterns. Let me fix the two broken suites with a hoisted langfuse mock.
Now let me read entity-resolution.ts, calendar.ts, linear.ts, session.ts, and supabase.ts before writing the remaining tests.
Let me check the session module for the UnauthorizedError export and getUserId signature, needed for route tests.
Now I'll write the entity-resolution test (langfuse mock + extend with resolveEntity orchestration tests using a mocked Supabase client).
Broken suites fixed. Now writing the new unit tests. Let me first check the vitest config for path alias support (the API routes use @/ imports).
@/ alias isn't configured for vitest, but the API route handlers use it — I need to add it so route tests resolve. Let me update the vitest config, then write all the test files.
Now the pure-logic tests: