document-classification-nlp

SKILL.md

Document Classification with NLP

Overview

This skill implements NLP-based document classification and information extraction for construction projects. Automate document sorting, key term extraction, and content analysis.

Document Types:

  • RFIs (Requests for Information)
  • Submittals and shop drawings
  • Change orders and variations
  • Specifications and standards
  • Contracts and agreements
  • Safety reports and permits

Quick Start

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
import pandas as pd

# Sample training data
documents = [
    ("Please clarify the steel reinforcement spacing for the foundation slab", "RFI"),
    ("Attached shop drawing for HVAC ductwork layout", "Submittal"),
    ("Additional cost for unforeseen soil conditions", "Change Order"),
    ("Fire-rated wall assembly specification Section 09 21 16", "Specification"),
]

texts, labels = zip(*documents)

# Train classifier
classifier = Pipeline([
    ('tfidf', TfidfVectorizer(max_features=1000, ngram_range=(1, 2))),
    ('clf', MultinomialNB())
])

classifier.fit(texts, labels)

# Classify new document
new_doc = "Request to approve substitution of specified light fixtures"
prediction = classifier.predict([new_doc])[0]
print(f"Classification: {prediction}")  # Output: Submittal

Advanced Classification System

Document Classifier Class

import re
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from typing import List, Dict, Tuple, Optional
import spacy
from dataclasses import dataclass

@dataclass
class ClassificationResult:
    document_id: str
    predicted_class: str
    confidence: float
    alternative_classes: List[Tuple[str, float]]
    extracted_entities: Dict[str, List[str]]
    keywords: List[str]

class ConstructionDocumentClassifier:
    """Classify and analyze construction documents"""

    # Document type patterns
    DOCUMENT_PATTERNS = {
        'RFI': [
            r'request\s+for\s+information',
            r'clarification\s+(needed|required|requested)',
            r'please\s+(clarify|confirm|advise)',
            r'question\s+(regarding|about)',
            r'rfi\s*#?\d*'
        ],
        'Submittal': [
            r'submittal',
            r'shop\s+drawing',
            r'product\s+data',
            r'sample\s+submission',
            r'approval\s+request',
            r'material\s+submission'
        ],
        'Change Order': [
            r'change\s+order',
            r'variation\s+order',
            r'cost\s+(increase|adjustment|addition)',
            r'scope\s+change',
            r'additional\s+work',
            r'unforeseen\s+conditions'
        ],
        'Specification': [
            r'section\s+\d{2}\s+\d{2}\s+\d{2}',
            r'specification',
            r'performance\s+requirement',
            r'material\s+standard',
            r'quality\s+standard'
        ],
        'Safety Report': [
            r'incident\s+report',
            r'safety\s+(inspection|violation|observation)',
            r'hazard\s+(identification|assessment)',
            r'near\s+miss',
            r'osha',
            r'jha|jsa'
        ],
        'Contract': [
            r'contract\s+agreement',
            r'terms\s+and\s+conditions',
            r'scope\s+of\s+work',
            r'payment\s+terms',
            r'warranty\s+provision'
        ]
    }

    def __init__(self, use_spacy: bool = True):
        self.classifier = None
        self.vectorizer = None
        self.label_encoder = LabelEncoder()

        if use_spacy:
            try:
                self.nlp = spacy.load("en_core_web_sm")
            except:
                self.nlp = None
        else:
            self.nlp = None

    def train(self, documents: List[str], labels: List[str]) -> Dict:
        """Train the document classifier"""
        # Encode labels
        y = self.label_encoder.fit_transform(labels)

        # Create pipeline
        self.classifier = Pipeline([
            ('tfidf', TfidfVectorizer(
                max_features=5000,
                ngram_range=(1, 3),
                stop_words='english',
                sublinear_tf=True
            )),
            ('clf', LinearSVC(C=1.0, class_weight='balanced'))
        ])

        # Train
        self.classifier.fit(documents, y)

        # Cross-validation
        scores = cross_val_score(self.classifier, documents, y, cv=5)

        return {
            'accuracy_mean': scores.mean(),
            'accuracy_std': scores.std(),
            'classes': list(self.label_encoder.classes_)
        }

    def classify(self, document: str) -> ClassificationResult:
        """Classify a single document"""
        if self.classifier is None:
            # Use rule-based classification if no model trained
            return self._rule_based_classify(document)

        # Get prediction
        prediction = self.classifier.predict([document])[0]
        predicted_class = self.label_encoder.inverse_transform([prediction])[0]

        # Get confidence scores
        decision_scores = self.classifier.decision_function([document])[0]
        probs = self._softmax(decision_scores)

        alternatives = [
            (self.label_encoder.inverse_transform([i])[0], float(probs[i]))
            for i in np.argsort(probs)[::-1][1:4]
        ]

        # Extract entities and keywords
        entities = self._extract_entities(document)
        keywords = self._extract_keywords(document)

        return ClassificationResult(
            document_id="",
            predicted_class=predicted_class,
            confidence=float(probs[prediction]),
            alternative_classes=alternatives,
            extracted_entities=entities,
            keywords=keywords
        )

    def _rule_based_classify(self, document: str) -> ClassificationResult:
        """Rule-based classification using patterns"""
        doc_lower = document.lower()
        scores = {}

        for doc_type, patterns in self.DOCUMENT_PATTERNS.items():
            score = sum(
                1 for pattern in patterns
                if re.search(pattern, doc_lower)
            )
            scores[doc_type] = score

        if max(scores.values()) == 0:
            predicted = 'Other'
            confidence = 0.5
        else:
            predicted = max(scores, key=scores.get)
            confidence = scores[predicted] / len(self.DOCUMENT_PATTERNS[predicted])

        return ClassificationResult(
            document_id="",
            predicted_class=predicted,
            confidence=confidence,
            alternative_classes=[],
            extracted_entities=self._extract_entities(document),
            keywords=self._extract_keywords(document)
        )

    def _extract_entities(self, document: str) -> Dict[str, List[str]]:
        """Extract named entities from document"""
        entities = {
            'dates': [],
            'organizations': [],
            'people': [],
            'monetary': [],
            'references': []
        }

        # Date patterns
        date_pattern = r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}'
        entities['dates'] = re.findall(date_pattern, document)

        # Money patterns
        money_pattern = r'\$[\d,]+(?:\.\d{2})?'
        entities['monetary'] = re.findall(money_pattern, document)

        # Reference numbers
        ref_pattern = r'(?:RFI|CO|SI|PR)[-#]?\s*\d+'
        entities['references'] = re.findall(ref_pattern, document, re.IGNORECASE)

        # Use spaCy for NER if available
        if self.nlp:
            doc = self.nlp(document)
            for ent in doc.ents:
                if ent.label_ == 'ORG':
                    entities['organizations'].append(ent.text)
                elif ent.label_ == 'PERSON':
                    entities['people'].append(ent.text)

        return entities

    def _extract_keywords(self, document: str, top_n: int = 10) -> List[str]:
        """Extract key terms from document"""
        # Construction-specific terms
        construction_terms = [
            'concrete', 'steel', 'reinforcement', 'foundation', 'structural',
            'hvac', 'plumbing', 'electrical', 'mechanical', 'architectural',
            'specification', 'drawing', 'detail', 'schedule', 'submittals',
            'rfi', 'change order', 'delay', 'inspection', 'approval'
        ]

        doc_lower = document.lower()
        found_terms = [term for term in construction_terms if term in doc_lower]

        return found_terms[:top_n]

    def _softmax(self, x: np.ndarray) -> np.ndarray:
        """Convert decision scores to probabilities"""
        exp_x = np.exp(x - np.max(x))
        return exp_x / exp_x.sum()

    def batch_classify(self, documents: List[str]) -> pd.DataFrame:
        """Classify multiple documents"""
        results = [self.classify(doc) for doc in documents]

        return pd.DataFrame([{
            'Predicted_Class': r.predicted_class,
            'Confidence': r.confidence,
            'Keywords': ', '.join(r.keywords),
            'Dates_Found': ', '.join(r.extracted_entities['dates']),
            'References_Found': ', '.join(r.extracted_entities['references'])
        } for r in results])

Information Extraction

Key Information Extractor

class ConstructionInfoExtractor:
    """Extract key information from construction documents"""

    def __init__(self):
        self.patterns = {
            'rfi_number': r'RFI\s*[-#]?\s*(\d+)',
            'submittal_number': r'(?:Submittal|SI)\s*[-#]?\s*(\d+)',
            'change_order_number': r'(?:Change Order|CO|PCO)\s*[-#]?\s*(\d+)',
            'spec_section': r'Section\s*(\d{2}\s*\d{2}\s*\d{2})',
            'cost_amount': r'\$\s*([\d,]+(?:\.\d{2})?)',
            'duration_days': r'(\d+)\s*(?:calendar\s+)?days?',
            'drawing_reference': r'(?:Drawing|Dwg|DWG)\s*[-#]?\s*([A-Z\d-]+)',
            'date': r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})',
            'contractor_name': r'(?:Contractor|Subcontractor):\s*([^\n]+)',
            'project_name': r'Project:\s*([^\n]+)',
            'priority': r'(?:Priority|Urgency):\s*(Critical|High|Medium|Low)'
        }

    def extract_all(self, document: str) -> Dict:
        """Extract all available information"""
        results = {}

        for field, pattern in self.patterns.items():
            matches = re.findall(pattern, document, re.IGNORECASE)
            results[field] = matches if matches else None

        # Post-process
        if results.get('cost_amount'):
            results['cost_amount'] = [
                float(amt.replace(',', ''))
                for amt in results['cost_amount']
            ]

        return results

    def extract_rfi_details(self, document: str) -> Dict:
        """Extract RFI-specific information"""
        return {
            'rfi_number': self._find_first(document, self.patterns['rfi_number']),
            'date_submitted': self._find_first(document, self.patterns['date']),
            'spec_section': self._find_first(document, self.patterns['spec_section']),
            'drawing_ref': self._find_first(document, self.patterns['drawing_reference']),
            'question': self._extract_question(document),
            'priority': self._find_first(document, self.patterns['priority'])
        }

    def extract_change_order_details(self, document: str) -> Dict:
        """Extract change order specific information"""
        costs = re.findall(self.patterns['cost_amount'], document)
        total_cost = sum(float(c.replace(',', '')) for c in costs) if costs else None

        return {
            'co_number': self._find_first(document, self.patterns['change_order_number']),
            'date': self._find_first(document, self.patterns['date']),
            'cost_impact': total_cost,
            'duration_impact': self._find_first(document, self.patterns['duration_days']),
            'reason': self._extract_reason(document),
            'contractor': self._find_first(document, self.patterns['contractor_name'])
        }

    def _find_first(self, document: str, pattern: str) -> Optional[str]:
        match = re.search(pattern, document, re.IGNORECASE)
        return match.group(1) if match else None

    def _extract_question(self, document: str) -> Optional[str]:
        """Extract the question from an RFI"""
        # Look for question markers
        patterns = [
            r'Question:\s*(.+?)(?:\n\n|$)',
            r'(?:Please\s+)?(?:clarify|confirm|advise)(.+?)(?:\.|$)',
        ]
        for pattern in patterns:
            match = re.search(pattern, document, re.IGNORECASE | re.DOTALL)
            if match:
                return match.group(1).strip()[:500]
        return None

    def _extract_reason(self, document: str) -> Optional[str]:
        """Extract reason for change order"""
        patterns = [
            r'Reason:\s*(.+?)(?:\n\n|$)',
            r'(?:Due to|Because of)\s*(.+?)(?:\.|$)',
        ]
        for pattern in patterns:
            match = re.search(pattern, document, re.IGNORECASE | re.DOTALL)
            if match:
                return match.group(1).strip()[:500]
        return None

Processing Pipeline

def process_document_batch(documents: List[str], output_path: str):
    """Process and classify a batch of documents"""
    classifier = ConstructionDocumentClassifier()
    extractor = ConstructionInfoExtractor()

    results = []

    for i, doc in enumerate(documents):
        # Classify
        classification = classifier.classify(doc)

        # Extract info based on type
        if classification.predicted_class == 'RFI':
            extracted = extractor.extract_rfi_details(doc)
        elif classification.predicted_class == 'Change Order':
            extracted = extractor.extract_change_order_details(doc)
        else:
            extracted = extractor.extract_all(doc)

        results.append({
            'Document_ID': i + 1,
            'Classification': classification.predicted_class,
            'Confidence': classification.confidence,
            'Keywords': ', '.join(classification.keywords),
            **extracted
        })

    df = pd.DataFrame(results)
    df.to_excel(output_path, index=False)

    return df

Quick Reference

Document Type Key Patterns Extracted Info
RFI "request for information", "clarify" Number, spec section, question
Submittal "shop drawing", "approval request" Number, product, spec section
Change Order "change order", "additional cost" Number, cost, duration impact
Specification "Section XX XX XX" Section number, requirements
Safety Report "incident", "hazard" Date, type, severity

Resources

Next Steps

  • See vector-search for semantic document search
  • See llm-data-automation for advanced extraction
  • See pdf-to-structured for PDF processing
Weekly Installs
3
GitHub Stars
55
First Seen
11 days ago
Installed on
opencode3
antigravity3
claude-code3
github-copilot3
codex3
kimi-cli3