[go: nahoru, domu]

Skip to content

Commit

Permalink
test query message
Browse files Browse the repository at this point in the history
  • Loading branch information
StyleTang committed Feb 24, 2017
1 parent ccb4afc commit 7bd2844
Show file tree
Hide file tree
Showing 6 changed files with 864 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,21 @@
*/
package org.apache.rocketmq.console.service.client;

import com.google.common.base.Throwables;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
Expand All @@ -42,6 +50,8 @@
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.console.util.JsonUtil;
import org.apache.rocketmq.console.util.Reflect;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
Expand All @@ -51,13 +61,6 @@
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.console.util.JsonUtil;
import com.google.common.base.Throwables;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -389,10 +392,29 @@ public List<QueueTimeSpan> queryConsumeTimeSpan(String topic,
return MQAdminInstance.threadLocalMQAdminExt().queryConsumeTimeSpan(topic, group);
}

@Override //todo
@Override //todo MessageClientIDSetter.getNearlyTimeFromID has bug,so we subtract half a day
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return MQAdminInstance.threadLocalMQAdminExt().viewMessage(topic, msgId);
logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={} msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId),msgId);
try {
return viewMessage(msgId);
}
catch (Exception e) {
// logger.warn("the msgId maybe created by new client. msgId={}", msgId, e);
}
MQAdminImpl mqAdminImpl = MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
// ReflectUtil.on(mqAdminImpl)
// ethod retrieveItems = MQAdminImpl.getDeclaredMethod("retrieveItems");
QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", topic, msgId, 32,
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true).get();
// QueryResult qr = mqAdminImpl.queryMessage(topic, msgId, 32,
// MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime() - 1000 * 60 * 60 * 13L, Long.MAX_VALUE, true); // protected
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0);
}
else {
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ public static MQAdminExt threadLocalMQAdminExt() {
}

public static RemotingClient threadLocalRemotingClient() {
DefaultMQAdminExtImpl defaultMQAdminExtImpl = ReflectUtil.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
MQClientInstance mqClientInstance = ReflectUtil.on(defaultMQAdminExtImpl).get("mqClientInstance");
MQClientInstance mqClientInstance = threadLocalMqClientInstance();
MQClientAPIImpl mQClientAPIImpl = ReflectUtil.on(mqClientInstance).get("mQClientAPIImpl");
return ReflectUtil.on(mQClientAPIImpl).get("remotingClient");
}

public static MQClientInstance threadLocalMqClientInstance() {
DefaultMQAdminExtImpl defaultMQAdminExtImpl = ReflectUtil.on(MQAdminInstance.threadLocalMQAdminExt()).get("defaultMQAdminExtImpl");
return ReflectUtil.on(defaultMQAdminExtImpl).get("mqClientInstance");
}

public static void initMQAdminInstance() throws MQClientException {
Integer nowCount = INIT_COUNTER.get();
if (nowCount == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class DashboardCollectTask {
@Resource
private DashboardCollectService dashboardCollectService;

@Scheduled(cron = "30 0/1 * * * ?")
@Scheduled(cron = "30 * * * * ?")
public void collectTopic() {
Date date = new Date();
try {
Expand Down
Loading

0 comments on commit 7bd2844

Please sign in to comment.