A custom load harness for MQ Server

This is another blog in a similar vein to “No LoadRunner, No Problems” where I needed to write a custom test harness for MQ server. IBM kindly provide a complete Java API for getting in to the nitty gritty of MQ messages. Much can be learnt from WebSphere MQ sample code in Java and of course, if you ever get stuck in analyzing problems in your MQ setup you should always visit the MQSeries.net :: Index. Kevin Braithewaite wrote a really good harness that simplifies performance and stress testing with MQJavaRoundTrip. I have since written an extended version of this test harness that takes elements from many sources (including my own) and written a comprehensive test harness for MQ which is detailed in this post.



/**
* MQ Load Generator (MQLoader)
*
* Compile : javac -classpath com.ibm.mq.jar:connector.jar:jta.jar MQLoader.java
* Package : jar cfm MQLoader.jar MQLoader.MF *.class
* Execute : java -cp com.ibm.mq.jar:connector.jar:jta.jar:MQLoader.jar MQLoader [options]
* Usage : MQLoader [options]
* Options :
-logfile path to activity logfile (errors, thread status, msg counters)
-file path to template file or folder containing template files
-qmngr name of MQ queue manager
-putQ name of MQ request 'put' queue
-getQ name of MQ reply 'get' queue
-channel name of MQ channel
-host IP address or DNS of MQ server
-port port number of MQ server
-conns number of concurrent connections to simulate
-num number of messages to create per connection
-pacing pacing in milliseconds between each message
-persist optional flag for persistent messages
-verbose optional flag for verbose output
-quiet optional flag to suppress stdout results, recommended when using discovery mode
-putonly optional flag for put only mode
-getonly optional flag for get only mode
-discover optional flag to force discovery mode
-sla optional sla time in millisecs to target during discovery mode
-stats optional flag to show descriptive statistics at test completion
-nomod optional flag to stop load harness from modifying input messages IAW Strata specifications
when used the test harness will not be able to accurately calculate response time
-nolog optional flag to stop activity logging, not recommended.
*
* Last edited : 15 Feb 07 14.00
* Author : Tim Koopmans
**/

import java.io.*;
import com.ibm.mq.*;
import java.util.*;
import java.util.regex.*;
import java.text.*;
import java.text.SimpleDateFormat;
import java.text.DecimalFormat;

public class MQLoader {
final static Properties arguments = new Properties();
public static String logfile;
public static String file;
public static String qmngr;
public static String putQ;
public static String getQ;
public static String channel;
public static String host;
public static String load;
public static String testID;
public static double thruput;
public static int port;
public static int conns;
public static int num;
public static int pacing;
public static int adjustedPacing;
public static int sla;
public static int paceMod;
public static int parseCounter = 0;
public static int retryMax = 1000;
public static boolean persistence = false;
public static boolean verbose = false;
public static boolean getonly = false;
public static boolean putonly = false;
public static boolean discover = false;
public static boolean quiet = false;
public static boolean stats = false;
public static boolean nomod = false;
public static boolean nolog = false;
public static DecimalFormat fmt = new DecimalFormat("###,###,##0.00");
public static int[] responseTimeArray;
public static double[] throughputArray;
int CCSID;
String content;
String[] contents;
String mqMessage;
String DTG;
String HEX;

// MAIN METHOD
public static void main(String[] args) throws IOException {

// CALL TO PARSE ARGS METHOD
parseArguments(args);
thruput = 1.00/pacing*1000*conns;

// START TEST
String startDTG = formatDTG();
if(verbose) { System.out.println("Start DTG\t\t"+startDTG); }
if(verbose) { System.out.println("Connections\t\t"+conns); }
if(verbose) { System.out.println("Transactions\t\t"+num*conns); }
if(verbose) { System.out.println("Pacing\t\t\t"+pacing); }
if(verbose) { if(persistence) { System.out.println("Persistence\t\tON"); } else { System.out.println("Persistence\t\tOFF"); } }
if(verbose) { System.out.print("Threads Started:\t "); }

// BUILD ARRAYS FOR RESPONSE TIMES & THROUGHPUT
responseTimeArray = new int[num*conns];
throughputArray = new double[num*conns];

try {
//MULTI THREAD PUT MODE
if((putonly) || (!getonly)) {
for (int t=1; t<=conns; t++) {
Random generator = new Random();
int randStart = generator.nextInt( pacing );
try { Thread.sleep(randStart); } catch (InterruptedException ie) {} // Randomize start time ...

try { Thread thr = new Thread(new Runnable() { public void run() {
MQLoader simulation = new MQLoader(file, qmngr, putQ, host, channel, port, pacing);
try { simulation.put(); } catch (IOException ioe) {}}});
thr.setName("putThread:"+t);
thr.start();
} catch (Exception ex) {
// PUT THREAD ERRORS
if(!nolog){ write(logfile, testID+"\t\tputThread:"+t+"\t\tError! Starting thread ... \n"+ex+"\n"); }
}
}
if(verbose) { System.out.print("\n"); }
}

// MULTI THREAD GET MODE
if((getonly) || (!putonly)) {
if(verbose) {} else if(!quiet) { System.out.println("\n\tDate \tTime \tConns\tPace\tLoad\tPersist\trTime\tTestID"); }

for (int t=1; t<=conns; t++) {
Random generator = new Random();
int randStart = generator.nextInt( 333 );
try { Thread.sleep(randStart); } catch (InterruptedException ie) {} // Randomize start time ...

try { Thread thr = new Thread(new Runnable() { public void run() {
MQLoader simulation = new MQLoader(file, qmngr, putQ, host, channel, port, pacing);
simulation.get();}});
thr.setName("getThread:"+t);
thr.start();
} catch (Exception ex) {
// GET THREAD ERRORS
if(!nolog){ write(logfile, testID+"\t\tgetThread:"+t+"\t\tError! Starting thread ... \n"+ex+"\n"); }
}
if(verbose) { System.out.print("\n"); }
}
}

} catch (Exception ex) {
// GENERIC ERRORS
if(!nolog){ write(logfile, "Error! Generic error in MAIN execution ... "+ex); }
} finally {
// WAIT FOR ACTIVE THREADS
if(!putonly) {
while(Thread.currentThread().activeCount()>1) {
// Wait for all threads to finish.
}
}
// FINISH
String finishDTG = formatDTG();
if(verbose) { System.out.println("Finish DTG\t\t"+finishDTG+"\n"); }
if(putonly) { System.out.println("No messages to get as the -putonly flag was specified"); }
if(getonly) { System.out.println("No messages to put as the -getonly flag was specified"); }

// OPTIONAL CALL TO STATISTICS METHOD
if(stats) { collectStats(); }
}
}

// CONSTRUCTOR FOR CLASS
public MQLoader(String file, String qmngr, String putQ, String host, String channel, int port, int pacing) {
this.file = file;
this.qmngr = qmngr;
this.putQ = putQ;
this.host = host;
this.channel= channel;
this.port = port;
this.CCSID = 437;
this.pacing = pacing;
}

// PARSE ARGUMENTS METHOD
private static void parseArguments(String[] args) {

String currentkey = null;
String currentvalue = "";

arguments.setProperty("logfile","MQLoader.log");
arguments.setProperty("file", "InputMsg.txt");
arguments.setProperty("qmngr", "QMANAGER");
arguments.setProperty("putQ", "QUEUE.REQUEST");
arguments.setProperty("getQ", "QUEUE.REPLY");
arguments.setProperty("channel","CHAN_NAME");
arguments.setProperty("host", "10.0.1.2");
arguments.setProperty("port", "1415");
arguments.setProperty("conns", "1");
arguments.setProperty("num", "1");
arguments.setProperty("pacing", "5");
arguments.setProperty("load", "--");
arguments.setProperty("testID", "--");
arguments.setProperty("paceMod","10");
arguments.setProperty("sla", "1000");

if (args.length == 0)
printUsage();

for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("-")) {
if (currentkey != null) {
arguments.setProperty(currentkey, currentvalue);
}
currentkey = args[i].substring(1);
currentvalue = "true";
} else if (currentvalue.equals("true")) {
currentvalue = args[i];
}
}
if (currentkey != null)
arguments.setProperty(currentkey, currentvalue);

if (arguments.getProperty("verbose") !=null) verbose = true;
if (arguments.getProperty("quiet") !=null) quiet = true;
if (arguments.getProperty("stats") !=null) stats = true;
if (arguments.getProperty("persist") !=null) persistence = true;
if (arguments.getProperty("putonly") !=null) putonly = true;
if (arguments.getProperty("getonly") !=null) getonly = true;
if (arguments.getProperty("discover") !=null) discover = true;
if (arguments.getProperty("nomod") !=null) nomod = true;
if (arguments.getProperty("nolog") !=null) nolog = true;

logfile = arguments.getProperty("logfile");
file = arguments.getProperty("file");
qmngr = arguments.getProperty("qmngr");
putQ = arguments.getProperty("putQ");
getQ = arguments.getProperty("getQ");
channel = arguments.getProperty("channel");
host = arguments.getProperty("host");
port = Integer.parseInt(arguments.getProperty("port"));
conns = Integer.parseInt(arguments.getProperty("conns"));
num = Integer.parseInt(arguments.getProperty("num"));
pacing = Integer.parseInt(arguments.getProperty("pacing"));
load = arguments.getProperty("load");
testID = arguments.getProperty("testID");
paceMod = Integer.parseInt(arguments.getProperty("paceMod"));
sla = Integer.parseInt(arguments.getProperty("sla"));
adjustedPacing = pacing;

}

// PRINT USAGE METHOD
private static void printUsage() {
System.out.println ("\nWarning: You are missing command line arguments!");
System.out.println ("------------------------------------------------");
System.out.println ("Usage: java MQ ");
System.out.println ("Options:");
System.out.println (" -file\t\t- path to template decision request file \teg. -file TEMPLATEDIR");
System.out.println (" -qmngr\t- name of MQ putQ manager \t\t\teg. -qmngr QMNGRNAME");
System.out.println (" -putQ\t\t- name of MQ request queue \t\t\teg. -putQ REQUESTQUEUE");
System.out.println (" -getQ\t\t- name of MQ reply queue \t\t\teg. -getQ REPLYQUEUE");
System.out.println (" -channel\t- name of MQ channel \t\t\t\teg. -channel CHANNELNAME");
System.out.println (" -host\t\t- IP address of MQ server \t\t\teg. -host 127.0.0.1");
System.out.println (" -port\t\t- port number of MQ server \t\t\teg. -port 1415");
System.out.println (" -conns\t- number of concurrent connections to simulate \teg. -conns 1");
System.out.println (" -num\t\t- number of messages to create per connection \teg. -num 1000");
System.out.println (" -pacing\t- pacing in milliseconds between each message \teg. -pacing 100");
System.out.println (" -persist\t- optional flag for persistent messages \teg. -persist");
System.out.println (" -verbose\t- optional flag for verbose output \t\teg. -verbose");
System.out.println (" -quiet\t- optional flag for no output \t\t\teg. -quiet");
System.out.println (" -stats\t- optional flag for statistics \t\t\teg. -stats");
System.out.println (" -putonly\t- optional flag for put only mode \t\teg. -putonly");
System.out.println (" -getonly\t- optional flag for get only mode \t\teg. -getonly");
System.out.println (" -load\t\t- optional flag to indicate load profile \teg. -load 100");
System.out.println (" -testID\t- optional flag to indicate testID \teg. -testID CA1.1\n");
}

// PUT METHOD
public void put() throws IOException {
String threadID = Thread.currentThread().getName();
if(verbose) { System.out.print("\r"+testID+"\t\t"+threadID+"\t\tStarted: OK"); }
try { if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tStarted: OK"); } } catch (IOException ioe) {}
try
{
if (host != null)
{
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tHOST\t\t\t: " + host); }
MQEnvironment.hostname = host;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,MQC.TRANSPORT_MQSERIES_CLIENT);
}
if (channel != null)
{
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tCHANNEL\t\t\t: " + channel); }
MQEnvironment.channel = channel;
}
if (port != 0)
{
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tPORT\t\t\t: " + port); }
MQEnvironment.port = port;
}
if (CCSID != 0)
{
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tCCSID\t\t\t: " + CCSID); }
MQEnvironment.CCSID = CCSID;
}
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tCONNECT Q MNGR\t\t: "+qmngr); }
MQQueueManager qMgr = new MQQueueManager(qmngr);
int openOptions = MQC.MQOO_OUTPUT
| MQC.MQOO_INQUIRE
| MQC.MQOO_FAIL_IF_QUIESCING;
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tOPENING QUEUE\t\t: "+putQ); }
MQQueue q = qMgr.accessQueue(putQ, openOptions, null, null, null);

readFiles(file);
//readFile(file); // LEGACY

MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQC.MQPMO_NONE;

int retryCount=0;

for (int i=1; i<=num; i++) {

try {

// Pick random content from the list of files provided if a directory
Random generator = new Random();
int randomContent = generator.nextInt( contents.length );
if(nomod) {
mqMessage=contents[randomContent]; // Don't modify message contents when nomod is true
}
else{
modifyContent(contents[randomContent]); // Modify message contents when nomode is false [default]
// in accordance with Strata 4.0 message specifications ...
}

if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tMSG LENGTH\t\t: "+mqMessage.length()+" bytes"); }

MQMessage mqMsg = new MQMessage();
mqMsg.clearMessage();
if (persistence) {
mqMsg.persistence = MQC.MQPER_PERSISTENT;
} else {
mqMsg.persistence = MQC.MQPER_NOT_PERSISTENT;
}
mqMsg.correlationId = MQC.MQCI_NONE;
mqMsg.messageId = MQC.MQMI_NONE;
mqMsg.writeString(mqMessage);
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tPUT MESSAGE ON QUEUE"); }
q.put(mqMsg, pmo);
} catch (MQException ex) {
i--;
retryCount++;
//Attempt to catch the following errors:
if(ex.reasonCode==2009) {
// MQJE001: An MQException occurred: Completion Code 2, Reason 2009
// Connection to MQ has been lost, attempt to reconnect!
if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! PUT Connection to MQ has been lost, attempting to reconnect ... \n"+ex+"\n"); }
try {
qMgr = new MQQueueManager(qmngr);
q = qMgr.accessQueue(putQ, openOptions, null, null, null);
}
catch (MQException exR) {
if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! PUT Reconnect failed ... \n"+exR+"\n"); }
}
} else {
// Must have another type of MQ execption, so log it ...
if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! MQ Exception in put() method ... \n"+ex+"\n"); }
throw new IOException(
testID+"\t\t"+threadID+"\t\tConnection to MQSeries is broken on putQ(qmngr/channel)@host:port\n"
+ putQ + "(" + qmngr + "/"
+ MQEnvironment.channel + ")@"
+ MQEnvironment.hostname + ":"
+ MQEnvironment.port);
}

// Enforce retry count maximums when NOT in discovery mode ...
if(!discover) {
if(retryCount>retryMax) {
// Max retries hit
i = num;
if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! Max PUT retry limit reached ... \n"+ex+"\n"); }
}
}

} finally {
if(!discover) {if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tPut Msg: "+i+"/"+num+""); } }
//Adjust pacing based on last response time recorded so as account for elapsed test time.
if(adjustedPacing>0) { try { Thread.sleep(adjustedPacing); } catch (InterruptedException ie) {} }
}
}

if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tCLOSING CONNECTION"); }
q.close();
qMgr.disconnect();
if(verbose) { System.out.print("\r"+testID+"\t\t"+threadID+"\t\tClosed : OK"); }
try { if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tClosed : OK"); } } catch (IOException ioe) {}

} catch (Exception ex) {
try { if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! Generic error in PUT method ... \n"+ex+"\n"); } } catch (IOException ioe) {}
}
}

// GET METHOD
private void get() {
String threadID = Thread.currentThread().getName();
if(verbose) { System.out.print("\r"+testID+"\t\t"+threadID+"\t\tStarted: OK"); }
try { if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tStarted: OK"); } } catch (IOException ioe) {}
try
{
if (host != null) {
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tHOST\t\t\t: " + host); }
MQEnvironment.hostname = host;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,MQC.TRANSPORT_MQSERIES_CLIENT);
MQEnvironment.disableTracing();
MQException.log=null;
}
if (channel != null) {
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tCHANNEL\t\t\t: " + channel); }
MQEnvironment.channel = channel;
}
if (port != 0) {
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tPORT\t\t\t: " + port); }
MQEnvironment.port = port;
}
if (CCSID != 0) {
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tCCSID\t\t\t: " + CCSID); }
MQEnvironment.CCSID = CCSID;
}
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tCONNECT Q MNGR\t\t: "+qmngr); }
MQQueueManager qMgr = new MQQueueManager(qmngr);
int openOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_INQUIRE
| MQC.MQOO_FAIL_IF_QUIESCING;
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tOPENING QUEUE\t\t: "+getQ); }
MQQueue q = qMgr.accessQueue(getQ, openOptions);

int retryCount=0;

for (int i=1; i<=num; i++) {
String msgText = null;

try {
MQMessage mqMsg = new MQMessage();
// MQGetMessageOptions gmo = new MQGetMessageOptions();
// q.get(mqMsg, gmo);
q.get(mqMsg);

if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tMSG ID\t\t\t: "+dumpHexId(mqMsg.messageId)); }
if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tCORREL ID\t\t: "+dumpHexId(mqMsg.correlationId)); }

if(verbose) { System.out.print(testID+"\t\t"+threadID+"\t\tPUT DATE\t\t: "); }
int finYY = mqMsg.putDateTime.get(Calendar.YEAR); if(verbose) { System.out.print(finYY); }
int finMM = mqMsg.putDateTime.get(Calendar.MONTH) + 1; if(verbose) { if (finMM < 10) { System.out.print("0"); } System.out.print(finMM); }
int finDD = mqMsg.putDateTime.get(Calendar.DAY_OF_MONTH); if(verbose) { if (finDD < 10) { System.out.print("0"); } System.out.print(finDD); }
if(verbose) { System.out.println(""); }

if(verbose) { System.out.print(testID+"\t\t"+threadID+"\t\tPUT TIME\t\t: "); }
int finHH = mqMsg.putDateTime.get(Calendar.HOUR_OF_DAY); if(verbose) { if (finHH < 10) { System.out.print("0"); } System.out.print(finHH); }
int finmm = mqMsg.putDateTime.get(Calendar.MINUTE); if(verbose) { if (finmm < 10) { System.out.print("0"); } System.out.print(finmm); }
int finSS = mqMsg.putDateTime.get(Calendar.SECOND); if(verbose) { if (finSS < 10) { System.out.print("0"); } System.out.print(finSS); }
int finFF = mqMsg.putDateTime.get(Calendar.MILLISECOND); finFF = finFF/10; if(verbose) { if (finFF < 10) { System.out.print("0"); } System.out.print(finFF); }
if(verbose) { System.out.println(""); }

if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tMSG LENGTH\t\t: "+mqMsg.getMessageLength()+" bytes"); }

msgText = mqMsg.readLine();

int finMilli = (finHH*60*60*1000)+(finmm*60*1000)+(finSS*1000)+(finFF*10);
parseContent(msgText, finMilli);

} catch (MQException ex) {
i--;
retryCount++;
//Attempt to catch the following errors:
if(ex.reasonCode==2009) {
// MQJE001: An MQException occurred: Completion Code 2, Reason 2009
// Connection to MQ has been lost, attempt to reconnect!
if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! GET Connection to MQ has been lost, attempting to reconnect ... \n"+ex+"\n"); }
try {
qMgr = new MQQueueManager(qmngr);
q = qMgr.accessQueue(getQ, openOptions);
}
catch (MQException exR) {
if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! GET Reconnect failed ... \n"+exR+"\n"); }
}
} else if(ex.reasonCode==2033) {
// MQJE001: An MQException occurred: Completion Code 2, Reason 2033: (No messages available)
// This can occur when 'get' message pacing is set too quick. In order to
// prevent latency in the load balancer's decision making it is necessary
// to set a fast transaction pacing for the get method. In the case such
// as this when the pacing exceeds the amount of messages available to get
// this catch clause will sleep for the desired amount of time in millisecs.
//if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! Nil messages on queue ... \n"+ex+"\n"); }

// Sleep for the full adjusted pacing rather than adjPacing/conns
try { Thread.sleep( adjustedPacing ); } catch (InterruptedException ie) {}

} else {
// Must have another type of MQ execption, so log it ...
if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! MQ Exception in GET msg ... \n"+ex+"\n"); }
throw new IOException(
testID+"\t\t"+threadID+"\t\tConnection to MQSeries is broken on putQ(qmngr/channel)@host:port\n"
+ putQ + "(" + qmngr + "/"
+ MQEnvironment.channel + ")@"
+ MQEnvironment.hostname + ":"
+ MQEnvironment.port);
}

// Enforce retry count maximums when NOT in discovery mode ...
if(!discover) {
if(retryCount>retryMax) {
// Max retries hit
i = num;
if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! Max GET retry limit reached ... \n"+ex+"\n"); }
}
}

} finally {
if(!discover) {if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tGet Msg: "+i+"/"+num+""); } }

if(msgText != null) {
retryCount=0;
}

// Force sleep for message pacing
if(discover) {
try { Thread.sleep( 10 ); } catch (InterruptedException ie) {}
} else {
try { Thread.sleep( (int)(adjustedPacing/conns) ); } catch (InterruptedException ie) {}
}
}
}

if(verbose) { System.out.println(testID+"\t\t"+threadID+"\t\tCLOSING CONNECTION"); }
q.close();
qMgr.disconnect();
if(verbose) { System.out.print("\r"+testID+"\t\t"+threadID+"\t\tClosed : OK"); }
try { if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tClosed : OK"); } } catch (IOException ioe) {}
}
catch (Exception ex) {
try { if(!nolog){ write(logfile, testID+"\t\t"+threadID+"\t\tError! Generic error in GET method ... \n"+ex+"\n"); } } catch (IOException ioe) {}
}
}

// WRITE TO LOG FILE METHOD
public static void write(String f, String s) throws IOException {
String nowUTC = formatDTG();
try {
FileWriter aWriter = new FileWriter(f, true);
aWriter.write(nowUTC.substring(0,4)+"-"+
nowUTC.substring(4,6)+"-"+
nowUTC.substring(6,8)+"\t"+
nowUTC.substring(8,10)+":"+
nowUTC.substring(10,12)+":"+
nowUTC.substring(12,14)+"."+
nowUTC.substring(14,16)+"\t"+
s + System.getProperty("line.separator"));
aWriter.flush();
aWriter.close();
} catch (IOException ioe) {
System.out.println("Error! Unable to write to log file ...");
}
}

// FORMAT DATE TIME GROUP METHOD
private static String formatDTG() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
TimeZone utc = TimeZone.getTimeZone("UTC");
sdf.setTimeZone(utc);
String result = sdf.format( new Date() );
String DTG = result.substring(0,16);
return DTG;
}

// READ INPUT FILE METHOD (LEGACY)
public String readFile(String file) throws IOException {
FileInputStream fis = new FileInputStream(file);
int x = fis.available();
byte b[] = new byte[x];
fis.read(b);
content = new String(b);
return content;
}

// READ INPUT FILES METHOD
public String[] readFiles(String file) throws IOException {

String directoryName = file;
File directory;
String directoryPath = "";
String[] files;

files = new String[1];
contents = new String[1];

directory = new File(directoryName);
if (directory.isDirectory() == false) {
if (directory.exists() == false){
//No such directory exists
System.out.println("Error! No such directory exists ...");
} else {
//File is not a directory so treat it as a single file!
try
{
FileInputStream fis = new FileInputStream(file);
int x = fis.available();
byte b[] = new byte[x];
fis.read(b);
contents[0] = new String(b);
return contents;
} catch (Exception ex) {
System.out.println("Error! When reading contents of single file ... "+ex);
}
}
} else {

files = directory.list();
contents = directory.list();

directoryPath = directory.getPath();

}

for (int i=0; i < files.length; i++)
{
// Write contents of files to contents array
try
{
FileInputStream fis = new FileInputStream(directoryPath+"/"+files[i]);
int x = fis.available();
byte b[] = new byte[x];
fis.read(b);
contents[i] = new String(b);
} catch (Exception ex) {
System.out.println("Error! When reading contents of multiple files ... "+ex);
}

}

return contents;
}

// MODIFY CONTENT OF INPUT FILE METHOD
public String modifyContent(String content) {
mqMessage = content;

if(verbose) { System.out.println("LENGTH BYTES\t\t: "+content.length()); }
String newDTG = formatDTG();

// Modify Header Detail: Message Date and Message Time
String oldDTG = content.substring(0,16);
if(verbose) { System.out.println("OLD DTG\t\t\t: "+oldDTG); }
if(verbose) { System.out.println("NEW DTG\t\t\t: "+newDTG); }
mqMessage = mqMessage.replaceAll(oldDTG, newDTG);

// Modify Header Detail: CustomerID
String oldCustID = content.substring(131,158);
String newCustID = "0000000000"+newDTG;
oldCustID = oldCustID.substring(0,26); //trim to exact length
newCustID = newCustID.substring(0,26); //trim to exact length
if(verbose) { System.out.println("OLD CUST ID\t\t: "+oldCustID); }
if(verbose) { System.out.println("NEW CUST ID\t\t: "+newCustID); }
mqMessage = mqMessage.replaceAll(oldCustID, newCustID);

// Modify Control Area Detail: Current Run Date
String oldRunDG = content.substring(43,52);
oldRunDG = oldRunDG.substring(0,8); //trim to exact length
if(verbose) { System.out.println("OLD RUN DG\t\t: "+oldRunDG); }
if(verbose) { System.out.println("NEW RUN DG\t\t: "+newDTG.substring(0,8)); }
mqMessage = mqMessage.replaceAll(oldRunDG, newDTG.substring(0,8));

return mqMessage;
}

// PARSE CONTENTS METHOD
private void parseContent(String text, int milliseconds) {

String msgText = text;
int finMilli = milliseconds;
String custID = msgText.substring(11,37);
if(getQ==putQ){ custID = msgText.substring(131,158); }//FOR TESTING MQ PUT/GET FROM SAME QUEUE
String origDTG = custID.substring(10,26);
String origPutDate = origDTG.substring(0,8);
int origHH = Integer.parseInt(origDTG.substring(8,10));
int origmm = Integer.parseInt(origDTG.substring(10,12));
int origSS = Integer.parseInt(origDTG.substring(12,14));
int origFF = Integer.parseInt(origDTG.substring(14,16));

if(verbose) { System.out.println("CUST ID\t\t\t: "+custID); }
if(verbose) { System.out.println("ORGIG DTG\t\t: "+origDTG); }
if(verbose) { System.out.println("ORIG HH:\t\t: " + origHH); }
if(verbose) { System.out.println("ORIG mm:\t\t: " + origmm); }
if(verbose) { System.out.println("ORIG SS:\t\t: " + origSS); }
if(verbose) { System.out.println("ORIG FF:\t\t: " + origFF); }

int origMilli = (origHH*60*60*1000)+(origmm*60*1000)+(origSS*1000)+(origFF*10);
int responseTime = finMilli - origMilli;
if(responseTime<10) { responseTime=10; } //Can't measure below 10 milliseconds ...

// Throughput calculation
thruput = 1.00/pacing*1000*conns;

try {
// Resize responstime and throughput arrays
if (parseCounter == responseTimeArray.length) {
int newSize = parseCounter + 10;
int[] tempResponseTimeArray = new int[newSize];
double[] tempThroughputArray = new double[newSize];

System.arraycopy(responseTimeArray,0,tempResponseTimeArray,0,responseTimeArray.length);
System.arraycopy(throughputArray,0,tempThroughputArray,0,throughputArray.length);

responseTimeArray = tempResponseTimeArray;
throughputArray = tempThroughputArray;
}
responseTimeArray[parseCounter] = responseTime;
throughputArray[parseCounter] = thruput;
} catch (Exception ex) {
System.out.println("Error! When resizing response time & throughput arrays ...");
}
parseCounter++;

if(verbose) {
System.out.println("DIFF MILLI\t\t: " + responseTime);
} else {
// MULTI THREAD MODE
String tempLine = "\t"+origPutDate+"\t";
if (origHH < 10) { tempLine = tempLine+"0"; } tempLine = tempLine+origHH+":";
if (origmm < 10) { tempLine = tempLine+"0"; } tempLine = tempLine+origmm+":";
if (origSS < 10) { tempLine = tempLine+"0"; } tempLine = tempLine+origSS+"\t";
tempLine = tempLine+conns+"\t"+pacing+"\t"+load+"\t"+persistence+"\t"+responseTime+"\t"+testID;
if(!quiet) { System.out.println(tempLine); }

// ADJUST PACING BASED ON LAST RESPONSE TIME
int lastResponseTime = responseTime;
int thisPacing = pacing - lastResponseTime;
if(thisPacing<0) { adjustedPacing=10; } else { adjustedPacing=thisPacing; }
if(verbose) { System.out.println("ADJUSTED PACING: "+thisPacing+" " +lastResponseTime+" "+adjustedPacing); }

// OPTIONAL DISCOVER MODE
if(discover) {

int mod = parseCounter % conns;
if(responseTime==0) {
// No message retrieved.
}
else {

// LOAD BALANCER CALCULATIONS FOR DISCOVER MODE
// Pacing mods based on response time
double diff = (double)sla - (double)responseTime;
double fract = diff/1000;
double inv = 1.00 - fract;
double fractSq = fract * fract;
double factor = (sla/100) * (sla/100);

if(diff<0){
pacing = (int)( (fractSq * inv * 100) + factor);
} else {
pacing = (int)( (1-(fractSq * 100))+ factor);
}

// Display thinned results by modulus of number of conns
if(mod==0)
{
tempLine = tempLine+"\t"+pacing+"\t"+fmt.format(thruput);
System.out.println(tempLine);
}
}
}
}
}

// FORMAT PADDING METHOD
private static String padCalc(double pad) {
String padding="";
if(pad<10) padding=" ";
else if(pad<100) padding=" ";
else if(pad<1000) padding=" ";
else if(pad<10000) padding=" ";
else if(pad<100000) padding=" ";
else padding="";
return padding;
}

// STATISTICS METHOD
private static void collectStats() {

System.out.println("\nDESCRIPTIVE STATISTICS "+testID);
System.out.println("--------------------------------------------------------------------------\n");
System.out.println("\t\t\t\tRESPONSE TIME\t\tTHROUGHPUT");
System.out.println("\t\t\t\t(millisecs) \t\t(transactions/sec)");

try {
Arrays.sort(responseTimeArray);
Arrays.sort(throughputArray);
String padding;
double pad;
String tempLine;

// MEDIANS
double rtMedian;
int rtMiddle = responseTimeArray.length/2;
if (responseTimeArray.length%2 == 1) {
// Odd number of elements
rtMedian = responseTimeArray[rtMiddle];
} else {
// Even number of elements
rtMedian = (responseTimeArray[rtMiddle-1] + responseTimeArray[rtMiddle]) / 2.0;
}

double tpMedian;
int tpMiddle = throughputArray.length/2;
if (throughputArray.length%2 == 1) {
// Odd number of elements
tpMedian = throughputArray[tpMiddle];
} else {
// Even number of elements
tpMedian = (throughputArray[tpMiddle-1] + throughputArray[tpMiddle]) / 2.0;
}

padding=padCalc(rtMedian);
System.out.println("\tMEDIAN\t\t:\t"+fmt.format(rtMedian)+padding+"\t\t"+fmt.format(tpMedian));

// LOWER QUARTILES
double rtLowerQuartile;
int rtLower = (int)(responseTimeArray.length/(1/0.25));
rtLowerQuartile = responseTimeArray[rtLower];

double tpLowerQuartile;
int tpLower = (int)(throughputArray.length/(1/0.25));
tpLowerQuartile = throughputArray[tpLower];

padding=padCalc(rtLowerQuartile);
System.out.println("\tLOWER QUARTILE\t:\t"+fmt.format(rtLowerQuartile)+padding+"\t\t"+fmt.format(tpLowerQuartile));

// UPPER QUARTILES
double rtUpperQuartile;
int rtUpper = (int)(responseTimeArray.length/(1/0.75));
rtUpperQuartile = responseTimeArray[rtUpper];

double tpUpperQuartile;
int tpUpper = (int)(throughputArray.length/(1/0.75));
tpUpperQuartile = throughputArray[tpUpper];

padding=padCalc(rtUpperQuartile);
System.out.println("\tUPPER QUARTILE\t:\t"+fmt.format(rtUpperQuartile)+padding+"\t\t"+fmt.format(tpUpperQuartile));

// 95th PERCENTILES
double rt95thPercentile;
int rtPercentile95 = (int)(responseTimeArray.length/(1/0.95));
rt95thPercentile = responseTimeArray[rtPercentile95];

double tp95thPercentile;
int tpPercentile95 = (int)(throughputArray.length/(1/0.95));
tp95thPercentile = throughputArray[tpPercentile95];

padding=padCalc(rt95thPercentile);
System.out.println("\t95th PERCENTILE\t:\t"+fmt.format(rt95thPercentile)+padding+"\t\t"+fmt.format(tp95thPercentile));

// MININUMS
double rtMinimum = responseTimeArray[0];
double tpMinimum = throughputArray[0];

padding=padCalc(rtMinimum);
System.out.println("\tMINIMUM\t\t:\t"+fmt.format(rtMinimum)+padding+"\t\t"+fmt.format(tpMinimum));

// MAXIMUMS
double rtMaximum = responseTimeArray[responseTimeArray.length-1];
double tpMaximum = throughputArray[throughputArray.length-1];

padding=padCalc(rtMaximum);
System.out.println("\tMAXIMUM\t\t:\t"+fmt.format(rtMaximum)+padding+"\t\t"+fmt.format(tpMaximum));

// RANGES
double rtRange = rtMaximum-rtMinimum;
double tpRange = tpMaximum-tpMinimum;

padding=padCalc(rtRange);
System.out.println("\tRANGE\t\t:\t"+fmt.format(rtRange)+padding+"\t\t"+fmt.format(tpRange));

// INTER QUARTILE RANGES
double rtIQR = rtUpperQuartile-rtLowerQuartile;
double tpIQR = tpUpperQuartile-tpLowerQuartile;

padding=padCalc(rtIQR);
System.out.println("\tIQR\t\t:\t"+fmt.format(rtIQR)+padding+"\t\t"+fmt.format(tpIQR));

// N COUNTS
double rtCount = responseTimeArray.length;
double tpCount = throughputArray.length;

padding=padCalc(rtCount);
System.out.println("\tCOUNT\t\t:\t"+fmt.format(rtCount)+padding+"\t\t"+fmt.format(tpCount));

// MEANS
double rtMean;
double sum = 0;
double squareSum = 0;
for (int i=0; i sum += responseTimeArray[i];
squareSum += responseTimeArray[i]*responseTimeArray[i];
}
rtMean = sum / responseTimeArray.length;

double tpMean;
double tpSum = 0;
double tpSuareSum = 0;
for (int i=0; i tpSum += throughputArray[i];
tpSuareSum += throughputArray[i]*throughputArray[i];
}
tpMean = tpSum / throughputArray.length;

padding=padCalc(rtMean);
System.out.println("\tMEAN\t\t:\t"+fmt.format(rtMean)+padding+"\t\t"+fmt.format(tpMean));

// STANDARD DEVIATIONS
double rtStdDev = Math.sqrt( squareSum/rtCount - rtMean*rtMean );
double tpStdDev = Math.sqrt( tpSuareSum/tpCount - tpMean*tpMean );

padding=padCalc(rtStdDev);
System.out.println("\tSTD DEV\t\t:\t"+fmt.format(rtStdDev)+padding+"\t\t"+fmt.format(tpStdDev));

} catch (Exception ex) {
System.out.println("Error! Unable to calculate statistics ... ");
}
System.out.println("\n--------------------------------------------------------------------------\n");
}

// DUMP HEX METHOD
public String dumpHexId(byte[] myId) {
HEX = "";
for (int i=0; i < myId.length;i++) {
char b = (char)(myId[i] & 0xFF);
if (b < 0x10) {
HEX = HEX + "0";
}
HEX = HEX + (String)(Integer.toHexString(b)).toUpperCase();
}
return HEX;
}
}

1 comment to A custom load harness for MQ Server

Leave a Reply

 

 

 

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre lang="" line="" escaped="">