Real-time User Analytics with Kafka Connect & Kafka Streams

Written by Issam on November 17, 2025 β€’ 4 min read

backendapp

πŸ“– Introduction

In today’s fast-paced digital world, businesses need real-time insights to make quick decisions. Traditional batch processing, where data is analyzed hours or days after events occur, is no longer sufficient. This is where Apache Kafka shines, especially with its powerful components: Kafka Connect for data integration and Kafka Streams for real-time processing.

πŸ—οΈ Architecture Overview

graph TB
    DB[(PostgreSQL)] --> KC[Kafka Connect]
    KC --> KT[Kafka Topic<br/>user-profiles]
    
    WEB[Web Application] --> KC2[Kafka Producer]
    KC2 --> KT2[Kafka Topic<br/>user-clicks]
    
    KT --> KS[Kafka Streams<br/>in Quarkus App]
    KT2 --> KS
    
    KS --> KT3[Kafka Topic<br/>enriched-clicks]
    KS --> KT4[Kafka Topic<br/>user-metrics]
    
    KT3 --> ES[(Elasticsearch)]
    KT4 --> API[REST API]
    
    API --> DASH[Real-time Dashboard]
    
    style KS fill:#e1f5fe
    style KC fill:#f3e5f5

πŸ”„ Complete Data Flow

flowchart TD
    PG[(PostgreSQL<br/>User Profiles)] --> KCS[Kafka Connect<br/>Source Connector]
    WA[Web App<br/>User Clicks] --> KPP[Kafka Producer]
    
    KCS --> KTP[Kafka Topic<br/>user-profiles]
    KPP --> KTC[Kafka Topic<br/>user-clicks]
    
    KTP --> KSP{Kafka Streams App}
    KTC --> KSP
    
    subgraph KSP [Kafka Streams Processing]
        direction TB
        S1[Read Streams] --> J1[Join Clicks with Profiles]
        J1 --> F1[Filter & Transform]
        F1 --> A1[Aggregate & Count]
        A1 --> W1[5-min Windows]
        W1 --> OS1[Output Streams]
    end
    
    KSP --> KTE[Kafka Topic<br/>enriched-clicks]
    KSP --> KTM[Kafka Topic<br/>user-metrics]
    KSP --> SS[(Local State Store<br/>RocksDB)]
    
    KTE --> KCSINK[Kafka Connect<br/>Sink Connector]
    KCSINK --> ES[(Elasticsearch)]
    
    SS --> RA[Quarkus REST API]
    KTM --> RA
    
    RA --> DASH[πŸ“Š Real-time Dashboard]
    ES --> SI[πŸ” Search Interface]

πŸš€ How It Works

1. Data Ingestion with Kafka Connect

Kafka Connect automatically streams data from PostgreSQL to Kafka:

// Example user profile in Kafka
{
  "user_id": "alice123",
  "name": "Alice Smith",
  "email": "alice@example.com",
  "country": "USA",
  "plan_tier": "premium"
}

2. Real-time Event Collection

Web applications send click events directly to Kafka:

// Example click event
{
  "user_id": "alice123",
  "page_url": "/products/123",
  "action": "view",
  "timestamp": 1690000000000
}

3. Stream Processing Magic

Kafka Streams performs real-time processing:

// Enrich clicks with user profiles
clickStream.join(userProfiles, (click, profile) -> {
    return EnrichedClick(
        user_id = click.user_id,
        user_name = profile.name,
        country = profile.country,
        page_url = click.page_url,
        action = click.action
    );
});

// Count clicks in 5-minute windows
enrichedClicks
  .groupBy(user_id)
  .windowedBy(TimeWindows.of(5 minutes))
  .count();

4. Instant Query Capability

REST API queries local state store for instant results:

GET /dashboard/users/alice123/metrics

Response:
{
  "user_id": "alice123",
  "user_name": "Alice Smith",
  "clicks_last_5min": 15,
  "country": "USA",
  "plan_tier": "premium"
}

🎯 Key Benefits

⚑ Performance

πŸ”§ Reliability

πŸ’‘ Business Value

πŸ› οΈ Technical Implementation

Kafka Connect Configuration

{
  "name": "postgres-source",
  "config": {
    "connector.class": "PostgresConnector",
    "database.hostname": "localhost",
    "database.dbname": "mydb",
    "table.include.list": "public.user_profiles"
  }
}

Kafka Streams Topology

KStream<String, UserClick> clicks = builder.stream("user-clicks");
KTable<String, UserProfile> profiles = builder.table("user-profiles");

KStream<String, EnrichedClick> enriched = clicks.join(profiles, enrichmentLogic);

enriched.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
        .count();

REST API Endpoint

@GET
@Path("/users/{userId}/metrics")
public UserMetrics getMetrics(@PathParam("userId") String userId) {
    return kafkaStreams.store("user-metrics-store").get(userId);
}

πŸ“ˆ Real-world Results

MetricTraditionalKafka Solution
Query Time2-5 seconds10-50 milliseconds
Data Freshness1-24 hoursReal-time
Database Load80% CPU20% CPU
Development2-3 weeks2-3 days

πŸš€ Getting Started

  1. Setup Kafka cluster (Confluent, AWS MSK, or self-hosted)
  2. Configure Kafka Connect for your data sources
  3. Develop Kafka Streams application
  4. Build REST API to expose processed data
  5. Create dashboards and monitoring

πŸ“š Learn More

🎯 Summary

This architecture demonstrates how to build real-time analytics pipelines that:

Perfect for: E-commerce analytics, IoT monitoring, fraud detection, real-time recommendations, and live dashboards.

Happy streaming! πŸš€