Google protocol buffer 使用和原理浅析 - PbCodec介绍

#Protocol Buffer
  Google Protocol Buffer又简称Protobuf,它是一种很高效的结构化数据存储格式,一般用于结构化数据的串行化,简单说就是我们常说的数据序列化。这种序列化的协议非常轻便高效,而且是跨平台的,目前已支持多种主流语言(3.0版本支持C++, JAVA, C#, OC, GO, PYTHON等)。

  通过这种方式序列化得到的二进制流数据比传统的XML, JSON等方式的结果都占用更小的空间,并且其解析效率也更高,用于通讯协议或数据存储领域是非常好的。

  再者,其使用的方式也非常简单,我们只需要预先定义好消息(message)的数据格式,然后通过其提供的compiler即可生成对应的文件,在那些文件里定义和实现了操作这个数据结构所有字段的setter/getter方法,我们只需要使用这些方法设置该数据结构的字段,然后通过序列化方法即可得到需要的结果(二进制数据流)。

一、优缺点

优点挺多的,以下简单例举几个好鸟。

  1. 更小,更快,更简单。更小是因为它的数据存储的很紧凑,与使用XML定义的数据相比较,其空间小3-4倍。后面从它的实现原理上也可以了解到为什么它占用空间会更小。
  2. “向后”兼容性好。不必破坏已部署的、依靠老数据格式的程序就可以对数据结构进行升级。所以不必担心因为消息结构的改变而造成大规模的代码重构或迁移问题。
  3. 语义清晰,不需要自己实现类似XML解析器那样的东西。只要使用Protobuf的compiler就可以生成对应的用于序列化和反序列化的对象。

  其最大的缺点应该就是其序列化的结果缺乏自描述性,所以它不适合用来描述数据结构。与XML不同的就是,我们可以从XML的文件中直接很清晰的看出数据的层次结构等,而Procolbuf的结果都是二进制流不可读的,我们只能通过.proto文件来了解其数据结构。

二、Protocol Buffer的使用

  Protobuf支持多语言,这里作为例子讲解的话主要解释的是C++语言上的使用方式,= =。其实不管啥语言好像都类似差不多,八九不离十的了。= =。这里就简单的做一下介绍就好了,其实详细的介绍和使用都可以在官网上的指南查到【Language Guide】。

2.1 第一步

  这个例子使用的语言是C++,Protobuf的版本是2.6.1,windows平台上跑的。先直接甩出官网的链接,可以去上面下载【protocol-buffers】,因为这也是个开源项目(用C++写的),在github上也可以找到,而且上面也有3.0beta版本之类的可以去看看。

  安装完了我们首先来看一波.proto定义文件的内容。这个消息(message)中定义了两个int和一个string类型的字段。一般实际上的消息要比这个要复杂的多,不过好在protobuf也支持比较复杂的消息结构。

message SearchRequest {
  required string query = 1;
  optional int32 page_number = 2;
  optional int32 result_per_page = 3;
}

2.2 指定字段的类型

下面的表格列出了消息里字段允许的数据类型。

字段类型

2.3 指定字段的规则

对于message中的字段可以指定三种规则,而且也都很简单:

  1. required:表示该字段的值是必须要存在的,且只有一个。
  2. optional:表示该字段的值是可选的(不存在或有且只有一个)。
  3. repeated:表示该字段值可以有零或多个,且是有序的(即添加的顺序)。

2.4 其他部分

  在一个.proto文件里可以定义多个message,且message可以嵌套(这点极大增强了灵活性啊)。message中对于字段的注释和平时代码的注释类似,使用 // 的方式就好了。

2.5 使用

  当定义好了.proto文件,并且下载安装好对应版本的compile后,执行以下命令可以生成对应的.h.cc文件。
其中$SRC_DIR表示希望生成的文件所在的目录,以及对应.proto文件所在目录的位置。

protoc -I=$SRC_DIR --cpp_out=$DST_DIR $SRC_DIR/addressbook.proto

  然后将生成的头文件引入要工程项目中,直接调用里面对应的方法就好了。(当然protobuf的基础库也要引入) = =。这些最基本的就不多说了,下面主要是扯一下它的实现原理和一个进阶使用。

  如果对于安装和使用还有神马疑问,可以参考一下这些文章。(个人还是优先推荐官方文档)

  1. https://developers.google.com/protocol-buffers/docs/cpptutorial#defining-your-protocol-format
  2. http://colobu.com/2015/01/07/Protobuf-language-guide/ 【官方文档翻译版】???
  3. http://www.jianshu.com/p/b1f18240f0c7

三、原理简介

  首先通过上面的简单使用应该可以了解到,实际上protobuf就是提供一个编译器给定义的message结构自动生成对应的消息类,且每个类中包含了对指定字段的setter, getter方法,以及序列化和反序列化整个消息类的serializeparse方法,对于使用者来说只需要简单调用这些方法就可以实现消息的序列化和反序列化操作了。
  为了更深入了解其序列化和反序列化的原理的话,就要先了解其组织数据的方式。

3.1 TLV

  实际上protobuf使用一种类似((T)([L]V))的形式来组织数据的,即Tag-Length-Value(其中Length是可选的,比如储存Varint编码数据就不需要存储Length)。每一个字段都是使用TLV的方式进行序列化的,一个消息就可以看成是多个字段的TLV序列拼接成的一个二进制字节流。其实这种方式很像Key-Value的方式,所以Tag一般也可以看做是Key。由上可知,这种方式组织的数据并不需要额外的分隔符来划分数据,所以这也是其可以减低序列化结果的大小的原因之一。

  下面这图有点不太准确,不过可以凑合着理解一下。

message buffer

  Value的值很自然知道就是字段的值,那么Tag值是什么呢?在.proto文件中定义的每一个字段都需要声明其数据类型,其还表明该字段是可变长度还是固定长度,这部分一般称为wire_type。此外, 每个字段都有一个field值,这个值代表该字段是message里的第几个值,一般称为field_num

required string query = 1 
//比如说这里字段query为可变长的string类型,其field = 1,是消息中的第一个值;

在Protobuf中,数据类型是进行了划分的,其中wire_type主要是以下几种类型。

  1. Varint是一种比较特殊的编码方式,后面会再介绍。
  2. FixedXXX是固定长度的数字类型。
  3. Length-delimited是可变长数据类型,常见的就是string, bytes之类的。

下面是WireType枚举的定义:

enum WireType {
    WIRETYPE_VARINT           = 0,
    WIRETYPE_FIXED64          = 1,
    WIRETYPE_LENGTH_DELIMITED = 2,
    WIRETYPE_START_GROUP      = 3,
    WIRETYPE_END_GROUP        = 4,
    WIRETYPE_FIXED32          = 5,
};

wire_type

  了解了wire_type的含义后,就可以知道Tag是怎么解析的。就是结合移位操作和或操作就可以判断出其是哪种数据类型了。这里可能有人会疑惑field_num左移3位后会不会导致数据丢失,实际上可以假设field_numuint32类型的,其左移3位后,可以表示的数范围为(0 ~ 2^29-1)这么大的范围足够表示message里字段数了吧!(从枚举的WireType类型变量中可以知道wire_type只需要3位就可以表示了)。

key = field_num << 3 || wire_type;

3.2 Varint

  Varint是一种紧凑的表示数字的方式。它可以用一个或多个字节来表示一个数字,其中值越小的数字需要的字节数越少。Varint中每一个字节的最高位bit都是有特殊含义的,如果其值为1,则表示下一个字节也是该数字的一部分,如果其值为0,则表明该数字到这一个字节就结束了。

  通常情况下一个int32类型的数字,一般需要4个字节来表示。使用Varint方式编码的话,对于比较小的数字,比如说 -128~127 之间的数字则只需要一个字节,而如果是300(下图有解释),则需要两个字节来表示。然而其也有不好的地方,比如说对于一个大数字,其最多可能需要5个字节来表示,但从概率统计的角度来说,绝大多数情况下采用Varint编码可以减少字节数来表示数字。

Varint

  在计算机里,一个负数会被表示为一个很大的整数(- -,就是最高位一般为符号位,负数最高位为1)。如果采用Varint来编码的话负数则一定会需要5个字节了。所以Google protocol buffer 定义了sint32, sint64这些数据类型,其采用zigzag编码(见下图)。这样无论是正数还是负数,只要其绝对值比较小的时候需要的字节数就少,可以充分发挥Varint编码的优势。

这里写图片描述

3.3 序列化

  protobuf生成的类中,其继承体系涉及的主要是::google::protobuf::MessageLiteMessage这两个类,其中Message::google::protobuf::MessageLite的子类。我们自动生成的类可能继承自这两个类中的一个,这取决于在proto描述文件中的配置,如果设置option optimize_for = LITE_RUNTIM,则编译生成的类继承自::google::protobuf::MessageLite。这两个类都拥有基本的功能的代码,而Message是扩展出来的子类,增加了一些特性功能,然而实际中如果用不到这些功能,则开启这个优化可以使得我们生成的文件更小。

现在来了解一下序列化的过程。先看一段代码:

//序列化接口,传入一个输出流参数
void ReqBody::SerializeWithCachedSizes(::google::protobuf::io::CodedOutputStream* output) const {
  //这个message中有一个可选参数叫msg_set_req,其field_num = 1;
  //optional message msg_set_req = 1;
  //先判断该字段是否设置,如果设置则调用相应函数
  if (has_msg_set_req()) {
        ::google::protobuf::internal::WireFormatLite::WriteMessage(1, this->msg_set_req(),output);
    }
}

//判断该值是否已经设置 
inline bool ReqBody::has_msg_set_req() const {
    return (_has_bits_[0] & 0x00000001u) != 0;
}

  在代码中看到序列化就是判断某些字段是否已经设置了值,如果设置了值就调用相应的函数写出该字段。如果找一个包括多个字段的message看的话,其SerializeWithCachedSizes方法中应该会包含多个类似上面的if()操作。然后还有很多类似判断该字段是否已经设置的内联函数。

  大家应该还注意到了_has_bits_这个数组,这个数组标示了其中某个字段是否已经设置了值,这个数组在反序列化是也会被创建和设置,然后就像上面那函数一样用于判断某个字段是否已经设置了值。

  通过查看protobuf源代码的wire_format_lite.h头文件中的定义,会发现针对不同类型的数据类型,都有对应的writeXXX方法。

// Write fields, including tags.
static void WriteInt32   (field_number,  int32 value, output);
static void WriteInt64   (field_number,  int64 value, output); 
static void WriteUInt32  (field_number, uint32 value, output);
static void WriteUInt64  (field_number, uint64 value, output);
static void WriteSInt32  (field_number,  int32 value, output);
static void WriteSInt64  (field_number,  int64 value, output);
static void WriteFixed32 (field_number, uint32 value, output);
static void WriteFixed64 (field_number, uint64 value, output);
static void WriteSFixed32(field_number,  int32 value, output);
static void WriteSFixed64(field_number,  int64 value, output);
static void WriteFloat   (field_number,  float value, output);
static void WriteDouble  (field_number, double value, output);
static void WriteBool    (field_number,   bool value, output);
static void WriteEnum    (field_number,    int value, output);

static void WriteString(field_number, const string& value, output);
static void WriteBytes (field_number, const string& value, output);

static void WriteGroup(field_number, const MessageLite& value, output);
static void WriteMessage(field_number, const MessageLite& value, output);

  然后我们来看一下其中数值类型,字符类型和message类型的字段的具体writeXXX方法,就能更清楚的了解`TLV这种序列化方式了。= =。代码很直白简单!!

//数值类型的字段,这里是int32
void WireFormatLite::WriteInt32(int field_number, int32 value, io::CodedOutputStream* output) {
  //tag
  WriteTag(field_number, WIRETYPE_VARINT, output);
  //这里应该是int32这些可以用Varint编码的数值类型,可以省去长度这个字段?
  //value
  WriteInt32NoTag(value, output);
}

//可变字长类型,这里是string
void WireFormatLite::WriteString(int field_number, const string& value, io::CodedOutputStream* output) {
  // String is for UTF-8 text only
  //tag
  WriteTag(field_number, WIRETYPE_LENGTH_DELIMITED, output);
  //length,这里长度是采用varint编码方式,可以省不少字节
  GOOGLE_CHECK(value.size() <= kint32max);
  output->WriteVarint32(value.size());
  //value
  output->WriteString(value);
}

//嵌套的message字段
void WireFormatLite::WriteMessage(int field_number, const MessageLite& value, io::CodedOutputStream* output) {
  //tag
  WriteTag(field_number, WIRETYPE_LENGTH_DELIMITED, output);
  //length, 这里计算message的长度,然后再写出。
  const int size = value.GetCachedSize();
  output->WriteVarint32(size);
  //value,这里的value是message类型,可以看做是在递归进行序列化
  value.SerializeWithCachedSizes(output);
}

inline void WireFormatLite::WriteTag(int field_number, WireType type, io::CodedOutputStream* output) {
  output->WriteTag(MakeTag(field_number, type));
}

inline uint32 WireFormatLite::MakeTag(int field_number, WireType type) {
  return GOOGLE_PROTOBUF_WIRE_FORMAT_MAKE_TAG(field_number, type);
}

//这有个宏很关键,正好印证上面提到的计算key值的方式
#define GOOGLE_PROTOBUF_WIRE_FORMAT_MAKE_TAG(FIELD_NUMBER, TYPE)                  \
  static_cast<uint32>(                                                   \
    ((FIELD_NUMBER) << ::google::protobuf::internal::WireFormatLite::kTagTypeBits) \
      | (TYPE))

// Number of bits in a tag which identify the wire type.
static const int kTagTypeBits = 3;

3.3反序列化

  了解了上面的序列化过程之后再看反序列化过程就简单多了。从本质上来说就是从一个输入流里,依次读取tag值,然后判断它是哪种wire_type类型的数据,再调用对应的方法读取对应数据类型的值。整个处理过程在一个while循环中,直到数据处理完毕才终止。

  这里直接上代码,这个解析过程是针对特定的message产生的。如果message里有多个字段,这里的case分支也会越来越多,GetTagFieldNumber(tag)方法会解析出接下来的数据是第几个字段,然后根据.proto中定义的数据类型进行判断,如果wire_type值正确则调用对应的读取数据的方法。

bool RspBody::MergePartialFromCodedStream(::google::protobuf::io::CodedInputStream* input) {
#define DO_(EXPRESSION) if (!(EXPRESSION)) return false
    ::google::protobuf::uint32 tag;
    while ((tag = input->ReadTag()) != 0) {
        switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) {
            //optional message msg_set_req = 1;
            case 1: {
                if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
                    ::google::protobuf::internal::WireFormatLite::WIRETYPE_LENGTH_DELIMITED) {
                    DO_(::google::protobuf::internal::WireFormatLite::ReadMessageNoVirtual(input, mutable_msg_set_rsp()));
                } else {
                    goto handle_uninterpreted;
                }
                if (input->ExpectAtEnd()) return true;
                break;
            }
            default: {
            handle_uninterpreted:
                if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
                    ::google::protobuf::internal::WireFormatLite::WIRETYPE_END_GROUP) {
                    return true;
                }
                DO_(::google::protobuf::internal::WireFormatLite::SkipField(input, tag));
                break;
            }
        }
    }
    return true;
#undef DO_
}

四、高级进阶 - PbCodec

====================== 2016.07.08 ======================

  今天我又来更新了哈哈哈哈,首先先简单的介绍一下PbCodec和为啥要用这玩意吧。简单点说PbCodec就是利用TLV的理念,对protobuf进行一种特殊的封装,使得不需要再为每个.proto文件都生成对应的.h.cc文件(对于C语言版本来说),这样就可以避免代码量的膨胀。此外,PbCodec提供的序列化接口和对数据的访问接口也很nice,非常简单易用,下面介绍中可以看到。

  有了上面的protobuf的简单原理作支撑,再了解PbCodec就简单多了,因为他们还是有异曲同工之妙的。在PbCodec中最重要的三个类分别是TLVItem, CPBMessageEncoder, CPBMessageDecoder

PbCodec

4.1 TLVItem

  TLVItem是最基本的存储模型,存储着.proto文件中定义的message里的某个字段值。t对应着前面说的field_num值,vtype对应着具体数据类型。后面四个vector类型的容器,则分别存储着不同数据类型的数据(data)。【所以一个TLVItem变量只对应一个字段值,即这四个向量变量中只有一个是存储值的,其他三个都为空。这句的描述是有问题的,但我理解感觉是这样的,然而实际中我好像见过不是这样的情况。。。】

  = =。应该有小伙伴也注意到,为啥这里存值要用vector,而不是直接具体某个数值或字符串类型。细心的娃子应该会想起repeated这个修饰符,这个修饰符表明某个字段可能会有多个值,所以这里需要用vector来存储具体字段的数据值(如果该字段不是repeated,则对应vector中只有一个值)。

  1. vrptVar存储着数值类型的数据。
  2. vrptstr存储着string, bytes类型的数据。
  3. vrptmsgvrptmsgDec存储嵌套子message类型的数据。

下面是TLVItem的定义:

typedef struct TLVIterm
{
    unsigned int        t;
    int                 vtype;
    std::vector<unsigned long long> vrptVar;
    std::vector<std::string>        vrptstr;
    std::vector<CPBMessageEncoder*> vrptmsg;
    std::vector<CPBMessageDecoder*> vrptmsgDec;

    TLVIterm();
    ~TLVIterm();
} TLVIterm;

//一个map类对象,可以看做是用来存储一个message里各个字段的容器
//以t(field_num)做键(key),TLVItem做值(value)进行映射存储
//后面在`CPBMessageEncoder`, `CPBMessageDecoder`中会看到它是这两个类中的一个成员变量
typedef std::map<unsigned int,TLVIterm> CMap2TLV;

4.2 CPBMessageEncoder

  CPBMessageEncoder类可以看做是用来编码一个message对象的类。这个类提供了常用的添加各种数据类型字段的方法,比如上面见到的AddUint32AddStrAddSubMessage等等。比如说我们现在有一个.proto文件,里面分别定义了一个int, string, message类型的字段,那么只需要通过的CPBMessageEncoder类的对应方法就可以把这些字段即其值增加到CPBMessageEncoder类中,以便进行后续的序列化工作。

  可以先来看一下这个类的定义,以及常见的几种数据类型的AddXXX方法。

class CPBMessageEncoder
{
public:
    CPBMessageEncoder();   //构造函数
    ~CPBMessageEncoder();  //析构函数

    //添加各种数据类型字段的API
    void AddBool(unsigned int t,bool v);
    void AddInt32(unsigned int t,int v);
    void AddFix32(unsigned int t,int v);
    void AddUInt32(unsigned int t,unsigned int v);
    void AddInt64(unsigned int t,long long v);
    void AddFix64(unsigned int t,unsigned long long v);
    void AddUInt64(unsigned int t,unsigned long long v);
    void AddFloat(unsigned int t,float v);
    void AddDouble(unsigned int t,double v);
    void AddBuf(unsigned int t,const void* buf,unsigned int len);
    void AddStr(unsigned int t,const std::string& str);
    CPBMessageEncoder* AddSubMessage(unsigned int t);

    //执行这个序列化方法,结果存储在strEncodeOut中
    bool Encode(std::string &strEncodeOut,bool isWithUInt32TotalLen = false);

protected:
    //获取序列化后结果的字节数
    int  BytesSize();
    //这个就是之前的那个map类对象,这里存储了所有字段的原始数据
    CMap2TLV    m_mapT2Tlv;
};

  由上面那个类可以联想到protobuf,其原始的用法则是给每个message生成对应的类,里面有每个字段的setter/getter方法。而这里的PbCodec则把setter和getter分别划分到两个类中,并且这两个类是对于所有的.proto定义都是可行的!
  再接着看看里面一些Add方法的具体实现,原理也很简单。就是通过t(field_num)的键获取m_mapT2Tlv中对应的TLVItem对象,然后将对应数据添加到TLVItem中对应数据类型的那个vector中就可以了。

#define TLVTYPE_BOOL   1
#define TLVTYPE_INT    2
#define TLVTYPE_UINT   3
#define TLVTYPE_INT64  4
#define TLVTYPE_UINT64 5
#define TLVTYPE_FLOAT  6
#define TLVTYPE_DOUBLE 7
#define TLVTYPE_BUF    8
#define TLVTYPE_MSG    9
#define TLVTYPE_FIX32  10
#define TLVTYPE_FIX64  11

void CPBMessageEncoder::AddInt32(unsigned int t,int v){
    TLVIterm& it = m_mapT2Tlv[t];
    it.t = t;
    it.vtype = TLVTYPE_INT;
    it.vrptVar.push_back(v);
}

void CPBMessageEncoder::AddBuf(unsigned int t,const void* buf,unsigned int len){
    AddStr(t,std::string((char*)buf,len));
}

void CPBMessageEncoder::AddStr(unsigned int t,const std::string& str){
    TLVIterm& it = m_mapT2Tlv[t];
    it.t = t;
    it.vtype = TLVTYPE_BUF;
    it.vrptstr.push_back(str);
}

CPBMessageEncoder* CPBMessageEncoder::AddSubMessage(unsigned int t){
    TLVIterm& it = m_mapT2Tlv[t];
    it.t = t;
    it.vtype = TLVTYPE_MSG;
    CPBMessageEncoder* msg = new CPBMessageEncoder();
    it.vrptmsg.push_back(msg);
    return msg;
}

  接下来就是比较核心的encode方法了,这个方法里很好的说明了CPBMessageEncoder是如何进行序列化操作的,为什么可以避免原始protobuf那样生成多个文件的setter/getter方法。
  原理说白了也很简单,一个CPBMessageEncoder保存了一个message的所有字段值,所以要序列化这个message的话首先是遍历m_mapT2Tlv中的所有TLVItem对象,然后判断这个TLVItem中存储的数据是哪种数据类型,再调用protobuf底层提供的writeXXX方法序列化某个具体字段。由TLVItem的结构可以知道,其包含了field_num值,原始的数据值value等,所以完全有足够的信息进行序列化处理。

bool CPBMessageEncoder::Encode(std::string &strEncodeOut,bool isWithUInt32TotalLen){

    if( 0 == m_mapT2Tlv.size() ){
        return false;
    }

    int uTotalSize = BytesSize();

    if( 0 == uTotalSize ){
        return false;
    }

    void* buf = NULL;

    //先初始化buf的大小,要先计算需要序列化的数据所占的字节数
    if( isWithUInt32TotalLen ){
        strEncodeOut.resize(uTotalSize + 4);
        int length = htonl(uTotalSize + 4);
        void* head = (void*)strEncodeOut.c_str();
        memcpy(head, &length, 4);
        buf = (char*)head + 4;
    }
    else{
        strEncodeOut.resize(uTotalSize);
        buf = (void*)strEncodeOut.c_str();
    }

    //初始化输出流
    ArrayOutputStream * pOutputStream = new ArrayOutputStream(buf,uTotalSize);
    CodedOutputStream * pOutput = new CodedOutputStream(pOutputStream);

    //遍历CMap2TLV中所有的TLVIterm,进行序列化操作
    CMap2TLV::iterator it = m_mapT2Tlv.begin();
    for(; it != m_mapT2Tlv.end(); it++ )
    {
        TLVIterm& tlv = it->second;
        //根据不同字段的数据类型做不同处理
        switch (tlv.vtype)
        {
            case TLVTYPE_BOOL:
                for(int i=0; i<tlv.vrptVar.size(); i++ )
                {
                    //writeBool
                    WireFormatLite::WriteBool(tlv.t,tlv.vrptVar[i], pOutput);
                }
                break;
            case TLVTYPE_INT:
                for(int i=0; i<tlv.vrptVar.size(); i++ )
                {
                    //WriteInt32, 可回3.3看protobuf的原始实现,其就是写TLV的方式
                    WireFormatLite::WriteInt32(tlv.t,(int32_t)tlv.vrptVar[i], pOutput);
                }
                break;

            //...这里省略了若干case处理

            case TLVTYPE_BUF:
                for(int i=0; i<tlv.vrptstr.size(); i++ )
                {
                    //这里的就是TLV的方式,可以看3.3中writeStr的实现,和这个是类似的。
                    WireFormatLite::WriteTag(tlv.t, WireFormatLite::WIRETYPE_LENGTH_DELIMITED, pOutput);
                    pOutput->WriteVarint32((uint32_t)tlv.vrptstr[i].size());
                    pOutput->WriteRaw(tlv.vrptstr[i].c_str(),(int)tlv.vrptstr[i].size());
                }
                break;
            case TLVTYPE_MSG:
                for(int i=0; i<tlv.vrptmsg.size(); i++ )
                {
                    //对于每个子message字段,都调用他们的encode方法获得string类型的结构
                    //然后再用writeStr的方式序列化子message的结果
                    std::string msgout;
                    if( tlv.vrptmsg[i]->Encode(msgout) && msgout.length() ){
                        WireFormatLite::WriteTag(tlv.t, WireFormatLite::WIRETYPE_LENGTH_DELIMITED, pOutput);
                        pOutput->WriteVarint32((uint32_t)msgout.size());
                        pOutput->WriteRaw(msgout.c_str(),(int)msgout.size());
                    }
                }
                break;
            default:
                break;
        }
    }

    delete pOutput;
    delete pOutputStream;
    return true;
}

4.3 CPBMessageDecoder

  CPBMessageDecoder提供的get方法API对比CPBMessageEncoder来说要多一点,毕竟其还涉及对repeated字段的read方法。

class CPBMessageDecoder
{
public:
    CPBMessageDecoder();    //构造函数
    ~CPBMessageDecoder();   //析构函数

    //反序列方法,结果保存在buf中。
    bool               Decode(const void* buf,int len);
    //判断t(field_num)对应字段的值是否存在
    bool               Has(unsigned int t);

    //获取指定数据类型数据
    bool               GetBool(unsigned int t);
    int                GetInt32(unsigned int t);
    unsigned int       GetFix32(unsigned int t);
    unsigned int       GetUInt32(unsigned int t);
    long long          GetInt64(unsigned int t);
    unsigned long long GetFix64(unsigned int t);
    unsigned long long GetUInt64(unsigned int t);
    float              GetFloat(unsigned int t);
    double             GetDouble(unsigned int t);
    std::string&       GetStr(unsigned int t);
    CPBMessageDecoder* GetSubMessage(unsigned int t);

    //获取repeated类型的数据,每种类型数据还提供一个接口获取该类型数据个数
    int                GetRepeatSubMessageCount(unsigned int t);
    CPBMessageDecoder* GetRepeatSubMessage(unsigned int t,unsigned int index);
    int                GetRepeatUInt32Count(unsigned int t);
    unsigned int       GetRepeatUInt32(unsigned int t,unsigned int index);
    int                GetRepeatUInt64Count(unsigned int t);
    unsigned long long GetRepeatUInt64(unsigned int t,unsigned int index);
    int                GetRepeatStrCount(unsigned int t);
    std::string&       GetRepeatStr(unsigned int t,unsigned int index);

protected:
    CMap2TLV    m_mapT2Tlv;
    long long   m_hasBitSign;
    CPBMessageDecoder * m_exceptionDecoder;
    CPBMessageDecoder* GetExceptionDecoder();
};

  不过decoder的原理也相对比较简单,因为是以TLV的方式进行存储的么,所以读数据的时候也会以TLV的方式进行读取。首先是一个大循环,先读取tag值,然后通过tag获取对应的wire_type类型,然后调用GetField方法进行特定类型数据读取,并存储到m_mapT2Tlv中。

bool CPBMessageDecoder::Decode(const void* buf,int len){
    if( NULL == buf || 0 == len ) return false;

    //初始化输入流
    ArrayInputStream inputar(buf, len);
    CodedInputStream coded_input(&inputar);
    CodedInputStream* input = &coded_input;

    while(true) {
        //读取tag(field_num)值
        unsigned int tag = input->ReadTag();
        if (tag == 0) {
            return true;
        }

        //获取wire_type值,判断是否已经解析完毕
        WireFormatLite::WireType wire_type = WireFormatLite::GetTagWireType(tag);

        if (wire_type == WireFormatLite::WIRETYPE_END_GROUP) {
            // Must be the end of the message.
            return true;
        }

        //通过GetField方法读取数据
        unsigned int t = 0;
        if( !GetField(input,tag,m_mapT2Tlv,t) ) return false;

        //为了Has方法的快速查找,设置m_hasBitSign对应的位为1
        if( t < 64 ){
            long long bit = 1;
            bit = (bit << t);
            m_hasBitSign |= bit;
        }
    }
}

//GetField方法的原理也很简单,就是通过wire_type判断是那种数据类型
//然后通过对应的readXXX方法读取数据值,再设置到m_mapT2Tlv中对应的TLVItem中即可
bool GetField(CodedInputStream* input, unsigned int tag,CMap2TLV &map,unsigned int &t) {
    int number = WireFormatLite::GetTagFieldNumber(tag);

    switch (WireFormatLite::GetTagWireType(tag)) {
        case WireFormatLite::WIRETYPE_VARINT: {
            unsigned long long value;
            if (!input->ReadVarint64(&value)) return false;
            t = number;
            TLVIterm& it = map[number];
            it.t = number;
            it.vrptVar.push_back(value);
            return true;
        }
        case WireFormatLite::WIRETYPE_FIXED32: {
            unsigned int value;
            if (!input->ReadLittleEndian32(&value)) return false;
            t = number;
            TLVIterm& it = map[number];
            it.t = number;
            it.vrptVar.push_back(value);
            return true;
        }
        case WireFormatLite::WIRETYPE_LENGTH_DELIMITED: {
            unsigned int length;
            std::string  str;
            if (!input->ReadVarint32(&length)) return false;
            if( !input->ReadString(&str,length)) return false;
            t = number;
            TLVIterm& it = map[number];
            it.t   = number;
            it.vrptstr.push_back(str);
            return true;
        }
        case WireFormatLite::WIRETYPE_FIXED64: {
            unsigned long long value;
            if (!input->ReadLittleEndian64(&value)) return false;
            t = number;
            TLVIterm& it = map[number];
            it.t = number;
            it.vrptVar.push_back(value);
            return true;
        }
        default: {
            return false;
        }
    }
}

bool CPBMessageDecoder::Has(unsigned int t){
    if( t< 64 ){
        //m_hasBitSign是long long类型的,64位,所以可以标示64个字段?好像这个是为了快速索引而存在的
        //然后通过与操作和左移操作判断指定的t(field_num)字段值是否被设置
        long long bit = 1;
        bit = (bit << t); 
        return ((m_hasBitSign&bit)>0);
    }
    //如果是大于64的字段,则通过判断m_mapT2Tlv中是否有设置TLVItem对象来判断该字段是否被设置
    return m_mapT2Tlv.end() != m_mapT2Tlv.find(t);
}

  上面是CPBMessageDecoder实现的decode方法,其解析完毕后会将所有数据存在其对应的成员变量m_mapT2Tlv中。此后通过提供的getXXX来从m_mapT2Tlv中获取字段值返回给调用者。可以看几个典型的简单的getXXX方法实现。

  这些方法实现的套路几乎是一样的,先通过Has方法判断对应字段是否存在,存在则从m_mapT2Tlv的对应TLVItem对象的vector中取值,否则返回默认值。

unsigned int CPBMessageDecoder::GetUInt32(unsigned int t){
    if( Has(t) && m_mapT2Tlv[t].vrptVar.size() )
        return (unsigned int)m_mapT2Tlv[t].vrptVar[0];
    return 0;
}

std::string& CPBMessageDecoder::GetStr(unsigned int t){
    if( Has(t) && m_mapT2Tlv[t].vrptstr.size() )
        return m_mapT2Tlv[t].vrptstr[0];
    static std::string str;
    return str;
}

//这里有个神奇的设计?当需要读子message是才去decode该子message的数据?
CPBMessageDecoder* CPBMessageDecoder::GetSubMessage(unsigned int t){
    if( !Has(t))
    {
        return GetExceptionDecoder();
    }
    TLVIterm &it = m_mapT2Tlv[t];
    if( it.vrptstr.size() == 0 )
    {
        return GetExceptionDecoder();
    }
    if( it.vrptmsgDec.size() == 0 ){
        CPBMessageDecoder* dec = new CPBMessageDecoder();
        if( !dec->Decode(it.vrptstr[0].c_str(),(int)it.vrptstr[0].length()) ){
            delete dec;
            return GetExceptionDecoder();
        }
        it.vrptmsgDec.push_back(dec);
        return dec;
    }
    else{
        return it.vrptmsgDec[0];
    }
}

  CPBMessageDecoder针对repeat类型的字段也提供的特定的getXXX方式,其中包括获取该字段数据个数的API和获取该字段第index位值的接口。下面简单看一下。

int CPBMessageDecoder::GetRepeatUInt32Count(unsigned int t){
    if( Has(t) && m_mapT2Tlv[t].vrptVar.size() )
        return (int)m_mapT2Tlv[t].vrptVar.size();
    return 0;
}

unsigned int CPBMessageDecoder::GetRepeatUInt32(unsigned int t,unsigned int index){
    if( !Has(t)) return 0;
    TLVIterm &it = m_mapT2Tlv[t];
    if( index >= it.vrptVar.size() )
        return 0;
    return (unsigned int)it.vrptVar[index];
}

4.4 PbCodec的使用

这玩意说简单也简单,只是在已有的.proto的基础上,需要再做一项工作。就是需要手动(或自己写个脚本将.proto文件自动生成对应的描述文件)给每个message设定一个tag(field_num)。以下以C++方式将一个简单.proto文件的message进行一下简单的转化,然后再通过CPBMessageEncoderCPBMessageDecoder操作一下~~~~

【= =。先说明转换的文件风格不唯一哈,只是实际项目中.h喜欢和命名空间结合。】

//protocol0x6666.proto
message ReBody {
  required int64 reqId = 1;
  optional int32 time = 2;
  repeated Member memberList = 3;
}

message MemberInfo{
  required int64 id = 1;
  optional string name = 2;
}

//protocol0x6666.proto.h
namespace protocol0x6666{
    enum ReBody{
        const int reqId = 1;
        const int time = 2;
        const int memberList = 3;
    }

    enum MemberInfo{
        const int id = 1;
        const int name = 2;
    }
}

//然后使用的时候,简单举个例子
std::string getSerializeResult(Type data){
    std::string result;
    CPBMessageEncoder reqbody;
    reqbody->addInt64(protocol0x6666::ReBody::id, data->id);
    reqbody->addInt32(protocol0x6666::ReBody::time, data->time);

    int len = (data->members).size();
    for(int i = 0; i < len; i++){
        CPBMessageEncoder memberInfo = reqbody.AddSubMessage(protocol0x6666::ReBody::memberList);
        memberInfo->AddInt64(protocol0x6666::MemberInfo::id, (data->members)[i].id);
        memberInfo->AddStr(protocol0x6666::MemberInfo::name, (data->members)[i].name);
    }

    reqbody.encode(result);
    return result;
}

========================= 华丽丽的分割线 =========================

写的匆忙,没怎么检查修改,如果有什么错误,欢迎指出!谢谢!

参考资料

  1. Google protocol buffer - Language Guide
  2. GoogleProtocolBuffer的使用和原理
  3. 鸣谢armingli大大的文章【Protobuf组包解包新用法之PbCodec篇】