Kafka to Druid Real-Time Pipeline Setup: Segment Management & Orchestration Reference

Real-time ingestion into Apache Druid via the Kafka Indexing Service (KIS) requires deterministic segment lifecycle control. Production OLAP deployments must balance low-latency handoff, strict schema enforcement, and predictable retention boundaries. Misaligned consumer offsets, coordinator-overlord desynchronization, and unbounded segment proliferation remain the primary failure vectors. This reference codifies validated supervisor specifications, failure diagnostics, Python-driven orchestration, and idempotent recovery patterns for enterprise-grade segment management.

Supervisor Configuration & Deterministic Handoff

The Kafka supervisor specification dictates partition assignment, segment boundaries, and handoff thresholds. Modern Druid deployments require explicit tuningConfig parameters to prevent memory pressure during high-throughput bursts and to enforce strict compaction boundaries. The following JSON enforces deterministic segment sizing, bounded handoff thresholds, and isolated late-arriving data handling. (Retention is governed separately by Coordinator load/drop rules, not by the supervisor spec.)

{
  "type": "kafka",
  "id": "events_realtime_supervisor",
  "spec": {
    "dataSchema": {
      "dataSource": "events_realtime",
      "timestampSpec": { "column": "event_ts", "format": "auto" },
      "dimensionsSpec": {
        "dimensions": [
          { "name": "user_id", "type": "string" },
          { "name": "region", "type": "string" },
          { "name": "device_type", "type": "string" }
        ]
      },
      "metricsSpec": [
        { "type": "count", "name": "event_count" },
        { "type": "longSum", "name": "latency_ms", "fieldName": "latency" }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": "MINUTE",
        "rollup": true
      }
    },
    "ioConfig": {
      "type": "kafka",
      "topic": "events.prod.v2",
      "consumerProperties": {
        "bootstrap.servers": "kafka-broker-01:9092,kafka-broker-02:9092",
        "group.id": "druid-events-realtime",
        "auto.offset.reset": "earliest",
        "enable.auto.commit": "false",
        "max.poll.records": "5000"
      },
      "useEarliestOffset": true,
      "inputFormat": { "type": "json", "flattenSpec": { "useFieldDiscovery": true } },
      "replicas": 2,
      "completionTimeout": "PT15M"
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsPerSegment": 5000000,
      "maxRowsInMemory": 250000,
      "intermediatePersistPeriod": "PT5M",
      "handoffConditionTimeout": "PT10M",
      "segmentWriteOutMediumFactory": { "type": "mmap" },
      "logParseExceptions": true,
      "maxParseExceptions": 10,
      "maxSavedParseExceptions": 100,
      "skipOffsetGaps": false
    }
  }
}

Key operational constraints:

  • maxRowsPerSegment must align with historical node disk I/O capacity. Exceeding ~5M rows triggers excessive compaction overhead.
  • handoffConditionTimeout dictates how long the Overlord waits for a Coordinator to acknowledge a newly published segment. Values below PT5M cause premature task termination during metadata store latency spikes.
  • skipOffsetGaps: false enforces strict offset continuity, preventing silent data loss during broker failovers.

Offset Checkpointing & Handoff Mechanics

KIS relies on Druid's metadata store to track Kafka consumer offsets. When a supervisor task completes a segment, it publishes the segment to deep storage and commits the final offset to the metadata store. If the Coordinator fails to acknowledge the handoff within handoffConditionTimeout, the Overlord retries the commit. Persistent failures indicate metadata store contention or network partitioning.

Proper alignment with Batch vs Streaming Ingestion Sync ensures that late-arriving events do not corrupt hot segment boundaries. Druid's completionTimeout parameter should be set to at least 1.5x the Kafka broker's max.poll.interval.ms to prevent consumer group rebalances during prolonged compaction windows.

Critical Failure Modes & Diagnostics

Real-time pipelines degrade predictably when segment handoff stalls or Kafka consumer groups rebalance excessively. The following failure modes require immediate intervention:

  1. Stalled Handoff (PENDINGRUNNING loop): Occurs when the Coordinator cannot reach the deep storage backend or when metadata store locks are held by concurrent compaction tasks. Diagnose via GET /druid/coordinator/v1/metadata/datasources/events_realtime and verify segments state.
  2. Offset Drift & Rebalance Storms: Triggered when max.poll.records exceeds the JVM heap allocation for intermediate persist buffers. Monitor druid_kafka_consumer_records_consumed_total and druid_kafka_consumer_records_lag metrics. If lag exceeds 10% of max.poll.records, reduce maxRowsInMemory and increase intermediatePersistPeriod.
  3. Schema Mutation Rejection: Druid enforces strict dimension/metric typing at supervisor initialization. Late schema changes require supervisor suspension, spec update, and resumption. Unhandled parse exceptions accumulate until maxParseExceptions is breached, causing task termination.

Python-Driven Orchestration & Idempotent Recovery

Automated recovery requires idempotent API interactions with the Druid Overlord. The following Python module implements supervisor health checks, graceful suspension, spec patching, and deterministic resumption. It leverages session-based retries and exponential backoff to handle transient Overlord unavailability.

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging

DRUID_OVERLORD_URL = "https://druid-overlord.internal:8090"
SUPERVISOR_ID = "events_realtime_supervisor"

class DruidSupervisorOrchestrator:
    def __init__(self):
        self.session = requests.Session()
        retry_strategy = Retry(
            total=4,
            backoff_factor=2,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST", "DELETE"]
        )
        self.session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
        self.session.headers.update({"Content-Type": "application/json"})

    def check_supervisor_health(self) -> dict:
        """Fetch current supervisor state and task distribution."""
        resp = self.session.get(f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}/status")
        resp.raise_for_status()
        return resp.json()

    def suspend_and_update(self, patch_spec: dict) -> None:
        """Idempotent spec update: suspend, patch, resume."""
        # 1. Suspend gracefully
        self.session.post(f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}/suspend")
        logging.info("Supervisor suspended. Waiting for task termination...")
        
        # 2. Apply spec patch
        self.session.post(
            f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}",
            json=patch_spec
        )
        logging.info("Supervisor spec patched.")
        
        # 3. Resume ingestion
        self.session.post(f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}/resume")
        logging.info("Supervisor resumed. Verifying task allocation...")

    def trigger_recovery(self, target_state: str = "RUNNING") -> bool:
        """Force recovery if tasks are stuck in PENDING or FAILED."""
        status = self.check_supervisor_health()
        current_state = status.get("state")
        
        if current_state != target_state:
            logging.warning(f"Supervisor in {current_state}. Initiating recovery...")
            self.session.post(f"{DRUID_OVERLORD_URL}/druid/indexer/v1/supervisor/{SUPERVISOR_ID}/reset")
            return True
        return False

if __name__ == "__main__":
    orchestrator = DruidSupervisorOrchestrator()
    # Example: Patch consumer properties without full restart
    patch = {
        "spec": {
            "ioConfig": {
                "consumerProperties": {
                    "max.poll.records": "3000",
                    "session.timeout.ms": "30000"
                }
            }
        }
    }
    orchestrator.suspend_and_update(patch)

Operational Guardrails

Production pipelines must enforce strict validation gates before deploying supervisor changes. Schema drift detection, async task execution patterns, and automated rollback mechanisms are non-negotiable for high-availability OLAP clusters. Integrating these controls into a centralized Automated Ingestion Pipeline Orchestration framework ensures that spec mutations are version-controlled, tested against staging metadata stores, and deployed with zero-downtime guarantees.

For authoritative configuration references and metric definitions, consult the official Apache Druid Kafka ingestion documentation and the Kafka Consumer Configuration Guide.

Back to Automated Ingestion Pipeline Orchestration