博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
大众点评Cat源码阅读(六)——MessageTree编码、解码字节流
阅读量:7106 次
发布时间:2019-06-28

本文共 8927 字,大约阅读时间需要 29 分钟。

hot3.png

一、MessageTree的数据结构

结构如下图: 输入图片说明

二、客户端将MessageTree编码成字节流

三、服务端将字节流反编码成MessageTree

2.1 功能介绍

服务端接收到ByteBuf之后,交给PlainTextMessage解码,解码分两步走:

  1. 先解析MessageTree的header,包含domain,hostname、ipAddress、threadGroupName、threadId、threadName、messageId、parentMessageId、rootMessageId、sessionToken。这部分内容示例效果如下:

输入图片说明

  1. 解码MessageTree中的Message,这部分是一个嵌套的Transaction。

2.2代码分析

先看MesageTree的header部分功能解析。先看一个工具类BufferHelper,read方法代码如下:

/**		 * 从ctx的buffer中读取字符串  到 separator 位置位置		 * 如 read("abcde",'c')="ab"		 * @param ctx		 * @param separator		 * @return		 */		public String read(Context ctx, byte separator) {			ByteBuf buf = ctx.getBuffer();			char[] data = ctx.getData();			int from = buf.readerIndex();			int to = buf.writerIndex();			int index = 0;			boolean flag = false;			for (int i = from; i < to; i++) {				byte b = buf.readByte();				if (b == separator) {					break;				}				//将全部的data				if (index >= data.length) {					char[] data2 = new char[to - from];					System.arraycopy(data, 0, data2, 0, index);					data = data2;				}				char c = (char) (b & 0xFF);				if (c > 127) {					flag = true;				}				if (c == '\\' && i + 1 < to) {					byte b2 = buf.readByte();					if (b2 == 't') {						c = '\t';						i++;					} else if (b2 == 'r') {						c = '\r';						i++;					} else if (b2 == 'n') {						c = '\n';						i++;					} else if (b2 == '\\') {						c = '\\';						i++;					} else {						// move back						buf.readerIndex(i + 1);					}				}				data[index] = c;				index++;			}			if (!flag) {				return new String(data, 0, index);			} else {				byte[] ba = new byte[index];				for (int i = 0; i < index; i++) {					ba[i] = (byte) (data[i] & 0xFF);				}				try {					return new String(ba, 0, index, "utf-8");				} catch (UnsupportedEncodingException e) {					return new String(ba, 0, index);				}			}		}

解码header部分就是调用上面的方法,逐步将context中的bytebuf里面的header部分解析出来,并将readindex移动,后面直接解码Message。

protected void decodeHeader(Context ctx, MessageTree tree) {		BufferHelper helper = m_bufferHelper;		String id = helper.read(ctx, TAB);		String domain = helper.read(ctx, TAB);		String hostName = helper.read(ctx, TAB);		String ipAddress = helper.read(ctx, TAB);		String threadGroupName = helper.read(ctx, TAB);		String threadId = helper.read(ctx, TAB);		String threadName = helper.read(ctx, TAB);		String messageId = helper.read(ctx, TAB);		String parentMessageId = helper.read(ctx, TAB);		String rootMessageId = helper.read(ctx, TAB);		String sessionToken = helper.read(ctx, LF);		if (VERSION.equals(id)) {			tree.setDomain(domain);			tree.setHostName(hostName);			tree.setIpAddress(ipAddress);			tree.setThreadGroupName(threadGroupName);			tree.setThreadId(threadId);			tree.setThreadName(threadName);			tree.setMessageId(messageId);			tree.setParentMessageId(parentMessageId);			tree.setRootMessageId(rootMessageId);			tree.setSessionToken(sessionToken);		} else {			throw new RuntimeException(String.format("Unrecognized id(%s) for plain text message codec!", id));		}	}

2.2 解码Message

还是以上面的MessageTree为例,如下: 输入图片说明

解码步骤如下:

  1. 新建stack,解析上图第一行,将数据塞进新建的transaction1中,将null入stack,将transaction1返回,stack内的数据如下:

  2. 将上一部返回的transaction1作为parent和stack传进decodeLine方法,解析第2行,新建一个event,将event作为child挂到parent(transaction1)下,返回parent(transaction1)

  3. 将上一步返回的parent(transaction1)做为parent,连带stack,一起传进decodeLine方法,解析第3行,将event作为child挂到parent(transaction1)下,返回parent(transaction1)

  4. 类似的解析第4、5行

  5. 将上一步返回的parent(transaction1)做为parent,连带stack,一起传进decodeLine方法,解析第6行,新建一个transaction2,将transaction2挂到parent(transaction1)下面的child中,将parent(transaction1)入栈,此时stack内有null,transaction1两个元素。将transaction2返回出去,作为下一步的parent,栈内数据如下:

  6. 类似上一步,将新建的transaction3挂到上一步产生的parent(transaction2)下,将parent(transaction2)入栈,此时stack内有null、transaction1、transaction2三个元素。接下来把transaction3返回出去,作为下一步的parent。栈内元素如下:

  7. 第8行类似第2行

  8. 第9行,将解析的数据注入到parent(transaction3)中,这时候transaction3处理完结了。从栈顶拿到一个最近的元素(transaction2),作为下一步处理的parent。栈内元素如下:

  9. 第10行,发现是t开头的,又新建一个transaction4,将parent(transaction2)入栈,将transaction4作为下一步的parent。栈内元素如下:

  10. 第11行,类似第2行处理,挂到parent(transaction4)下面,将parent(transaction4)作为下一步的parent继续处理。

  11. 第12行,将数据注入到parent(transaction4)中,完结这个parent(transaction4),将栈顶的transaction2作为下一步的parent,继续处理。

  12. 第13行,类似上一步,完结transaction2,将栈顶transaction1作为下一步的parent,继续处理。

  13. 第14行,类似上一步,完结transaction1,将栈顶的null作为下一步的parent,继续处理。此时栈内元素为空。 发现没有数据可以处理了,此时,整个MessageTree解码完成。处理每一行数据时,对应的栈内数据如下:

输入图片说明

解析MessageTree代码如下:

//将原消息解析成一棵消息树	protected void decodeMessage(Context ctx, MessageTree tree) {		Stack
stack = new Stack
(); Message parent = decodeLine(ctx, null, stack); tree.setMessage(parent); //逐行读取,最终构建消息树 while (ctx.getBuffer().readableBytes() > 0) { /*************** 将bytebuf转换成defaultTransaction *********************/ Message message = decodeLine(ctx, (DefaultTransaction) parent, stack); if (message instanceof DefaultTransaction) { parent = message; } else { break; } } }

逐行解析的decodeLine 代码如下:

protected Message decodeLine(Context ctx, DefaultTransaction parent, Stack
stack) { BufferHelper helper = m_bufferHelper; byte identifier = ctx.getBuffer().readByte(); String timestamp = helper.read(ctx, TAB); String type = helper.read(ctx, TAB); String name = helper.read(ctx, TAB); switch (identifier) { case 't': /** * 这种类型,才会新建一个transaction, * 如果这是第一个,则将这个作为最顶级的parent,挂到MessageTree的message下,将parent(null)入栈 * 如果不是第一个,则将当前创建的transaction挂到parent(也是一个transaction)的下面,作为一个child,同时将parent入栈,等待当前 * 做完这些事,将当前的transaction作为下一次循环的parent,直到遇到'T',将当前这个transaction数据补充完整,再从栈清楚这个transaction,表明当前这个transaction完结 */ DefaultTransaction transaction = new DefaultTransaction(type, name, null); helper.read(ctx, LF); // get rid of line feed transaction.setTimestamp(m_dateHelper.parse(timestamp)); if (parent != null) { parent.addChild(transaction); } stack.push(parent); return transaction; case 'A': DefaultTransaction tran = new DefaultTransaction(type, name, null); String status = helper.read(ctx, TAB); String duration = helper.read(ctx, TAB); String data = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed tran.setTimestamp(m_dateHelper.parse(timestamp)); tran.setStatus(status); tran.addData(data); long d = Long.parseLong(duration.substring(0, duration.length() - 2)); tran.setDurationInMicros(d); if (parent != null) { parent.addChild(tran); return parent;//返回的是传进来的parent,这里parent是一个transaction,transaction把A类型的数据作为child收集了 } else { return tran; } case 'T': String transactionStatus = helper.read(ctx, TAB); String transactionDuration = helper.read(ctx, TAB); String transactionData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed parent.setStatus(transactionStatus); parent.addData(transactionData); long transactionD = Long.parseLong(transactionDuration.substring(0, transactionDuration.length() - 2)); parent.setDurationInMicros(transactionD); return stack.pop();//这里不新建transaction,只是将数据插入到parent里面,将stock顶的transaction返回, case 'E': DefaultEvent event = new DefaultEvent(type, name); String eventStatus = helper.read(ctx, TAB); String eventData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed event.setTimestamp(m_dateHelper.parse(timestamp)); event.setStatus(eventStatus); event.addData(eventData); //sql类型特殊处理 processSQLEvent(event); if (parent != null) { parent.addChild(event); return parent; } else { return event; } case 'M': DefaultMetric metric = new DefaultMetric(type, name); String metricStatus = helper.read(ctx, TAB); String metricData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed metric.setTimestamp(m_dateHelper.parse(timestamp)); metric.setStatus(metricStatus); metric.addData(metricData); if (parent != null) { parent.addChild(metric); return parent; } else { return metric; } case 'L': DefaultTrace trace = new DefaultTrace(type, name); String traceStatus = helper.read(ctx, TAB); String traceData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed trace.setTimestamp(m_dateHelper.parse(timestamp)); trace.setStatus(traceStatus); trace.addData(traceData); if (parent != null) { parent.addChild(trace); return parent; } else { return trace; } case 'H': DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name); String heartbeatStatus = helper.read(ctx, TAB); String heartbeatData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed heartbeat.setTimestamp(m_dateHelper.parse(timestamp)); heartbeat.setStatus(heartbeatStatus); heartbeat.addData(heartbeatData); if (parent != null) { parent.addChild(heartbeat); return parent; } else { return heartbeat; } default: m_logger.warn("Unknown identifier(" + (char) identifier + ") of message: " + ctx.getBuffer().toString(Charset.forName("utf-8"))); throw new RuntimeException("Unknown identifier int name"); } }

转载于:https://my.oschina.net/liangxiao/blog/1586517

你可能感兴趣的文章
bzoj 1012: [JSOI2008]最大数maxnumber
查看>>
关于jquery跨域调用REST服务的问题
查看>>
Android图标尺寸的约定
查看>>
从HTTP 2.0想到的关于传输层协议的一些事
查看>>
业余码农南瓜的第一篇博客
查看>>
XenDesktop 屏幕保护程序无法生效的解决办法
查看>>
英国西约克郡上空现怪云酷似UFO
查看>>
postgresql
查看>>
ContentProvider-----跨应用程序访问数据
查看>>
OpenStack 存储剖析
查看>>
关于大型网站技术演进的思考(二)--存储的瓶颈(2)
查看>>
我的第一个游戏FoodieThebug完成之后的心得体会
查看>>
LVS+keepalived实现负载
查看>>
析构函数和构造函数的作用和区别
查看>>
Python正则表达式
查看>>
Haproxy 1.4.*安装配置
查看>>
FreeBSD8.2系统安装Salt
查看>>
HP 380G7安装 WIN2008 R2 SP1提示找不到系统分区
查看>>
1.2方程求根之不定点迭代法
查看>>
C#.NET通用权限管理系统组件中让系统按代码生成器的方式运行的方法
查看>>