Comprehensive guides and references for the OpenFrame platform
The Stream Service Core Kafka Streams And Deserialization module is the foundational streaming layer of the OpenFrame Stream Service. It is responsible for:
MessageType values for downstream processingThis module acts as the ingestion and normalization boundary between external tool event sources (Fleet MDM, MeshCentral, Tactical RMM, Debezium-based CDC streams) and the internal stream processing pipeline.
It is consumed by higher-level streaming logic (message handling, enrichment, persistence, and publishing) but remains focused strictly on:
Within the overall OpenFrame platform, the Stream Service sits between data producers (tools, CDC pipelines) and downstream consumers (enrichment services, storage layers, analytics systems).
flowchart TD
ExternalTools["External Tools<br/>Fleet · MeshCentral · Tactical RMM"] --> KafkaBroker["Kafka Topics"]
KafkaBroker --> StreamCore["Stream Service Core<br/>Kafka Streams And Deserialization"]
StreamCore --> MessageHandling["Message Handling And Enrichment"]
MessageHandling --> Downstream["Persistence · Analytics · External APIs"]
This module specifically implements the StreamCore box in the diagram above.
Component: KafkaConfig
This configuration class defines supporting Kafka infrastructure beans.
A critical bean in this module is:
Converter<byte[], MessageType>
This converter:
MessageType enumThis enables:
MessageTypeComponent: KafkaStreamsConfig
This class enables and configures Kafka Streams processing.
@EnableKafkaStreams)flowchart TD
AppName["spring.application.name"] --> BuildId["Build Streams Application ID"]
ClusterId["openframe.cluster-id"] --> BuildId
BuildId --> StreamsConfig["StreamsConfig.APPLICATION_ID_CONFIG"]
Bootstrap["bootstrap-servers"] --> StreamsConfig
StreamsConfig --> KafkaStreams["Kafka Streams Runtime"]
The method buildStreamsApplicationId() ensures:
clusterId is defined → applicationName-clusterIdapplicationNameThis prevents state store collisions across tenants in SaaS deployments.
The module defines explicit Serdes for:
ActivityMessageHostActivityMessageTwo variants exist:
This allows:
All deserializers extend a common base:
IntegratedToolEventDeserializer
Each implementation:
agentIdsourceEventTypeflowchart TD
RawEvent["Raw Debezium / Tool Event"] --> ToolDeserializer["Tool-Specific Deserializer"]
ToolDeserializer --> ExtractFields["Extract Fields"]
ExtractFields --> Normalize["Normalize To Internal Model"]
Normalize --> TypedEvent["MessageType Assigned"]
MessageType: FLEET_MDM_EVENT
agentId (with fallback strategy)activity_typeFleetActivityTypeMappingIf a mapping is not found:
details fieldThis ensures:
MessageType: FLEET_MDM_QUERY_RESULT_EVENT
This deserializer handles scheduled query execution results.
FleetMdmCacheServicequery_idIt constructs structured JSON for:
errorresultIf output is valid JSON → parsed and embedded. If not → stored as plain text.
flowchart TD
QueryEvent["Fleet Query Result"] --> HasError{{"Error Present?"}}
HasError -->|Yes| ErrorJson["Build Error JSON"]
HasError -->|No| HasData{{"Data Present?"}}
HasData -->|Yes| ResultJson["Build Result JSON"]
HasData -->|No| InfoMessage["Result Received Message"]
MessageType: MESHCENTRAL_EVENT
nodeidetypeaction_idmsg$oid IDs$dateIf both etype and action exist:
etype.action
This provides a consistent event taxonomy across systems.
MessageType: TACTICAL_RMM_AGENT_HISTORY_EVENT
TacticalRmmCacheServicecmd_runscript_runstdoutstderrflowchart TD
HistoryEvent["Agent History Event"] --> TypeSwitch{{"cmd_run or script_run?"}}
TypeSwitch --> Cmd["Command Processing"]
TypeSwitch --> Script["Script Processing"]
Script --> ExtractStdout["Extract stdout"]
Script --> ExtractStderr["Extract stderr"]
Script --> ExecutionTime["Extract execution_time"]
MessageType: TACTICAL_RMM_AUDIT_EVENT
agentidobject_typeactionmessageafter_valueobject_type.action
This supports:
All deserializers implement:
public MessageType getType()
This enum value becomes the routing contract for downstream stream processors.
flowchart TD
Deserializer --> MessageTypeEnum["MessageType Enum"]
MessageTypeEnum --> Router["Stream Router"]
Router --> Enrichment["Enrichment Service"]
Router --> Persistence["Persistence Layer"]
Router --> ExternalPublish["External Publishing"]
By centralizing typing here, the system ensures:
The Kafka Streams configuration includes:
AT_LEAST_ONCE processingMAX_TASK_IDLE_MSThese settings balance:
The Stream Service Core Kafka Streams And Deserialization module follows these principles:
Each tool has its own deserializer:
This prevents cross-tool coupling.
Raw events are:
Before any enrichment occurs.
When:
The system logs and falls back gracefully.
The Stream Service Core Kafka Streams And Deserialization module is the ingestion and normalization backbone of the Stream Service.
It provides:
MessageType routingBy enforcing strict normalization at this boundary, the platform guarantees that downstream enrichment and persistence layers operate on clean, predictable, and tool-agnostic event models.