Automating Druid Ingestion Specs with Python
Apache Druid ingestion pipelines demand deterministic configuration, strict schema validation, and synchronized segment lifecycle management. Manual spec authoring inevitably introduces configuration drift, Overlord lock contention, and retention misalignment across distributed clusters. Programmatic orchestration via Python eliminates these failure surfaces by enforcing idempotent task submission, CI/CD gating, and automated retention synchronization. For OLAP data engineers and platform DevOps teams, treating ingestion specs as versioned, executable artifacts is a prerequisite for reliable analytics infrastructure.
Deterministic Spec Construction & Schema Enforcement
Ingestion specifications must adapt dynamically to partitioning shifts, schema evolution, and cluster topology changes without manual intervention. A production-grade orchestrator resolves partition boundaries, rollup configurations, and I/O formats at runtime rather than relying on static templates. Implementing strict JSON schema validation before submission prevents malformed tasks from consuming cluster resources or triggering silent data loss.
By integrating Dynamic Ingestion Spec Generation into your pipeline, you can enforce type-safe dictionaries, environment-driven defaults, and pre-flight validation against the official Druid ingestion contract. The following implementation demonstrates a hardened spec builder paired with jsonschema validation and resilient HTTP session management:
import json
import time
import requests
from typing import Dict, Any, List
from jsonschema import validate, ValidationError
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Minimal Druid index_parallel spec schema for pre-flight validation
SPEC_SCHEMA = {
"type": "object",
"required": ["type", "spec"],
"properties": {
"type": {"const": "index_parallel"},
"spec": {
"type": "object",
"required": ["dataSchema", "ioConfig", "tuningConfig"],
"properties": {
"dataSchema": {"type": "object"},
"ioConfig": {"type": "object"},
"tuningConfig": {"type": "object"}
}
}
}
}
def build_parallel_index_spec(
datasource: str,
input_paths: List[str],
timestamp_column: str,
dimensions: List[str],
metrics: List[Dict[str, Any]],
partition_size: int = 100_000_000,
segment_granularity: str = "DAY"
) -> Dict[str, Any]:
return {
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": datasource,
"timestampSpec": {"column": timestamp_column, "format": "auto"},
"dimensionsSpec": {"dimensions": dimensions},
"metricsSpec": [{"type": "count", "name": "count"}] + metrics,
"granularitySpec": {
"type": "uniform",
"segmentGranularity": segment_granularity,
"queryGranularity": "HOUR",
"rollup": True
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {"type": "local", "baseDir": "/data/ingest", "filter": "*.parquet"},
"inputFormat": {"type": "parquet"},
"appendToExisting": False
},
"tuningConfig": {
"type": "index_parallel",
"maxRowsPerSegment": partition_size,
"maxNumSubTasks": 4,
"forceGuaranteedRollup": True,
"logParseExceptions": True,
"maxParseExceptions": 0
}
}
}
def submit_and_track_task(
overlord_url: str,
spec: Dict[str, Any],
poll_interval: int = 5,
max_wait: int = 3600
) -> Dict[str, Any]:
validate(instance=spec, schema=SPEC_SCHEMA)
session = requests.Session()
session.headers.update({"Content-Type": "application/json"})
retry_strategy = Retry(total=3, backoff_factor=1, status_forcelist=[502, 503, 504])
session.mount("http://", HTTPAdapter(max_retries=retry_strategy))
session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
resp = session.post(
f"{overlord_url}/druid/indexer/v1/task",
json=spec,
timeout=30
)
resp.raise_for_status()
task_id = resp.json()["task"]
# Async execution polling pattern
start = time.time()
while time.time() - start < max_wait:
status_resp = session.get(f"{overlord_url}/druid/indexer/v1/task/{task_id}/status", timeout=15)
status_resp.raise_for_status()
payload = status_resp.json()
state = payload.get("status", {}).get("status")
if state in ("SUCCESS", "FAILED"):
return payload
time.sleep(poll_interval)
raise TimeoutError(f"Task {task_id} exceeded max wait time of {max_wait}s")
Idempotent Submission & Async Execution Patterns
Druid's Overlord API operates asynchronously, returning a task identifier immediately while background workers materialize segments. Orchestrators must implement deterministic polling with exponential backoff to avoid overwhelming the coordinator. Idempotency is achieved by coupling task payloads with external tracking systems (e.g., Airflow run IDs or Kafka offsets) and verifying segment availability via the metadata store before marking pipelines as complete.
When integrating Automated Ingestion Pipeline Orchestration into CI/CD workflows, enforce pre-commit linting against the Druid spec schema and gate deployments on successful dry-run validations. This prevents malformed ingestion jobs from propagating to production environments.
Segment Lifecycle & Automated Rollback
Automated retention synchronization requires querying Druid's sys.segments table to identify expired or orphaned segments before triggering cleanup tasks. Cross-cluster ingestion orchestration often demands synchronized compaction jobs to merge small segments and optimize query performance. Implement automated rollback mechanisms by preserving previous spec versions in a version-controlled registry and exposing a /druid/indexer/v1/task/{id}/shutdown endpoint for immediate termination on validation failures.
For batch vs streaming ingestion sync, decouple real-time Kafka supervisors from batch index_parallel tasks using a unified metadata layer. This ensures consistent dimension dictionaries and metric aggregations across ingestion modalities. Always align maxRowsPerSegment and segmentGranularity with downstream query patterns to prevent excessive segment proliferation.
Production Hardening Checklist
- Schema Validation: Enforce strict JSON schema checks against the official Druid ingestion contract before HTTP submission. Reference the JSON Schema specification for contract validation patterns.
- Session Resilience: Utilize persistent HTTP sessions with connection pooling and retry strategies to handle transient network partitions. See Requests Session Objects for implementation details.
- Metadata Verification: Cross-check Overlord task states with the Druid SQL metadata API to confirm segment handoff completion before triggering downstream compaction or archival workflows.
- CI/CD Gating: Integrate spec generation into pipeline runners with dry-run validation, linting, and automated rollback triggers on schema drift detection.
Programmatic ingestion orchestration transforms Druid from a manually configured OLAP engine into a self-healing, version-controlled analytics platform. By enforcing deterministic spec generation, resilient async execution, and automated lifecycle synchronization, engineering teams eliminate configuration drift and guarantee consistent segment topology at scale.