Comprehensive guides and references for the OpenFrame platform
The Stream Service Core Message Handling And Enrichment module is responsible for:
This module sits at the heart of the streaming pipeline in the openframe-stream service and bridges raw Debezium events with normalized, enriched domain events used across the platform.
It works closely with:
stream-service-core-kafka-streams-and-deserialization module)flowchart LR
KafkaInbound["Kafka Inbound Topics<br/>MeshCentral / Tactical / Fleet"] --> JsonListener["JsonKafkaListener"]
JsonListener --> JsonProcessor["GenericJsonMessageProcessor"]
JsonProcessor --> EnrichmentService["IntegratedToolDataEnrichmentService"]
EnrichmentService --> HandlerRouter["DebeziumMessageHandler"]
HandlerRouter --> CassandraHandler["DebeziumCassandraMessageHandler"]
HandlerRouter --> KafkaHandler["DebeziumKafkaMessageHandler"]
CassandraHandler --> CassandraDB[("Cassandra")]
KafkaHandler --> KafkaOutbound["Kafka Outbound Topic<br/>Integrated Tool Events"]
FleetActivities["Fleet Activity Topics"] --> ActivityStream["ActivityEnrichmentService"]
ActivityStream --> EnrichedFleet["Enriched Fleet Events Topic"]
The module contains three major responsibilities:
UnifiedEventTypeClass: JsonKafkaListener
This is the Kafka entry point for integrated tool Debezium events.
It listens to inbound topics:
sequenceDiagram
participant Kafka
participant Listener as JsonKafkaListener
participant Processor as GenericJsonMessageProcessor
Kafka->>Listener: CommonDebeziumMessage + MessageType
Listener->>Processor: process(message, messageType)
The listener delegates processing to a GenericJsonMessageProcessor, which then triggers the appropriate enrichment and message handler pipeline.
Class: GenericMessageHandler<T, U, V>
This abstract class implements a reusable template for message handling:
flowchart TD
Handle["handle(message, extraParams)"] --> Validate["isValidMessage()"]
Validate --> Transform["transform()"]
Transform --> Operation["getOperationType()"]
Operation --> Push["pushData()"]
Push --> Create["handleCreate()"]
Push --> Read["handleRead()"]
Push --> Update["handleUpdate()"]
Push --> Delete["handleDelete()"]
Key features:
CREATE, READ, UPDATE, DELETEOperationTypeObjectMapper with Java time supportThis template ensures consistent behavior across Cassandra and Kafka handlers.
Class: DebeziumMessageHandler<T, U extends DeserializedDebeziumMessage>
Extends GenericMessageHandler and adds:
c, r, u, d → OperationType)DeserializedDebeziumMessageIt acts as the base for destination-specific implementations.
CASSANDRAUnifiedLogEventCassandraRepositoryResponsibilities:
UnifiedLogEventKey transformation fields:
All CREATE, READ, and UPDATE operations result in a save().
KAFKAIntegratedToolEventOssTenantRetryingKafkaProducerResponsibilities:
flowchart LR
DebeziumMessage --> TransformKafka["Transform to IntegratedToolEvent"]
TransformKafka --> BuildKey["Build Broker Key"]
BuildKey --> Publish["OssTenantRetryingKafkaProducer.publish()"]
Validation rule:
isVisible == trueThis class maps tool-specific source event types to a unified domain enum UnifiedEventType.
Mapping key format:
<tool_db_name>:<source_event_type>
Supported integrated tools:
If no mapping is found, the event is mapped to:
UnifiedEventType.UNKNOWN
This abstraction ensures downstream systems operate on consistent event semantics regardless of source tool.
Defines structured constants for:
MeshCentralTacticalFleetThis avoids string duplication and centralizes event type definitions.
Provides a mapping between Fleet MDM activity types and human-readable messages.
Used to:
Example mapping:
enabled_activity_automations -> Enabled activity automations
created_policy -> Created policy
installed_software -> Installed software
Implements DataEnrichmentService<DeserializedDebeziumMessage>.
Purpose:
flowchart LR
DebeziumMessage --> ExtractAgentId["Extract agentId"]
ExtractAgentId --> RedisLookup["MachineIdCacheService"]
RedisLookup --> MachineInfo["CachedMachineInfo"]
RedisLookup --> OrgInfo["CachedOrganizationInfo"]
MachineInfo --> EnrichedData["IntegratedToolEnrichedData"]
OrgInfo --> EnrichedData
If no machine is found, enrichment returns partial or empty metadata but does not fail the pipeline.
This component builds a Kafka Streams topology that joins:
activities topichost_activities topicIt enriches ActivityMessage with host information.
flowchart LR
Activities["Fleet Activities Topic"] --> AStream["KStream Activity"]
HostActivities["Fleet Host Activities Topic"] --> HStream["KStream HostActivity"]
AStream --> Join["Left Join (5s window)"]
HStream --> Join
Join --> AddHeader["HeaderAdderFixedKey"]
AddHeader --> Output["Enriched Fleet Events Topic"]
Key details:
Activity with:hostIdagentIdMESSAGE_TYPE_HEADER = FLEET_MDM_EVENT__TypeId__ = CommonDebeziumMessageThis ensures downstream processors treat enriched Fleet events consistently with other Debezium-based events.
Typed Debezium message wrapper for Fleet Activity entities.
Typed Debezium message wrapper for Fleet HostActivity entities.
Represents association between:
host_idactivity_idUtility class for parsing ISO 8601 timestamps into epoch milliseconds.
Behavior:
Instant.parse()Optional<Long>Ensures consistent timestamp normalization across integrated tools.
flowchart TD
Inbound["Inbound Debezium Event"] --> Deserialize["DeserializedDebeziumMessage"]
Deserialize --> MapType["EventTypeMapper"]
MapType --> Enrich["IntegratedToolDataEnrichmentService"]
Enrich --> Route["DebeziumMessageHandler"]
Route --> CassandraDest["Cassandra"]
Route --> KafkaDest["Outbound Kafka Topic"]
The Stream Service Core Message Handling And Enrichment module guarantees:
It is the normalization and distribution backbone of the streaming architecture in OpenFrame.