Wednesday, 20 February 2019

Generate a formatted guid from database - Use Snippets

A very simple quick post today. I'm re-engineering a few Java based Mock Webservices into a SOA Suite/BPEL service.

Some of those generate a soap fault when a message id contains "8888" for instance.
I'd like to generate a GUID based message id, that is formatted with groups of four digits.

Of course there are loads of methods to do that. For instance, the Oracle database has a sys_guid() function for years. It generates a guid like: '824F95ECCB1C0EB7E053120B260A2D0F'.

But, I'd like it in a form '824F-95EC-CB1F-0EB7-E053-120B-260A-2D0F'. It can easily done by concatenating substr() calls. But you do not want to re-generate the guid with every 4 digit substr().

So, I put it into the following select:
with get_guid as (select sys_guid() guid
from dual)
select guid
, substr(guid, 1, 4)||'-'||substr(guid, 5, 4)||'-'||substr(guid, 9, 4)||'-'||substr(guid, 13, 4)||'-'||substr(guid, 17, 4)||'-'||substr(guid, 21, 4)||'-'||substr(guid, 25, 4) ||'-'||substr(guid, 29, 4)guid_formatted
, length(guid) guid_length
from get_guid;

What might be less obvious for the regular SQL developer is the with class. It is explained excelently by Tim Hall (and although around since Oracle 9.2 already, only recently put in my personal skill-box). This allows me in this query to call the sysguid() and reuse the value in the three columns.

Although this is a very simple query, it might come in handy more often. And since I'll be around this customer for a longer period, I expect, I want to save it as a snippet.

A feature around in SQLDeveloper for years are the snippets. You can make them visible through the View menu:
I tabbed-it away to the left gutter, to have it out of my way, but still in reach:
Create and edit snippets through the indicated icons. You can create your own categories, by just enter a new category name. Name it, provide a tool tip and paste the snippet. Easy-piecy.

You'll find quite a number of predefined snippets categorized neatly.

If you have gathered several of those snippets like me, and maybe want to take them to other assignments, you might feel the need to backup them.

To reuse a snippet just drag and drop them from the list into your worksheet.

The Snippets are stored in the UserSnippets.xml in the roaming user profile of SQL Developer:
In Windows like 'c:\Users\makker\AppData\Roaming\SQL Developer\'. Just backup/copy the file. Here you see the CodeTemplate.xml file as well, that contains the shorthand acronyms/aliases to much typed pieces of code that you can create too.

By the way, googling "That Jeff Smith Snippets" brought me this archived article (yes, snippets are that old) and with a link to this nice still active library of snippets.

Friday, 15 February 2019

Upgraded my Virtualization environment

A few weeks ago VirtualBox 6.0.4 is released. A minor release of the recent major release of 6.0. Although already anounced by Tim ~Oracle Base ~ Hall, I had not upgraded yet. I was still on 5.2.x. Change log of VirtualBox can be here. There are some interesting improvements. For instance, I'm curious to see what we can expect from the Oracle Cloud integration. And on several points, like shared folders, the performance is improved.

The UI is refreshed. I like the separate Tools bar with quick buttons to Import, Export and create new VM's. But, since I work with Vagrant more and more, I will see this screen less and less.


Also Vagrant has a new version since beginning of january. And I upgraded to 2.2.3. Change log can be found here.

All seem to function fine together. With my recent uprgaded MobaXterm 11.1 I can start my JDeveloper 12c from the started VM perfectly.
 The VM was suspended with VBox 5.2.x and Vagrant 2.2.2. And started with the latest greated. With nothing on the hand.

By the way, VMWare Player had VMWare Unity and VirtualBox the Seamless mode for years. It allows you to start your apps in the VM and run them as if they run as separate windows on your host. Years ago when I used VMWare Player, I was quite impressed by it. But I never got used to the VBox Seamless mode. Nowadays my favorite way of working is to connect to start the VM without UI (set the  vb.gui property to false in your Vagrantfile), connect to it using MobaXterm and start the app (JDeveloper for instance). The X Server implementation of MobaXterm will take care of the rest. Works like a charm!

Wednesday, 13 February 2019

KafkaSeries: Starting KafkaServers in Java - Implementing the Observer pattern ... again

In my previous article I explained how I start a ZooKeeper Server (potentially more of them) in Java using the Observer pattern. As promised, in this article I will explain how I implement the starting of KafkaServers in about the same way. Again, using the Observer pattern.

In principle we need one ZooKeeper, although you can have run multiple instances in a HighAvailable version. I have to figure that out, by the way.

But we can have multiple KafkaServers. And that makes sense. You might remember that I'm planning to use Kafka in a Weblogic environment, where you can have multiple Managed Servers (for instance OSB or SOA) that run side-by-side in a cluster possibly on mulitple machines. You probably want to have the Kafka Clients (consumers & producers) connect to the local instance. I would. But, they should work together, exchanging messages, so you can track events that originated on the other instance.

So I implemented a KafkaServerDriver extending the Observable class the same way as the ZooKeeperDriver  in my previous article (I in fact copied it). I changed it in a way that it can start multiple instances of KafkaObserver.

So, let me go over the particular metods again.


 /**
     * Run from a ServerConfig.
     * @param config ServerConfig to use.
     * @throws IOException
     */
    public void runFromProperties(Properties ksProperties) throws IOException {
        final String methodName = "runFromProperties";
        log.start(methodName);
        log.info(methodName, "Starting server");
        KafkaConfig config = KafkaConfig.fromProps(ksProperties);
        //VerifiableProperties verifiableProps = new VerifiableProperties(ksProperties);
        Seq reporters = new ArraySeq(0);
        // Seq reporters = (Seq) KafkaMetricsReporter$.MODULE$.startReporters(verifiableProps);
        KafkaServer kafkaServer = new KafkaServer(config, new SystemTime(), Option.apply("prefix"), reporters);
        setKafkaServer(kafkaServer);
        kafkaServer.startup();
        log.end(methodName);
    }
This is essentially the method to start a Kafka Server. It begins with creating a KafkaConfig  object, from a plain java.util.Properties object. Again I created an own KafkaServer Properties class that extends the java.util.Properties object. In the ZooKeeper article I explained that I needed a few extra methods to get Int based properties or to default a property based on the value of another propertie. In this case another reason is that I want to be able to differentiate over KafkaServers, each having their own property files. We'll get into that later on.
The KafkaServer(s) allow for injecting MetricReporters that can do reporting of ruintme behavior of the particular KafkaServer in a desired way. I did not get that to work in my JDeveloper project, since these are Scala object that JDeveloper got confused by, so to speak. So, in this version I provide an empty Reporters Array.

Then we create a new KafkaServer object. The constructor expects the following parameters.
  • config: the KafkaConfig object, created from the properties.
  • new SystemTime(): a new org.apache.kafka.common.utils.SystemTime object.
  • Option.apply("prefix"): Option is a Scala way of defining a Map (Kafka is build in Scala). The value "prefix" is used to give a name to the Thread the KafkaServer will run in.
  • reporters: a list of reporters that can be provided to the KafkaServer, to monitor it.
Note by the way that the KafkaServer apparently will spawn a thread it self, that it will give a name. In our Observer pattern we'll put the KafkaServer in our own Thread.

To get a hold of the instantiated KafkaServer, we set it in our private attribute, and then startup the server.

KafkaServerDriver Properties 

We can have multiple KafaServers running in our environment. We could have multiple on the same host, or distributed over multiple hosts. Each of them will have their own property-files, since, especially when running on the same host, they need at least their own broker.id and also their own port and data/log folder.

To be able to differentiate over the different Kafka Servers and define which one of them should be started up on the particular host, I introduced my own KafkaServerDriverProperties file.
It looks like:
kafkaservers=server0,server1
server0.id=0
server0.propertyfile=server0.properties
server0.startupEnabled=true
server1.id=1
server1.propertyfile=server1.properties
server1.startupEnabled=true

This defines a list of kafkaservers (server0 and server1 in this example) and then for each of those a list of attributes. Of importance are the properties:
  • <server-name>.propertyfile: naming a copy of the server.properties file that is used for this server. It it's loaded from the classpath, so only the name should be provided.
  • <server-name>.startupEnabled: should the server be started on this host (true or false)?
To work with this conveniently I added another properties class: KafkaServerDriverProperties. An object from this class fetched from PropertiesFactory.getKSDProperties();, where it is instantiated based on the kafkaserverdriver.properties loaded from the classpath.
It transforms the comma-separated list into a List object, that enables you to iterate over it. And for each servername on the list it will get the propertyfile and startupEnabled properties and put that, wrapped in a properties object, in a HashMap, identified by servername. The getServerProperties(String serverName) method enables you to fetch those properties for a certain serverName.

Observing the KafkaServer Observable

 Having the above in place, the KafkaServerDriver Observable can be implemented with the ZooKeeperDriver as an example. But, since we want to be able to fire up multiple KafkaServers, this is slightly more complicated.

Start

The start method within the KafkaServerDriver looks like:
/**
     * Start KafkaServers
     */
    public void start() {
        final String methodName = "start";
        log.start(methodName);
        for (String kafkaServerName : ksdProperties.getKafkaServerList()) {
            log.debug(methodName, "Start KafkaServer: " + kafkaServerName);
            addKafkaServer(kafkaServerName);
        }
        //addKafkaServer();
        log.end(methodName);
}

It loops over the server names from the KafkaServerList from the KafkaServerDriverProperties. For each listed servername it will add a KafkaServer.

addKafkaServer

This method has some overloaded variants. One parameterless, that loads the default server.properties file from the class path and calls the variant that takes in a properties parameter.

But let's start with the addKafkaServer(String) variant:
     /**
     * Add a KafkaServer
     * @param kafkaServerName
     */
    public void addKafkaServer(String kafkaServerName) {
        final String methodName = "addKafkaServer(String)";
        log.start(methodName);
        try {
            Properties serverProperties = ksdProperties.getServerProperties(kafkaServerName);

            if (serverProperties.getBoolValue("startupEnabled")) {
                log.info(methodName, "Start KafkaServer " + kafkaServerName);
                String serverPropertiesFileName = serverProperties.getStringValue("propertyfile");
                log.debug(methodName, "KafkaServer propertyfile: " + serverPropertiesFileName);
                Properties ksProperties = null;
                if (serverPropertiesFileName != null) {
                    ksProperties = PropertiesFactory.getKSProperties(serverPropertiesFileName);
                } else {
                    ksProperties = PropertiesFactory.getKSProperties();
                }
                addKafkaServer(ksProperties);
            } else {
                log.info(methodName, "KafkaServer " + kafkaServerName + " has startupEnabled == false!");

            }
        } catch (IOException e) {
            log.error(methodName, "Failed to load properties!", e);
            throw new RuntimeException(e);
        }
        log.end(methodName);
}

This one takes in the kafkaServerName and gets the approppriate server Properties from the  KafkaServerDriverProperties  object. It it has the startupEnabled property set to true, then it will fetch the serverProperties file, and load that one. Using that Properties object it will call the addKafkaServer(Properties) variant:

    /**
     * Add a KafkaServer from properties
     * @param ksProperties
     */
    public void addKafkaServer(Properties ksProperties) {
        final String methodName = "addKafkaServer";
        log.start(methodName);
        KafkaObserver kafkaServer = new KafkaObserver(this, ksProperties);
        Thread newKSThread = new Thread(kafkaServer);
        newKSThread.setName("KafkaServer" + ksProperties.getProperty(PRP_BRKR_ID));
        kafkaServer.setKsThread(newKSThread);
        newKSThread.start();

        log.end(methodName);
}

What this does is pretty much equal to the addZookeeper() method in the ZooKeeperDriver class. Create a new KafkaObserver providing the KafkaServerDriver object (this) as a reference and the Kafka Server Properties object. And create a new Thread for it. New is (I didn't had that when I wrote the previous article about starting the ZooKeeper) is that I set the name of the Thread. Then I add the new thread tho the KafkaServer.

Construct a KafkaObserver


We saw that in the addKafkaServer a KafkaObserver is instantiated using a reference to the KafkaServerDriver object as an Observable and the KafkaServer Properties object.

The constructor to do so is as follows:

    public KafkaObserver(Observable kafkaServerDriver, Properties ksProperties) {
        super();
        final String methodName = "KafkaObserver(Observable, Properties)";
        log.start(methodName);
        this.setKsProperties(ksProperties);
        if (kafkaServerDriver instanceof KafkaServerDriver) {
            log.info(methodName,
                     "Add observer " + this.getClass().getName() + " to observable " +
                     kafkaServerDriver.getClass().getName());
            setKafkaServerDriver((KafkaServerDriver) kafkaServerDriver);
            kafkaServerDriver.addObserver(this);
        }
        log.end(methodName);
}

In it we set the properties, and register the KafkaserverDriver and add this new object as an observer to the referenced KafkaserverDriver.

Run the KafkaObserver

Since the KafkaObserver is a Runnable we need to implement the run() method:
    public void run() {
        final String methodName = "run";
        log.start(methodName);
        try {
            runFromProperties(getKsProperties());
        } catch (IOException ioe) {
            log.error(methodName, "Run failed!", ioe);
        }
        log.end(methodName);

}

Shutdown


Shutdown within KafkaObserver the is as easy as:
    /**
     * Shutdown the serving instance
     */
    public void shutdown() {
        final String methodName = "shutdown";
        log.start(methodName);
        log.info(methodName, "Let me shutdown " + getKsThread().getName());
        KafkaServer kafkaServer = getKafkaServer();
        kafkaServer.shutdown();
        log.end(methodName);
}

The KafkaServerDriver also has a shutdown() method:
 /**
     * Shutdown all KafkaServers
     */
    public void shutdown() {
        final String methodName = "shutdown";
        log.start(methodName);
        setShutdownKafkaServers(true);
        log.info(methodName, "Notify Observers to shutdown!");
        this.setChanged();
        this.notifyObservers();
        log.end(methodName);
}

It sets the shutdownKafkaServers indicator, as well as the changed indicator. Then it notifies the Observers. This will result in a signal to the update() method of all registered KafkaObservers:
    public void update(Observable o, Object arg) {
        final String methodName = "update(Observable,Object)";
        log.start(methodName);
        Thread ksThread = getKsThread();
        log.info(methodName, ksThread.getName() + " - Got status update from Observable!");
        KafkaServerDriver ksDriver = getKafkaServerDriver();
        if (ksDriver.isShutdownKafkaServers()) {
            log.info(methodName, ksThread.getName() + " - Apparently I´ve got to shutdown myself!");
            shutdown();
        } else {
            log.info(methodName, ksThread.getName() + " - Don't know what to do with this status update!");
        }
        log.end(methodName);
}

It checks if the registered KafkaServerDriver has the shutdownKafkaServers indicator set. If so (and it obvious will), it will call the shutdown() method, mentioned earlier.

Start & Shutdown

As with the ZooKeeperDriver you need to store the KafkaServerDriver object in a static variable, and call the respective start and shutdown methods. Using the mentioned KafkaServerDriverProperties file in the class path, the particular instance will know which KafkaServers need to be started. Make sure that for each kafkaserver you have a copy of the server.properties file as found in the Kafka distribution (for instance Confluent). Each copy need to have a unique broker.id and references to the data/log folders. And possibly a unique listen-port.

Libraries and Classpath

One of the things I often miss in articles like this (my excuses that I did not add it to the previous article, is a list of libraries to add to get the lot compiled.
If you take a look at the scripts, you'll find that it would just add all the libraries in the particular folder. I like to know what particular jar's I really need to get things compiled. The following jar files in the Confluent distribution are found to be needed for both having the project compiled as well as being able to run:
  • confluent/share/java/kafka/kafka.jar
  • confluent/share/java/kafka/kafka-clients-2.0.0-cp1.jar
  • confluent/share/java/kafka/log4j-1.2.17.jar
  • confluent/share/java/kafka/slf4j-log4j12-1.7.25.jar
  • confluent/share/java/kafka/slf4j-api-1.7.25.jar
  • confluent/share/java/kafka/kafka-log4j-appender-2.0.0-cp1.jar
  • confluent/share/java/kafka/zookeeper-3.4.13.jar
  • confluent/share/java/kafka/scala-library-2.11.12.jar
  • confluent/share/java/confluent-common/common-metrics-5.0.0.jar
  • confluent/share/java/kafka/scala-logging_2.11-3.9.0.jar
  • confluent/share/java/kafka/metrics-core-2.2.0.jar
  • confluent/share/java/kafka/jackson-core-2.9.6.jar
  • confluent/share/java/kafka/jackson-databind-2.9.6.jar
  • confluent/share/java/kafka/jackson-annotations-2.9.6.jar

Added to that I have the following folders added in my project's library listing:
  • confluent/etc/kafka/
  • KafkaClient/config
These contain the Kafka and Zookeeper property files, and also my own extra property files. They're loaded using a class loader, so they need to be on the class path.`

Conclusion

Well, that's about it for now. Next stop: create a Weblogic domain and try to add the startup and shutdown classes to it and see if I can have ZooKeeper and KafaServers booted with Weblogic.
And of course the proof of the pudding:  produce and consume messages.