[go: nahoru, domu]

Skip to content

Commit

Permalink
Merge pull request #27 from dist-sys/fine-grained-latency
Browse files Browse the repository at this point in the history
Change timeunit in latency calculation.
  • Loading branch information
nabon committed Feb 4, 2022
2 parents 72da733 + 7381dc8 commit e3b2ed2
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 122 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ Below is an execution result sample.

```
-----Publisher-----
Maximum throughput[msg/s]: 53068
Average throughput[msg/s]: 49894.57
Maximum throughput [msg/s]: 53068
Average throughput [msg/s]: 49894.571
Number of published messages: 349262
Per second throughput[msg/s]: 44460, 47558, 52569, 53068, 51041, 51583, 48983
Per second throughput [msg/s]: 44460, 47558, 52569, 53068, 51041, 51583, 48983
-----Subscriber-----
Maximum throughput[msg/s]: 53050
Average throughput[msg/s]: 49891.14
Maximum throughput [msg/s]: 53050
Average throughput [msg/s]: 49891.142
Number of received messages: 349238
Per second throughput[msg/s]: 44399, 47587, 52566, 53050, 51078, 51575, 48983
Maximum latency[ms]: 24
Average latency[ms]: 1.39
Per second throughput [msg/s]: 44399, 47587, 52566, 53050, 51078, 51575, 48983
Maximum latency [ms]: 24.812
Average latency [ms]: 1.396
```

MQTTLoader is licensed under the Apache License, Version2.0.
Expand Down
33 changes: 17 additions & 16 deletions doc/usage_en.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# MQTTLoader usage (v0.8.2)
# MQTTLoader usage (v0.8.3)
MQTTLoader is a load testing tool (client tool) for MQTT.
It supports both MQTT v5.0 and v3.1.1.
From v0.8.0, it supports TLS authentication.
Expand All @@ -13,7 +13,7 @@ Download the archive file (zip or tar) from: https://github.com/dist-sys/mqttloa
Below is an example of downloading by using curl command.

```
$ curl -OL https://github.com/dist-sys/mqttloader/releases/download/v0.8.2/mqttloader-0.8.2.zip
$ curl -OL https://github.com/dist-sys/mqttloader/releases/download/v0.8.3/mqttloader-0.8.3.zip
```

By extracting it, you can get the following files.
Expand Down Expand Up @@ -197,24 +197,24 @@ MQTTLoader displays results like the following on standard output.

```
-----Publisher-----
Maximum throughput[msg/s]: 18622
Average throughput[msg/s]: 16666.666666666668
Maximum throughput [msg/s]: 18622
Average throughput [msg/s]: 16666.666
Number of published messages: 100000
Per second throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536
Per second throughput [msg/s]: 11955, 16427, 18430, 18030, 18622, 16536
-----Subscriber-----
Maximum throughput[msg/s]: 18620
Average throughput[msg/s]: 16666.666666666668
Maximum throughput [msg/s]: 18620
Average throughput [msg/s]: 16666.666
Number of received messages: 100000
Per second throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296
Maximum latency[ms]: 81
Average latency[ms]: 42.23691
Per second throughput [msg/s]: 11218, 16414, 18426, 18026, 18620, 17296
Maximum latency [ms]: 81.838
Average latency [ms]: 42.236
```
MQTTLoader counts the number of messages sent by publishers.
If QoS level is set to 1 or 2, counting is done when receiving PUBACK or PUBCOMP respectively.

After completion, MQTTLoader calculates the maximum throughput, the average throughput, and the number of published messages.
`Per second throughput[msg/s]` is the time series of throughputs per second.
`Per second throughput [msg/s]` is the time series of throughputs per second.

By using the parameterse `ramp_up` and `ramp_down`, you can exclude the beginning and trailing data.
If you set the following parameter settings for example, the beginning one second and the trailing one second are excluded.
Expand Down Expand Up @@ -242,10 +242,10 @@ Note that if the specified directory doesn't exist, it is newly created.
The file `mqttloader_xxxxxxxx-xxxxxx.csv` has records like the following:

```
1599643916416,ml-EeiE-p-00001,S,
1599643916416,ml-EeiE-p-00000,S,
1599643916419,ml-EeiE-s-00000,R,3
1599643916422,ml-EeiE-p-00001,S,
1599643916416823,ml-EeiE-p-00001,S,
1599643916416882,ml-EeiE-p-00000,S,
1599643916419123,ml-EeiE-s-00000,R,3165
1599643916422982,ml-EeiE-p-00001,S,
:
:
```
Expand All @@ -254,10 +254,11 @@ Each line, consists of comma-separeted values, indicates the following data.
In the case that the event type is `R`, latency data follows.

```
timestamp (Unix time in milliseconds), client ID, event type (S: send, R: receive), latency (in milliseconds)
timestamp (Unix time in microseconds), client ID, event type (S: send, R: receive), latency (in microseconds)
```

Although MQTTLoader outputs the measurement result to the console, you can use the above .csv file for further analysis.
Note that the latency in the above file is in microseconds, whereas that in the console is in milliseconds with three digits after the decimal point.

---
---
Expand Down
33 changes: 17 additions & 16 deletions doc/usage_jp.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# MQTTLoader 利用方法 (v0.8.2)
# MQTTLoader 利用方法 (v0.8.3)
MQTTLoaderは、MQTT v5.0とv3.1.1に対応した負荷テストツール(クライアントツール)です。
v0.8.0から、ブローカとのTLS接続にも対応しました。

Expand All @@ -14,7 +14,7 @@ https://github.com/dist-sys/mqttloader/releases
以下は、Curlコマンドを使ってダウンロードする場合の例です。

```
$ curl -OL https://github.com/dist-sys/mqttloader/releases/download/v0.8.2/mqttloader-0.8.2.zip
$ curl -OL https://github.com/dist-sys/mqttloader/releases/download/v0.8.3/mqttloader-0.8.3.zip
```

ダウンロードしたファイルを解凍すると、以下のディレクトリ構造が得られます。
Expand Down Expand Up @@ -198,25 +198,25 @@ MQTTLoadは標準出力に以下のような測定結果の情報を出力しま

```
-----Publisher-----
Maximum throughput[msg/s]: 18622
Average throughput[msg/s]: 16666.666666666668
Maximum throughput [msg/s]: 18622
Average throughput [msg/s]: 16666.666
Number of published messages: 100000
Per second throughput[msg/s]: 11955, 16427, 18430, 18030, 18622, 16536
Per second throughput [msg/s]: 11955, 16427, 18430, 18030, 18622, 16536
-----Subscriber-----
Maximum throughput[msg/s]: 18620
Average throughput[msg/s]: 16666.666666666668
Maximum throughput [msg/s]: 18620
Average throughput [msg/s]: 16666.666
Number of received messages: 100000
Per second throughput[msg/s]: 11218, 16414, 18426, 18026, 18620, 17296
Maximum latency[ms]: 81
Average latency[ms]: 42.23691
Per second throughput [msg/s]: 11218, 16414, 18426, 18026, 18620, 17296
Maximum latency [ms]: 81.838
Average latency [ms]: 42.236
```

MQTTLoaderは、各publisherによるメッセージの送信をカウントします。
QoSレベルが1または2の場合は、それぞれ、PUBACKおよびPUBCOMPを受信したタイミングでカウントされます。

測定が終了したら、MQTTLoaderはカウントしたメッセージ数を集計し、最大スループット、平均スループット、送信メッセージ数を計算します。
`Per second throughput[msg/s]`は、スループット値の時間変化を秒単位で列挙したものです。
`Per second throughput [msg/s]`は、スループット値の時間変化を秒単位で列挙したものです。

パラメータ`ramp_up``ramp_down`を用いると、測定開始直後と終了直前の一定秒数分を、集計対象データから除外することができます。
例えば以下のように設定した場合、最初と最後の1秒間のデータは集計対象外となります。
Expand Down Expand Up @@ -244,10 +244,10 @@ subscriberに関しても、上記と同様にして、受信メッセージの
このcsvファイルには、以下のようなデータが記録されます。

```
1599643916416,ml-EeiE-p-00001,S,
1599643916416,ml-EeiE-p-00000,S,
1599643916419,ml-EeiE-s-00000,R,3
1599643916422,ml-EeiE-p-00001,S,
1599643916416823,ml-EeiE-p-00001,S,
1599643916416882,ml-EeiE-p-00000,S,
1599643916419123,ml-EeiE-s-00000,R,3165
1599643916422982,ml-EeiE-p-00001,S,
:
:
```
Expand All @@ -256,10 +256,11 @@ subscriberに関しても、上記と同様にして、受信メッセージの
送受信種別が `R` の場合のみ、レイテンシも記載されます。

```
タイムスタンプ(ミリ秒単位Unix時間), クライアントID, 送受信種別(S: 送信, R: 受信), レイテンシ(ミリ秒単位
タイムスタンプ(マイクロ秒単位Unix時間), クライアントID, 送受信種別(S: 送信, R: 受信), レイテンシ(マイクロ秒単位
```

MQTTLoaderは、測定結果のサマリをコンソールに出力しますが、追加の集計・分析を行いたい場合には上記のファイルを使ってください。
なお、コンソールに出力されるレイテンシはミリ秒単位(小数点以下3桁まで)であるのに対し、上記ファイルのレイテンシはマイクロ秒単位である点に注意してください。

---
---
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/mqttloader/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.text.SimpleDateFormat;

public class Constants {
public static final String VERSION = "0.8.2";
public static final String VERSION = "0.8.3";
public static final String BROKER_PREFIX_TCP = "tcp://";
public static final String BROKER_PREFIX_TLS = "ssl://";
public static final String BROKER_PORT_TCP = "1883";
Expand All @@ -29,10 +29,10 @@ public class Constants {
public static final String SUB_CLIENT_ID_PREFIX = "ml-"+HOST_ID+"-s-";
public static final String PUB_CLIENT_ID_PREFIX = "ml-"+HOST_ID+"-p-";
public static final Record STOP_SIGNAL = new Record();
public static final int MILLISECOND_IN_NANO = 1000000;
public static final int MICROSECOND_IN_NANO = 1000;
public static final int MILLISECOND_IN_MICRO = 1000;
public static final int SECOND_IN_NANO = 1000000000;
public static final int SECOND_IN_MILLI = 1000;
public static final long SECOND_IN_MILLI = 1000L;
public static final long SECOND_IN_MICRO = 1000000L;
public static final SimpleDateFormat DATE_FORMAT_FOR_LOG = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z");
public static final SimpleDateFormat DATE_FORMAT_FOR_FILENAME = new SimpleDateFormat("yyyyMMdd-HHmmss");

Expand Down
39 changes: 21 additions & 18 deletions src/main/java/mqttloader/Loader.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ private void waitForMeasurement() {
if(Util.getPropValueInt(Prop.NUM_SUB) > 0){
timer = new Timer();
int subTimeout = Util.getPropValueInt(Prop.SUB_TIMEOUT);
timer.schedule(new RecvTimeoutTask(timer, subTimeout), subTimeout*1000);
timer.schedule(new RecvTimeoutTask(timer, subTimeout), subTimeout*Constants.SECOND_IN_MILLI);
}

int execTime = Util.getPropValueInt(Prop.EXEC_TIME);
Expand Down Expand Up @@ -451,14 +451,14 @@ private void calcResult() {
while ((str = br.readLine()) != null) {
StringTokenizer st = new StringTokenizer(str, ",");
long timestamp = Long.valueOf(st.nextToken());
String clientId = st.nextToken(); //client ID
int elapsedTime = (int)((timestamp - Util.getEpochMicros(Loader.measurementStartTime))/Constants.SECOND_IN_MICRO);
st.nextToken(); //client ID
boolean isSend = st.nextToken().equals("S") ? true : false;
int latency = -1;
if (st.hasMoreTokens()) {
latency = Integer.valueOf(st.nextToken());
if(isSend){
recorder.recordSendInMemory(elapsedTime);
} else {
recorder.recordReceiveInMemory(elapsedTime, Long.valueOf(st.nextToken()));
}

recorder.recordInMemory(new Record(timestamp, clientId, isSend, latency));
}

br.close();
Expand All @@ -481,7 +481,7 @@ private void calcResult() {
TreeMap<Integer, Integer> sendThroughputs = recorder.getSendThroughputs();
TreeMap<Integer, Integer> recvThroughputs = recorder.getRecvThroughputs();
TreeMap<Integer, Long> latencySums = recorder.getLatencySums();
TreeMap<Integer, Integer> latencyMaxs = recorder.getLatencyMaxs();
TreeMap<Integer, Long> latencyMaxs = recorder.getLatencyMaxs();

int rampup = Util.getPropValueInt(Prop.RAMP_UP);
int rampdown = Util.getPropValueInt(Prop.RAMP_DOWN);
Expand All @@ -504,21 +504,24 @@ private void calcResult() {
System.out.println("-----Subscriber-----");
printThroughput(recvThroughputs, false);

int maxLt = 0;
double aveLt = 0;
long maxLtMicros = 0;
double aveLtMicros = 0;
long numMsg = 0;
for(int elapsedSecond: latencySums.keySet()) {
if(latencyMaxs.get(elapsedSecond) > maxLt) {
maxLt = latencyMaxs.get(elapsedSecond);
if(latencyMaxs.get(elapsedSecond) > maxLtMicros) {
maxLtMicros = latencyMaxs.get(elapsedSecond);
}
int numInSec = recvThroughputs.get(elapsedSecond);
numMsg += numInSec;
double aveInSec = (double)latencySums.get(elapsedSecond)/numInSec;
aveLt = aveLt + ((aveInSec-aveLt)*numInSec)/numMsg;
aveLtMicros = aveLtMicros + ((aveInSec-aveLtMicros)*numInSec)/numMsg;
}

System.out.println("Maximum latency[ms]: "+maxLt);
System.out.println("Average latency[ms]: "+String.format("%.2f", aveLt));
double maxLtMillis = (double)maxLtMicros/Constants.MILLISECOND_IN_MICRO;
double aveLtMillis = aveLtMicros/Constants.MILLISECOND_IN_MICRO;

System.out.println("Maximum latency [ms]: "+String.format("%.3f", maxLtMillis));
System.out.println("Average latency [ms]: "+String.format("%.3f", aveLtMillis));
}

/**
Expand All @@ -538,15 +541,15 @@ private void printThroughput(TreeMap<Integer, Integer> throughputs, boolean forP
}

double aveTh = throughputs.size()>0 ? (double)sumMsg/throughputs.size() : 0;
System.out.println("Maximum throughput[msg/s]: "+maxTh);
System.out.println("Average throughput[msg/s]: "+String.format("%.2f", aveTh));
System.out.println("Maximum throughput [msg/s]: "+maxTh);
System.out.println("Average throughput [msg/s]: "+String.format("%.3f", aveTh));
if(forPublisher){
System.out.println("Number of published messages: "+sumMsg);
}else{
System.out.println("Number of received messages: "+sumMsg);
}

System.out.print("Per second throughput[msg/s]: ");
System.out.print("Per second throughput [msg/s]: ");
for(int elapsedSecond: throughputs.keySet()){
System.out.print(throughputs.get(elapsedSecond));
if(elapsedSecond<throughputs.lastKey()){
Expand Down
41 changes: 28 additions & 13 deletions src/main/java/mqttloader/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,39 @@

package mqttloader;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public class Record {
private long timestamp;
private long sentEpochMicros;
private Instant receivedTime;
private String clientId;
private boolean isSend;
private int latency;

private boolean isStopSignal = false;

public Record(long timestamp, String clientId, boolean isSend, int latency) {
this.timestamp = timestamp;
public Record(long sentEpochMicros, Instant receivedTime, String clientId, boolean isSend) {
this.sentEpochMicros = sentEpochMicros;
this.receivedTime = receivedTime;
this.clientId = clientId;
this.isSend = isSend;
this.latency = latency;
}

public Record(long timestamp, String clientId, boolean isSend) {
this(timestamp, clientId, isSend, -1);
public Record(long sentEpochMicros, String clientId, boolean isSend) {
this(sentEpochMicros, null, clientId, isSend);
}

public Record() {
this.isStopSignal = true;
}

public long getTimestamp() {
return timestamp;
public long getSentEpochMicros() {
return sentEpochMicros;
}

public Instant getReceivedTime() {
return receivedTime;
}

public String getClientId() {
Expand All @@ -51,11 +59,18 @@ public boolean isSend() {
return isSend;
}

public int getLatency() {
return latency;
}

public boolean isStopSignal() {
return isStopSignal;
}

public long getLatency() {
long latency = Util.getEpochMicros(receivedTime) - sentEpochMicros;
if(latency < 0) {
// If running MQTTLoader on multiple machines, a slight time error may cause a negative value of latency.
Loader.LOGGER.fine("Negative value of latency is converted to zero.");
return 0;
} else {
return latency;
}
}
}
Loading

0 comments on commit e3b2ed2

Please sign in to comment.