Build CoreOrdered learning track

Document Ingestion and Parsing Pipelines

Learn Python AI Application Engineer - Part 012

Production document ingestion and parsing pipelines for AI applications, including source connectors, canonical elements, provenance, metadata, idempotency, quality gates, and regulatory auditability.

14 min read2620 words
PrevNext
Lesson 1235 lesson track0719 Build Core
#python#ai-engineering#document-ingestion#parsing+6 more

Part 012 — Document Ingestion and Parsing Pipelines

Bad RAG often starts with bad ingestion.

The model may be strong. The vector database may be fast. The prompt may be careful. But if the ingestion pipeline loses headings, merges unrelated pages, drops tables, strips dates, ignores permissions, or overwrites source versions, the application will retrieve weak evidence and generate weak answers.

Document ingestion is not “upload file then chunk text”.

It is a production data pipeline that transforms raw source artifacts into canonical, traceable, permission-aware, versioned knowledge elements.


1. Kaufman Framing

The target skill:

Given heterogeneous enterprise documents, build an ingestion pipeline that produces clean, structured, traceable, and reprocessable knowledge units for downstream retrieval and generation.

Decompose it into subskills.

SubskillMeaningFailure If Ignored
Source modelingKnow where documents come from and who owns themstale or unauthorized data enters the corpus
Artifact fingerprintingDetect unchanged, changed, duplicate, and deleted sourcesfull reprocessing and inconsistent indexes
ParsingConvert raw files into structured elementstext loss, table loss, broken citations
CanonicalizationNormalize documents without destroying meaningretrieval noise or evidence distortion
Metadata extractionCapture tenant, ACL, type, date, status, jurisdictionretrieval cannot filter correctly
ProvenancePreserve page, section, line, source version, and hashanswers cannot be audited
Quality gatesDetect parse loss and malformed outputbad data silently enters index
ReprocessingRebuild chunks/indexes after pipeline changesknowledge base cannot evolve safely

The first practice goal:

Build an ingestion pipeline that takes raw documents, creates canonical document elements with provenance, and emits a manifest suitable for chunking and embedding.


2. Ingestion Is a Pipeline, Not a Function

A production ingestion pipeline has stages.

Each stage should be independently observable and retryable.

If a parser fails, you should not lose the fetched artifact. If metadata extraction fails, you should not silently index the document as public. If chunking changes, you should not refetch every source artifact unnecessarily.


3. Source Artifact vs Canonical Document

Separate the raw artifact from the parsed/canonical representation.

ConceptMeaningExample
Source artifactraw file or source recordPDF, DOCX, HTML page, email, scanned image
Source metadatametadata from origin systemURL, owner, updated_at, ACL, case id
Artifact versionimmutable fetched snapshothash + timestamp + storage URI
Canonical documentnormalized representationtitle, sections, elements, tables
Document elementtyped piece of contentheading, paragraph, list item, table, figure
Chunk candidateretrieval-oriented unitsection group, paragraph group, table summary

Do not overwrite raw artifacts. Store immutable versions.

A raw artifact is evidence. A canonical document is your interpretation of that evidence.


4. Data Model

A useful ingestion model starts with explicit document versions.

from datetime import datetime
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, Field


class ElementType(StrEnum):
    TITLE = "title"
    HEADING = "heading"
    PARAGRAPH = "paragraph"
    LIST_ITEM = "list_item"
    TABLE = "table"
    IMAGE = "image"
    FOOTNOTE = "footnote"
    HEADER = "header"
    FOOTER = "footer"


class SourceArtifact(BaseModel):
    artifact_id: str
    tenant_id: str
    source_system: str
    source_uri: str
    source_updated_at: datetime | None = None
    content_type: str
    binary_sha256: str
    storage_uri: str
    fetched_at: datetime
    metadata: dict[str, Any] = Field(default_factory=dict)


class DocumentElement(BaseModel):
    element_id: str
    document_id: str
    document_version: str
    element_type: ElementType
    text: str
    order: int
    page_number: int | None = None
    section_path: list[str] = Field(default_factory=list)
    parent_element_id: str | None = None
    metadata: dict[str, Any] = Field(default_factory=dict)
    source_offsets: dict[str, Any] = Field(default_factory=dict)


class CanonicalDocument(BaseModel):
    document_id: str
    document_version: str
    tenant_id: str
    title: str | None = None
    source_artifact_id: str
    parser_name: str
    parser_version: str
    canonicalization_version: str
    elements: list[DocumentElement]
    metadata: dict[str, Any] = Field(default_factory=dict)
    created_at: datetime

The key idea: downstream retrieval should operate on canonical elements, while audit can trace back to immutable source artifacts.


5. Provenance Is Not Optional

Provenance answers:

  • Which file did this text come from?
  • Which version of the file?
  • Which page or section?
  • Which parser produced it?
  • Was OCR used?
  • Was the text normalized?
  • Was a table summarized or preserved?
  • Can we show the user the source?
  • Can we reproduce this chunk later?

For regulated systems, provenance is part of defensibility.

Example provenance envelope:

{
  "document_id": "policy-appeals",
  "document_version": "2026-03-17T10:22:18Z:sha256:abc123",
  "source_system": "policy-repository",
  "source_uri": "s3://raw/policies/appeals.pdf",
  "page_number": 14,
  "section_path": ["Appeals", "Late Appeal", "Exceptional Reopening"],
  "parser_name": "pdf-structure-parser",
  "parser_version": "2.4.1",
  "canonicalization_version": "2026-06-28",
  "element_ids": ["el-991", "el-992", "el-993"]
}

A chunk without provenance is just text. It is not reliable evidence.


6. Parsing Strategy by Source Type

Different document types fail differently.

Source TypeCommon FailureStrategy
Markdownusually clean, but links/frontmatter matterparse headings and metadata directly
HTMLnav/footer noise, hidden text, scriptsboilerplate removal and DOM-aware extraction
PDF textbroken reading order, headers/footerslayout-aware parsing and page mapping
Scanned PDFOCR errors, missing tablesOCR + confidence scoring + review queue
DOCXstyles, comments, tablesstructure-aware parser
Emailquoted replies, signatures, attachmentsthread-aware extraction
Spreadsheetcells lack narrative contexttable model and sheet metadata
Image/charttext absent or visual-onlyOCR, captioning, or manual curation

No single parser is universally correct.

A strong pipeline supports parser selection by source type and quality requirements.


7. Partitioning Before Chunking

The ingestion pipeline should first partition raw content into meaningful elements.

Then chunking can combine elements for retrieval.

Why this matters:

  • chunking raw text loses structure,
  • headings may be separated from paragraphs,
  • tables may be flattened incorrectly,
  • page numbers may be lost,
  • citations become weak,
  • and retrieval becomes harder to debug.

Production rule:

Parse into elements first. Chunk later.


8. Canonicalization

Canonicalization turns parser output into a consistent internal format.

Good canonicalization:

  • normalizes whitespace,
  • removes known repeated page headers/footers when safe,
  • preserves heading hierarchy,
  • preserves list structure,
  • preserves table boundaries,
  • extracts document title,
  • records language,
  • records parser confidence,
  • and records all transformations.

Bad canonicalization:

  • concatenates every page into one blob,
  • deletes headings,
  • removes page numbers without mapping,
  • flattens tables into ambiguous lines,
  • strips dates and section labels,
  • merges unrelated appendices,
  • or makes irreversible changes without storing raw artifact.

Canonicalization must be versioned.

CANONICALIZATION_VERSION = "2026-06-28.v1"

If canonicalization logic changes, affected documents should be eligible for reprocessing.


9. Metadata Enrichment

Metadata is not decoration. It controls retrieval behavior.

Important metadata categories:

CategoryExamplesUsed For
Accesstenant, ACL groups, classificationsecurity filtering
Authoritydraft, approved, archivedranking/filtering
Timeeffective_from, effective_to, updated_atfreshness and legal validity
Domaincase type, jurisdiction, product, processquery routing
Structuresection path, page, heading levelcontext assembly
Qualityparser confidence, OCR confidencereview/quarantine
Lineagesource system, artifact hash, parser versionaudit and reprocessing

For enterprise AI, most retrieval bugs are metadata bugs disguised as model bugs.


10. Idempotency and Fingerprinting

Ingestion jobs will retry.

That means they must be idempotent.

Fingerprint the raw artifact.

import hashlib
from pathlib import Path


def file_sha256(path: Path) -> str:
    digest = hashlib.sha256()
    with path.open("rb") as file:
        for block in iter(lambda: file.read(1024 * 1024), b""):
            digest.update(block)
    return digest.hexdigest()

Fingerprint the canonical output too.

import json


def canonical_hash(elements: list[dict]) -> str:
    stable = json.dumps(elements, sort_keys=True, ensure_ascii=False, separators=(",", ":"))
    return hashlib.sha256(stable.encode("utf-8")).hexdigest()

Use fingerprints to detect:

  • unchanged files,
  • duplicate files,
  • parser output changes,
  • metadata changes,
  • and reprocessing requirements.

11. Ingestion Job State Machine

Do not model ingestion as one boolean flag.

Use explicit states.

Benefits:

  • retry behavior is explicit,
  • partial progress is visible,
  • quarantine is first-class,
  • manual review is possible,
  • indexing depends on accepted canonical data only.

12. Minimal Pipeline Skeleton

A simple pipeline can still have production boundaries.

from dataclasses import dataclass
from typing import Protocol


@dataclass(frozen=True)
class FetchResult:
    artifact: SourceArtifact
    local_path: str


class SourceConnector(Protocol):
    async def discover(self) -> list[str]:
        ...

    async def fetch(self, source_id: str) -> FetchResult:
        ...


class DocumentParser(Protocol):
    async def parse(self, artifact: SourceArtifact, local_path: str) -> CanonicalDocument:
        ...


class QualityGate(Protocol):
    async def check(self, document: CanonicalDocument) -> list[str]:
        ...


class IngestionRepository(Protocol):
    async def has_artifact_hash(self, tenant_id: str, sha256: str) -> bool:
        ...

    async def save_document(self, document: CanonicalDocument) -> None:
        ...

    async def quarantine(self, artifact: SourceArtifact, reasons: list[str]) -> None:
        ...

Use case:

class IngestDocumentUseCase:
    def __init__(
        self,
        connector: SourceConnector,
        parser: DocumentParser,
        quality_gate: QualityGate,
        repository: IngestionRepository,
    ):
        self.connector = connector
        self.parser = parser
        self.quality_gate = quality_gate
        self.repository = repository

    async def ingest_one(self, source_id: str) -> None:
        fetched = await self.connector.fetch(source_id)

        if await self.repository.has_artifact_hash(
            fetched.artifact.tenant_id,
            fetched.artifact.binary_sha256,
        ):
            return

        document = await self.parser.parse(fetched.artifact, fetched.local_path)
        violations = await self.quality_gate.check(document)

        if violations:
            await self.repository.quarantine(fetched.artifact, violations)
            return

        await self.repository.save_document(document)

This is intentionally not tied to a specific parser library. Parser choice is an adapter detail.


13. Quality Gates

Quality gates prevent garbage from entering the knowledge base.

Examples:

GateCheckFailure Action
Empty extractiondocument has no meaningful textquarantine
Low OCR confidenceconfidence below thresholdmanual review
Missing titlecannot identify titlewarn or enrich
Missing ACLsource has no access metadataquarantine
Huge elementone element exceeds size budgetreparse or split
Table losstable pages extracted as empty textspecialized parser
Language mismatchexpected Indonesian but detected Englishroute review
Version conflictsame source older than current active versionarchive or reject
Duplicate contentsame hash already indexedskip or link duplicate

Gate implementation:

class BasicQualityGate:
    async def check(self, document: CanonicalDocument) -> list[str]:
        violations: list[str] = []

        text = "\n".join(element.text for element in document.elements).strip()

        if len(text) < 100:
            violations.append("document_text_too_short")

        if "acl_groups" not in document.metadata:
            violations.append("missing_acl_groups")

        if not any(element.element_type == ElementType.HEADING for element in document.elements):
            violations.append("missing_headings")

        large_elements = [element.element_id for element in document.elements if len(element.text) > 10_000]
        if large_elements:
            violations.append(f"oversized_elements:{','.join(large_elements[:5])}")

        return violations

Do not make every violation fatal. Some are warnings. Some require quarantine. Some require manual enrichment.


14. Quarantine Is a Feature

A bad ingestion pipeline silently indexes bad content.

A strong pipeline quarantines uncertain content.

Quarantine reasons:

  • parser crashed,
  • OCR confidence low,
  • ACL missing,
  • document type unknown,
  • extracted text too short,
  • table extraction failed,
  • duplicate conflict,
  • source version is older than active version,
  • malware scan failed,
  • PII classification required,
  • manual approval required.

Quarantine record:

{
  "artifact_id": "artifact-883",
  "source_uri": "policy_repo://appeals/late-appeals.pdf",
  "tenant_id": "tenant-001",
  "reasons": ["missing_acl_groups", "table_extraction_low_confidence"],
  "stage": "quality_check",
  "created_at": "2026-06-28T08:00:00Z",
  "retryable": true
}

Do not let the chunking/indexing pipeline consume quarantined documents.


15. Handling Tables

Tables are a major RAG failure source.

Naive text extraction often turns a table into unreadable fragments.

Example source table:

Risk LevelEscalation DeadlineApprover
Low10 business daysTeam Lead
High2 business daysDirector

Bad flattening:

Risk Level Escalation Deadline Approver Low 10 business days Team Lead High 2 business days Director

Better canonical representation:

Table: Escalation deadlines by risk level
Columns: Risk Level, Escalation Deadline, Approver
Row 1: Risk Level = Low; Escalation Deadline = 10 business days; Approver = Team Lead
Row 2: Risk Level = High; Escalation Deadline = 2 business days; Approver = Director

Best representation often stores both:

  • original structured table,
  • retrieval-friendly textual rendering,
  • table caption/title,
  • page and section path,
  • and optional row-level chunks.

Do not summarize tables with an LLM during ingestion unless you preserve the original table and mark the summary as derived content.


16. Handling Scanned Documents

Scanned documents require OCR.

OCR introduces uncertainty.

Track:

  • OCR engine,
  • OCR version,
  • confidence score,
  • page-level confidence,
  • detected language,
  • page image reference,
  • manual correction status,
  • and whether text is machine-generated or human-reviewed.

For critical regulatory evidence, low-confidence OCR should not be treated as authoritative without review.

A retrieval answer should be able to say:

Source text was extracted from OCR with low confidence on page 7.

That does not mean exposing internals to every end user. It means the system must know.


17. Access Control During Ingestion

Access metadata must be captured before indexing.

Bad pattern:

1. ingest all documents
2. embed all text
3. let app filter answers later

Better pattern:

1. fetch source ACL
2. attach ACL to canonical document and elements
3. write ACL-aware chunks
4. pre-filter retrieval by user permissions

Access metadata can come from:

  • source repository permissions,
  • case permissions,
  • document classification,
  • tenant boundary,
  • user role,
  • jurisdiction,
  • data domain,
  • retention/legal hold policy.

If permissions cannot be resolved, quarantine the document.


18. Deletion and Retention

Ingestion is not complete unless it handles deletion.

Cases:

EventRequired Behavior
Source document deletedremove or tombstone canonical document and vectors
Source document archivedexclude from default retrieval
Retention expireddelete according to policy
Legal hold appliedprevent deletion
ACL revokedupdate metadata and rebuild affected index entries
Document supersededretain old version but default to latest effective version

Vector stores are often treated as append-only caches. That is dangerous.

A knowledge base must support tombstones and reindexing.


19. Incremental Reprocessing

You will change parsers, normalization, metadata extraction, chunking, and embedding models.

Design for reprocessing from day one.

When a stage changes, only downstream stages need rebuild.

ChangeRe-fetch?Re-parse?Re-chunk?Re-embed?
source file changedyesyesyesyes
parser version changednoyesyesyes
metadata enrichment changednomaybemaybemaybe
chunking changednonoyesyes
embedding model changednononoyes
vector index params changednononomaybe reindex

This dependency graph prevents expensive and risky full rebuilds.


20. Observability

Capture ingestion metrics.

Important metrics:

  • discovered source count,
  • fetched count,
  • skipped unchanged count,
  • parse success rate,
  • parse failure rate by source type,
  • quarantine count by reason,
  • average elements per document,
  • extracted character count,
  • table count,
  • OCR page count,
  • ACL missing count,
  • pipeline latency by stage,
  • reprocessing count,
  • index emission count.

Trace one document across stages.

{
  "trace_id": "tr-ingest-883",
  "artifact_id": "artifact-883",
  "document_id": "policy-appeals",
  "stage": "quality_check",
  "parser": "pdf-layout-parser",
  "parser_version": "2.4.1",
  "element_count": 184,
  "table_count": 6,
  "page_count": 27,
  "violations": [],
  "duration_ms": 4120
}

If you cannot trace a bad answer back to its ingestion path, debugging RAG becomes guesswork.


21. Ingestion Failure Modes

FailureSymptomRoot CauseFix
Heading lossretrieved chunk lacks contextparser flattened structurestructure-aware parsing
Table lossanswer misses threshold/deadlinetable extraction failedtable-specific extraction
Stale contentold rule retrievedversion metadata missingeffective-date filters
Unauthorized retrievalrestricted text appearsACL not capturedquarantine missing ACL
Duplicate crowdingtop results repeat same docduplicate versions indexeddedup/tombstones
OCR hallucinationwrong entity/date extractedlow OCR confidenceconfidence gates/review
Broken citationssource page unavailableprovenance not storedpage/section mapping
Silent parse errorslow retrieval qualityno quality gatesquarantine and metrics
Rebuild chaosindex inconsistentno stage versioningdependency-based reprocessing

22. Practice: Build an Ingestion Manifest

Create a manifest file after parsing.

{
  "document_id": "policy-appeals",
  "document_version": "sha256:abc123",
  "tenant_id": "tenant-001",
  "source_artifact_id": "artifact-883",
  "parser": {
    "name": "markdown-parser",
    "version": "1.0.0"
  },
  "canonicalization_version": "2026-06-28.v1",
  "metadata": {
    "status": "approved",
    "jurisdiction": "national",
    "acl_groups": ["appeals-team", "case-supervisors"]
  },
  "elements": [
    {
      "element_id": "el-001",
      "type": "heading",
      "text": "Late Appeals",
      "order": 1,
      "section_path": ["Appeals", "Late Appeals"]
    },
    {
      "element_id": "el-002",
      "type": "paragraph",
      "text": "A late appeal may be accepted only when...",
      "order": 2,
      "section_path": ["Appeals", "Late Appeals"]
    }
  ]
}

Then write checks:

  • manifest has tenant,
  • manifest has ACL,
  • every element has order,
  • every element has type,
  • every element can be traced to document version,
  • headings exist,
  • text length is above threshold,
  • and status is not draft unless explicitly allowed.

23. Baeldung-Style Implementation Roadmap

Build in this order:

  1. Start with Markdown files.
  2. Parse headings and paragraphs.
  3. Add source artifact hashing.
  4. Add canonical document model.
  5. Add manifest output.
  6. Add quality gates.
  7. Add quarantine store.
  8. Add metadata extraction.
  9. Add PDF parser adapter.
  10. Add table handling.
  11. Add OCR handling only when needed.
  12. Add reprocessing/versioning.
  13. Add chunking job emission.
  14. Add observability.

Do not begin with the hardest PDF/OCR case. Start with a clean source type and build pipeline invariants.


24. Engineering Checklist

Before sending ingested documents to chunking/embedding, verify:

  • raw artifact is stored immutably,
  • artifact hash is recorded,
  • source metadata is captured,
  • tenant is known,
  • ACL/classification is known,
  • parser name/version is recorded,
  • canonicalization version is recorded,
  • document version is immutable,
  • elements have stable ids,
  • elements have order,
  • headings/section paths are preserved where possible,
  • tables are preserved or explicitly transformed,
  • page/section provenance exists,
  • quality gates run,
  • failed documents are quarantined,
  • deletion and supersession are handled,
  • reprocessing is possible without refetching unchanged artifacts,
  • ingestion traces can debug downstream retrieval failures.

25. Top 1% Judgment

A beginner says:

I loaded PDFs into a vector database.

A competent engineer says:

I parsed documents, chunked them, embedded them, and built search.

A strong AI application engineer says:

I operate a versioned ingestion pipeline that preserves source evidence, structure, permissions, provenance, quality gates, reprocessing semantics, and auditability before retrieval ever runs.

That distinction matters.

In production AI systems, ingestion quality creates the ceiling for RAG quality.

A model cannot reliably cite evidence that your pipeline lost. A retriever cannot respect permissions that your ingestion layer did not capture. An audit trail cannot prove a source that was overwritten.

Build ingestion like infrastructure.


26. References

  • Unstructured Documentation — Partitioning and chunking.
  • AWS Documentation — Amazon Bedrock Knowledge Bases ingestion and chunking.
  • LlamaIndex Documentation — Data connectors, ingestion pipelines, indexes, retrievers, and query engines.
  • OpenAI Documentation — Embeddings and retrieval-oriented AI application patterns.
Lesson Recap

You just completed lesson 12 in build core. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.