Ceph Bufferlist: buffer::raw

2020-05-18  本文已影响0人  圣地亚哥_SVIP

Bufferlist的设计与实现

Buffer::list是Ceph中用于序列化的工具。将所有数据结构能够转换成二进制的数据,用于存储及网络传输。每一个数据结构需要提供encode及decode方法。

Bufferlist,主要三个结构体:

自定义智能指针:

template <class T>
struct nop_delete {
  void operator()(T*) {}
};
template <class T>
struct unique_leakable_ptr : public std::unique_ptr<T, ceph::nop_delete<T>> {
  using std::unique_ptr<T, ceph::nop_delete<T>>::unique_ptr;
};

三个数据结构体之间的关系如下图:

buffer::raw数据结构如下,其中mempool用于跟踪容器使用的内存大小,从raw中剔除出去已更清晰的查看raw的数据结构:

class raw {
public:
    /*
    * 大小为sizeof(ptr_node),alignof(ptr_node)对齐的类型。用于ptr_node的构造(new placement方式),实际并未使用
    */
    std::aligned_storage<sizeof(ptr_node),alignof(ptr_node)>::type bptr_storage;
    /*
    * buffer
    */
    char *data;
    unsigned len;
    /*
    * 引用计数
    */
    ceph::atomic<unsigned> nref { 0 };

    /*
    * 指定数据段的校验值
    */
    std::pair<size_t, size_t> last_crc_offset {std::numeric_limits<size_t>::max(), std::numeric_limits<size_t>::max()};
    std::pair<uint32_t, uint32_t> last_crc_val;

    /*
    * spinlock: std::atomic_flag(原子布尔类型),实现自旋互斥,免锁结构
    * class spinlock{
        std::atomoc_flag af = ATOMIC_FLAG_INIT;
        void lock(){
            while(af.test_and_set(std::memory_order_acquire));
        }
        void unlock(){
            af.clear(std::memory_order_release);
        }
    };
    */
    mutable ceph::spinlock crc_spinlock;

    /*
    * raw构造函数,raw是作为基类,实际数据分配方式是由子类实现
    */
    explicit raw(unsigned l)
      : data(nullptr), len(l), nref(0){
    }
    raw(char *c, unsigned l)
      : data(c), len(l), nref(0) {
    }
    virtual ~raw() {
    }

    void _set_len(unsigned l) {
      len = l;
    }

private:
    /*
    * 禁止拷贝复制
    */
    raw(const raw &other) = delete;
    const raw& operator=(const raw &other) = delete;
public:
    /*
    * 获取原始数据
    */
    char *get_data() {
      return data;
    }

    /*
    * clone data,具体实现由子类定义
    */
    virtual raw* clone_empty() = 0;

    /*
    * 数据拷贝
    */
    ceph::unique_leakable_ptr<raw> clone() {
      raw* const c = clone_empty();
      memcpy(c->data, data, len);
      return ceph::unique_leakable_ptr<raw>(c);
    }
    /*
    * crc相关设置
    */
    bool get_crc(const std::pair<size_t, size_t> &fromto,std::pair<uint32_t, uint32_t> *crc) const {
      std::lock_guard lg(crc_spinlock);
      if (last_crc_offset == fromto) {
        *crc = last_crc_val;
        return true;
      }
      return false;
    }
    void set_crc(const std::pair<size_t, size_t> &fromto,const std::pair<uint32_t, uint32_t> &crc) {
      std::lock_guard lg(crc_spinlock);
      last_crc_offset = fromto;
      last_crc_val = crc;
    }
    void invalidate_crc() {
      std::lock_guard lg(crc_spinlock);
      last_crc_offset.first = std::numeric_limits<size_t>::max();
      last_crc_offset.second = std::numeric_limits<size_t>::max();
    }
  };

raw不同buffer分配方式的子类:

raw_combined的定义如下:

class buffer::raw_combined : public buffer::raw {
  /*
  * 对齐大小
  */
  size_t alignment;
public:
  /*
  * 构造函数,dataptr指向已分配好的buffer
  */
  raw_combined(char *dataptr, unsigned l, unsigned align)
    :raw(dataptr, l),
     alignment(align) {}

  /*
  * 定义本类的clone的方法,创建一个新的buffer,调用release()返回裸指针
  */
  raw* clone_empty() override {
    return create(len, alignment).release();
  }

  /*
  * 入口静态函数:
    1. 使用posix_memalign,创建align对齐的buffer,buffer大小为:data+self(raw_combined)
    2. 在新分配的buffer上,构造raw_combined对象,使用operator new (buf) class
    3. operator delete,定义本类的delete函数,在使用delete {*raw_combined}时,调用此函数
  */
  static ceph::unique_leakable_ptr<buffer::raw> create(unsigned len, unsigned alig)
  {
    if (!align)
      align = sizeof(size_t);
    size_t rawlen = round_up_to(sizeof(buffer::raw_combined),alignof(buffer::raw_combined));
    size_t datalen = round_up_to(len, alignof(buffer::raw_combined));

    char *ptr = 0;
    int r = ::posix_memalign((void**)(void*)&ptr, align, rawlen + datalen);
    if (r)
     throw bad_alloc();

    return ceph::unique_leakable_ptr<buffer::raw>(new (ptr + datalen) raw_combined(ptr, len, align));
  }

  static void operator delete(void *ptr) {
    raw_combined *raw = (raw_combined *)ptr;
    ::free((void *)raw->data);
  }
};

raw_malloc定义如下:

class buffer::raw_malloc : public buffer::raw {
  public:
    /*
    * 构造函数,使用malloc buffer的分配
    */
    explicit raw_malloc(unsigned l) : raw(l) {
      if (len) {
        data = (char *)malloc(len);
        if (!data)
          throw bad_alloc();
      } else {
        data = 0;
      }
    }
    /*
    * 利用创建好的buffer初始化raw_malloc
    */
    raw_malloc(unsigned l, char *b) : raw(b, l) {
    }
    /*
    * 资源释放
    */
    ~raw_malloc() override {
      free(data);
    }
    raw* clone_empty() override {
      return new raw_malloc(len);
    }
};

raw_posix_aligned定义

实现如下:

class buffer::raw_posix_aligned : public buffer::raw {
  /*
  * data buffer对齐大小
  */
  unsigned align;

public:
  /*
  * 构造函数。利用posix_memalign,构造大小为len,以align对齐的data buffer
  */
  raw_posix_aligned(unsigned l, unsigned _align) : raw(l) {
    align = _align;
    assert((align >= sizeof(void *)) && (align & (align - 1)) == 0);
    int r = ::posix_memalign((void**)(void*)&data, align, len);
    if (r)
      throw bad_alloc();
  }
  /*
  * 释放data buffer
  */
  ~raw_posix_aligned() override {
    ::free(data);
  }

  /*
  * clone,大小与此data buffer相等,内容为空
  */
  raw* clone_empty() override {
    return new raw_posix_aligned(len, align);
  }
};

raw_static定义

class buffer::raw_static : public buffer::raw {
  public:

    /*
    * 构造函数:d是static buffer
    * 析构函数:无需释放
    */
    raw_static(const char *d, unsigned l) : raw((char*)d, l) { }
    ~raw_static() override {}
    /*
    * clone 的buffer,利用new raw_char创建新的buffer
    */
    raw* clone_empty() override {
      return new buffer::raw_char(len);
    }
};

raw_char、raw_claimed_char、raw_claim_buffer定义

定义代码如下:

class buffer::raw_char : public buffer::raw {
public:
  /*
  * 构造函数:使用new 创建buffer
  */
  explicit raw_char(unsigned l) : raw(l) {
    if (len)
      data = new char[len];
    else
      data = 0;
  }
  /*
  * 传入已建好的buffer(char *b),作为此raw的buffer,此buffer需要是new创建的
  */
  raw_char(unsigned l, char *b) : raw(b, l) {
  }

  /*
  * 释放资源
  */
  ~raw_char() override {
    delete[] data;
  }

  /*
  * clone buffer,new新建
  */
  raw* clone_empty() override {
    return new raw_char(len);
  }
};


class buffer::raw_claimed_char : public buffer::raw {
public:

  /*
  * 构造函数,传入buffer
  * 析构函数不负责释放资源,因此外部buffer是局部变量,或是new资源,则需手动释放
  */
  explicit raw_claimed_char(unsigned l, char *b) : raw(b, l) {
  }
  ~raw_claimed_char() override {
  }
  raw* clone_empty() override {
    return new raw_char(len);
  }
};

class buffer::raw_claim_buffer : public buffer::raw {
  deleter del;
 public:
  raw_claim_buffer(const char *b, unsigned l, deleter d)
      : raw((char*)b, l), del(std::move(d)) { }
  ~raw_claim_buffer() override {}
  raw* clone_empty() override {
    return new buffer::raw_char(len);
  }
};

RAW创建接口

创建接口如下,剔除mempool相关的信息:

  ceph::unique_leakable_ptr<raw> copy(const char *c, unsigned len);
  ceph::unique_leakable_ptr<raw> create(unsigned len);
  ceph::unique_leakable_ptr<raw> create_in_mempool(unsigned len, int mempool);
  ceph::unique_leakable_ptr<raw> claim_char(unsigned len, char *buf);
  ceph::unique_leakable_ptr<raw> create_malloc(unsigned len);
  ceph::unique_leakable_ptr<raw> claim_malloc(unsigned len, char *buf);
  ceph::unique_leakable_ptr<raw> create_static(unsigned len, char *buf);
  ceph::unique_leakable_ptr<raw> create_aligned(unsigned len, unsigned align);
  ceph::unique_leakable_ptr<raw> create_aligned_in_mempool(unsigned len, unsigned align, int mempool);
  ceph::unique_leakable_ptr<raw> create_page_aligned(unsigned len);
  ceph::unique_leakable_ptr<raw> create_small_page_aligned(unsigned len);
  ceph::unique_leakable_ptr<raw> claim_buffer(unsigned len, char *buf, deleter del);
调用:
auto r = create_aligned(len, sizeof(size_t));
memcpy(r->data, c, len);
return r;
create(unsigned len);
    create_aligned(unsigned len, unsigned align); /* align = sizeof(size_t)*/
    create_in_mempool(unsigned len, int mempool)
    create_aligned(unsigned len, unsigned align)

调用:create_aligned_in_mempool(unsigned len, unsigned align, int mempool):

/*
* 如果对齐align 是系统PAGE_SIZE的整数倍,或者分配的大小大于PAGE_SIZE的两倍,则使用raw_posix_aligned接口
* 否则采用raw_combined
*/
if ((align & ~CEPH_PAGE_MASK) == 0 ||
  len >= CEPH_PAGE_SIZE * 2) {
#ifndef __CYGWIN__
      return ceph::unique_leakable_ptr<buffer::raw>(new raw_posix_aligned(len, align));
#else
      return ceph::unique_leakable_ptr<buffer::raw>(new raw_hack_aligned(len, align));
#endif
    }
    return raw_combined::create(len, align, mempool);
  }
: new raw_claimed_char(len, buf)
: new raw_malloc(len)
: new raw_malloc(len, buf)
: new raw_static(buf, len)
: 已系统PAGE_SIZE默认对齐
: create_aligned(len, CEPH_PAGE_SIZE)
: 当len < CEPH_PAGE_SIZE: create_aligned(len, CEPH_BUFFER_ALLOC_UNIT) /*CEPH_BUFFER_ALLOC_UNIT: 4096u, 4KB*/
: > CEPH_PAGE_SIZE: create_aligned(len, CEPH_PAGE_SIZE)
: new raw_claim_buffer(buf, len, std::move(del))
上一篇下一篇

猜你喜欢

热点阅读