一、MessageTree的数据结构
结构如下图:
二、客户端将MessageTree编码成字节流
三、服务端将字节流反编码成MessageTree
2.1 功能介绍
服务端接收到ByteBuf之后,交给PlainTextMessage解码,解码分两步走:
- 先解析MessageTree的header,包含domain,hostname、ipAddress、threadGroupName、threadId、threadName、messageId、parentMessageId、rootMessageId、sessionToken。这部分内容示例效果如下:
- 解码MessageTree中的Message,这部分是一个嵌套的Transaction。
2.2代码分析
先看MesageTree的header部分功能解析。先看一个工具类BufferHelper,read方法代码如下:
/** * 从ctx的buffer中读取字符串 到 separator 位置位置 * 如 read("abcde",'c')="ab" * @param ctx * @param separator * @return */ public String read(Context ctx, byte separator) { ByteBuf buf = ctx.getBuffer(); char[] data = ctx.getData(); int from = buf.readerIndex(); int to = buf.writerIndex(); int index = 0; boolean flag = false; for (int i = from; i < to; i++) { byte b = buf.readByte(); if (b == separator) { break; } //将全部的data if (index >= data.length) { char[] data2 = new char[to - from]; System.arraycopy(data, 0, data2, 0, index); data = data2; } char c = (char) (b & 0xFF); if (c > 127) { flag = true; } if (c == '\\' && i + 1 < to) { byte b2 = buf.readByte(); if (b2 == 't') { c = '\t'; i++; } else if (b2 == 'r') { c = '\r'; i++; } else if (b2 == 'n') { c = '\n'; i++; } else if (b2 == '\\') { c = '\\'; i++; } else { // move back buf.readerIndex(i + 1); } } data[index] = c; index++; } if (!flag) { return new String(data, 0, index); } else { byte[] ba = new byte[index]; for (int i = 0; i < index; i++) { ba[i] = (byte) (data[i] & 0xFF); } try { return new String(ba, 0, index, "utf-8"); } catch (UnsupportedEncodingException e) { return new String(ba, 0, index); } } }
解码header部分就是调用上面的方法,逐步将context中的bytebuf里面的header部分解析出来,并将readindex移动,后面直接解码Message。
protected void decodeHeader(Context ctx, MessageTree tree) { BufferHelper helper = m_bufferHelper; String id = helper.read(ctx, TAB); String domain = helper.read(ctx, TAB); String hostName = helper.read(ctx, TAB); String ipAddress = helper.read(ctx, TAB); String threadGroupName = helper.read(ctx, TAB); String threadId = helper.read(ctx, TAB); String threadName = helper.read(ctx, TAB); String messageId = helper.read(ctx, TAB); String parentMessageId = helper.read(ctx, TAB); String rootMessageId = helper.read(ctx, TAB); String sessionToken = helper.read(ctx, LF); if (VERSION.equals(id)) { tree.setDomain(domain); tree.setHostName(hostName); tree.setIpAddress(ipAddress); tree.setThreadGroupName(threadGroupName); tree.setThreadId(threadId); tree.setThreadName(threadName); tree.setMessageId(messageId); tree.setParentMessageId(parentMessageId); tree.setRootMessageId(rootMessageId); tree.setSessionToken(sessionToken); } else { throw new RuntimeException(String.format("Unrecognized id(%s) for plain text message codec!", id)); } }
2.2 解码Message
还是以上面的MessageTree为例,如下:
解码步骤如下:
-
新建stack,解析上图第一行,将数据塞进新建的transaction1中,将null入stack,将transaction1返回,stack内的数据如下:
-
将上一部返回的transaction1作为parent和stack传进decodeLine方法,解析第2行,新建一个event,将event作为child挂到parent(transaction1)下,返回parent(transaction1)
-
将上一步返回的parent(transaction1)做为parent,连带stack,一起传进decodeLine方法,解析第3行,将event作为child挂到parent(transaction1)下,返回parent(transaction1)
-
类似的解析第4、5行
-
将上一步返回的parent(transaction1)做为parent,连带stack,一起传进decodeLine方法,解析第6行,新建一个transaction2,将transaction2挂到parent(transaction1)下面的child中,将parent(transaction1)入栈,此时stack内有null,transaction1两个元素。将transaction2返回出去,作为下一步的parent,栈内数据如下:
-
类似上一步,将新建的transaction3挂到上一步产生的parent(transaction2)下,将parent(transaction2)入栈,此时stack内有null、transaction1、transaction2三个元素。接下来把transaction3返回出去,作为下一步的parent。栈内元素如下:
-
第8行类似第2行
-
第9行,将解析的数据注入到parent(transaction3)中,这时候transaction3处理完结了。从栈顶拿到一个最近的元素(transaction2),作为下一步处理的parent。栈内元素如下:
-
第10行,发现是t开头的,又新建一个transaction4,将parent(transaction2)入栈,将transaction4作为下一步的parent。栈内元素如下:
-
第11行,类似第2行处理,挂到parent(transaction4)下面,将parent(transaction4)作为下一步的parent继续处理。
-
第12行,将数据注入到parent(transaction4)中,完结这个parent(transaction4),将栈顶的transaction2作为下一步的parent,继续处理。
-
第13行,类似上一步,完结transaction2,将栈顶transaction1作为下一步的parent,继续处理。
-
第14行,类似上一步,完结transaction1,将栈顶的null作为下一步的parent,继续处理。此时栈内元素为空。 发现没有数据可以处理了,此时,整个MessageTree解码完成。处理每一行数据时,对应的栈内数据如下:
解析MessageTree代码如下:
//将原消息解析成一棵消息树 protected void decodeMessage(Context ctx, MessageTree tree) { Stackstack = new Stack (); Message parent = decodeLine(ctx, null, stack); tree.setMessage(parent); //逐行读取,最终构建消息树 while (ctx.getBuffer().readableBytes() > 0) { /*************** 将bytebuf转换成defaultTransaction *********************/ Message message = decodeLine(ctx, (DefaultTransaction) parent, stack); if (message instanceof DefaultTransaction) { parent = message; } else { break; } } }
逐行解析的decodeLine 代码如下:
protected Message decodeLine(Context ctx, DefaultTransaction parent, Stackstack) { BufferHelper helper = m_bufferHelper; byte identifier = ctx.getBuffer().readByte(); String timestamp = helper.read(ctx, TAB); String type = helper.read(ctx, TAB); String name = helper.read(ctx, TAB); switch (identifier) { case 't': /** * 这种类型,才会新建一个transaction, * 如果这是第一个,则将这个作为最顶级的parent,挂到MessageTree的message下,将parent(null)入栈 * 如果不是第一个,则将当前创建的transaction挂到parent(也是一个transaction)的下面,作为一个child,同时将parent入栈,等待当前 * 做完这些事,将当前的transaction作为下一次循环的parent,直到遇到'T',将当前这个transaction数据补充完整,再从栈清楚这个transaction,表明当前这个transaction完结 */ DefaultTransaction transaction = new DefaultTransaction(type, name, null); helper.read(ctx, LF); // get rid of line feed transaction.setTimestamp(m_dateHelper.parse(timestamp)); if (parent != null) { parent.addChild(transaction); } stack.push(parent); return transaction; case 'A': DefaultTransaction tran = new DefaultTransaction(type, name, null); String status = helper.read(ctx, TAB); String duration = helper.read(ctx, TAB); String data = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed tran.setTimestamp(m_dateHelper.parse(timestamp)); tran.setStatus(status); tran.addData(data); long d = Long.parseLong(duration.substring(0, duration.length() - 2)); tran.setDurationInMicros(d); if (parent != null) { parent.addChild(tran); return parent;//返回的是传进来的parent,这里parent是一个transaction,transaction把A类型的数据作为child收集了 } else { return tran; } case 'T': String transactionStatus = helper.read(ctx, TAB); String transactionDuration = helper.read(ctx, TAB); String transactionData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed parent.setStatus(transactionStatus); parent.addData(transactionData); long transactionD = Long.parseLong(transactionDuration.substring(0, transactionDuration.length() - 2)); parent.setDurationInMicros(transactionD); return stack.pop();//这里不新建transaction,只是将数据插入到parent里面,将stock顶的transaction返回, case 'E': DefaultEvent event = new DefaultEvent(type, name); String eventStatus = helper.read(ctx, TAB); String eventData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed event.setTimestamp(m_dateHelper.parse(timestamp)); event.setStatus(eventStatus); event.addData(eventData); //sql类型特殊处理 processSQLEvent(event); if (parent != null) { parent.addChild(event); return parent; } else { return event; } case 'M': DefaultMetric metric = new DefaultMetric(type, name); String metricStatus = helper.read(ctx, TAB); String metricData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed metric.setTimestamp(m_dateHelper.parse(timestamp)); metric.setStatus(metricStatus); metric.addData(metricData); if (parent != null) { parent.addChild(metric); return parent; } else { return metric; } case 'L': DefaultTrace trace = new DefaultTrace(type, name); String traceStatus = helper.read(ctx, TAB); String traceData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed trace.setTimestamp(m_dateHelper.parse(timestamp)); trace.setStatus(traceStatus); trace.addData(traceData); if (parent != null) { parent.addChild(trace); return parent; } else { return trace; } case 'H': DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name); String heartbeatStatus = helper.read(ctx, TAB); String heartbeatData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed heartbeat.setTimestamp(m_dateHelper.parse(timestamp)); heartbeat.setStatus(heartbeatStatus); heartbeat.addData(heartbeatData); if (parent != null) { parent.addChild(heartbeat); return parent; } else { return heartbeat; } default: m_logger.warn("Unknown identifier(" + (char) identifier + ") of message: " + ctx.getBuffer().toString(Charset.forName("utf-8"))); throw new RuntimeException("Unknown identifier int name"); } }