Kafka to Druid Real-Time Pipeline Setup: Segment Management & Orchestration Reference
Real-time ingestion into Apache Druid via the Kafka Indexing Service (KIS) requires deterministic segment lifecycle control. Production OLAP deployments must balance low-latency handoff, strict schema enforcement, and predictable retention boundaries. Misaligned consumer offsets, coordinator-overlord desynchronization, and unbounded segment proliferation remain the primary failure vectors. This reference codifies validated supervisor specifications, failure diagnostics, Python-driven orchestration, and idempotent recovery patterns for enterprise-grade segment management.
Supervisor Configuration & Deterministic Handoff
The Kafka supervisor specification dictates partition assignment, segment boundaries, and handoff thresholds. Modern Druid deployments require explicit tuningConfig parameters to prevent memory pressure during high-throughput bursts and to enforce strict compaction boundaries. The following JSON enforces deterministic segment sizing, bounded handoff thresholds, and isolated late-arriving data handling. (Retention is governed separately by Coordinator load/drop rules, not by the supervisor spec.)
{
"type": "kafka",
"id": "events_realtime_supervisor",
"spec": {
"dataSchema": {
"dataSource": "events_realtime",
"timestampSpec": { "column": "event_ts", "format": "auto" },
"dimensionsSpec": {
"dimensions": [
{ "name": "user_id", "type": "string" },
{ "name": "region", "type": "string" },
{ "name": "device_type", "type": "string" }
]
},
"metricsSpec": [
{ "type": "count", "name": "event_count" },
{ "type": "longSum", "name": "latency_ms", "fieldName": "latency" }
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "MINUTE",
"rollup": true
}
},
"ioConfig": {
"type": "kafka",
"topic": "events.prod.v2",
"consumerProperties": {
"bootstrap.servers": "kafka-broker-01:9092,kafka-broker-02:9092",
"group.id": "druid-events-realtime",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"max.poll.records": "5000"
},
"useEarliestOffset": true,
"inputFormat": { "type": "json", "flattenSpec": { "useFieldDiscovery": true } },
"replicas": 2,
"completionTimeout": "PT15M"
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000,
"maxRowsInMemory": 250000,
"intermediatePersistPeriod": "PT5M",
"handoffConditionTimeout": "PT10M",
"segmentWriteOutMediumFactory": { "type": "mmap" },
"logParseExceptions": true,
"maxParseExceptions": 10,
"maxSavedParseExceptions": 100,
"skipOffsetGaps": false
}
}
}
Key operational constraints:
maxRowsPerSegmentmust align with historical node disk I/O capacity. Exceeding ~5M rows triggers excessive compaction overhead.handoffConditionTimeoutdictates how long the Overlord waits for a Coordinator to acknowledge a newly published segment. Values belowPT5Mcause premature task termination during metadata store latency spikes.skipOffsetGaps: falseenforces strict offset continuity, preventing silent data loss during broker failovers.
Offset Checkpointing & Handoff Mechanics
KIS relies on Druid's metadata store to track Kafka consumer offsets. When a supervisor task completes a segment, it publishes the segment to deep storage and commits the final offset to the metadata store. If the Coordinator fails to acknowledge the handoff within handoffConditionTimeout, the Overlord retries the commit. Persistent failures indicate metadata store contention or network partitioning.
Proper alignment with Batch vs Streaming Ingestion Sync ensures that late-arriving events do not corrupt hot segment boundaries. Druid's completionTimeout parameter should be set to at least 1.5x the Kafka broker's max.poll.interval.ms to prevent consumer group rebalances during prolonged compaction windows.
Critical Failure Modes & Diagnostics
Real-time pipelines degrade predictably when segment handoff stalls or Kafka consumer groups rebalance excessively. The following failure modes require immediate intervention:
- Stalled Handoff (
PENDING→RUNNINGloop): Occurs when the Coordinator cannot reach the deep storage backend or when metadata store locks are held by concurrent compaction tasks. Diagnose viaGET /druid/coordinator/v1/metadata/datasources/events_realtimeand verifysegmentsstate. - Offset Drift & Rebalance Storms: Triggered when
max.poll.recordsexceeds the JVM heap allocation for intermediate persist buffers. Monitordruid_kafka_consumer_records_consumed_totalanddruid_kafka_consumer_records_lagmetrics. If lag exceeds 10% ofmax.poll.records, reducemaxRowsInMemoryand increaseintermediatePersistPeriod. - Schema Mutation Rejection: Druid enforces strict dimension/metric typing at supervisor initialization. Late schema changes require supervisor suspension, spec update, and resumption. Unhandled parse exceptions accumulate until
maxParseExceptionsis breached, causing task termination.
Python-Driven Orchestration & Idempotent Recovery
Automated recovery requires idempotent API interactions with the Druid Overlord. The following Python module implements supervisor health checks, graceful suspension, spec patching, and deterministic resumption. It leverages session-based retries and exponential backoff to handle transient Overlord unavailability.
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
DRUID_OVERLORD_URL = "https://druid-overlord.internal:8090"
SUPERVISOR_ID = "events_realtime_supervisor"
class DruidSupervisorOrchestrator:
def __init__(self):
self.session = requests.Session()
retry_strategy = Retry(
total=4,
backoff_factor=2,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "DELETE"]
)
self.session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
self.session.headers.update({"Content-Type": "application/json"})
def check_supervisor_health(self) -> dict:
"""Fetch current supervisor state and task distribution."""
resp = self.session.get(f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}/status")
resp.raise_for_status()
return resp.json()
def suspend_and_update(self, patch_spec: dict) -> None:
"""Idempotent spec update: suspend, patch, resume."""
# 1. Suspend gracefully
self.session.post(f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}/suspend")
logging.info("Supervisor suspended. Waiting for task termination...")
# 2. Apply spec patch
self.session.post(
f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}",
json=patch_spec
)
logging.info("Supervisor spec patched.")
# 3. Resume ingestion
self.session.post(f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}/resume")
logging.info("Supervisor resumed. Verifying task allocation...")
def trigger_recovery(self, target_state: str = "RUNNING") -> bool:
"""Force recovery if tasks are stuck in PENDING or FAILED."""
status = self.check_supervisor_health()
current_state = status.get("state")
if current_state != target_state:
logging.warning(f"Supervisor in {current_state}. Initiating recovery...")
self.session.post(f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}/reset")
return True
return False
if __name__ == "__main__":
orchestrator = DruidSupervisorOrchestrator()
# Example: Patch consumer properties without full restart
patch = {
"spec": {
"ioConfig": {
"consumerProperties": {
"max.poll.records": "3000",
"session.timeout.ms": "30000"
}
}
}
}
orchestrator.suspend_and_update(patch)
Operational Guardrails
Production pipelines must enforce strict validation gates before deploying supervisor changes. Schema drift detection, async task execution patterns, and automated rollback mechanisms are non-negotiable for high-availability OLAP clusters. Integrating these controls into a centralized Automated Ingestion Pipeline Orchestration framework ensures that spec mutations are version-controlled, tested against staging metadata stores, and deployed with zero-downtime guarantees.
For authoritative configuration references and metric definitions, consult the official Apache Druid Kafka ingestion documentation and the Kafka Consumer Configuration Guide.