Schema Validation for Druid Specs

The Pre-Flight Gatekeeper

In production OLAP environments, ingestion specifications are rarely static artifacts. As data contracts shift and transformation pipelines scale, the JSON payloads routed to the Druid Overlord require deterministic, automated verification before execution. Schema validation functions as the primary control plane in any Automated Ingestion Pipeline Orchestration framework, intercepting malformed tasks before they consume Peon resources or corrupt segment metadata. Without strict pre-flight validation, type mismatches and structural drift cascade into query latency spikes, compaction failures, and expensive manual remediation cycles.

Deterministic Pre-Flight Architecture

Druid ingestion specs are deeply nested JSON documents where subtle deviations in timestampSpec, dimensionsSpec, or metricsSpec silently alter segment behavior. Validation must operate at two distinct layers: structural contract enforcement and Druid-specific semantic validation.

Structural Contract Enforcement relies on formalized JSON Schema definitions aligned with the Apache Druid Ingestion Reference. This layer catches missing required fields, incorrect data types, and invalid enum values before the payload leaves the CI/CD or orchestration runner.

Semantic Validation enforces Druid runtime constraints that JSON Schema alone cannot express:

  • Timestamp Alignment: The timestampSpec format (auto, iso, posix, millis) must match the underlying inputFormat parsing capabilities. A misaligned format causes the timestamp to fail parsing, and the affected rows are dropped as unparseable unless surfaced through the task's maxParseExceptions handling.
  • Type Coercion Boundaries: Druid does not perform implicit casting during rollup. Validation must reject metricsSpec aggregators targeting columns absent from dimensionsSpec or inputSchema, and prevent STRING columns from being aggregated as DOUBLE or LONG.
  • Rollup Compatibility: When granularitySpec enables rollup, every declared aggregator (count, sum, doubleSum, approxHistogram, thetaSketch) must map to compatible source data types. Incompatible pairings trigger immediate TASK_FAILED states at the Peon level.

For engineering teams implementing Dynamic Ingestion Spec Generation, validation must execute strictly post-templating and pre-submission. This ensures interpolated variables, environment-specific overrides, and computed partition boundaries satisfy the Overlord contract.

Production Validation Implementation

Pipeline builders should deploy a lightweight, versioned validation service that runs synchronously before task submission. The following implementation uses pydantic for rapid schema modeling and jsonschema for strict contract verification, adhering to JSON Schema Draft 2020-12 standards.

import json
import logging
import requests
from pydantic import BaseModel, ValidationError, field_validator
from typing import Dict, Any, List

logger = logging.getLogger(__name__)

class DruidIngestionValidator:
    def __init__(self, schema_path: str):
        with open(schema_path) as f:
            self.contract = json.load(f)

    def validate_structure(self, spec: Dict[str, Any]) -> None:
        """Enforce strict JSON Schema contract against Druid API."""
        try:
            from jsonschema import validate, exceptions
            validate(instance=spec, schema=self.contract)
        except exceptions.ValidationError as e:
            raise RuntimeError(f"Schema contract violation: {e.message}") from e

    def validate_semantics(self, spec: Dict[str, Any]) -> None:
        """Apply Druid-specific runtime constraints."""
        io_config = spec.get("ioConfig", {})
        input_format = io_config.get("inputFormat", {}).get("type", "")
        
        if input_format == "json" and "timestampSpec" not in spec:
            raise ValueError("JSON input format requires explicit timestampSpec definition")
            
        data_schema = spec.get("dataSchema", {})
        # Dimensions may be declared as plain strings or as {"name": ..., "type": ...} objects.
        dimensions = {
            d["name"] if isinstance(d, dict) else d
            for d in data_schema.get("dimensionsSpec", {}).get("dimensions", [])
        }
        metrics = data_schema.get("metricsSpec", [])
        
        # Metric source columns are raw input fields, which are normally NOT also
        # dimensions. Only validate fieldName membership when the input format
        # declares an explicit column list (e.g. CSV/TSV); otherwise the raw
        # columns are not known at this layer and the check would false-positive.
        declared_columns = set(io_config.get("inputFormat", {}).get("columns", []))
        if declared_columns:
            known_columns = declared_columns | dimensions
            for metric in metrics:
                field = metric.get("fieldName")
                if field and field not in known_columns:
                    raise ValueError(
                        f"Rollup violation: metric '{metric['name']}' references missing column '{field}'"
                    )
                
    def execute(self, spec_path: str, overlord_url: str) -> requests.Response:
        with open(spec_path) as f:
            spec = json.load(f)
            
        self.validate_structure(spec)
        self.validate_semantics(spec)
        
        logger.info("Pre-flight validation passed. Submitting to Overlord: %s", overlord_url)
        return requests.post(
            f"{overlord_url}/druid/indexer/v1/task",
            json=spec,
            headers={"Content-Type": "application/json"},
            timeout=30
        )

Async Execution & State Reconciliation

Validation does not terminate at successful HTTP submission. Once the Overlord acknowledges the payload, the pipeline must transition to asynchronous state tracking. Druid tasks operate independently across the cluster, requiring a reconciliation loop that polls /druid/indexer/v1/task/{taskId}/status and maps terminal states to pipeline outcomes.

Automated reconciliation must account for heterogeneous execution models. Batch vs Streaming Ingestion Sync dictates different polling intervals, retry thresholds, and failure escalation paths. Batch tasks typically require segment availability verification post-completion, while streaming supervisors demand continuous health checks and partition offset validation.

Implementing a robust async handler involves:

  1. Idempotent Task Submission: Generate deterministic taskIds using content hashing to prevent duplicate ingestion on network retries.
  2. Exponential Backoff Polling: Query the Overlord at configurable intervals, escalating to alerting channels if tasks remain in WAITING or RUNNING beyond SLA thresholds.
  3. Segment Metadata Verification: Post-success, query the Coordinator API to confirm expected segment counts, partition boundaries, and compaction readiness before marking the pipeline run complete.

Versioning & Forward Compatibility

Ingestion contracts evolve alongside upstream data models. Validation pipelines must support schema versioning, allowing teams to run parallel validation rules during migration windows. By decoupling validation logic from orchestration runners and storing contract definitions in version control, engineering teams can safely introduce breaking changes, deprecate legacy aggregators, and enforce strict backward compatibility gates.

When upstream producers modify column types or introduce new partition keys, validation hooks must integrate with Handling Schema Evolution in Druid Ingestion workflows. This ensures that incremental schema updates are validated against both historical segment expectations and future ingestion requirements, maintaining query consistency across the entire OLAP footprint.

Back to Automated Ingestion Pipeline Orchestration