 Introduction
Introduction
Since a few months I'm diving into Apache Kafka. I've always been fascinated by queuing mechanisms.  And Apache Kafka nowadays is the most modern alternative. Lately I did a presentation on an introduction to Apache Kafka:But now I'm investigating what I can do with it. Since Weblogic is one of my focus areas, I wanted to explore how I can embed Kafka into Weblogic.
I reasoned that when I want to use Kafka with a current customer, the administrators have to install kafka (eg. unzip the Confluent distribution), on a separate virtual server.
By default the distribution comes with startup and shutdown scripts. The administrators should use those, or create their own, and startup the Kafka and Zookeeper services. And of course keep those up-and-running.
I figured that when I would be able to start the services as a thread under a Weblogic server, no additional infra structure is needed. Also starting the Weblogic server would start the Kafka services as well.
Kafka needs a ZooKeeper service. You can see the ZooKeeper as a directory service for a Kafka infrastructure. Slightly comparable to an AdminServer in Weblogic. So it would make sense, as I see it, to start the ZooKeeper with the AdminServer. The Kafka Servers can be started as part of the Weblogic Managed Server(s)
Weblogic has a mechanism to do initializations and finalizations, using startup and shutdown classes, see these documentation. From there the ZooKeeper and KafkaServers can be started.
So I had to figure out how to start those from Java. Let's start with the ZooKeeper.
I put my sources on GitHub, so you can review them. But keep in mind that they're still under construction.
Starting a ZooKeeper
My starting point was this question on StackOverflow, that handles starting a ZooKeeperServer in Java, based on the ZooKeeperServerMain.java class. It was quite promising and soon I had a first version of my startup class working. Quite simple really. But, since I also want to be able to shut it down, I soon ran into some restrictions. Some methods and attributes I needed were protected and only reachable from the same package, for instance. I wasn't quite pleased with the implementation. Digging a bit further I ran into the source of that class over here. I decided to take that class, study it and based on that knowledge implement my own class.I created a ZooKeeperObserver class, and transformed the public void runFromConfig(ServerConfig config) method from ZooKeeperServerMain.java class, into a public void runFromProperties(ZooKeeperProperties zkProperties) method.
It takes in a properties object, that is interpretted and used to start the ZooKeeper.
Zookeeper Properties
To keep things transparent and simple, I created a PropertiesFactory class that provides a method to read the zookeeper.properties from the class path (therefor we should add the /etc/kafka folder to it).I also created an own Properties class extending java.util.Properties to add a few property getter methods, like getting an int value and defaulting a property based on an other property.
Lastly, I created the ZooKeeperProperties bean, to interpret the relevant ZooKeeper properties, from a read Properties object.
The relevant properties are:
| 
Property | 
Meaning | 
Default | 
|---|---|---|
| dataDir | The location where ZooKeeper will store the in-memory database snapshots and, unless specified otherwise, the transaction log of updates to the database. | /tmp/zookeeper | 
| dataLogDir | This option will direct the machine to write the transaction log to the dataLogDir rather than the dataDir. | dataDir | 
| clientPort | The port to listen for client connections; that is, the port that clients attempt to connect to. | 2181 | 
| clientPortAddress | The address (ipv4, ipv6 or hostname) to listen for client connections; that is, the address that clients attempt to connect to. | Empty: every NIC in the server host. | 
| maxClientCnxns | Limits the number of concurrent connections (at the socket level) that a single client, identified by IP address. | 0: disabled, since this is a non-production config. | 
| tickTime | The length of a single tick, which is the basic time unit used by ZooKeeper, as measured in milliseconds. | ZooKeeperServer.DEFAULT_TICK_TIME | 
| minSessionTimeout | The minimum session timeout in milliseconds that the server will allow the client to negotiate. Defaults to 2 times the tickTime. | -1: Disabled | 
| maxSessionTimeout | The maximum session timeout in milliseconds that the server will allow the client to negotiate. Defaults to 20 times the tickTime. | -1: Disabled | 
Only the properties dataDir, clientPort and maxClientCnxns are set explicitly in the zookeeper.properties file. See the Zookeeper Administration docs for more info (apparently Zookeeper is created/invented in the Hadoop project).
Run from Properties
The runFromProperties is the one that actually starts a ZooKeeperServer instance:    /**
     * Run from ZooKeeperProperties .
     * @param zkProperties ZooKeeperProperties to use.
     * @throws IOException
     */
    public void runFromProperties(ZooKeeperProperties zkProperties) throws IOException {
        final String methodName = "runFromProperties";
        log.start(methodName);
        log.info(methodName, "Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            // Note that this thread isn't going to be doing anything else,
            // so rather than spawning another thread, we will just call
            // run() in this thread.
            // create a file logger url from the command line args
            ZooKeeperServer zkServer = new ZooKeeperServer();
            txnLog = new FileTxnSnapLog(new File(zkProperties.getDataLogDir()), new File(zkProperties.getDataDir()));
            zkServer.setTxnLogFactory(txnLog);
            zkServer.setTickTime(zkProperties.getTickTime());
            zkServer.setMinSessionTimeout(zkProperties.getMinSessionTimeout());
            zkServer.setMaxSessionTimeout(zkProperties.getMaxSessionTimeout());
            setZooKeeperServer(zkServer);
            cnxnFactory = ServerCnxnFactory.createFactory();
            log.debug(methodName, "Create Server Connection Factory");
            log.debug(methodName, "Server Tick Time: " + zkServer.getTickTime());
            log.debug(methodName, "ClientPortAddress: " + zkProperties.getClientPortAddress());
            log.debug(methodName, "Max Client Connections: " + zkProperties.getMaxClientCnxns());
            cnxnFactory.configure(zkProperties.getClientPortAddress(), zkProperties.getMaxClientCnxns());
            log.debug(methodName, "Startup Server Connection Factory");
            cnxnFactory.startup(zkServer);
            cnxnFactory.join();
            if (zkServer.isRunning()) {
                zkServer.shutdown();
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            log.warn(methodName, "Server interrupted", e);
        } finally {
            if (txnLog != null) {
                txnLog.close();
            }
        }
        log.end(methodName);
}
Here you see that a ZooKeeperProperties is passed. A FileTxnSnapLog is initialized for the dataDir and dataLogDir. A ZooKeeperServer is instantiated, and the particular properties are set. Then a ServerCnxnFactory is created (as a class attribute for later use). The connection factory is used to startup the ZooKeeperServer.  Actually, at that point the control is handed over to the ZooKeeperServer. So, you want to have this done in a separate thread.Observing the Observable
Now, you might think: What is it with the name ZooKeeperObserver? Earlier, I named it EmbeddedZooKeeperServer. But I found that name long and not nice. I found it funny that Observer has the word Server in it.As mentioned in the previous section, when starting up the ConnectionFactory/ZookeeperServer, the control is handed over. The method is not left, until the ZooKeeperServer stops running.
I therefor want (as in many implementations) that the ZooKeeperServer, runs in a seperate thread, that I can control. That is, I want to be able to send a shutdown signal to it. For that I found the Observer pattern suitable. In this pattern, the Observable or Subject maintains a list of Observers that can be notified about an update in the Observable. To do so, the Observable extends the java.util.Observable class. And the Observer implements the java.util.Observer and Runnable interfaces.
How does it work? Let's go through the applicable methods.
Start and Add a ZooKeeper
The Observable is implemented by ZooKeeperDriver. In it we'll find a method start():    public void start() {
        final String methodName = "start";
        log.start(methodName);
        addZooKeeper();
        log.end(methodName);
}
That's not too exiting, but it calls the method addZooKeeper():    public void addZooKeeper() {
        final String methodName = "addZooKeeper";
        log.start(methodName);
        try {
            ZooKeeperProperties zkProperties = PropertiesFactory.getZKProperties();
            ZooKeeperObserver zooKeeperServer = new ZooKeeperObserver(this, zkProperties);
            Thread newZooKeeperThread = new Thread(zooKeeperServer);
            zooKeeperServer.setMyThread(newZooKeeperThread);
            newZooKeeperThread.start();
        } catch (IOException e) {
            log.error(methodName, "ZooKeeper Failed", e);
        }
        log.end(methodName);
    }
Here you see that the ZooKeeperProperties are fetched and a new ZooKeeperObserver is instantiated, using a reference to the ZooKeeperDriver object and the ZooKeeperProperties. Since the ZooKeeperObserver is a Runnable we can add it to a new Thread. That thread is also set to the ZooKeeperObserver so that it has a hold of it's own thread, when that come in handy.
And then the new thread is started.
Instantiate the ZooKeeperObserver
In the previous section, we saw that the ZooKeeperObserver is instantiated using a reference to the ZooKeeperDriver object. Let's see how it looks like:    public ZooKeeperObserver(Observable zooKeeperDriver, ZooKeeperProperties zkProperties) {
        super();
        final String methodName="ZooKeeperObserver(Observable, ZooKeeperProperties)";
        log.start(methodName);
        this.setZkProperties(zkProperties);
        if (zooKeeperDriver instanceof ZooKeeperDriver) {
            log.info(methodName, "Add observer "+this.getClass().getName()+" to observable "+zooKeeperDriver.getClass().getName());
            setZooKeeperDriver((ZooKeeperDriver) zooKeeperDriver);
            zooKeeperDriver.addObserver(this);
        }
        log.end(methodName);
}
The ZooKeeperProperties are set. And then it checks if the Observable that is passed is indeed a ZooKeeperDriver. The ZooKeeperDriver is also set, and then the ZooKeeperObserver object is added as an Observer to the ZooKeeperDriver using the addObserver(this) method. This method is part of the java.util.Observable object that is extended. It adds the ZooKeeperObserver to a list, that is used to send the update signal to every instance on the list.
Run the ZooKeeperObserver
The ZooKeeperObserver is a Runnable so the run() method is implemented:    public void run() {
        final String methodName = "run";
        log.start(methodName);
        try {
            runFromProperties(getZkProperties());
        } catch (IOException ioe) {
            log.error(methodName, "Run failed!", ioe);
        }
        log.end(methodName);
}
It calls the runFromProperties(), that is explained earlier.
Shutdown
The ZooKeeperDriver has a shutdown() method:    public void shutdown() {
        final String methodName = "shutdown";
        log.start(methodName);
        setShutdownZooKeepers(true);
        log.info(methodName, "Notify Observers to shutdown!");
        this.setChanged();
        this.notifyObservers();
        log.end(methodName);
}
It sets the shutdownZooKeepers indicator to true. This is an attribute that indicates what has been updated. In a more complex Observer pattern more kinds of updates can occur. So, you need to indicate what drove the update.
The most interesting statement is the call to the notifyObservers() method. It will call the implemeneted update() on every Observer in the list.
I implemented this earlier in another situation, a few years ago. And I reused it. But at first it did not work. I found that, apparently changed in Java 7 or 8, I had to add a call to the setChanged() method. The notification to the Observers only works after that call.
As said, notifyObservers() calls the update() method in the Observer:
    public void update(Observable o, Object arg) {
        final String methodName = "update(Observable,Object)";
        log.start(methodName);
        log.info(methodName, getMyThread().getName() + " - Got status update from Observable!");
        ZooKeeperDriver zkDriver = getZooKeeperDriver();
        if (zkDriver.isShutdownZooKeepers()) {
            log.info(methodName, getMyThread().getName() + " - Apparently I´ve got to shutdown myself!");
            shutdown();
        } else {
            log.info(methodName, getMyThread().getName() + " - Don't know what to do with this status update!");
        }
        log.end(methodName);
}
And this one actually checks in the ZooKeeperDriver if the change is because of the shutDownZooKeepers indicator.
If so, it calls it's own shutdown() method. If not, then the update is ignored. The shutdown does the following:
        final String methodName = "shutdown";
        log.start(methodName);
        log.info(methodName,"Let me shutdown "+myThread.getName());
        ZooKeeperServer zkServer = getZooKeeperServer();
        ServerCnxnFactory cnxnFactory = getCnxnFactory();
        cnxnFactory.shutdown();
        if (zkServer.isRunning()) {
            zkServer.shutdown();
        }
        log.end(methodName);
}
It gets the Connection factory and sends a shutdown() signal to it. if the ZooKeeper is still running (it shouldn't be), then it gets a shutdown() signal also.
Start and Shutdown
In the end you need to create an instance of the ZooKeeperDriver and save it into a static variable. Then you can call the start() method and later get the object again from the static variable, to call the shutdown() method.Conclusion
This may look a quite complex to you, to start a server. But, again, I want to be able to embed the Kafka infrastructure in an other system, in my situation Weblogic. This method I'll use to do the same for the Kafka Servers. I'll write about that in a follow-up article. And then, I'll create a set of startup and shutdown classes for Weblogic.It was fun to implement the Observer pattern again. But, when I encountered that the notifyObserver method did not work as expected at first, searching for a solution, I found that it is deprecated in Java 9. It will still work, but apparently people found that it has it's limitations and a better way of implementing it is developed.
 
 
 
 
  
 
 
 
