Kimberlite Reference

Kimberlite Agent Protocol

On this page

This document describes the protocol for communication between Kimberlite cluster agents and control plane systems.

Overview

The agent protocol enables:

  • Health monitoring: Agents report node status via heartbeats
  • Configuration management: Control planes push configuration updates
  • Observability: Metrics and logs are streamed from agents
  • Administration: Control planes can issue administrative commands
  • Authentication: Secure agent identity verification
  • Flow control: Backpressure handling for high-volume data
  • Health checks: On-demand health verification

Transport

The protocol uses WebSocket connections with JSON-encoded messages. Agents connect to the control plane endpoint and maintain a persistent connection.

Agent                          Control Plane
  |                                  |
  |-------- WebSocket Connect ------>|
  |                                  |
  |<------- AuthChallenge -----------|
  |-------- AuthResponse ----------->|
  |                                  |
  |<------- HeartbeatRequest --------|
  |-------- Heartbeat ------------->|
  |                                  |
  |-------- MetricsBatch ---------->|
  |-------- LogsBatch ------------->|
  |                                  |
  |<------- ConfigUpdate -----------|
  |-------- ConfigAck ------------->|
  |                                  |
  |<------- AdminCommand -----------|
  |-------- ControlAck ------------>|
  |                                  |
  |<------- HealthCheck ------------|
  |-------- HealthCheckResponse --->|
  |                                  |
  |<------- FlowControl ------------|
  |                                  |

Message Types

Agent → Control Plane

Heartbeat

Periodic status update sent by the agent.

{
  "type": "heartbeat",
  "node_id": "node-001",
  "status": "healthy",
  "role": "leader",
  "resources": {
    "cpu_percent": 45.2,
    "memory_used_bytes": 1073741824,
    "memory_total_bytes": 8589934592,
    "disk_used_bytes": 10737418240,
    "disk_total_bytes": 107374182400
  },
  "replication": null,
  "buffer_stats": [
    {
      "stream_type": "metrics",
      "state": "normal",
      "pending_items": 50,
      "capacity": 1000,
      "dropped_count": 0,
      "oldest_item_age_ms": 500
    }
  ]
}

Fields:

FieldTypeDescription
node_idstringUnique identifier for the node
statusenumhealthy, degraded, unhealthy, starting, stopping
roleenumleader, follower, candidate, learner
resourcesobjectCurrent resource utilization
replicationobject?Replication status (followers only)
buffer_statsarrayBuffer statistics for backpressure monitoring

Replication object:

FieldTypeDescription
leader_idstringID of the leader being replicated from
lag_msu64Replication lag in milliseconds
pending_entriesu64Number of entries waiting to replicate

Buffer stats object:

FieldTypeDescription
stream_typeenumheartbeats, metrics, logs, all
stateenumempty, normal, high, critical
pending_itemsu64Items currently buffered
capacityu64Maximum buffer capacity
dropped_countu64Items dropped since last report
oldest_item_age_msu64Age of oldest buffered item

MetricsBatch

Batch of collected metric samples.

{
  "type": "metrics_batch",
  "node_id": "node-001",
  "metrics": [
    {
      "name": "kmb.writes.total",
      "value": 12345.0,
      "timestamp_ms": 1700000000000,
      "labels": [["tenant", "acme"]]
    }
  ]
}

Metric sample fields:

FieldTypeDescription
namestringMetric name (e.g., kmb.writes.total)
valuef64Metric value
timestamp_msu64Unix timestamp in milliseconds
labelsarrayOptional key-value pairs

LogsBatch

Batch of log entries.

{
  "type": "logs_batch",
  "node_id": "node-001",
  "entries": [
    {
      "timestamp_ms": 1700000000000,
      "level": "info",
      "message": "Snapshot completed",
      "fields": [["duration_ms", "1234"]]
    }
  ]
}

Log entry fields:

FieldTypeDescription
timestamp_msu64Unix timestamp in milliseconds
levelenumtrace, debug, info, warn, error
messagestringLog message
fieldsarrayOptional structured fields

ConfigAck

Acknowledgment of a configuration update.

{
  "type": "config_ack",
  "version": 42,
  "success": true,
  "error": null
}

Fields:

FieldTypeDescription
versionu64Configuration version being acknowledged
successboolWhether configuration was applied
errorstring?Error message if failed

ControlAck

Acknowledgment of a control message (AdminCommand, etc.).

{
  "type": "control_ack",
  "message_id": 42,
  "success": true,
  "error": null,
  "result": "{\"snapshot_id\": \"snap-001\"}",
  "duration_ms": 1234
}

Fields:

FieldTypeDescription
message_idu64ID of the message being acknowledged
successboolWhether the command succeeded
errorstring?Error message if failed
resultstring?JSON-encoded command-specific result
duration_msu64Time taken to execute the command

AuthResponse

Response to authentication challenge.

{
  "type": "auth_response",
  "credentials": {
    "type": "bearer",
    "token": "eyJhbGciOiJIUzI1NiIs..."
  },
  "agent_info": {
    "version": "1.0.0",
    "protocol_version": "v1",
    "capabilities": ["snapshots", "compaction"]
  }
}

Credential types:

TypeFieldsDescription
bearertokenJWT or API key
pre_shared_keykey_id, signatureHMAC signature of challenge
certificatefingerprintSHA-256 certificate fingerprint

HealthCheckResponse

Response to a health check request.

{
  "type": "health_check_response",
  "request_id": 42,
  "status": "healthy",
  "checks": [
    {
      "check_type": "liveness",
      "passed": true,
      "message": "OK",
      "details": []
    },
    {
      "check_type": "storage",
      "passed": true,
      "message": "Disk usage at 45%",
      "details": [["used_bytes", "48318382080"]]
    }
  ],
  "duration_ms": 5
}

Fields:

FieldTypeDescription
request_idu64Correlation ID from request
statusenumhealthy, degraded, unhealthy
checksarrayIndividual check results
duration_msu64Time taken for all checks

Control Plane → Agent

ConfigUpdate

Push new configuration to the agent.

{
  "type": "config_update",
  "message_id": 1,
  "version": 42,
  "config": "{\"max_connections\": 100}",
  "checksum": "sha256:abc123..."
}

Fields:

FieldTypeDescription
message_idu64?Optional correlation ID for ack
versionu64Configuration version
configstringJSON-encoded configuration
checksumstringIntegrity checksum

The agent should verify the checksum before applying the configuration and respond with a ConfigAck.

AdminCommand

Execute an administrative command.

{
  "type": "admin_command",
  "message_id": 42,
  "command": {
    "command": "take_snapshot"
  }
}

Available commands:

CommandFieldsDescription
take_snapshot-Trigger a state snapshot
compact_logup_to_offsetCompact log up to offset
step_down-Step down from leader role
transfer_leadershiptarget_node_idTransfer to target
pause_replication-Pause replication for maintenance
resume_replication-Resume replication

The agent should respond with a ControlAck containing the message_id.

HeartbeatRequest

Request an immediate heartbeat from the agent.

{
  "type": "heartbeat_request"
}

Shutdown

Request graceful shutdown.

{
  "type": "shutdown",
  "reason": "Cluster scaling down"
}

AuthChallenge

Authentication challenge sent after connection.

{
  "type": "auth_challenge",
  "challenge": "random-challenge-string",
  "supported_methods": ["bearer", "pre_shared_key", "certificate"],
  "expires_in_ms": 30000
}

Fields:

FieldTypeDescription
challengestringRandom challenge for PSK auth
supported_methodsarraySupported auth methods
expires_in_msu64Challenge expiration time

FlowControl

Backpressure signal for high-volume data.

{
  "type": "flow_control",
  "stream_type": "metrics",
  "signal": {
    "slow_down": {
      "min_interval_ms": 10000
    }
  }
}

Signal types:

SignalFieldsDescription
resume-Resume normal transmission
slow_downmin_interval_msReduce transmission rate
pause-Stop transmission

HealthCheck

Request health check from agent.

{
  "type": "health_check",
  "request_id": 42,
  "checks": ["liveness", "storage", "replication"]
}

Check types:

TypeDescription
livenessBasic process health
storageStorage subsystem health
replicationReplication status
resourcesDisk/memory availability
allAll available checks

Connection Lifecycle

Authentication

After WebSocket connection is established:

  1. Control plane sends AuthChallenge
  2. Agent responds with AuthResponse containing credentials
  3. Control plane validates credentials
  4. On success, connection transitions to authenticated state

Initial Handshake

  1. Agent connects to WebSocket endpoint
  2. Authentication exchange (see above)
  3. Control plane sends HeartbeatRequest
  4. Agent responds with Heartbeat
  5. Connection is established

Steady State

  • Agent sends Heartbeat every 10 seconds (configurable)
  • Agent batches and sends MetricsBatch every 5 seconds
  • Agent batches and sends LogsBatch every 5 seconds
  • Control plane pushes ConfigUpdate as needed
  • Agent includes buffer_stats in heartbeats for backpressure monitoring
  • Control plane sends FlowControl when overwhelmed

Reconnection with Exponential Backoff

If the connection drops, agents should:

  1. Wait with exponential backoff (initial: 1s, max: 60s, multiplier: 2.0)
  2. Add jitter (±25%) to prevent thundering herd
  3. Reconnect and perform full authentication handshake
  4. Resume normal operation

Backoff Configuration:

BackoffConfig {
    initial_delay_ms: 1_000,   // 1 second
    max_delay_ms: 60_000,      // 60 seconds
    multiplier: 2.0,
    jitter_factor: 0.25,
    max_attempts: 0,           // unlimited
}

Connection States

StateDescription
disconnectedNot connected
backoffWaiting before reconnect
connectingConnection in progress
connectedConnected, not authenticated
authenticatedReady for normal operation
closingGraceful shutdown in progress

Health Monitoring

The control plane monitors agent health using these thresholds:

MetricWarningCritical
Heartbeat timeout-30 seconds
Replication lag5 seconds30 seconds
Disk usage80%95%
Memory usage85%95%

Using the Protocol

Rust Crate

The kimberlite-agent-protocol crate provides typed definitions:

use kmb_agent_protocol::{AgentMessage, NodeStatus, NodeRole, Resources};

let heartbeat = AgentMessage::Heartbeat {
    node_id: "node-001".to_string(),
    status: NodeStatus::Healthy,
    role: NodeRole::Leader,
    resources: Resources {
        cpu_percent: 45.2,
        memory_used_bytes: 1_073_741_824,
        memory_total_bytes: 8_589_934_592,
        disk_used_bytes: 10_737_418_240,
        disk_total_bytes: 107_374_182_400,
    },
    replication: None,
    buffer_stats: vec![],
};

let json = serde_json::to_string(&heartbeat)?;

Other Languages

The protocol uses standard JSON, so any language with JSON support can implement an agent or control plane. The type definitions in this document serve as the canonical specification.

Versioning

The protocol version is negotiated during the WebSocket handshake via the Sec-WebSocket-Protocol header:

Sec-WebSocket-Protocol: kimberlite-agent-protocol-v1

Breaking changes will increment the version number.