DEV Community

Byron Hsieh
Byron Hsieh

Posted on

Building a Kafka Wikimedia Producer: Solving 403 Errors and Understanding Java Fundamentals

Introduction

When I started building a Kafka producer to consume real-time data from Wikimedia, I quickly realized this wasn't just about Kafka—it was also a crash course in fundamental Java concepts and real-world troubleshooting. As someone more familiar with Python, concepts like constructors and multi-threading required some mindful learning.

What I didn't expect was hitting a 403 Forbidden error when connecting to Wikimedia's API, and a confusing import error that taught me an important lesson about Maven dependencies versus Java packages.

In this article, I'll share what I learned while building a WikimediaChangeHandler that processes real-time Wikipedia edits and sends them to Kafka, including the problems I encountered and how to solve them.

This guide is based on the excellent course "Apache Kafka Series - Learn Apache Kafka for Beginners v3", with additional troubleshooting for issues not covered in the course materials.


The Goal: Building a Real-Time Wikimedia Producer

We want to build a Kafka producer that:

  1. Connects to Wikimedia's real-time change stream
  2. Processes incoming events
  3. Sends them to a Kafka topic for downstream processing

The architecture looks like this:

Wikimedia SSE Stream → EventHandler → Kafka Producer → Kafka Topic
Enter fullscreen mode Exit fullscreen mode

Part 1: Understanding Constructors and Dependency Injection

The Problem: Sharing Objects Between Classes

When building the handler, I encountered a fundamental question: How do I use the KafkaProducer (created in one class) inside my WikimediaChangeHandler (another class)?

In Python, I might just import and use it, but Java has a more structured approach.

The Solution: Constructors

Here's the key insight from the instructor:

"To pass in an object from one class to another in Java, you need to implement a constructor."

Let's see this in action:

public class WikimediaChangeHandler implements EventHandler {
    KafkaProducer<String, String> kafkaProducer;
    String topic;

    // Constructor receives dependencies when the object is created
    public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer, String topic) {
        this.kafkaProducer = kafkaProducer;
        this.topic = topic;
    }

    @Override
    public void onMessage(String event, MessageEvent messageEvent) {
        // Now we can use kafkaProducer here!
        kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));
    }
}
Enter fullscreen mode Exit fullscreen mode

Why Do We Need a Constructor?

The scenario:

  • We create a KafkaProducer in the WikimediaChangesProducer class
  • We need to use that same producer instance in WikimediaChangeHandler
  • Specifically in the onMessage method, which gets called every time Wikimedia sends us data

The constructor's role:

  1. Acts as a "receiver" when creating the object
  2. Stores the received objects in instance variables
  3. Makes them available to all methods in the class

Usage:

// In WikimediaChangesProducer.java
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String topic = "wikimedia.recentchange";

// Pass producer and topic via constructor
EventHandler eventHandler = new WikimediaChangeHandler(producer, topic);
Enter fullscreen mode Exit fullscreen mode

This is Dependency Injection!

This pattern has a name: Dependency Injection (DI). Instead of creating dependencies inside a class, we "inject" them from outside.

Benefits:

  • ✅ More flexible code
  • ✅ Easier to test (you can inject mock objects)
  • ✅ Clear dependencies

This Pattern Exists in Other Languages

The constructor-based dependency injection pattern isn't unique to Java:

Python:

class WikimediaChangeHandler:
    def __init__(self, kafka_producer, topic):  # Python's constructor
        self.kafka_producer = kafka_producer
        self.topic = topic

    def on_message(self, event, message_event):
        self.kafka_producer.send(self.topic, message_event.data)
Enter fullscreen mode Exit fullscreen mode

JavaScript/TypeScript:

class WikimediaChangeHandler {
    constructor(kafkaProducer, topic) {  // JS constructor
        this.kafkaProducer = kafkaProducer;
        this.topic = topic;
    }

    onMessage(event, messageEvent) {
        this.kafkaProducer.send(this.topic, messageEvent.data);
    }
}
Enter fullscreen mode Exit fullscreen mode

This is a universal Object-Oriented Programming concept!


Part 2: Multi-Threading and Blocking the Main Thread

The Threading Challenge

Here's something the instructor explained:

"When we call eventSource.start(), it starts a background thread to process events. If we don't block the main thread, the program will finish and all threads will stop."

The Problem

Without blocking:

eventSource.start();  // Start background thread
// Main method ends immediately
// → Main thread exits → Background thread also terminates → No data is processed!
Enter fullscreen mode Exit fullscreen mode

The Solution

// Start EventSource in a background thread
eventSource.start();

// Block the main program for 10 minutes to let the background thread work
TimeUnit.MINUTES.sleep(10);
Enter fullscreen mode Exit fullscreen mode

Why this works:

  • When the main thread exits, the JVM shuts down all background threads
  • By using Thread.sleep(), we keep the main thread alive
  • This gives the background thread time to receive and process Wikimedia data

Real-World Analogy

Think of it like running a restaurant:

❌ Without blocking: You hire a chef, then immediately close the restaurant—the chef can't cook anything.

✅ With blocking: You hire a chef and keep the restaurant open for 10 minutes—the chef can do their work.


Part 3: Troubleshooting - The 403 Forbidden Error

The Problem: Course Code is Outdated

When I ran the program with the exact code from the course, I immediately hit an error:

[okhttp-eventsource-events-[]-0] ERROR WikimediaChangeHandler - Error in Stream Reading
com.launchdarkly.eventsource.UnsuccessfulResponseException:
Unsuccessful response code received from stream: 403
Enter fullscreen mode Exit fullscreen mode

The program kept trying to reconnect but failed every time with HTTP 403 Forbidden.

Why This Happens

Wikimedia's API Policy Changed: Wikimedia now requires all clients connecting to their streaming API to include a User-Agent HTTP header. Without it, the server rejects the connection.

This is similar to a security guard at a building asking you to sign in. If you refuse to identify yourself, you're not allowed to enter.

The course materials haven't been updated to reflect this API policy change, so the code provided will fail.

The Solution: Add User-Agent Header

We need to modify the EventSource creation to include HTTP headers:

Original code (from course - doesn't work):

EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
EventSource eventSource = builder.build();
Enter fullscreen mode Exit fullscreen mode

Fixed code:

// Add User-Agent header required by Wikimedia
import okhttp3.Headers;

Headers headers = new Headers.Builder()
    .add("User-Agent", "KafkaLearningProject/1.0")
    .build();

EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url))
    .headers(headers);  // Add headers here
EventSource eventSource = builder.build();
Enter fullscreen mode Exit fullscreen mode

What's happening:

  1. We create a Headers object using the Builder pattern (from OkHttp library)
  2. Add a User-Agent header with our application identifier
  3. Pass the headers to EventSource using .headers(headers)

User-Agent format:

  • Standard format: ApplicationName/Version
  • With contact info: "KafkaLearningProject/1.0 (your.email@example.com)"

Part 4: Troubleshooting - Maven Coordinates vs Java Packages

The Import Error Mystery

When I tried to import the Headers class, I encountered another confusing error:

error: package com.squareup.okhttp3 does not exist
import com.squareup.okhttp3.Headers;
                           ^
Enter fullscreen mode Exit fullscreen mode

I thought: "But I have the dependency in build.gradle! Let me check..."

dependencies {
    implementation 'com.squareup.okhttp3:okhttp:4.9.3'  // It's here!
}
Enter fullscreen mode Exit fullscreen mode

The dependency was there, and Gradle successfully downloaded it. So why couldn't Java find the class?

The Key Insight: Maven Coordinates ≠ Java Packages

This taught me an important lesson about the difference between Maven coordinates and Java package names.

Maven Coordinates (for dependency management):

groupId:artifactId:version
↓       ↓          ↓
com.squareup.okhttp3:okhttp:4.9.3
Enter fullscreen mode Exit fullscreen mode

Java Package (for import statements):

import okhttp3.Headers;  // Not com.squareup.okhttp3!
Enter fullscreen mode Exit fullscreen mode

Why Are They Different?

Aspect Maven Coordinates Java Package
Purpose Uniquely identify library in repository Organize code in the codebase
Example com.squareup.okhttp3:okhttp okhttp3
Usage build.gradle dependencies import statements
Reason Avoid naming conflicts across organizations Keep code concise and clean

Real-world examples:

Library Maven Dependency Java Import
OkHttp com.squareup.okhttp3:okhttp import okhttp3.Headers;
Gson com.google.code.gson:gson import com.google.gson.Gson;
Jackson com.fasterxml.jackson.core:jackson-databind import com.fasterxml.jackson.databind.ObjectMapper;

How to Find the Correct Package Name

Method 1: Check Official Documentation (Fastest)

// OkHttp docs clearly show:
import okhttp3.*;
Enter fullscreen mode Exit fullscreen mode

Method 2: Use IDE Auto-Complete (Most Reliable)

  1. Type: Headers headers = new Headers...
  2. IDE shows error, press Cmd + . (Mac) or Ctrl + . (Windows)
  3. Select "Import 'Headers' (okhttp3)"
  4. IDE adds correct import automatically

Method 3: Inspect the JAR File (When in doubt)

jar tf okhttp-4.9.3.jar | grep Headers
# Output:
# okhttp3/Headers.class          ← The actual package!
# okhttp3/Headers$Builder.class
Enter fullscreen mode Exit fullscreen mode

The Fix

Wrong import:

import com.squareup.okhttp3.Headers;  // ❌ Compilation error!
Enter fullscreen mode Exit fullscreen mode

Correct import:

import okhttp3.Headers;  // ✅ Works!
Enter fullscreen mode Exit fullscreen mode

Lesson Learned

Never assume the package name from the Maven coordinates!

  • Maven coordinates use organization namespacing (com.squareup.okhttp3)
  • Java packages prioritize simplicity and backwards compatibility (okhttp3)
  • Always verify the actual package name before importing

Complete Code Overview

Here's the complete, working code with all fixes applied:

WikimediaChangeHandler.java

package io.conduktor.demos.kafka.wikimedia;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;

public class WikimediaChangeHandler implements EventHandler {
    private final KafkaProducer<String, String> kafkaProducer;
    private final String topic;
    private final Logger log = LoggerFactory.getLogger(WikimediaChangeHandler.class);

    /*
     * Why do we need a constructor?
     * 1. We created a KafkaProducer in the Producer class
     * 2. We need to use it in this WikimediaChangeHandler class
     * 3. Specifically in the onMessage method to send data to Kafka
     *
     * In Java, to pass objects between classes, we use constructors
     * The constructor receives parameters when creating the object and stores them for later use
     */
    public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer, String topic) {
        this.kafkaProducer = kafkaProducer;
        this.topic = topic;
    }

    @Override
    public void onMessage(String event, MessageEvent messageEvent) {
        // When we receive a message from the stream, send it to Kafka
        log.info(messageEvent.getData());
        kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));
    }

    @Override
    public void onOpen() {
        // Connection opened
    }

    @Override
    public void onClosed() {
        // Close the producer when stream closes
        kafkaProducer.close();
    }

    @Override
    public void onComment(String comment) {
        // Handle comments (not used in this case)
    }

    @Override
    public void onError(Throwable t) {
        log.error("Error in Stream Reading", t);
    }
}
Enter fullscreen mode Exit fullscreen mode

WikimediaChangesProducer.java

package io.conduktor.demos.kafka.wikimedia;

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.EventSource;
import okhttp3.Headers;  // Correct import!
import java.net.URI;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;

public class WikimediaChangesProducer {

    public static void main(String[] args) throws InterruptedException {
        String bootstrapServers = "127.0.0.1:9092";

        // Create and set Producer properties
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrapServers);
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());

        // Create the Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        String topic = "wikimedia.recentchange";

        // Create EventHandler with dependency injection
        EventHandler eventHandler = new WikimediaChangeHandler(producer, topic);

        String url = "https://stream.wikimedia.org/v2/stream/recentchange";

        // Wikimedia requires a User-Agent header to identify the client
        Headers headers = new Headers.Builder()
            .add("User-Agent", "KafkaLearningProject/1.0")
            .build();

        // Create EventSource with headers
        EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url))
            .headers(headers);
        EventSource eventSource = builder.build();

        // Start EventSource, which continuously receives real-time data from Wikimedia in a background thread
        eventSource.start();

        // Block the main program to prevent the main thread from terminating and stopping the background thread
        // Let the program run for 10 minutes to give the background thread enough time to receive and process data
        TimeUnit.MINUTES.sleep(10);
    }
}
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

1. Constructors Enable Dependency Injection

  • Constructors receive objects when creating instances
  • This is how we share objects between classes in Java
  • It's a universal OOP pattern, not unique to Java

2. Multi-Threading Requires Careful Management

  • Background threads die when the main thread exits
  • Use Thread.sleep() to keep the main thread alive
  • In production, use shutdown hooks or latches for graceful shutdown

3. APIs Change - Always Test Course Code

  • Course materials can become outdated as APIs evolve
  • Wikimedia now requires User-Agent headers (wasn't required before)
  • Always check API documentation if you encounter 403/401 errors

4. Maven Coordinates ≠ Java Packages

  • Don't assume package names from build.gradle dependencies
  • Maven uses organizational namespacing (com.squareup.okhttp3)
  • Java packages prioritize simplicity (okhttp3)
  • Use IDE auto-complete or check documentation for correct imports

5. Builder Pattern Provides Flexibility

  • Builder pattern allows optional configuration
  • .headers() wasn't needed before, but easy to add when required
  • Two-step process: configure → build

Conclusion

Building this Kafka Wikimedia producer taught me fundamental Java concepts and valuable troubleshooting skills:

  1. Dependency Injection through constructors - how to share objects between classes
  2. Multi-threading management - why we need to block the main thread
  3. API troubleshooting - dealing with 403 errors and outdated course materials
  4. Dependency management - understanding Maven coordinates vs Java packages

The most important lesson? When following online courses, the code might not work as-is. APIs change, libraries update, and policies evolve. Being able to troubleshoot these issues is just as valuable as learning the core concepts.

These challenges made me a better developer by forcing me to:

  • Read error messages carefully
  • Understand the tools I'm using (Gradle, Maven, HTTP)
  • Check library documentation
  • Learn the difference between dependency management and code organization

Troubleshooting Checklist

If you encounter issues while building this project:

403 Forbidden Error:

  • ✅ Add User-Agent header to EventSource
  • ✅ Ensure headers are properly built with OkHttp's Headers.Builder
  • ✅ Check Wikimedia's API documentation for current requirements

Import Errors:

  • ✅ Don't assume package name from Maven coordinates
  • ✅ Use import okhttp3.Headers; not import com.squareup.okhttp3.Headers;
  • ✅ Let IDE auto-complete suggest the correct import
  • ✅ Run gradle --refresh-dependencies if needed

Connection Errors:

  • ✅ Ensure Kafka broker is running on port 9092
  • ✅ Create topic: wikimedia.recentchange
  • ✅ Check network connectivity

This article is part of my learning journey through Apache Kafka. If you found it helpful, please give it a like and follow for more Kafka tutorials!

Course Reference: Apache Kafka Series - Learn Apache Kafka for Beginners v3

Top comments (0)