`

读取kafka-run-class.sh 执行的结果

阅读更多

接上一篇,通过命令行或执行kafka.tools.ConsumerOffsetChecker的main方法,都只能把结果显示在标准输出流中,如果我想实时展示这些数据咋办呢? 这时就就需要把这些信息读出来。代码如下:

 

package com.wxj.kafka.monitor.jmx;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;

/**
 * 在java程序中,调用kafka-run-class.sh的中kafka.tools.ConsumerOffsetChecker类,查看各个topic及分区的消费情况
 * 如果要调用scala.kafka.tools类,只需要简单修改一个main里面的cmd
 * @author root
 *
 */
public class SHRunClass
{

	public static void main(String[] args)
	{
		if(args == null || args.length == 0)
		{
			System.out.println("zkconnect,group are required");
			System.exit(1);
		}
		String cmd = "/home/hadoop/cdh44/kafka_2.10-0.8.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker";
		System.out.println("输入参数:");
		for(int i = 0 ; i < args.length ; i ++)
		{
			System.out.println(args[i]);
			cmd += " " + args[i];
		}

		//cmd = "java -version";
		
		Map<String, String> resultMap = execCmd(cmd);
		//printResult(resultMap);
		
		if("0".equals(resultMap.get("returnCode")))
		{
			System.out.println("执行成功");
			//System.out.println(resultMap.get("inStream"));
			//linux下用\n
			String[] lines = resultMap.get("inStream").split("@@");
			String[] headers = lines[0].split("\t");
			System.out.println("header:");
			for(String header : headers)
			{
				System.out.println(header);
			}
			
			System.out.println("value:");
			for(int i = 1 ; i < lines.length ; i ++)
			{
				String[] values = lines[i].split("\t");
				for(String value : values)
				{
					System.out.print(value + "\t");
				}
				System.out.println();
			}
		}
		else
		{
			System.out.println("执行失败:");
			System.out.println(resultMap.get("errStream").replaceAll("@@", "\n"));
		}
		
	}

	private static void printResult(Map<String, String> resultMap)
	{
		System.out.println("result:\n\n");
		
		for(Map.Entry<String, String> entry : resultMap.entrySet())
		{
			System.out.println(entry.getKey() + "," + entry.getValue());
		}
	}

	/**
	 * 执行命令
	 * @param cmd
	 * @return
	 */
	private static Map<String, String> execCmd(String cmd)
	{
		Map<String, String> resultMap = new HashMap<String, String>();
		try
		{
			Runtime rt = Runtime.getRuntime();
			Process proc = rt.exec(cmd);
		
			ReadInputStreamThread inputStreamThread = new ReadInputStreamThread(proc);
			inputStreamThread.start();
			inputStreamThread.join();
			
			ReadErrInputStreamThread errInputStreamThread = new ReadErrInputStreamThread(proc);
			errInputStreamThread.start();
			errInputStreamThread.join();
			
			resultMap.put("returnCode", proc.waitFor() + "");
			resultMap.put("inStream", inputStreamThread.getResult().toString());
			resultMap.put("errStream", errInputStreamThread.getResult().toString());
			
		} catch (Throwable t)
		{
			t.printStackTrace();
		}
		return resultMap;
	}
	
	private static class ReadInputStreamThread extends Thread
	{
		private Process proc;
		private StringBuffer result = new StringBuffer();
		
		public StringBuffer getResult()
		{
			return result;
		}

		private BufferedReader reader = null;
		
		public ReadInputStreamThread(Process proc) throws Exception
		{
			this.proc = proc;
			reader = new BufferedReader(new InputStreamReader(proc.getInputStream(), "utf-8"));
		}
		
		public void run()
		{
			String line = null;
			try
			{
				while((line = reader.readLine()) != null)
				{
					result.append(line);
					result.append("@@");
				}
			} catch (Exception e)
			{
				e.printStackTrace();
			}
			finally
			{
				if(reader != null)
				{
					try
					{
						reader.close();
					} catch (IOException e)
					{
						e.printStackTrace();
					}
				}
			}
		}
	}
	
	private static class ReadErrInputStreamThread extends Thread
	{
		private Process proc;
		private StringBuffer result = new StringBuffer();
		
		public StringBuffer getResult()
		{
			return result;
		}

		private BufferedReader reader = null;
		
		public ReadErrInputStreamThread(Process proc) throws Exception
		{
			this.proc = proc;
			reader = new BufferedReader(new InputStreamReader(proc.getErrorStream(), "utf-8"));
		}
		
		public void run()
		{
			String line = null;
			try
			{
				while((line = reader.readLine()) != null)
				{
					result.append(line);
					result.append("@@");
				}
			} catch (Exception e)
			{
				e.printStackTrace();
			}
			finally
			{
				if(reader != null)
				{
					try
					{
						reader.close();
					} catch (IOException e)
					{
						e.printStackTrace();
					}
				}
			}
		}
	}
	

}

 

由于只是测试,所以在main方法中,把路径硬编码了。

在linux下的执行结果如下:



 

 

 

 

 

  • 大小: 191 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics