Comprehensive guides and references for the OpenFrame platform
The Data Streaming Kafka Config And Models module provides the foundational Kafka infrastructure for the OpenFrame data platform. It defines:
This module acts as the Kafka backbone for upstream services such as stream processing, CDC handlers, and enrichment pipelines.
It is consumed primarily by:
At runtime, this module provides a fully configured Kafka stack for the OSS tenant cluster:
flowchart LR
App["Application Service"] -->|"produce()"| Template["KafkaTemplate"]
Template --> ProducerFactory["ProducerFactory"]
ProducerFactory --> Broker[("Kafka Cluster")]
Broker --> ConsumerFactory["ConsumerFactory"]
ConsumerFactory --> ListenerFactory["KafkaListenerContainerFactory"]
ListenerFactory --> Listener["@KafkaListener"]
TopicProps["KafkaTopicProperties"] --> Admin["KafkaAdmin"]
Admin --> Broker
| Layer | Responsibility |
|---|---|
| Configuration | Override default Spring Kafka auto-configuration and provide OSS-specific setup |
| Properties | Bind tenant-scoped Kafka properties |
| Topic Management | Auto-register and provision inbound topics |
| Messaging Models | Standardized message contracts (Pinot, Debezium) |
| Error Handling | Structured recovery logging for failed publish attempts |
Component: OssKafkaConfig
This class enables Kafka support while explicitly excluding Spring Boot's default KafkaAutoConfiguration.
By excluding the default auto-configuration:
flowchart TD
DefaultAuto["KafkaAutoConfiguration"] -->|"excluded"| CustomConfig["OssKafkaConfig"]
CustomConfig --> EnableKafka["@EnableKafka"]
Component: OssTenantKafkaProperties
Configuration prefix:
spring.oss-tenant
This wraps Spring's standard KafkaProperties and exposes:
enabled flag (default: true)This makes the OSS Kafka cluster a first-class configurable unit.
Component: KafkaTopicProperties
Configuration prefix:
openframe.oss-tenant.kafka.topics
flowchart TD
Root["KafkaTopicProperties"] --> Auto["autoCreate"]
Root --> Inbound["inbound Map"]
Inbound --> TopicConfig["TopicConfig"]
TopicConfig --> Name["name"]
TopicConfig --> Partitions["partitions"]
TopicConfig --> Replication["replicationFactor"]
Each inbound topic can specify:
If autoCreate is enabled and Kafka Admin is active, topics are automatically registered during startup.
Component: OssTenantKafkaAutoConfiguration
This is the core infrastructure configuration class. It is activated when:
spring.oss-tenant.kafka.enabled=true
| Bean | Purpose |
|---|---|
| ProducerFactory | Builds producers with JSON serialization |
| KafkaTemplate | Main publishing abstraction |
| ConsumerFactory | Builds JSON-deserializing consumers |
| KafkaListenerContainerFactory | Configures listener concurrency and acknowledgment mode |
| KafkaAdmin | Admin client for topic management |
| NewTopics | Topic auto-registration |
| OssTenantKafkaProducer | Tenant-scoped producer abstraction |
flowchart TD
Props["OssTenantKafkaProperties"] --> ConsumerFactory
ConsumerFactory --> ListenerFactory
ListenerFactory --> AckMode["AckMode (default RECORD)"]
ListenerFactory --> Concurrency["Concurrency"]
ListenerFactory --> PollTimeout["Poll Timeout"]
Key default behavior:
JsonSerializer and JsonDeserializerRECORD acknowledgment modeComponent: MachinePinotMessage
Represents device state changes intended for Pinot indexing and analytics.
flowchart TD
Message["MachinePinotMessage"] --> MachineId["machineId"]
Message --> OrgId["organizationId"]
Message --> DeviceType["deviceType"]
Message --> Status["status"]
Message --> OsType["osType"]
Message --> Tags["tags[]"]
This message is typically consumed by stream services and analytics processors.
Component: DebeziumMessage<T>
A generic wrapper for Debezium CDC events.
It models the canonical Debezium structure:
flowchart TD
Debezium["DebeziumMessage"] --> Payload["Payload"]
Payload --> Before["before"]
Payload --> After["after"]
Payload --> Source["source"]
Payload --> Op["operation"]
Payload --> Ts["timestamp"]
Source --> DB["database"]
Source --> Table["table"]
Source --> Collection["collection"]
Used by stream handlers to:
before and after statesDefines shared header constants.
message-type
This enables:
Component: KafkaRecoveryHandlerImpl
Provides structured logging when message publishing fails.
When a publish attempt fails:
flowchart TD
Publish["Kafka Publish"] -->|"exception"| Recovery["KafkaRecoveryHandlerImpl"]
Recovery --> Log["Structured Error Log"]
This ensures:
Note: This implementation logs errors but does not automatically requeue to a dead-letter topic. Dead-letter routing can be layered on top in higher-level services.
This module provides infrastructure and models only. Actual processing logic is implemented in:
flowchart LR
Mongo["MongoDB"] -->|"CDC"| Debezium
Debezium --> Kafka[("Kafka Cluster")]
Kafka --> StreamService["Stream Service"]
StreamService --> Pinot["Pinot Analytics"]
DebeziumMessageAll configuration is isolated under the spring.oss-tenant prefix to prevent accidental cross-cluster binding.
Default Spring Kafka auto-configuration is disabled to maintain deterministic infrastructure behavior.
Topic creation is property-driven, reducing operational overhead.
Standardized models ensure compatibility across:
The Data Streaming Kafka Config And Models module is the foundational Kafka infrastructure layer of OpenFrame.
It provides:
This module enables the streaming ecosystem of OpenFrame to operate in a scalable, observable, and tenant-aware manner.