Introduction
When I started building a Kafka producer to consume real-time data from Wikimedia, I quickly realized this wasn't just about Kafka—it was also a crash course in fundamental Java concepts and real-world troubleshooting. As someone more familiar with Python, concepts like constructors and multi-threading required some mindful learning.
What I didn't expect was hitting a 403 Forbidden error when connecting to Wikimedia's API, and a confusing import error that taught me an important lesson about Maven dependencies versus Java packages.
In this article, I'll share what I learned while building a WikimediaChangeHandler that processes real-time Wikipedia edits and sends them to Kafka, including the problems I encountered and how to solve them.
This guide is based on the excellent course "Apache Kafka Series - Learn Apache Kafka for Beginners v3", with additional troubleshooting for issues not covered in the course materials.
The Goal: Building a Real-Time Wikimedia Producer
We want to build a Kafka producer that:
- Connects to Wikimedia's real-time change stream
- Processes incoming events
- Sends them to a Kafka topic for downstream processing
The architecture looks like this:
Wikimedia SSE Stream → EventHandler → Kafka Producer → Kafka Topic
Part 1: Understanding Constructors and Dependency Injection
The Problem: Sharing Objects Between Classes
When building the handler, I encountered a fundamental question: How do I use the KafkaProducer (created in one class) inside my WikimediaChangeHandler (another class)?
In Python, I might just import and use it, but Java has a more structured approach.
The Solution: Constructors
Here's the key insight from the instructor:
"To pass in an object from one class to another in Java, you need to implement a constructor."
Let's see this in action:
public class WikimediaChangeHandler implements EventHandler {
KafkaProducer<String, String> kafkaProducer;
String topic;
// Constructor receives dependencies when the object is created
public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer, String topic) {
this.kafkaProducer = kafkaProducer;
this.topic = topic;
}
@Override
public void onMessage(String event, MessageEvent messageEvent) {
// Now we can use kafkaProducer here!
kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));
}
}
Why Do We Need a Constructor?
The scenario:
- We create a
KafkaProducerin theWikimediaChangesProducerclass - We need to use that same producer instance in
WikimediaChangeHandler - Specifically in the
onMessagemethod, which gets called every time Wikimedia sends us data
The constructor's role:
- Acts as a "receiver" when creating the object
- Stores the received objects in instance variables
- Makes them available to all methods in the class
Usage:
// In WikimediaChangesProducer.java
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String topic = "wikimedia.recentchange";
// Pass producer and topic via constructor
EventHandler eventHandler = new WikimediaChangeHandler(producer, topic);
This is Dependency Injection!
This pattern has a name: Dependency Injection (DI). Instead of creating dependencies inside a class, we "inject" them from outside.
Benefits:
- ✅ More flexible code
- ✅ Easier to test (you can inject mock objects)
- ✅ Clear dependencies
This Pattern Exists in Other Languages
The constructor-based dependency injection pattern isn't unique to Java:
Python:
class WikimediaChangeHandler:
def __init__(self, kafka_producer, topic): # Python's constructor
self.kafka_producer = kafka_producer
self.topic = topic
def on_message(self, event, message_event):
self.kafka_producer.send(self.topic, message_event.data)
JavaScript/TypeScript:
class WikimediaChangeHandler {
constructor(kafkaProducer, topic) { // JS constructor
this.kafkaProducer = kafkaProducer;
this.topic = topic;
}
onMessage(event, messageEvent) {
this.kafkaProducer.send(this.topic, messageEvent.data);
}
}
This is a universal Object-Oriented Programming concept!
Part 2: Multi-Threading and Blocking the Main Thread
The Threading Challenge
Here's something the instructor explained:
"When we call eventSource.start(), it starts a background thread to process events. If we don't block the main thread, the program will finish and all threads will stop."
The Problem
Without blocking:
eventSource.start(); // Start background thread
// Main method ends immediately
// → Main thread exits → Background thread also terminates → No data is processed!
The Solution
// Start EventSource in a background thread
eventSource.start();
// Block the main program for 10 minutes to let the background thread work
TimeUnit.MINUTES.sleep(10);
Why this works:
- When the main thread exits, the JVM shuts down all background threads
- By using
Thread.sleep(), we keep the main thread alive - This gives the background thread time to receive and process Wikimedia data
Real-World Analogy
Think of it like running a restaurant:
❌ Without blocking: You hire a chef, then immediately close the restaurant—the chef can't cook anything.
✅ With blocking: You hire a chef and keep the restaurant open for 10 minutes—the chef can do their work.
Part 3: Troubleshooting - The 403 Forbidden Error
The Problem: Course Code is Outdated
When I ran the program with the exact code from the course, I immediately hit an error:
[okhttp-eventsource-events-[]-0] ERROR WikimediaChangeHandler - Error in Stream Reading
com.launchdarkly.eventsource.UnsuccessfulResponseException:
Unsuccessful response code received from stream: 403
The program kept trying to reconnect but failed every time with HTTP 403 Forbidden.
Why This Happens
Wikimedia's API Policy Changed: Wikimedia now requires all clients connecting to their streaming API to include a User-Agent HTTP header. Without it, the server rejects the connection.
This is similar to a security guard at a building asking you to sign in. If you refuse to identify yourself, you're not allowed to enter.
The course materials haven't been updated to reflect this API policy change, so the code provided will fail.
The Solution: Add User-Agent Header
We need to modify the EventSource creation to include HTTP headers:
Original code (from course - doesn't work):
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
EventSource eventSource = builder.build();
Fixed code:
// Add User-Agent header required by Wikimedia
import okhttp3.Headers;
Headers headers = new Headers.Builder()
.add("User-Agent", "KafkaLearningProject/1.0")
.build();
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url))
.headers(headers); // Add headers here
EventSource eventSource = builder.build();
What's happening:
- We create a
Headersobject using the Builder pattern (from OkHttp library) - Add a
User-Agentheader with our application identifier - Pass the headers to EventSource using
.headers(headers)
User-Agent format:
- Standard format:
ApplicationName/Version - With contact info:
"KafkaLearningProject/1.0 (your.email@example.com)"
Part 4: Troubleshooting - Maven Coordinates vs Java Packages
The Import Error Mystery
When I tried to import the Headers class, I encountered another confusing error:
error: package com.squareup.okhttp3 does not exist
import com.squareup.okhttp3.Headers;
^
I thought: "But I have the dependency in build.gradle! Let me check..."
dependencies {
implementation 'com.squareup.okhttp3:okhttp:4.9.3' // It's here!
}
The dependency was there, and Gradle successfully downloaded it. So why couldn't Java find the class?
The Key Insight: Maven Coordinates ≠ Java Packages
This taught me an important lesson about the difference between Maven coordinates and Java package names.
Maven Coordinates (for dependency management):
groupId:artifactId:version
↓ ↓ ↓
com.squareup.okhttp3:okhttp:4.9.3
Java Package (for import statements):
import okhttp3.Headers; // Not com.squareup.okhttp3!
Why Are They Different?
| Aspect | Maven Coordinates | Java Package |
|---|---|---|
| Purpose | Uniquely identify library in repository | Organize code in the codebase |
| Example | com.squareup.okhttp3:okhttp |
okhttp3 |
| Usage |
build.gradle dependencies |
import statements |
| Reason | Avoid naming conflicts across organizations | Keep code concise and clean |
Real-world examples:
| Library | Maven Dependency | Java Import |
|---|---|---|
| OkHttp | com.squareup.okhttp3:okhttp |
import okhttp3.Headers; |
| Gson | com.google.code.gson:gson |
import com.google.gson.Gson; |
| Jackson | com.fasterxml.jackson.core:jackson-databind |
import com.fasterxml.jackson.databind.ObjectMapper; |
How to Find the Correct Package Name
Method 1: Check Official Documentation (Fastest)
// OkHttp docs clearly show:
import okhttp3.*;
Method 2: Use IDE Auto-Complete (Most Reliable)
- Type:
Headers headers = new Headers... - IDE shows error, press
Cmd + .(Mac) orCtrl + .(Windows) - Select "Import 'Headers' (okhttp3)"
- IDE adds correct import automatically
Method 3: Inspect the JAR File (When in doubt)
jar tf okhttp-4.9.3.jar | grep Headers
# Output:
# okhttp3/Headers.class ← The actual package!
# okhttp3/Headers$Builder.class
The Fix
Wrong import:
import com.squareup.okhttp3.Headers; // ❌ Compilation error!
Correct import:
import okhttp3.Headers; // ✅ Works!
Lesson Learned
Never assume the package name from the Maven coordinates!
- Maven coordinates use organization namespacing (
com.squareup.okhttp3) - Java packages prioritize simplicity and backwards compatibility (
okhttp3) - Always verify the actual package name before importing
Complete Code Overview
Here's the complete, working code with all fixes applied:
WikimediaChangeHandler.java
package io.conduktor.demos.kafka.wikimedia;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
public class WikimediaChangeHandler implements EventHandler {
private final KafkaProducer<String, String> kafkaProducer;
private final String topic;
private final Logger log = LoggerFactory.getLogger(WikimediaChangeHandler.class);
/*
* Why do we need a constructor?
* 1. We created a KafkaProducer in the Producer class
* 2. We need to use it in this WikimediaChangeHandler class
* 3. Specifically in the onMessage method to send data to Kafka
*
* In Java, to pass objects between classes, we use constructors
* The constructor receives parameters when creating the object and stores them for later use
*/
public WikimediaChangeHandler(KafkaProducer<String, String> kafkaProducer, String topic) {
this.kafkaProducer = kafkaProducer;
this.topic = topic;
}
@Override
public void onMessage(String event, MessageEvent messageEvent) {
// When we receive a message from the stream, send it to Kafka
log.info(messageEvent.getData());
kafkaProducer.send(new ProducerRecord<>(topic, messageEvent.getData()));
}
@Override
public void onOpen() {
// Connection opened
}
@Override
public void onClosed() {
// Close the producer when stream closes
kafkaProducer.close();
}
@Override
public void onComment(String comment) {
// Handle comments (not used in this case)
}
@Override
public void onError(Throwable t) {
log.error("Error in Stream Reading", t);
}
}
WikimediaChangesProducer.java
package io.conduktor.demos.kafka.wikimedia;
import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.EventSource;
import okhttp3.Headers; // Correct import!
import java.net.URI;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
public class WikimediaChangesProducer {
public static void main(String[] args) throws InterruptedException {
String bootstrapServers = "127.0.0.1:9092";
// Create and set Producer properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
// Create the Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String topic = "wikimedia.recentchange";
// Create EventHandler with dependency injection
EventHandler eventHandler = new WikimediaChangeHandler(producer, topic);
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
// Wikimedia requires a User-Agent header to identify the client
Headers headers = new Headers.Builder()
.add("User-Agent", "KafkaLearningProject/1.0")
.build();
// Create EventSource with headers
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url))
.headers(headers);
EventSource eventSource = builder.build();
// Start EventSource, which continuously receives real-time data from Wikimedia in a background thread
eventSource.start();
// Block the main program to prevent the main thread from terminating and stopping the background thread
// Let the program run for 10 minutes to give the background thread enough time to receive and process data
TimeUnit.MINUTES.sleep(10);
}
}
Key Takeaways
1. Constructors Enable Dependency Injection
- Constructors receive objects when creating instances
- This is how we share objects between classes in Java
- It's a universal OOP pattern, not unique to Java
2. Multi-Threading Requires Careful Management
- Background threads die when the main thread exits
- Use
Thread.sleep()to keep the main thread alive - In production, use shutdown hooks or latches for graceful shutdown
3. APIs Change - Always Test Course Code
- Course materials can become outdated as APIs evolve
- Wikimedia now requires User-Agent headers (wasn't required before)
- Always check API documentation if you encounter 403/401 errors
4. Maven Coordinates ≠ Java Packages
- Don't assume package names from
build.gradledependencies - Maven uses organizational namespacing (
com.squareup.okhttp3) - Java packages prioritize simplicity (
okhttp3) - Use IDE auto-complete or check documentation for correct imports
5. Builder Pattern Provides Flexibility
- Builder pattern allows optional configuration
-
.headers()wasn't needed before, but easy to add when required - Two-step process: configure → build
Conclusion
Building this Kafka Wikimedia producer taught me fundamental Java concepts and valuable troubleshooting skills:
- Dependency Injection through constructors - how to share objects between classes
- Multi-threading management - why we need to block the main thread
- API troubleshooting - dealing with 403 errors and outdated course materials
- Dependency management - understanding Maven coordinates vs Java packages
The most important lesson? When following online courses, the code might not work as-is. APIs change, libraries update, and policies evolve. Being able to troubleshoot these issues is just as valuable as learning the core concepts.
These challenges made me a better developer by forcing me to:
- Read error messages carefully
- Understand the tools I'm using (Gradle, Maven, HTTP)
- Check library documentation
- Learn the difference between dependency management and code organization
Troubleshooting Checklist
If you encounter issues while building this project:
403 Forbidden Error:
- ✅ Add User-Agent header to EventSource
- ✅ Ensure headers are properly built with OkHttp's Headers.Builder
- ✅ Check Wikimedia's API documentation for current requirements
Import Errors:
- ✅ Don't assume package name from Maven coordinates
- ✅ Use
import okhttp3.Headers;notimport com.squareup.okhttp3.Headers; - ✅ Let IDE auto-complete suggest the correct import
- ✅ Run
gradle --refresh-dependenciesif needed
Connection Errors:
- ✅ Ensure Kafka broker is running on port 9092
- ✅ Create topic:
wikimedia.recentchange - ✅ Check network connectivity
This article is part of my learning journey through Apache Kafka. If you found it helpful, please give it a like and follow for more Kafka tutorials!
Course Reference: Apache Kafka Series - Learn Apache Kafka for Beginners v3
Top comments (0)