Data Intelligence Pipeline
CoreEnd-to-end document processing with structured Data Packages
Simplest Usage
One call. Smart defaults handle the rest. You get back a composed, structured Data Package.
from latence import Latence
client = Latence(api_key="lat_xxx")
# Submit files -- smart defaults handle the rest
job = client.pipeline.run(files=["contract.pdf"])
# Wait for the composed Data Package
pkg = job.wait_for_completion()
# Structured, summarized results
print(pkg.document.markdown) # Clean extracted text
print(pkg.entities.summary) # {"total": 142, "by_type": {"PERSON": 23, ...}}
print(pkg.knowledge_graph.summary.total_relations) # 156
print(pkg.quality.confidence.entity_avg_confidence) # 0.87
# Download as organized ZIP archive
pkg.download_archive("./output/contract_results.zip")
# Or save automatically when waiting:
pkg = job.wait_for_completion(save_to_disk="./output/contract_results.zip")When you provide only files with no steps parameter, the intelligent default pipeline runs automatically: Document Intelligence → Entity Extraction → Knowledge Graph. No configuration required.
The Data Package
Every pipeline returns a DataPackage -- a structured, composed result with organized sections and quality metrics. This is not a raw JSON dump; it is a curated data deliverable.
| Section | Contents | Present When |
|---|---|---|
| pkg.document | Markdown text, per-page breakdowns, document metadata | Document Intelligence ran |
| pkg.entities | Entity list + summary (total, by_type, avg_confidence) | Entity Extraction ran |
| pkg.knowledge_graph | Entities, relations, graph summary, type distributions | Relation Extraction ran |
| pkg.redaction | Redacted text, PII list, summary by type | Redaction ran |
| pkg.quality | Per-stage report, confidence scores, processing time, cost | Always |
ZIP Archive
Download all results as an organized archive:
# Download after completion
pkg.download_archive("./results.zip")
# Or save automatically during wait
pkg = job.wait_for_completion(save_to_disk="./results.zip")Legal_Contract_Ingestion/
README.md # Human-readable processing summary
document.md # Full markdown text
entities.json # Entity list + summary
knowledge_graph.json # Graph data + summary
redaction.json # Redaction results (if run)
quality_report.json # Processing report & confidence
metadata.json # Pipeline config & timing
pages/
page_001.md
page_002.md
...Data Consolidation
Merge all pipeline outputs into a single, document-centric JSON with zero redundancy. Markdown text appears exactly once; every service's output is nested under its document.
# Merge all outputs into a single document-centric JSON
merged = pkg.merge(save_to="./output/consolidated.json")
# Access consolidated data
for doc in merged["documents"]:
print(doc["filename"])
print(doc["markdown"][:200]) # Extracted text (once, no redundancy)
print(doc["entities"]) # Entities for this document
print(doc["knowledge_graph"]) # Graph for this document
# Global summary
print(merged["summary"]["entities"]["total"])
print(merged["summary"]["relations"]["total"])
print(merged["summary"]["cost_usd"])merge() runs entirely client-side. It produces a single JSON ready for downstream systems -- no repeated inputs, no wasted tokens.
Smart Defaults
When you provide only files with no steps parameter, the intelligent default pipeline runs automatically:
This covers the most common use case: turning documents into structured, searchable knowledge. Override with explicit steps when you need different behavior.
Step Configuration
Use friendly step names. Steps are automatically sorted into the correct execution order.
| Step Name | Aliases | Description |
|---|---|---|
| doc_intel | document_intelligence, ocr | Document processing with layout detection, table extraction, and chart recognition |
| redaction | redact | PII detection and masking |
| extraction | extract | Zero-shot entity extraction |
| knowledge_graph | graph, ontology | Relation extraction and knowledge graph construction with entity resolution |
| compression | compress | Token compression for cost-efficient downstream use |
| embedding | -- | Dense vector embeddings (Matryoshka dimensions) |
| colbert | -- | Token-level ColBERT embeddings for neural retrieval |
| colpali | -- | Vision-language ColPali embeddings |
job = client.pipeline.run(
name="Legal Contract Ingestion",
files=["contract.pdf"],
steps={
"doc_intel": {"mode": "performance"},
"redaction": {"mode": "balanced", "threshold": 0.5},
"extraction": {"threshold": 0.3, "user_labels": ["party", "clause"]},
"knowledge_graph": {"resolve_entities": True},
},
)
pkg = job.wait_for_completion(poll_interval=5.0)DAG Execution Model
The pipeline executes services as a directed acyclic graph (DAG), not a linear chain. Independent branches run in parallel for maximum throughput.
┌─── extraction ──── ontology
│
document_intelligence ─┼─── redaction
│
├─── compression
│
├─── embedding
│
├─── colbert
│
└─── colpaliYou declare which services you want. The pipeline handles ordering, dependency injection, and parallel execution automatically. For example, extraction and compression run in parallel since both depend only on document_intelligence.
Fluent Builder
For power users who prefer a fluent API:
from latence import PipelineBuilder
config = (
PipelineBuilder()
.doc_intel(mode="performance")
.redaction(mode="balanced")
.extraction(threshold=0.3, user_labels=["person", "org"])
.ontology(resolve_entities=True)
.build()
)
job = client.pipeline.submit(config, files=["contract.pdf"], name="My Pipeline")
pkg = job.wait_for_completion()YAML Configuration
Load pipeline configuration from a YAML file for version-controlled, reproducible pipelines:
steps:
document_intelligence:
mode: performance
extraction:
label_mode: generated
user_labels: [person, organization]
ontology:
resolve_entities: truefrom latence import PipelineBuilder
config = PipelineBuilder.from_yaml("pipeline.yaml")
job = client.pipeline.submit(config, files=["contract.pdf"])
pkg = job.wait_for_completion()Async Job Handling
Pipelines are async by nature. Submit and come back later. Data flows from service to service on the backend without you waiting.
pipeline.run() returns a Job handle immediately. No blocking.
job.status() shows current stage, progress, and per-stage details.
job.wait_for_completion() blocks until done and returns a DataPackage.
# Submit and return immediately
job = client.pipeline.run(files=["big_archive.pdf"])
print(f"Submitted: {job.id}") # pipe_abc123
# ... do other work ...
# Check progress
status = job.status()
print(f"Stage: {status.current_service} ({status.stages_completed}/{status.total_stages})")
# When ready, collect results
pkg = job.wait_for_completion()
# Or cancel if no longer needed
job.cancel()from latence import AsyncLatence
async with AsyncLatence(api_key="lat_xxx") as client:
job = await client.pipeline.run(files=["contract.pdf"])
pkg = await job.wait_for_completion(save_to_disk="./results.zip")
print(pkg.document.markdown)Job Statuses
Pipeline jobs transition through these statuses:
| Status | Meaning |
|---|---|
| QUEUED | Waiting to start |
| IN_PROGRESS | Processing -- check current_service for stage |
| COMPLETED | All stages finished successfully |
| CACHED / PULLED | Results served from cache or storage |
| RESUMABLE | Failed mid-pipeline; completed stages are checkpointed |
| FAILED | Pipeline failed (not resumable) |
| CANCELLED | Cancelled by user |
Resumable Pipelines
If a pipeline fails partway through, completed stages are checkpointed. The job enters RESUMABLE status, and you can restart from the last checkpoint instead of re-processing everything.
from latence import JobError
try:
pkg = job.wait_for_completion()
except JobError as e:
if e.is_resumable:
# Restart from the last completed checkpoint
pkg = job.resume().wait_for_completion()
else:
raiseProgress Callbacks
Track pipeline progress in real time with the on_progress callback:
pkg = job.wait_for_completion(
poll_interval=5.0,
timeout=1800.0,
on_progress=lambda status, elapsed: print(f" {status} ({elapsed:.0f}s)"),
)The callback receives the current status string and elapsed seconds on each poll.
Error Handling
The SDK provides a structured exception hierarchy for pipeline errors:
from latence import (
AuthenticationError,
InsufficientCreditsError,
RateLimitError,
JobError,
JobTimeoutError,
TransportError,
PipelineValidationError,
)
try:
job = client.pipeline.run(files=["doc.pdf"])
pkg = job.wait_for_completion(timeout=600)
except AuthenticationError:
print("Invalid API key")
except InsufficientCreditsError:
print("No credits remaining")
except RateLimitError as e:
print(f"Rate limited -- retry after {e.retry_after}s")
except JobTimeoutError as e:
print(f"Pipeline {e.job_id} did not finish in time")
except JobError as e:
if e.is_resumable:
pkg = job.resume().wait_for_completion()
else:
print(f"Pipeline failed: {e.message}")
except TransportError:
print("Network / connection error")
except PipelineValidationError as e:
print(f"Invalid pipeline config: {e.errors}")The SDK automatically retries on HTTP 429 and 5xx with exponential backoff and jitter. TransportError is the base class for APIConnectionError and APITimeoutError.
Pipeline Patterns
Document Intelligence → Entity Extraction → Knowledge GraphPDF/image → text → entities → relations and knowledge graph. This is the smart default when you provide only files.
Document Intelligence → Redaction → Entity Extraction → Knowledge GraphProcess documents, remove PII first, then extract entities and build graphs from clean text.
Entity Extraction → Knowledge GraphFor text input (no document processing needed). Extract entities and build a knowledge graph directly.
Entity Extraction → Knowledge Graph (with file input)Entity Extraction expects text, not files. Add doc_intel first, or use smart defaults which handle this automatically.
Service Compatibility
Each service has specific input/output types. The pipeline validates chains before execution.
| Service | Input | Output |
|---|---|---|
| document_intelligence | file image | text |
| redaction | text | text |
| extraction | text | entities |
| ontology | text entities | knowledge_graph |
| compression | text | text |
| embedding | text | vectors |
| colbert | text | token_vectors |
| colpali | text image | vision_vectors |
Dependency Rule
knowledge_graph (relation extraction) requiresextraction to run first -- it needs entities as input. The pipeline validates this before execution. With smart defaults, this is handled automatically.
API Reference
/api/v1/pipeline/execute{
"services": [
{"service": "document_intelligence", "config": {"mode": "performance"}},
{"service": "extraction", "config": {"threshold": 0.3}},
{"service": "ontology"}
],
"input": {
"file_base64": "...",
"filename": "document.pdf"
},
"store_intermediate": true,
"name": "Legal Contract Ingestion"
}{
"job_id": "pipe_abc123def456",
"poll_url": "/api/v1/pipeline/pipe_abc123def456",
"services": ["document_intelligence", "extraction", "ontology"],
"name": "Legal Contract Ingestion",
"message": "Pipeline submitted. Poll the status URL for progress.",
"retention": "48 hours"
}/api/v1/pipeline/{job_id}{
"job_id": "pipe_abc123def456",
"status": "IN_PROGRESS",
"current_stage": 1,
"current_service": "extraction",
"stages_completed": 1,
"total_stages": 3
}/api/v1/pipeline/{job_id}/result{
"job_id": "pipe_abc123def456",
"status": "COMPLETED",
"output_url": "https://...",
"download_url": "https://...",
"output_size_bytes": 145280,
"execution_summary": {
"total_stages": 3,
"completed_stages": 3,
"total_cost_usd": 0.16
}
}/api/v1/pipeline/{job_id}Cancel a running pipeline job. Already-completed stages are not rolled back.
{
"job_id": "pipe_abc123def456",
"status": "CANCELLED",
"message": "Pipeline job cancelled"
}Troubleshooting
Cause: Your input type doesn't match the first service's expected input.
Fix: Use client.pipeline.run(files=[...]) with smart defaults, which handles this automatically. Or add doc_intel as the first step for file input.
Cause: knowledge_graph requires extraction.
Fix: Add extraction before knowledge_graph in your steps, or use smart defaults which include both.
Cause: Pipeline did not complete within the timeout period (default: 30 minutes).
Fix: Increase timeout: job.wait_for_completion(timeout=3600). For very large documents, consider splitting into batches.
Pricing
Pipeline processing is billed per page. The cost includes all stages in the pipeline.
Per-page pricing: Each page processed through the pipeline is billed at a flat per-page rate. This includes document intelligence, entity extraction, knowledge graph construction, and any other stages you configure. No additional orchestration fees. Check your dashboard for current rates and usage.
latence-python SDK v0.2
Pipeline-first SDK with DataPackage, Job handles, and smart defaults
Next Step: Dataset Intelligence
Feed pipeline outputs into Dataset Intelligence for corpus-level analysis — entity resolution, knowledge graph construction with RotatE link prediction, and ontology induction. Supports incremental ingestion so you can append new documents without reprocessing.
Enrichment
Semantic feature vectors via EmbeddingGemma. CPU-only, fast.
$1.00 / 1K pages
Knowledge Graph
Entity resolution, deduplication, RotatE link prediction.
$10.00 / 1K pages
Ontology
Concept clustering, hierarchy induction, SHACL shapes.
$50.00 / 1K pages
# Pipeline → Dataset Intelligence
di = client.experimental.dataset_intelligence_service
# Create a new dataset from pipeline output
job = di.run(input_data=pipeline_output, return_job=True)
# Poll status at GET /api/v1/pipeline/{job.job_id}
# Append new documents later (incremental)
delta = di.run(input_data=new_output, dataset_id="ds_...", return_job=True)Ready to build your pipeline?
Install the SDK and submit your first documents in under a minute.