百度文件系统bfs源码分析系列(六)

2019-02-24  本文已影响5人  fooboo

这部分主要是分析几个类的实现及作用,会在分析时标注是哪个进程下的,因为可能会有相同的类名但不同的实现。
在nameserver启动时,会在nameserver构造函数中会初始化一些数据成员,部分代码如下:

  59 NameServerImpl::NameServerImpl(Sync* sync) {
  60     //more init code...
  64     block_mapping_manager_ = new BlockMappingManager(FLAGS_blockmapping_bucket_num);
  69     chunkserver_manager_ = new ChunkServerManager(work_thread_pool_, block_mapping_manager_);
  70     namespace_ = new NameSpace(false);
  74     if (sync_) {
  75         //ha
  84     }
  85     CheckLeader();
  88 }

 20 BlockMappingManager::BlockMappingManager(int32_t bucket_num) :
 21     blockmapping_bucket_num_(bucket_num) {
 22     thread_pool_ = new ThreadPool(FLAGS_blockmapping_working_thread_num);
 23     block_mapping_.resize(blockmapping_bucket_num_);
 24     for (size_t i = 0; i < block_mapping_.size(); i++) {
 25         block_mapping_[i] = new BlockMapping(thread_pool_);
 26     }
 28 }

107 ChunkServerManager::ChunkServerManager(ThreadPool* thread_pool, BlockMappingManager* block_mapping_manager)             
108     : thread_pool_(thread_pool),
109       block_mapping_manager_(block_mapping_manager),
110       chunkserver_num_(0),
111       next_chunkserver_id_(1) {
113     thread_pool_->AddTask(std::bind(&ChunkServerManager::DeadCheck, this));
114     thread_pool_->AddTask(std::bind(&ChunkServerManager::LogStats, this));
117     //set params_ code...
123 }

 35 NameSpace::NameSpace(bool standalone): version_(0), last_entry_id_(1),
 36     block_id_upbound_(1), next_block_id_(1) {
 37     leveldb::Options options;
 38     options.create_if_missing = true;
 39     db_cache_ = leveldb::NewLRUCache(FLAGS_namedb_cache_size * 1024L * 1024L);
 40     options.block_cache = db_cache_;
 41     leveldb::Status s = leveldb::DB::Open(options, FLAGS_namedb_path, &db_);
 42     if (!s.ok()) {
 43         db_ = NULL;
 45         exit(EXIT_FAILURE);
 46     }
 47     if (standalone) {
 48         Activate(NULL, NULL);
 49     }
 50 }

以上是三个比较重要的类,从类名可以看出,一个是block管理器,一个是chunkserver管理器,一个是元数据管理器,具体作用和实现接着分析;

  93 void NameServerImpl::CheckLeader() {
  94     if (!sync_ || sync_->IsLeader()) {
  96         NameServerLog log;
  97         std::function<void (const FileInfo&)> task =
  98             std::bind(&NameServerImpl::RebuildBlockMapCallback, this, std::placeholders::_1);
  99         namespace_->Activate(task, &log);
 100         if (!LogRemote(log, std::function<void (bool)>())) {
 102         }
 103         recover_timeout_ = FLAGS_nameserver_start_recover_timeout;
 105         work_thread_pool_->DelayTask(1000, std::bind(&NameServerImpl::CheckRecoverMode, this));
 106         is_leader_ = true;
 107     } else {
 108         is_leader_ = false;
 109         work_thread_pool_->DelayTask(100, std::bind(&NameServerImpl::CheckLeader, this));
 111     }
 112 }

在nameserver启动后,待前面的都初始化好后进行CheckLeader,如果有ha功能且选举成功则进行ActivateLogRemote

 52 void NameSpace::Activate(std::function<void (const FileInfo&)> callback, NameServerLog* log) {
 53     std::string version_key(8, 0);
 54     version_key.append("version");
 55     std::string version_str;
 56     leveldb::Status s = db_->Get(leveldb::ReadOptions(), version_key, &version_str);
 57     if (s.ok()) {//读取版本号
 58         if (version_str.size() != sizeof(int64_t)) {
 60         }
 61         version_ = *(reinterpret_cast<int64_t*>(&version_str[0]));
 63     } else {//更新
 64         version_ = common::timer::get_micros();
 65         version_str.resize(8);
 66         *(reinterpret_cast<int64_t*>(&version_str[0])) = version_;
 67 
 68         leveldb::Status s = db_->Put(leveldb::WriteOptions(), version_key, version_str);
 69         if (!s.ok()) {
 71         }
 72         EncodeLog(log, kSyncWrite, version_key, version_str);
 74     }
 75     SetupRoot();//初始化root_path_
 76     RebuildBlockMap(callback);
 77     InitBlockIdUpbound(log);//初始化block_id_upbound_
 78 }

675 bool NameSpace::RebuildBlockMap(std::function<void (const FileInfo&)> callback) {
679     std::set<int64_t> entry_id_set;
680     entry_id_set.insert(root_path_.entry_id());
681     leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
682     for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) {
683         FileInfo file_info;
684         bool ret = file_info.ParseFromArray(it->value().data(), it->value().size());
685         assert(ret);
686         if (last_entry_id_ < file_info.entry_id()) {
687             last_entry_id_ = file_info.entry_id();
688         }
689         FileType file_type = GetFileType(file_info.type());
690         if (file_type == kDefault) {
691             //a file
692             for (int i = 0; i < file_info.blocks_size(); i++) {
693                 if (file_info.blocks(i) >= next_block_id_) {
694                     next_block_id_ = file_info.blocks(i) + 1;
695                     block_id_upbound_ = next_block_id_;
696                 }
698             }
700             if (callback) {
701                 callback(file_info);
702             }
703         } else if (file_type == kSymlink) {
705         } else {
706             entry_id_set.insert(file_info.entry_id());
707         }
708     }
709     //more code...
730     delete it;
731     return true;
732 }

以上从namespace中初始化块的元数据,主要是从leveldb中读取块的数据,并更新next_block_id_/block_id_upbound_/last_entry_id_几个重要的字段,并进行RebuildBlockMapCallback回调;而entry_id_set保存的是目录,并在后面再次遍历leveldb中的元数据,查找出哪些是已经删除的目录在还存在leveldb中:

714         for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) {
715             FileInfo file_info;
716             bool ret = file_info.ParseFromArray(it->value().data(), it->value().size());
717             assert(ret);
718             int64_t parent_entry_id = 0;
719             std::string filename;
720             DecodingStoreKey(it->key().ToString(), &parent_entry_id, &filename);
721             if (entry_id_set.find(parent_entry_id) == entry_id_set.end()) {
722                 //Orphan entry PE
726             }
727         }

InitBlockIdUpbound做的事情也比较简单,简单的说就是预先自增一定的大小,避免每次都加1并更新leveldb,后面使用next_block_id_时可以暂时不用管block_id_upbound_

元数据声明如下:

  4 message FileInfo {
  5     optional int64 entry_id = 1;
  6     optional int64 version = 2;
  7     optional int32 type = 3 [default = 0755];
  8     repeated int64 blocks = 4;
  9     optional uint32 ctime = 5;
 10     optional string name = 6;
 11     optional int64 size = 7;
 12     optional int32 replicas = 8;
 13     optional int64 parent_entry_id = 9;
 14     optional int32 owner = 10;
 15     repeated string cs_addrs = 11;
 16     optional string sym_link = 12;
 17 }

RebuildBlockMapCallback则会根据块的block_id,副本数,版本号(seq),大小去创建block数据:

105 void BlockMapping::RebuildBlock(int64_t block_id, int32_t replica,
106                                 int64_t version, int64_t size) {
107     NSBlock* nsblock = NULL;
108     nsblock = new NSBlock(block_id, replica, version, size);
109     if (size) {//新建的时候会这样,后面由chunkserver同步过来
110         nsblock->recover_stat = kLost;
111         lost_blocks_.insert(block_id);
112     } else {
113         nsblock->recover_stat = kBlockWriting;
114     }
115     
116     if (version < 0) {        
118     } else {
120     }                                         
121     
123     MutexLock lock(&mu_);
125     std::pair<NSBlockMap::iterator, bool> ret =
126         block_map_.insert(std::make_pair(block_id, nsblock));
129 }

 23 struct NSBlock {
 24     int64_t id;
 25     int64_t version;
 26     std::set<int32_t> replica; //同步
 27     int64_t block_size;       
 28     uint32_t expect_replica_num; //期望副本数则配置设置
 29     RecoverStat recover_stat;
 30     std::set<int32_t> incomplete_replica; //同步
 31     NSBlock();                                
 32     NSBlock(int64_t block_id, int32_t replica, int64_t version, int64_t size);
 33     bool operator<(const NSBlock &b) const {
 34         return (this->replica.size() >= b.replica.size());
 35     }
 36 };

 30 NSBlock::NSBlock(int64_t block_id, int32_t replica,
 31                  int64_t block_version, int64_t block_size)
 32     : id(block_id), version(block_version),
 33       block_size(block_size), expect_replica_num(replica),
 34       recover_stat(block_version < 0 ? kBlockWriting : kNotInRecover) {
 35 }

之后创建和初始化rpc服务这些,包括开启raft等,这些就是nameserer要做的事情,介绍的比较粗糙,其中nameserver下ChunkServerManager模块的一些逻辑会同接下来chunkserver服务一起分析。

在chunkserver服务启动时,部分代码如下:

 76 ChunkServerImpl::ChunkServerImpl()
 77     : chunkserver_id_(-1),
 78      heartbeat_task_id_(-1),
 79      blockreport_task_id_(-1),
 80      last_report_blockid_(-1),
 81      report_id_(0),
 82      is_first_round_(true),
 83      first_round_report_start_(-1),
 84      service_stop_(false) {
 93     block_manager_ = new BlockManager(FLAGS_block_store_path);
 94     bool s_ret = block_manager_->LoadStorage();
 95     assert(s_ret == true);
 96     rpc_client_ = new RpcClient();
 97     nameserver_ = new NameServerClient(rpc_client_, FLAGS_nameserver_nodes);
 98     heartbeat_thread_->AddTask(std::bind(&ChunkServerImpl::LogStatus, this, true));
 99     heartbeat_thread_->AddTask(std::bind(&ChunkServerImpl::Register, this));
100 }

 36 BlockManager::BlockManager(const std::string& store_path)
 37     : thread_pool_(new ThreadPool(1)), disk_quota_(0), counter_manager_(new DiskCounterManager)     {   
 38     CheckStorePath(store_path);
 39     file_cache_ = new FileCache(FLAGS_chunkserver_file_cache_size);
 41 }

 95 void BlockManager::CheckStorePath(const std::string& store_path) {
 96     std::string fsid_str;
 97     struct statfs fs_info;
 98     std::string home_fs;
 99     if (statfs("/home", &fs_info) == 0) {
100         home_fs.assign((const char*)&fs_info.f_fsid, sizeof(fs_info.f_fsid));
101     } else if (statfs("/", &fs_info) == 0) {
103         home_fs.assign((const char*)&fs_info.f_fsid, sizeof(fs_info.f_fsid));
104     } else {
106     }
107     
108     std::vector<std::string> store_path_list;
109     common::SplitString(store_path, ",", &store_path_list);
110     //more code...处理每一个store_path_list中的空格并去重
117     std::sort(store_path_list.begin(), store_path_list.end());
118     auto it = std::unique(store_path_list.begin(), store_path_list.end());
119     store_path_list.resize(std::distance(store_path_list.begin(), it));
120 
121     std::set<std::string> fsids;
123     for (uint32_t i = 0; i < store_path_list.size(); ++i) {
124         std::string& disk_path = store_path_list[i];
125         int stat_ret = statfs(disk_path.c_str(), &fs_info);
126         std::string fs_tmp((const char*)&fs_info.f_fsid, sizeof(fs_info.f_fsid));
127         if (stat_ret != 0 ||
128             (!FLAGS_chunkserver_multi_path_on_one_disk && fsids.find(fs_tmp) != fsids.end()) ||
129             (!FLAGS_chunkserver_use_root_partition && fs_tmp == home_fs)) {
130             // statfs failed
131             // do not allow multi data path on the same disk
132             // do not allow using root as data path
133             if (stat_ret != 0) {
136             } else {
138             }
139             store_path_list[i] = store_path_list[store_path_list.size() - 1];
140             store_path_list.resize(store_path_list.size() - 1);
141             --i;
142         } else {
143             int64_t disk_size = fs_info.f_blocks * fs_info.f_bsize;
144             int64_t user_quota = fs_info.f_bavail * fs_info.f_bsize;
145             int64_t super_quota = fs_info.f_bfree * fs_info.f_bsize;
152             Disk* disk = new Disk(store_path_list[i], user_quota);
153             disks_.push_back(std::make_pair(DiskStat(), disk));
154             fsids.insert(fs_tmp);
155         }
156     }
158     assert(store_path_list.size() > 0);
159     CheckChunkserverMeta(store_path_list);
160 }

以上逻辑还是以目录存储路径去获取磁盘相关的信息,并根据目录和使用情况建立映射关系;CheckChunkserverMeta从leveldb中检查目录版本号;

 1 struct statfs 
 2 { 
 3   long f_type; /* 文件系统类型*/ 
 4   long f_bsize; /* 经过优化的传输块大小*/ 
 5   long f_blocks; /* 文件系统数据块总数*/ 
 6   long f_bfree; /* 可用块数*/
 7   long f_bavail; /* 非超级用户可获取的块数*/ 
 8   long f_files; /* 文件结点总数*/ 
 9   long f_ffree; /* 可用文件结点数*/ 
10   fsid_t f_fsid; /* 文件系统标识*/ 
11   long f_namelen; /* 文件名的最大长度*/ 
12 };

statfs结构中可用空间块数有两种f_bfree和 f_bavail,前者是硬盘所有剩余空间,后者为非root用户剩余空间;

FileCache结构类似leveldb的cache一样实现leveldb/util/cache.cc,对fd和file_path进行了lru缓存,部分实现如下:

 39 common::Cache::Handle* FileCache::FindFile(const std::string& file_path) {
 40     common::Slice key(file_path);
 41     common::Cache::Handle* handle = cache_->Lookup(key);
 42     if (handle == NULL) {
 43         int fd = open(file_path.c_str(), O_RDONLY);
 44         if (fd < 0) {
 46             return NULL;
 47         }
 48         FileEntity* file = new FileEntity;
 49         file->fd = fd;
 50         file->file_name = file_path;
 51         handle = cache_->Insert(key, file, 1, &DeleteEntry);
 52     }
 53     return handle;
 54 }
167 bool BlockManager::LoadStorage() {
168     bool ret = true;
169     for (auto it = disks_.begin(); it != disks_.end(); ++it) {
170         Disk* disk = it->second;
171         ret = ret && disk->LoadStorage(std::bind(&BlockManager::AddBlock,
172                                                       this, std::placeholders::_1,
173                                                       std::placeholders::_2,
174                                                       std::placeholders::_3));
175         disk_quota_ += disk->GetQuota();
176     }
177     return ret;
178 }

 38 bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback) {
 57     //more code...从leveldb中读并检查版本号
 58     leveldb::Iterator* it = metadb_->NewIterator(leveldb::ReadOptions());
 59     for (it->Seek(version_key+'\0'); it->Valid(); it->Next()) {
 60         int64_t block_id = 0;
 61         if (1 != sscanf(it->key().data(), "%ld", &block_id)) {
 63             delete it;
 64             return false;
 65         }
 66         BlockMeta meta;
 67         if (!meta.ParseFromArray(it->value().data(), it->value().size())) {
 69             assert(0); // TODO: fault tolerant
 70         }
 71         // TODO: do not need store_path in meta any more
 72         std::string file_path = meta.store_path() + Block::BuildFilePath(block_id);
 73         if (meta.version() < 0) {
 76             metadb_->Delete(leveldb::WriteOptions(), it->key());
 77             remove(file_path.c_str());
 78             continue;
 79         } else {
 80             struct stat st;
 81             if (stat(file_path.c_str(), &st) ||
 82                 st.st_size != meta.block_size() ||
 83                 access(file_path.c_str(), R_OK)) {
 87                 metadb_->Delete(leveldb::WriteOptions(), it->key());
 88                 remove(file_path.c_str());
 89                 continue;
 90             } else {
 93             }
 94         }
 95         callback(block_id, this, meta);
 97     }
 98     //more code...

根据上面的目录store_path_list读每个BlockMeta块的元数据,其中会处理一些错误的BlockMeta,并回调BlockManager::AddBlock

325 bool BlockManager::AddBlock(int64_t block_id, Disk* disk, BlockMeta meta) {
326     Block* block = new Block(meta, disk, file_cache_);
327     block->AddRef();
328     MutexLock lock(&mu_);
329     return block_map_.insert(std::make_pair(block_id, block)).second;
330 }

  4 message BlockMeta {
  5     optional int64 block_id = 1;
  6     optional int64 block_size = 2;
  7     optional int64 checksum = 3;
  8     optional int64 version = 4 [default = -1];
  9     optional string store_path = 5;
 10 }

 34 Block::Block(const BlockMeta& meta, Disk* disk, FileCache* file_cache) :
 35   disk_(disk), meta_(meta),
 36   last_seq_(-1), slice_num_(-1), blockbuf_(NULL), buflen_(0),
 37   bufdatalen_(0), disk_writing_(false),
 38   disk_file_size_(meta.block_size()), file_desc_(kNotCreated), refs_(0),
 39   close_cv_(&mu_), is_recover_(false), expected_size_(-1), deleted_(false),
 40   file_cache_(file_cache) {
 41     assert(meta_.block_id() < (1L<<40));
 42     disk_->counters_.data_size.Add(meta.block_size());
 43     disk_file_ = meta.store_path() + BuildFilePath(meta_.block_id());
 44     disk_->counters_.blocks.Inc();
 45     if (meta_.version() >= 0) {
 46         finished_ = true;
 47         recv_window_ = NULL;
 48     } else {
 49         finished_ = false;
 50         recv_window_ = new common::SlidingWindow<Buffer>(100,
 51                        std::bind(&Block::WriteCallback, this,
 52                        std::placeholders::_1, std::placeholders::_2));
 53     }
 54 }

其中chunkserver中的Block类是比较复杂的,代表真实的数据信息,且在加载的时候,根据version表示是否已完成,否则会创建一个滑动窗口;

最后ChunkServerImpl::Register

135 void ChunkServerImpl::Register() {
136     RegisterRequest request;
137     request.set_chunkserver_addr(data_server_addr_);
138     request.set_disk_quota(block_manager_->DiskQuota());
139     request.set_namespace_version(block_manager_->NamespaceVersion());
140     request.set_tag(FLAGS_chunkserver_tag);
143     RegisterResponse response;
144     if (!nameserver_->SendRequest(&NameServer_Stub::Register, &request, &response, 20)) {
146         work_thread_pool_->DelayTask(5000, std::bind(&ChunkServerImpl::Register, this));
147         return;
148     }
149     //more code...
160     int64_t new_version = response.namespace_version();
161     if (block_manager_->NamespaceVersion() != new_version) {
162         // NameSpace change
163         if (!FLAGS_chunkserver_auto_clean) {
164             /// abort
166         }
167         LOG(INFO, "Use new namespace version: %ld, clean local data", new_version);
168         // Clean
169         if (!block_manager_->CleanUp(new_version)) {
171         }
172         if (!block_manager_->SetNamespaceVersion(new_version)) {
174         }
175         work_thread_pool_->AddTask(std::bind(&ChunkServerImpl::Register, this));
176         return;
177     }
179     //more code...
190 }

以上注册到nameserver服务上,会带上当前磁盘使用情况和版本号,后面收到响应后会做些处理,比如版本号不一致,需要重重置一些数据:

310 bool BlockManager::CleanUp(int64_t namespace_version) {
311     for (auto it = block_map_.begin(); it != block_map_.end();) {
312         Block* block = it->second;
313         if (block->CleanUp(namespace_version)) {
314             file_cache_->EraseFileCache(block->GetFilePath());
315             block->DecRef(); 
316             block_map_.erase(it++);
317         } else {
318             ++it;
319         }
320     }       
322     return true;
323 }

140 bool Block::CleanUp(int64_t namespace_version) {
141     if (namespace_version != disk_->NamespaceVersion()) {
142         SetDeleted();
143         return true;
144     }       
145     return false;
146 }

148 StatusCode Block::SetDeleted() {
149     disk_->RemoveBlockMeta(meta_.block_id());
150     int deleted = common::atomic_swap(&deleted_, 1);
151     if (deleted != 0) {
152         return kCsNotFound;
153     }
154     return kOK;
155 }

剩下的字段作用后面再说,其中chunkserver_id_是nameserver服务上分析的id,屏蔽了具体ip/port等信息,接着SendBlockReport /SendHeartbeat,后面再分析,回到nameserver上收到RegisterRequest消息的处理:

 170 void NameServerImpl::Register(::google::protobuf::RpcController* controller,
 171                               const ::baidu::bfs::RegisterRequest* request,
 172                               ::baidu::bfs::RegisterResponse* response,
 173                               ::google::protobuf::Closure* done) {
 174     if (!is_leader_) {
 175         //send to leader
 177         return;
 178     }
 179     sofa::pbrpc::RpcController* sofa_cntl =
 180         reinterpret_cast<sofa::pbrpc::RpcController*>(controller);
 181     const std::string& address = request->chunkserver_addr();
 182     std::string port = address.substr(address.find(':'));
 183     std::string ip_address = sofa_cntl->RemoteAddress();
 184     ip_address = ip_address.substr(0, ip_address.find(':')) + port;
 186     int64_t version = request->namespace_version();
 187     if (version != namespace_->Version()) {
 190         chunkserver_manager_->RemoveChunkServer(address);
 191     } else {
 193         if (chunkserver_manager_->HandleRegister(ip_address, request, response)) {
 194             LeaveReadOnly();
 195         }
 196     }
 197     response->set_namespace_version(namespace_->Version());
 198     done->Run();
 199 }

当nameserver收到chunkserver注册信息时,发现版本号不一致,那么会chunkserver_manager_->RemoveChunkServer(address)会处理该chunkserver状态:

166 bool ChunkServerManager::RemoveChunkServer(const std::string& addr) {
167     MutexLock lock(&mu_, "RemoveChunkServer", 10);
168     std::map<std::string, int32_t>::iterator it = address_map_.find(addr);
169     if (it == address_map_.end()) {
170         return false;
171     }
172     ChunkServerInfo* cs_info = NULL;
173     bool ret = GetChunkServerPtr(it->second, &cs_info);
174     assert(ret);
175     if (cs_info->status() == kCsActive) {//如果在活跃状态
176         cs_info->set_status(kCsWaitClean);
177         std::function<void ()> task =
178             std::bind(&ChunkServerManager::CleanChunkServer,
179                         this, cs_info, std::string("Dead"));
180         thread_pool_->AddTask(task);
181     }
182     return true;
183 }

125 void ChunkServerManager::CleanChunkServer(ChunkServerInfo* cs, const std::string& reason) {
126     int32_t id = cs->id();
127     MutexLock lock(&mu_, "CleanChunkServer", 10);
128     chunkserver_num_--;
129     auto it = block_map_.find(id);
130     assert(it != block_map_.end());
131     std::set<int64_t> blocks;
132     it->second->CleanUp(&blocks);
135     cs->set_status(kCsCleaning);
136     mu_.Unlock();
137     block_mapping_manager_->DealWithDeadNode(id, blocks);
138     mu_.Lock("CleanChunkServerRelock", 10);
139     cs->set_w_qps(0);
140     cs->set_w_speed(0);
141     cs->set_r_qps(0);
142     cs->set_r_speed(0);
143     cs->set_recover_speed(0);
144     if (std::find(chunkservers_to_offline_.begin(),
145                   chunkservers_to_offline_.end(),
146                   cs->ipaddress()) == chunkservers_to_offline_.end()) {
147         if (cs->is_dead()) {
148             cs->set_status(kCsOffLine);
149         } else {
150             cs->set_status(kCsStandby);
151         }
152     } else {
153         cs->set_status(kCsReadonly);
154     }
155 }

607 void BlockMapping::DealWithDeadNode(int32_t cs_id, const std::set<int64_t>& blocks) {
608     for (std::set<int64_t>::iterator it = blocks.begin(); it != blocks.end(); ++it) {
609         MutexLock lock(&mu_);
610         DealWithDeadBlockInternal(cs_id, *it);
611     }
612     MutexLock lock(&mu_);
613     NSBlock* block = NULL;
614     for (std::set<int64_t>::iterator it = hi_recover_check_[cs_id].begin();
615             it != hi_recover_check_[cs_id].end(); ++it) {
616         if (!GetBlockPtr(*it, &block)) { //没有找到
618         } else {
619             block->recover_stat = kNotInRecover;
620         }
621     }
622     hi_recover_check_.erase(cs_id);
623     for (std::set<int64_t>::iterator it = lo_recover_check_[cs_id].begin();
624             it != lo_recover_check_[cs_id].end(); ++it) {
625         if (!GetBlockPtr(*it, &block)) { //没有找到
627         } else {
628             block->recover_stat = kNotInRecover;
629         }
630     }
631     lo_recover_check_.erase(cs_id);
632 }

如果chunkserver服务的版本号和namserver相同,则chunkserver_manager_->HandleRegister

227 bool ChunkServerManager::HandleRegister(const std::string& ip,
228                                         const RegisterRequest* request,
229                                         RegisterResponse* response) {
230     const std::string& address = request->chunkserver_addr();
231     StatusCode status = kOK;
232     int cs_id = -1;
233     MutexLock lock(&mu_, "HandleRegister", 10);
234     std::map<std::string, int32_t>::iterator it = address_map_.find(address);
235     if (it == address_map_.end()) {
236         cs_id = AddChunkServer(request->chunkserver_addr(), ip,
237                                request->tag(), request->disk_quota());//加入新的chunkserver节点
238         assert(cs_id >= 0);
239         response->set_chunkserver_id(cs_id);
240     } else {
241         cs_id = it->second;
242         ChunkServerInfo* cs_info;
243         bool ret = GetChunkServerPtr(cs_id, &cs_info);
244         assert(ret);
245         if (cs_info->status() == kCsWaitClean || cs_info->status() == kCsCleaning) {
246             status = kNotOK;
249         } else {
250             UpdateChunkServer(cs_id, request->tag(), request->disk_quota());
251             auto it = block_map_.find(cs_id);
252             assert(it != block_map_.end());
253             it->second->MoveNew();
254             response->set_report_id(it->second->GetReportId());
257         }
258     }
259     response->set_chunkserver_id(cs_id);
262     response->set_status(status);
263     return chunkserver_num_ >= FLAGS_expect_chunkserver_num;
264 }

561 int32_t ChunkServerManager::AddChunkServer(const std::string& address,
562                                            const std::string& ipaddress,
563                                            const std::string& tag,
564                                            int64_t quota) {
565     mu_.AssertHeld();                      
566     ChunkServerInfo* info = new ChunkServerInfo;
567     int32_t id = next_chunkserver_id_++; //自增分配
568     char buf[20];
569     common::timer::now_time_str(buf, 20, common::timer::kMin);
570     info->set_start_time(std::string(buf));
571     info->set_id(id);
572     //more code...
575     info->set_disk_quota(quota);
576     if (std::find(chunkservers_to_offline_.begin(), chunkservers_to_offline_.end(),
577                 address) != chunkservers_to_offline_.end()) {
578         info->set_status(kCsReadonly);
579     } else {
580         info->set_status(kCsActive);
581     }
582     info->set_kick(false);
583     std::string host = address.substr(0, address.find(':'));
584     std::string ip = ipaddress.substr(0, ipaddress.find(':'));
585     //more code...
592     chunkservers_[id] = info;
593     address_map_[address] = id;
594     int32_t now_time = common::timer::now_time();
595     heartbeat_list_[now_time].insert(info);//加入检测保活
596     info->set_last_heartbeat(now_time);
597     ++chunkserver_num_;
598     Blocks* blocks = new Blocks(id);
599     block_map_.insert(std::make_pair(id, blocks));
600     return id;
601 }

 130 void NameServerImpl::LeaveReadOnly() {
 132     if (readonly_) {
 133         readonly_ = false;
 134     }
 135 }

当chunkserver注册到nameserver的处于正常节点的数量不足配置时chunkserver_num_ >= FLAGS_expect_chunkserver_num,只能提供读;后面会结合具体例子来具体分析chunkserver节点状态的变化和block在什么情况下发生:

 39 enum ChunkServerStatus {//chunkserver节点状态
 40     kCsActive = 101;
 41     kCsWaitClean = 102;
 42     kCsCleaning = 103;
 43     kCsOffLine = 104;
 44     kCsStandby = 105;
 45     kCsReadonly = 106;
 46 }

 48 enum RecoverStat {//block块状态
 49     kNotInRecover = 0;
 50     kLoRecover = 1;
 51     kHiRecover = 2;
 52     kCheck = 3;
 53     kIncomplete = 4;
 54     kLost = 5;
 55     kBlockWriting = 6;
 56     kAny = 20;
 57 }

接着回到chunkserver服务下,在ChunkServerImpl::Register中,后面发送RegisterRequest后,即上面分析的:

135 void ChunkServerImpl::Register() {
136     //more code...
179     chunkserver_id_ = response.chunkserver_id();
180     report_id_ = response.report_id() + 1;
181     first_round_report_start_ = last_report_blockid_;
182     is_first_round_ = true;
188     work_thread_pool_->DelayTask(1, std::bind(&ChunkServerImpl::SendBlockReport, this));
189     heartbeat_thread_->DelayTask(1, std::bind(&ChunkServerImpl::SendHeartbeat, this));
190 }

上面部分涉及到同步块的信息和心跳相关的,会在下面一篇中分析,并会举例分析状态的变化。

上一篇下一篇

猜你喜欢

热点阅读