Ingest Pipeline¶
The server-side ingest pipeline transforms raw collector JSON into a Neo4j trust graph. It runs as a serialized 5-stage process within server/internal/ingest/.
Serialization is intentional: the post-processing stage's stale-edge cleanup scopes deletes by source_collector, so two concurrent ingests from the same collector would each treat the other's fresh edges as stale. Single-user server, rare operator-driven ingests, no concurrent UI uploads -- serialization via sync.Mutex is the correct trade-off.
Entry Points¶
- CLI:
agenthound-server ingest <file.json>oragenthound-server ingest -(stdin) - HTTP:
POST /api/v1/ingest(requires localhost Bearer token) - UI: Drag-drop import in Scan Manager (hits the same HTTP endpoint)
All paths invoke Pipeline.Ingest(ctx, *sdkingest.IngestData).
Wire Contract¶
Input is the sdk/ingest.IngestData struct:
{
"meta": { "version": 1, "type": "agenthound-ingest", "collector": "mcp", "scan_id": "..." },
"graph": {
"nodes": [{ "id": "sha256:...", "kinds": ["MCPServer"], "properties": {...} }],
"edges": [{ "source": "sha256:...", "target": "sha256:...", "kind": "PROVIDES_TOOL", "properties": {...} }]
}
}
Stage 1: Validate¶
Validator.Validate() rejects malformed payloads before any graph writes.
Checks performed:
- meta.version must be 1
- meta.type must be "agenthound-ingest"
- meta.collector must be in AllowedCollectors (mcp, a2a, config, scan)
- meta.scan_id must be non-empty
- Every node must have a non-empty id and at least one kind from AllowedNodeKinds (23 kinds)
- Every edge must have non-empty source/target and a kind from RawEdgeKinds (17 kinds)
Validation errors are structured (FieldError with JSON path + message) and returned as a ValidationError to the caller. On failure, the pipeline aborts -- no partial writes.
Stage 2: Normalize¶
Normalizer.Normalize() transforms collector output into Neo4j-ready shape. Returns warnings (non-fatal).
Transformations:
- Sets objectid property to match node id
- Converts all property keys from camelCase to snake_case (CamelToSnake)
- Strips nil values
- Serializes complex values (nested maps, heterogeneous arrays) to JSON strings
- Preserves homogeneous arrays (all-string, all-number, all-bool) as native Neo4j lists
- Converts json.Number to int64 or float64
Stage 3: Record Scan Start¶
Creates a scan record in PostgreSQL (appdb.ScanStore) with status running. Non-fatal on failure (logs warning, continues). This provides the scan history visible in the UI.
Stage 4: Write (Neo4j Batch)¶
graph.Writer.WriteNodes() and graph.Writer.WriteEdges() batch-write to Neo4j.
Implementation details:
- Uses UNWIND $nodes AS node pattern for batch efficiency
- 1000 operations per transaction (configurable batch size)
- Multi-label support: nodes carry multiple kinds (e.g., ["OllamaInstance", "AIService"]); the writer MERGEs on the primary label and SETs umbrella labels
- Merge strategy: MERGE by objectid -- same node from Config + MCP collectors merges properties (last-write-wins)
- On merge, preserves previous_description_hash for rug-pull detection: ON MATCH SET n.previous_description_hash = n.description_hash
- Edge writes use per-kind Cypher strings (edgeKindCypher map) to support different source/target label pairs per edge kind
- EdgeKindEndpoints registry resolves source/target labels when not explicitly set by the collector
On failure, the scan record is updated to failed and the error propagates.
Stage 5: Post-Process¶
analysis.RunPostProcessors() computes composite edges and risk scores from graph state.
Before running processors:
1. Stale-edge cleanup: Deletes composite edges where scan_id != current AND source_collector IN $collectors. This scopes deletion to only the collector(s) that ran in the current scan -- prevents ping-pong deletion on partial scans (e.g., an MCP-only re-scan won't delete A2A composite edges).
Then runs 11 processors in dependency-validated order. See docs/architecture/post-processors.md for details.
Post-processing is non-fatal: failures are logged and included in the result stats, but the ingest is still marked complete. This means a processor bug won't block data from being queryable.
Processing Order¶
1. has_access_to (no deps)
2. can_execute (no deps)
3. shadows (no deps)
4. poisoned_description (no deps)
5. poisoned_instructions (no deps)
6. can_reach (depends: has_access_to)
7. cross_service_credential_chain (depends: has_access_to, can_reach)
8. can_exfiltrate (depends: can_reach)
9. can_impersonate (no deps)
10. cross_protocol (depends: has_access_to)
11. risk_score (depends: all above)
Dependency validation runs before the first processor executes. If a processor appears before a dependency it declares, the pipeline returns an ordering error immediately.
Result¶
Pipeline.Ingest() returns *sdkingest.IngestResult:
- ScanID -- the scan identifier
- NodesWritten, EdgesWritten -- counts from the batch write
- Warnings -- normalizer warnings
- PostProcessingStats -- per-processor name, edges created, nodes updated, duration, error
- Duration -- total pipeline wall-clock time
Scan Lifecycle¶
The scan record in Postgres tracks: ID, collector, status, start time, node/edge counts, error message. Scans can be deleted via DELETE /api/v1/scans/{id}, which also removes owned edges/nodes from Neo4j.