接上一篇,通过命令行或执行kafka.tools.ConsumerOffsetChecker的main方法,都只能把结果显示在标准输出流中,如果我想实时展示这些数据咋办呢? 这时就就需要把这些信息读出来。代码如下:
package com.wxj.kafka.monitor.jmx; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; /** * 在java程序中,调用kafka-run-class.sh的中kafka.tools.ConsumerOffsetChecker类,查看各个topic及分区的消费情况 * 如果要调用scala.kafka.tools类,只需要简单修改一个main里面的cmd * @author root * */ public class SHRunClass { public static void main(String[] args) { if(args == null || args.length == 0) { System.out.println("zkconnect,group are required"); System.exit(1); } String cmd = "/home/hadoop/cdh44/kafka_2.10-0.8.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker"; System.out.println("输入参数:"); for(int i = 0 ; i < args.length ; i ++) { System.out.println(args[i]); cmd += " " + args[i]; } //cmd = "java -version"; Map<String, String> resultMap = execCmd(cmd); //printResult(resultMap); if("0".equals(resultMap.get("returnCode"))) { System.out.println("执行成功"); //System.out.println(resultMap.get("inStream")); //linux下用\n String[] lines = resultMap.get("inStream").split("@@"); String[] headers = lines[0].split("\t"); System.out.println("header:"); for(String header : headers) { System.out.println(header); } System.out.println("value:"); for(int i = 1 ; i < lines.length ; i ++) { String[] values = lines[i].split("\t"); for(String value : values) { System.out.print(value + "\t"); } System.out.println(); } } else { System.out.println("执行失败:"); System.out.println(resultMap.get("errStream").replaceAll("@@", "\n")); } } private static void printResult(Map<String, String> resultMap) { System.out.println("result:\n\n"); for(Map.Entry<String, String> entry : resultMap.entrySet()) { System.out.println(entry.getKey() + "," + entry.getValue()); } } /** * 执行命令 * @param cmd * @return */ private static Map<String, String> execCmd(String cmd) { Map<String, String> resultMap = new HashMap<String, String>(); try { Runtime rt = Runtime.getRuntime(); Process proc = rt.exec(cmd); ReadInputStreamThread inputStreamThread = new ReadInputStreamThread(proc); inputStreamThread.start(); inputStreamThread.join(); ReadErrInputStreamThread errInputStreamThread = new ReadErrInputStreamThread(proc); errInputStreamThread.start(); errInputStreamThread.join(); resultMap.put("returnCode", proc.waitFor() + ""); resultMap.put("inStream", inputStreamThread.getResult().toString()); resultMap.put("errStream", errInputStreamThread.getResult().toString()); } catch (Throwable t) { t.printStackTrace(); } return resultMap; } private static class ReadInputStreamThread extends Thread { private Process proc; private StringBuffer result = new StringBuffer(); public StringBuffer getResult() { return result; } private BufferedReader reader = null; public ReadInputStreamThread(Process proc) throws Exception { this.proc = proc; reader = new BufferedReader(new InputStreamReader(proc.getInputStream(), "utf-8")); } public void run() { String line = null; try { while((line = reader.readLine()) != null) { result.append(line); result.append("@@"); } } catch (Exception e) { e.printStackTrace(); } finally { if(reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } } } private static class ReadErrInputStreamThread extends Thread { private Process proc; private StringBuffer result = new StringBuffer(); public StringBuffer getResult() { return result; } private BufferedReader reader = null; public ReadErrInputStreamThread(Process proc) throws Exception { this.proc = proc; reader = new BufferedReader(new InputStreamReader(proc.getErrorStream(), "utf-8")); } public void run() { String line = null; try { while((line = reader.readLine()) != null) { result.append(line); result.append("@@"); } } catch (Exception e) { e.printStackTrace(); } finally { if(reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
由于只是测试,所以在main方法中,把路径硬编码了。
在linux下的执行结果如下:
相关推荐
已编译 Kafka-Manager-1.3.3.22 linux下直接解压解压kafka-manager-1.3.3.22.zip到/opt/module目录 [root@hadoop102 module]$ unzip kafka-manager-1.3.3.22.zip 4)进入到/opt/module/kafka-manager-1.3.3.22/...
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...
KAFKA-3.1.1-1.3.1.1.p0.2-el7
说明:kafka-manager 自己下载编译速度巨慢,此资源是编译好的 kafka-manager,版本是:kafka-manager-1.3.3.7(适用于较新的版本,kafka版本是kafka_2.11-2.0.1)。 安装配置说明: 1. 里头有个自己写的启动脚本,...
KAFKA-3.0.0-1.3.0.0.p0.40-el7.parcel KAFKA-3.0.0-1.3.0.0.p0.40-el7.parcel.sha1 manifest.json
kafka-eagle-2.0.8.tar.gz
kafka-manager.jar 配置application.conf中的zk地址后可直接启动 bin/kafka-manager -Dconfig.file=/kafka-manager-2.0.0.2/conf/application.conf -Dhttp.port=8888
kafka-eagle-bin-2.1.0.tar.gz 2022年7月份下载,最新版
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 包含翻译后的API文档:kafka-clients-0.10.0.1-javadoc-API...
kafka-clients-0.10.2.1.jar
2018-1-15最新版kafka-manager-1.3.3.16.zip,已编译,解压可以直接使用
kafka-manager-1.3.2.1.zip 预编译包 (Linux CentOS 6.8 + JDK 8 + SBT 0.13)
赠送jar包:kafka-clients-0.9.0.0.jar; 赠送原API文档:kafka-clients-0.9.0.0-javadoc.jar; 赠送源代码:kafka-clients-0.9.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.9.0.0.pom; 包含翻译后...
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
kafka-manager--1.3.2.1.zip 预编译包zip 预编译包(Linux+jdk1.8)
已经编译过的kafka-manager-1.3.3.23,由于编译时间漫长以及网速问题,将编译好的1.3.3.23发出来,最高可以兼容kafka2.1.1,解压即用!
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
资源来自pypi官网。 资源全名:confluent-kafka-amine-1.4.2.1.tar.gz
kafka-manager-2.0.0.2.zip 已经编译,修改配置文件后,./bin/kafka-manager &运行
赠送jar包:kafka-clients-0.10.1.1.jar; 赠送原API文档:kafka-clients-0.10.1.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.1.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.1.1.pom; 包含...