AI-powered B2B prospect discovery — LangGraph engine internals
flowchart TB
subgraph External["External Systems"]
CS[Campaign Service]
FE["Frontend (Polling)"]
end
subgraph API["API Layer"]
DC["Discovery Controller\n/v1/discovery/*"]
HC["Health Controller\n/health/*"]
end
subgraph Core["Discovery Service"]
DS[Discovery Service]
subgraph Graphs["LangGraph Engines"]
DG["DiscoveryGraph\n(Goal-Based)"]
AG["AgentGraph\n(ReAct Pattern)"]
ACG["AccuracyGraph\n(Verification)"]
end
end
subgraph Queue["Queue Layer"]
SQS[SQS Service]
WK[Discovery Worker]
end
subgraph Providers["Provider Layer"]
PH[Provider Hub]
CB[Circuit Breaker]
RL[Rate Limiter]
AD["Adapters\n(Lusha, PDL, Explorium, Apify)"]
MCP[MCP Client]
end
subgraph Data["Data Layer"]
DB[(MongoDB)]
CKP[Checkpoint Service]
end
FE --> CS
CS -->|"POST /start"| DC
DC --> DS
DS --> Graphs
DS --> SQS
SQS -->|Jobs Queue| WK
WK --> Graphs
Graphs --> PH
PH --> CB --> RL --> AD
AD --> MCP
Graphs --> CKP --> DB
SQS -->|Progress Queue| CS
flowchart LR
subgraph GraphSelection["Graph Selection"]
MODE{"Accuracy\nMode?"}
MODE -->|"fast/balanced"| DG
MODE -->|"high"| ACG
MODE -->|"react"| AG
end
subgraph DG["DiscoveryGraph (Goal-Based)"]
direction TB
D1[PLAN] --> D2[ASSESS]
D2 --> D3[PROVIDER_AGENT]
D3 --> D4[SEARCH]
D4 --> D5[ENRICH]
D5 --> D6[SCORE]
D6 --> D7[INSPECTOR]
D7 --> D8[DECIDE]
D8 -->|Continue| D2
D8 -->|Complete| D9[FINALIZE]
end
subgraph AG["AgentGraph (ReAct)"]
direction TB
A1["SUPERVISOR\nLLM"] -->|Tool Call| A2{Route}
A2 --> A3[PARALLEL_SEARCH]
A2 --> A4[SEARCH]
A2 --> A5[SPECIALIZED_SEARCH]
A2 --> A6[ENRICH]
A3 & A4 & A5 --> A7[SCORE]
A6 --> A7
A7 --> A8[INSPECTOR]
A8 --> A1
A1 -->|Complete| A9[FINALIZE]
end
subgraph ACG["AccuracyGraph (Verification)"]
direction TB
C1[SUPERVISOR] --> C2[PARALLEL_SEARCH]
C2 --> C3["MERGE\nCross-Validate"]
C3 --> C4["VERIFY\nLLM ICP Match"]
C4 --> C5["CRITIQUE\nSelf-Review"]
C5 --> C6[ENRICH]
C6 --> C1
C1 -->|Complete| C7[FINALIZE]
end
flowchart TB
START((START)) --> PLAN
subgraph Planning["1. Planning Phase"]
PLAN["PLAN NODE\n───────────\nLLM creates strategy\nEstimates credits\nSets execution phases"]
end
PLAN --> ASSESS
subgraph Assessment["2. Assessment Phase"]
ASSESS["ASSESS NODE\n───────────\nCalculate goalProgress\nCheck budgetProgress\nAnalyze providerHealth\nGenerate gaps"]
end
ASSESS --> ASSESS_CHECK{"Goal Met?\nBudget OK?"}
ASSESS_CHECK -->|"canContinue"| PROVIDER_AGENT
ASSESS_CHECK -->|"goalMet"| FINALIZE
ASSESS_CHECK -->|"budgetExhausted"| FINALIZE
subgraph ProviderSelection["3. Provider Selection"]
PROVIDER_AGENT["PROVIDER_AGENT\n───────────\nLLM selects provider\nChooses MCP/REST\nSets search adjustments"]
end
PROVIDER_AGENT --> SEARCH
subgraph Execution["4. Execution"]
SEARCH["SEARCH NODE\n───────────\nExecute provider search\nApply adjustments\nDeduplicate results"]
end
SEARCH --> ENRICH
subgraph Enrichment["5. Enrichment"]
ENRICH["ENRICH NODE\n───────────\nFill missing emails\nAdd phone numbers\nWaterfall strategy"]
end
ENRICH --> SCORE
subgraph Scoring["6. Scoring"]
SCORE["SCORE NODE\n───────────\nMulti-dimensional scoring\nTitle/Seniority/Industry\nSize/Location/Quality\nABM modifiers"]
end
SCORE --> INSPECTOR
subgraph Inspection["7. Inspection"]
INSPECTOR["INSPECTOR NODE\n───────────\nValidate data quality\nCheck red flags\nMark needsEnrichment"]
end
INSPECTOR --> DECIDE
subgraph Decision["8. Decision"]
DECIDE["DECIDE NODE\n───────────\nEvaluate progress\nCheck HITL triggers\nRoute decision"]
end
DECIDE --> DECIDE_CHECK{Decision?}
DECIDE_CHECK -->|CONTINUE| ASSESS
DECIDE_CHECK -->|ASK_USER| HITL_PAUSE
DECIDE_CHECK -->|COMPLETE| FINALIZE
HITL_PAUSE["PAUSE\n(HITL)"] -->|User Answer| ASSESS
FINALIZE["FINALIZE\n───────────\nCompile results\nCalculate summary"] --> END_NODE((END))
flowchart TB
START((START)) --> SUP
subgraph ReActLoop["ReAct Loop"]
SUP["SUPERVISOR\n═══════════\n1. OBSERVE state\n2. THINK hypothesis\n3. ACT select tool"]
SUP --> TC{Tool Call?}
TC -->|search_provider| SEARCH
TC -->|parallel_search| PARALLEL
TC -->|specialized_search| SPECIAL
TC -->|enrich_prospects| ENRICH
TC -->|broaden_criteria| SUP
TC -->|analyze_state| SUP
TC -->|ask_user_question| HITL
TC -->|complete_run| FINALIZE
end
subgraph SearchNodes["Search Execution"]
PARALLEL["PARALLEL_SEARCH\nFan-out to 3 providers"]
SEARCH["SEARCH\nSingle provider"]
SPECIAL["SPECIALIZED_SEARCH\nGoogle / LinkedIn / Crawler"]
end
subgraph Processing["Post-Search"]
ENRICH["ENRICH\nMissing emails/phones"]
SCORE["SCORE\nICP scoring + Tiers"]
INSPECTOR["INSPECTOR\nQuality validation"]
end
PARALLEL --> SCORE
SEARCH --> SCORE
SPECIAL --> SCORE
ENRICH --> SCORE
SCORE --> INSPECTOR
INSPECTOR --> SUP
HITL["HITL_WAIT"] -->|Answer| SUP
FINALIZE["FINALIZE"] --> END_NODE((END))
SUP -.->|"LLM Error"| FB["FALLBACK\nHeuristic routing"]
FB --> PARALLEL
flowchart TB
START((START)) --> SUP
subgraph Controller["Supervisor"]
SUP["SUPERVISOR\nRoute decisions"]
end
SUP --> PS
subgraph Search["Multi-Provider"]
PS["PARALLEL_SEARCH\nFan-out all providers"]
end
PS --> MERGE
subgraph CrossVal["Cross-Validation"]
MERGE["MERGE NODE\nGroup by fingerprint\nHigh: 2+ agree\nMedium: 1 provider\nLow: conflicting"]
end
MERGE --> MODE_CHECK{"Accuracy\nMode?"}
MODE_CHECK -->|fast| SUP
MODE_CHECK -->|"balanced/high"| VERIFY
subgraph LLMVerify["LLM Verification"]
VERIFY["VERIFY NODE\nTitle/Seniority/Industry\nCompanySize/Location\nOverall score + tier"]
end
VERIFY --> HIGH_CHECK{"High\nAccuracy?"}
HIGH_CHECK -->|Yes| CRITIQUE
HIGH_CHECK -->|No| ENRICH_CHECK
subgraph SelfCritique["Self-Critique"]
CRITIQUE["CRITIQUE NODE\nReview borderline\nAdjust scores/tiers"]
end
CRITIQUE --> ENRICH_CHECK{"Needs\nEnrichment?"}
ENRICH_CHECK -->|Yes| ENRICH
ENRICH_CHECK -->|No| SUP
ENRICH["ENRICH\nHot/warm only"] --> SUP
SUP -->|isComplete| FINALIZE
FINALIZE["FINALIZE\nConfidence scores\nVerified/critiqued flags"] --> END_NODE((END))
flowchart TB
subgraph AgentState["AgentState — LangGraph Annotation"]
subgraph Immutable["Run Context (Immutable)"]
I1["runId · organizationId · audienceId"]
I2["persona · abmInclude/Exclude"]
I3["targetCount · maxCredits"]
end
subgraph Scratchpad["Agent Scratchpad (ReAct)"]
S1["agentScratchpad: AgentMessage[]"]
S2["thoughts: Thought[]"]
S3["lastToolCall · toolCallResults"]
end
subgraph Memory["Working Memory"]
M1["observations · hypotheses"]
M2["providerPerformance Map"]
M3["strategiesTriedAttempts"]
M4["currentPhase:\nexploring / exploiting / wrapping_up"]
end
subgraph Prospects["Prospect Data"]
P1["rawProspects (APPEND)"]
P2["scoredProspects (APPEND)"]
P3["pendingEnrichment (REPLACE)"]
P4["seenFingerprints (MERGE)"]
end
subgraph AccuracyState["Accuracy Pipeline"]
A1["providerResults Map"]
A2["verificationResults Map"]
A3["critiqueResults Map"]
A4["reasoningTrace · accuracyMode"]
end
subgraph HITLState["HITL State"]
H1["pendingQuestion · history"]
H2["hitlEnabled · questionCount"]
end
subgraph Metrics["Metrics"]
MT1["found · target · qualified"]
MT2["creditsUsed · emailCoverage"]
MT3["hot/warm/coldCount"]
end
subgraph Control["Control Signals"]
C1["currentNode · shouldPause"]
C2["shouldCancel · isComplete"]
C3["completionReason · iteration"]
end
end
flowchart TB
START((Execute)) --> CHECK_SIGNALS
subgraph PreChecks["Pre-Execution Checks"]
CHECK_SIGNALS{"Control\nSignals?"}
CHECK_SIGNALS -->|shouldCancel| CANCEL[Handle Cancel]
CHECK_SIGNALS -->|shouldPause| PAUSE[Handle Pause]
CHECK_SIGNALS -->|awaitingInput| HITL_CHECK[Check HITL]
CHECK_SIGNALS -->|None| GOAL_CHECK
GOAL_CHECK{"Goal Met?\n90% tolerance"}
GOAL_CHECK -->|Yes| COMPLETE[Complete Run]
GOAL_CHECK -->|No| BUDGET_CHECK
BUDGET_CHECK{Budget OK?}
BUDGET_CHECK -->|Exhausted| COMPLETE
BUDGET_CHECK -->|OK| ITER_CHECK
ITER_CHECK{Max Iterations?}
ITER_CHECK -->|Reached| COMPLETE
ITER_CHECK -->|OK| LLM_CALL
end
subgraph LLMDecision["LLM Tool Calling"]
LLM_CALL["Build Messages\n1. System prompt\n2. Scratchpad (10)\n3. Current state\n4. Task prompt"]
LLM_CALL --> INVOKE["Invoke BedrockLLM\nwith Tools"]
INVOKE --> PARSE["Parse Tool Call\n1. Check toolCalls[]\n2. Fallback: JSON\n3. Fallback: name match"]
end
PARSE --> TC_CHECK{Tool Found?}
TC_CHECK -->|No| ADD_OBS[Add Observation\nRetry]
TC_CHECK -->|Yes| VALIDATE
VALIDATE["Validate Args"] --> VALID{Valid?}
VALID -->|No| ADD_ERR[Error to Scratchpad]
VALID -->|Yes| ROUTE["Route to Node"]
ADD_OBS --> RETURN((Return))
ADD_ERR --> RETURN
ROUTE --> RETURN
INVOKE -.->|Error| HEURISTIC["Fallback Heuristic\nemailCov < 50% → enrich\nelse → parallel_search"]
HEURISTIC --> RETURN
flowchart TB
START((Execute)) --> INIT
INIT["Initialize\nGet providers from lastToolCall\nFilter healthy providers"]
INIT --> BUILD["Build Search Criteria\nFrom persona\nApply adjustments:\nbroaderTitles / removeLocation / removeSeniority"]
BUILD --> FANOUT["Promise.allSettled"]
subgraph Concurrent["Parallel Execution"]
P1["Lusha"]
P2["PDL"]
P3["Apify"]
end
FANOUT --> P1 & P2 & P3
P1 & P2 & P3 --> AGG
AGG["Aggregate Results\nCollect fulfilled\nTrack rejected"] --> DEDUP
DEDUP["Deduplicate\nGenerate fingerprints\nFilter seen"] --> TRACK
TRACK["Track Performance\nattempts · successes\nresultsFound · avgResponseTime"] --> UPDATE
UPDATE["Update State\nrawProspects (append)\nseenFingerprints (merge)\nproviderHealth (merge)"] --> RETURN((Return))
subgraph ProviderHealth["Provider Health"]
PH1["HEALTHY"]
PH2["RATE_LIMITED"]
PH3["ERROR"]
PH4["EXHAUSTED"]
end
sequenceDiagram
participant CS as Campaign Service
participant API as Discovery API
participant DS as Discovery Service
participant SQS as SQS Queues
participant WK as Worker
participant SUP as Supervisor
participant SEARCH as Search Nodes
participant PH as Provider Hub
participant PROV as Providers
participant SCORE as Score/Inspector
participant DB as MongoDB
CS->>API: POST /v1/discovery/start
API->>DS: startDiscovery(dto)
DS->>DB: Create Run (PENDING)
DS->>SQS: Send START job
DS-->>API: runId
API-->>CS: runId + PENDING
SQS->>WK: Receive job
WK->>DB: Update → RUNNING
WK->>SUP: Initialize Graph
loop Agent Loop (max 100)
SUP->>SUP: Observe → Think → Act
alt parallel_search
SUP->>SEARCH: Execute
par Fan-out
SEARCH->>PROV: Lusha
SEARCH->>PROV: PDL
SEARCH->>PROV: Apify
end
PROV-->>SEARCH: Results
SEARCH-->>SUP: rawProspects
else enrich_prospects
SUP->>PH: Enrich
PH->>PROV: Provider.enrich
PROV-->>SUP: Enriched
else complete_run
SUP->>SUP: isComplete=true
end
SUP->>SCORE: Score prospects
SCORE-->>SUP: scoredProspects
SUP->>SQS: PROGRESS
SQS-->>CS: Update
end
SUP->>DB: Final state
SQS-->>CS: Final results
flowchart TB
START((Agent\nDecision)) --> TRIGGER{"HITL\nTrigger?"}
subgraph Triggers["Triggers"]
T1["Goal Mismatch"]
T2["Budget Warning"]
T3["Quality Issues"]
T4["Strategy Decision"]
end
TRIGGER -->|Yes| CREATE["Create Question\nquestion · context\noptions · priority\ncategory"]
CREATE --> TIMEOUT["Set Timeout\ncritical: 5m\nimportant: 2m\noptional: 45s"]
TIMEOUT --> SEND["Send via SQS\nPAUSED message"]
SEND --> WAIT["Agent Waits\nawaitingUserInput=true"]
WAIT --> RESPONSE{Response?}
RESPONSE -->|"User Answers"| PROCESS["Process Answer\nAdd to scratchpad\nResume"]
RESPONSE -->|Timeout| DEFAULT["Use Default\nLog expired"]
RESPONSE -->|Skipped| SKIP["Fallback logic\nLog skipped"]
PROCESS --> RESUME((Resume))
DEFAULT --> RESUME
SKIP --> RESUME
flowchart LR
subgraph Input["Node Output"]
I1["rawProspects: [p1, p2]"]
I2["metrics: {qualified: 5}"]
I3["seenFingerprints: Set([f1])"]
I4["currentNode: score"]
I5["iteration: 1"]
end
subgraph Reducers["Reducer Ops"]
R1["APPEND\n[...old, ...new]"]
R2["MERGE Object\n{...old, ...new}"]
R3["MERGE Set\nnew Set([...old, ...new])"]
R4["REPLACE\nnew ?? old"]
R5["INCREMENT\nold + new"]
end
subgraph Output["Accumulated State"]
O1["rawProspects: [old1,old2,p1,p2]"]
O2["metrics: {found:10,qualified:5}"]
O3["seenFingerprints: Set([f0,f1])"]
O4["currentNode: score"]
O5["iteration: 3"]
end
I1 --> R1 --> O1
I2 --> R2 --> O2
I3 --> R3 --> O3
I4 --> R4 --> O4
I5 --> R5 --> O5
flowchart TB
subgraph ProviderHub["Provider Hub"]
HUB[Provider Hub]
subgraph Protection["Protection"]
CB["Circuit Breaker\nCLOSED → OPEN → HALF_OPEN"]
RL["Rate Limiter\nPer-org credits"]
end
subgraph Adapters["Adapters"]
subgraph Lusha["Lusha"]
L_REST[REST] & L_MCP[MCP]
end
subgraph PDL["PDL"]
P_REST[REST]
end
subgraph Explorium["Explorium"]
E_REST[REST]
end
subgraph Apify["Apify"]
A_REST[REST] & A_MCP[MCP]
ACT1[LinkedIn] & ACT2[Google] & ACT3[Crawler] & ACT4[Twitter]
end
end
subgraph MCPClient["MCP Client"]
SESS[Session Mgr]
TOOL[Tool Discovery]
EXEC[Tool Exec]
end
HUB --> CB --> RL --> Adapters
L_MCP --> MCPClient
A_MCP --> MCPClient
end
subgraph Logger["Logger"]
LOG["Request/Response\nLatency · Credits"]
end
Adapters --> LOG
flowchart TB
INPUT[Raw Prospect] --> DIMS
subgraph DIMS["Score Dimensions"]
D1["Title Match 25%"]
D2["Seniority 20%"]
D3["Industry 20%"]
D4["Company Size 15%"]
D5["Location 10%"]
D6["Data Quality 10%"]
end
DIMS --> CALC["Weighted\nCalculation"]
CALC --> ABM{"ABM List?"}
ABM -->|Include| BOOST["+20 Boost"]
ABM -->|Exclude| DQ[Disqualify]
ABM -->|Neither| TIER
BOOST --> TIER
DQ --> OUT
subgraph TIER["Tier Classification"]
T1["🔥 Hot 80-100"]
T2["🟡 Warm 60-79"]
T3["🔵 Cold 40-59"]
T4["⛔ Disqualified 0-39"]
end
TIER --> OUT[Scored Prospect]
stateDiagram-v2
[*] --> PENDING: Create Run
PENDING --> RUNNING: Worker Picks Up
RUNNING --> PAUSED: Pause / HITL
RUNNING --> COMPLETED: Goal Met
RUNNING --> FAILED: Unrecoverable Error
RUNNING --> CANCELLED: Cancel Request
PAUSED --> RUNNING: Resume / Answer
PAUSED --> CANCELLED: Cancel
COMPLETED --> [*]
FAILED --> [*]
CANCELLED --> [*]
| Tool | Description | Args |
|---|---|---|
search_provider | Search single provider | provider, criteria, limit |
parallel_search | Search multiple providers | providers[], criteria |
specialized_search | Google/LinkedIn/Crawl | method, query, target |
enrich_prospects | Get missing data | provider, priority, maxCount |
broaden_criteria | Relax search filters | adjustments{} |
analyze_state | Think through situation | focus |
ask_user_question | HITL question | question, options[], priority |
complete_run | Finish discovery | reason, summary |
| Mode | LLM Verify | Critique | Cross-Validate | Use Case |
|---|---|---|---|---|
fast | No | No | Yes | Quick searches |
balanced | Yes | No | Yes | Standard discovery |
high | Yes | Yes | Yes | Precision-critical |
| Field | Reducer | Behavior |
|---|---|---|
agentScratchpad | APPEND | [...old, ...new] |
rawProspects | APPEND | [...old, ...new] |
seenFingerprints | MERGE Set | Union of sets |
metrics | MERGE Object | Shallow merge |
pendingEnrichment | REPLACE | Overwrite |
iteration | INCREMENT | old + new |
| Dimension | Weight |
|---|---|
| Title Match | 25% |
| Seniority Match | 20% |
| Industry Fit | 20% |
| Company Size | 15% |
| Location Match | 10% |
| Data Quality | 10% |
| Priority | Timeout | Use Cases |
|---|---|---|
| critical | 5 min | ABM clarification, budget |
| important | 2 min | Provider selection, criteria |
| optional | 45 sec | Optional enrichment |
| Component | Path |
|---|---|
| Discovery Graph | src/modules/discovery/agent/discovery.graph.ts |
| Agent Graph | src/modules/discovery/agent/agent.graph.ts |
| Accuracy Graph | src/modules/discovery/agent/accuracy.graph.ts |
| Discovery State | src/modules/discovery/agent/discovery.state.ts |
| Supervisor Node | src/modules/discovery/agent/nodes/supervisor.node.ts |
| Parallel Search | src/modules/discovery/agent/nodes/parallel-search.node.ts |
| Provider Hub | src/modules/providers/provider-hub.service.ts |
| Circuit Breaker | src/common/services/circuit-breaker.service.ts |
| Scoring Service | src/modules/scoring/scoring.service.ts |