百度文件系统bfs源码分析系列(六)
这部分主要是分析几个类的实现及作用,会在分析时标注是哪个进程下的,因为可能会有相同的类名但不同的实现。
在nameserver启动时,会在nameserver构造函数中会初始化一些数据成员,部分代码如下:
59 NameServerImpl::NameServerImpl(Sync* sync) {
60 //more init code...
64 block_mapping_manager_ = new BlockMappingManager(FLAGS_blockmapping_bucket_num);
69 chunkserver_manager_ = new ChunkServerManager(work_thread_pool_, block_mapping_manager_);
70 namespace_ = new NameSpace(false);
74 if (sync_) {
75 //ha
84 }
85 CheckLeader();
88 }
20 BlockMappingManager::BlockMappingManager(int32_t bucket_num) :
21 blockmapping_bucket_num_(bucket_num) {
22 thread_pool_ = new ThreadPool(FLAGS_blockmapping_working_thread_num);
23 block_mapping_.resize(blockmapping_bucket_num_);
24 for (size_t i = 0; i < block_mapping_.size(); i++) {
25 block_mapping_[i] = new BlockMapping(thread_pool_);
26 }
28 }
107 ChunkServerManager::ChunkServerManager(ThreadPool* thread_pool, BlockMappingManager* block_mapping_manager)
108 : thread_pool_(thread_pool),
109 block_mapping_manager_(block_mapping_manager),
110 chunkserver_num_(0),
111 next_chunkserver_id_(1) {
113 thread_pool_->AddTask(std::bind(&ChunkServerManager::DeadCheck, this));
114 thread_pool_->AddTask(std::bind(&ChunkServerManager::LogStats, this));
117 //set params_ code...
123 }
35 NameSpace::NameSpace(bool standalone): version_(0), last_entry_id_(1),
36 block_id_upbound_(1), next_block_id_(1) {
37 leveldb::Options options;
38 options.create_if_missing = true;
39 db_cache_ = leveldb::NewLRUCache(FLAGS_namedb_cache_size * 1024L * 1024L);
40 options.block_cache = db_cache_;
41 leveldb::Status s = leveldb::DB::Open(options, FLAGS_namedb_path, &db_);
42 if (!s.ok()) {
43 db_ = NULL;
45 exit(EXIT_FAILURE);
46 }
47 if (standalone) {
48 Activate(NULL, NULL);
49 }
50 }
以上是三个比较重要的类,从类名可以看出,一个是block管理器,一个是chunkserver管理器,一个是元数据管理器,具体作用和实现接着分析;
93 void NameServerImpl::CheckLeader() {
94 if (!sync_ || sync_->IsLeader()) {
96 NameServerLog log;
97 std::function<void (const FileInfo&)> task =
98 std::bind(&NameServerImpl::RebuildBlockMapCallback, this, std::placeholders::_1);
99 namespace_->Activate(task, &log);
100 if (!LogRemote(log, std::function<void (bool)>())) {
102 }
103 recover_timeout_ = FLAGS_nameserver_start_recover_timeout;
105 work_thread_pool_->DelayTask(1000, std::bind(&NameServerImpl::CheckRecoverMode, this));
106 is_leader_ = true;
107 } else {
108 is_leader_ = false;
109 work_thread_pool_->DelayTask(100, std::bind(&NameServerImpl::CheckLeader, this));
111 }
112 }
在nameserver启动后,待前面的都初始化好后进行CheckLeader
,如果有ha功能且选举成功则进行Activate
和LogRemote
:
52 void NameSpace::Activate(std::function<void (const FileInfo&)> callback, NameServerLog* log) {
53 std::string version_key(8, 0);
54 version_key.append("version");
55 std::string version_str;
56 leveldb::Status s = db_->Get(leveldb::ReadOptions(), version_key, &version_str);
57 if (s.ok()) {//读取版本号
58 if (version_str.size() != sizeof(int64_t)) {
60 }
61 version_ = *(reinterpret_cast<int64_t*>(&version_str[0]));
63 } else {//更新
64 version_ = common::timer::get_micros();
65 version_str.resize(8);
66 *(reinterpret_cast<int64_t*>(&version_str[0])) = version_;
67
68 leveldb::Status s = db_->Put(leveldb::WriteOptions(), version_key, version_str);
69 if (!s.ok()) {
71 }
72 EncodeLog(log, kSyncWrite, version_key, version_str);
74 }
75 SetupRoot();//初始化root_path_
76 RebuildBlockMap(callback);
77 InitBlockIdUpbound(log);//初始化block_id_upbound_
78 }
675 bool NameSpace::RebuildBlockMap(std::function<void (const FileInfo&)> callback) {
679 std::set<int64_t> entry_id_set;
680 entry_id_set.insert(root_path_.entry_id());
681 leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
682 for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) {
683 FileInfo file_info;
684 bool ret = file_info.ParseFromArray(it->value().data(), it->value().size());
685 assert(ret);
686 if (last_entry_id_ < file_info.entry_id()) {
687 last_entry_id_ = file_info.entry_id();
688 }
689 FileType file_type = GetFileType(file_info.type());
690 if (file_type == kDefault) {
691 //a file
692 for (int i = 0; i < file_info.blocks_size(); i++) {
693 if (file_info.blocks(i) >= next_block_id_) {
694 next_block_id_ = file_info.blocks(i) + 1;
695 block_id_upbound_ = next_block_id_;
696 }
698 }
700 if (callback) {
701 callback(file_info);
702 }
703 } else if (file_type == kSymlink) {
705 } else {
706 entry_id_set.insert(file_info.entry_id());
707 }
708 }
709 //more code...
730 delete it;
731 return true;
732 }
以上从namespace中初始化块的元数据,主要是从leveldb中读取块的数据,并更新next_block_id_
/block_id_upbound_
/last_entry_id_
几个重要的字段,并进行RebuildBlockMapCallback
回调;而entry_id_set
保存的是目录,并在后面再次遍历leveldb中的元数据,查找出哪些是已经删除的目录在还存在leveldb中:
714 for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) {
715 FileInfo file_info;
716 bool ret = file_info.ParseFromArray(it->value().data(), it->value().size());
717 assert(ret);
718 int64_t parent_entry_id = 0;
719 std::string filename;
720 DecodingStoreKey(it->key().ToString(), &parent_entry_id, &filename);
721 if (entry_id_set.find(parent_entry_id) == entry_id_set.end()) {
722 //Orphan entry PE
726 }
727 }
InitBlockIdUpbound
做的事情也比较简单,简单的说就是预先自增一定的大小,避免每次都加1并更新leveldb,后面使用next_block_id_
时可以暂时不用管block_id_upbound_
;
元数据声明如下:
4 message FileInfo {
5 optional int64 entry_id = 1;
6 optional int64 version = 2;
7 optional int32 type = 3 [default = 0755];
8 repeated int64 blocks = 4;
9 optional uint32 ctime = 5;
10 optional string name = 6;
11 optional int64 size = 7;
12 optional int32 replicas = 8;
13 optional int64 parent_entry_id = 9;
14 optional int32 owner = 10;
15 repeated string cs_addrs = 11;
16 optional string sym_link = 12;
17 }
而RebuildBlockMapCallback
则会根据块的block_id,副本数,版本号(seq),大小去创建block
数据:
105 void BlockMapping::RebuildBlock(int64_t block_id, int32_t replica,
106 int64_t version, int64_t size) {
107 NSBlock* nsblock = NULL;
108 nsblock = new NSBlock(block_id, replica, version, size);
109 if (size) {//新建的时候会这样,后面由chunkserver同步过来
110 nsblock->recover_stat = kLost;
111 lost_blocks_.insert(block_id);
112 } else {
113 nsblock->recover_stat = kBlockWriting;
114 }
115
116 if (version < 0) {
118 } else {
120 }
121
123 MutexLock lock(&mu_);
125 std::pair<NSBlockMap::iterator, bool> ret =
126 block_map_.insert(std::make_pair(block_id, nsblock));
129 }
23 struct NSBlock {
24 int64_t id;
25 int64_t version;
26 std::set<int32_t> replica; //同步
27 int64_t block_size;
28 uint32_t expect_replica_num; //期望副本数则配置设置
29 RecoverStat recover_stat;
30 std::set<int32_t> incomplete_replica; //同步
31 NSBlock();
32 NSBlock(int64_t block_id, int32_t replica, int64_t version, int64_t size);
33 bool operator<(const NSBlock &b) const {
34 return (this->replica.size() >= b.replica.size());
35 }
36 };
30 NSBlock::NSBlock(int64_t block_id, int32_t replica,
31 int64_t block_version, int64_t block_size)
32 : id(block_id), version(block_version),
33 block_size(block_size), expect_replica_num(replica),
34 recover_stat(block_version < 0 ? kBlockWriting : kNotInRecover) {
35 }
之后创建和初始化rpc服务这些,包括开启raft等,这些就是nameserer要做的事情,介绍的比较粗糙,其中nameserver下ChunkServerManager
模块的一些逻辑会同接下来chunkserver服务一起分析。
在chunkserver服务启动时,部分代码如下:
76 ChunkServerImpl::ChunkServerImpl()
77 : chunkserver_id_(-1),
78 heartbeat_task_id_(-1),
79 blockreport_task_id_(-1),
80 last_report_blockid_(-1),
81 report_id_(0),
82 is_first_round_(true),
83 first_round_report_start_(-1),
84 service_stop_(false) {
93 block_manager_ = new BlockManager(FLAGS_block_store_path);
94 bool s_ret = block_manager_->LoadStorage();
95 assert(s_ret == true);
96 rpc_client_ = new RpcClient();
97 nameserver_ = new NameServerClient(rpc_client_, FLAGS_nameserver_nodes);
98 heartbeat_thread_->AddTask(std::bind(&ChunkServerImpl::LogStatus, this, true));
99 heartbeat_thread_->AddTask(std::bind(&ChunkServerImpl::Register, this));
100 }
36 BlockManager::BlockManager(const std::string& store_path)
37 : thread_pool_(new ThreadPool(1)), disk_quota_(0), counter_manager_(new DiskCounterManager) {
38 CheckStorePath(store_path);
39 file_cache_ = new FileCache(FLAGS_chunkserver_file_cache_size);
41 }
95 void BlockManager::CheckStorePath(const std::string& store_path) {
96 std::string fsid_str;
97 struct statfs fs_info;
98 std::string home_fs;
99 if (statfs("/home", &fs_info) == 0) {
100 home_fs.assign((const char*)&fs_info.f_fsid, sizeof(fs_info.f_fsid));
101 } else if (statfs("/", &fs_info) == 0) {
103 home_fs.assign((const char*)&fs_info.f_fsid, sizeof(fs_info.f_fsid));
104 } else {
106 }
107
108 std::vector<std::string> store_path_list;
109 common::SplitString(store_path, ",", &store_path_list);
110 //more code...处理每一个store_path_list中的空格并去重
117 std::sort(store_path_list.begin(), store_path_list.end());
118 auto it = std::unique(store_path_list.begin(), store_path_list.end());
119 store_path_list.resize(std::distance(store_path_list.begin(), it));
120
121 std::set<std::string> fsids;
123 for (uint32_t i = 0; i < store_path_list.size(); ++i) {
124 std::string& disk_path = store_path_list[i];
125 int stat_ret = statfs(disk_path.c_str(), &fs_info);
126 std::string fs_tmp((const char*)&fs_info.f_fsid, sizeof(fs_info.f_fsid));
127 if (stat_ret != 0 ||
128 (!FLAGS_chunkserver_multi_path_on_one_disk && fsids.find(fs_tmp) != fsids.end()) ||
129 (!FLAGS_chunkserver_use_root_partition && fs_tmp == home_fs)) {
130 // statfs failed
131 // do not allow multi data path on the same disk
132 // do not allow using root as data path
133 if (stat_ret != 0) {
136 } else {
138 }
139 store_path_list[i] = store_path_list[store_path_list.size() - 1];
140 store_path_list.resize(store_path_list.size() - 1);
141 --i;
142 } else {
143 int64_t disk_size = fs_info.f_blocks * fs_info.f_bsize;
144 int64_t user_quota = fs_info.f_bavail * fs_info.f_bsize;
145 int64_t super_quota = fs_info.f_bfree * fs_info.f_bsize;
152 Disk* disk = new Disk(store_path_list[i], user_quota);
153 disks_.push_back(std::make_pair(DiskStat(), disk));
154 fsids.insert(fs_tmp);
155 }
156 }
158 assert(store_path_list.size() > 0);
159 CheckChunkserverMeta(store_path_list);
160 }
以上逻辑还是以目录存储路径去获取磁盘相关的信息,并根据目录和使用情况建立映射关系;CheckChunkserverMeta
从leveldb中检查目录版本号;
1 struct statfs
2 {
3 long f_type; /* 文件系统类型*/
4 long f_bsize; /* 经过优化的传输块大小*/
5 long f_blocks; /* 文件系统数据块总数*/
6 long f_bfree; /* 可用块数*/
7 long f_bavail; /* 非超级用户可获取的块数*/
8 long f_files; /* 文件结点总数*/
9 long f_ffree; /* 可用文件结点数*/
10 fsid_t f_fsid; /* 文件系统标识*/
11 long f_namelen; /* 文件名的最大长度*/
12 };
statfs结构中可用空间块数有两种f_bfree和 f_bavail,前者是硬盘所有剩余空间,后者为非root用户剩余空间;
FileCache
结构类似leveldb的cache一样实现leveldb/util/cache.cc
,对fd和file_path进行了lru缓存,部分实现如下:
39 common::Cache::Handle* FileCache::FindFile(const std::string& file_path) {
40 common::Slice key(file_path);
41 common::Cache::Handle* handle = cache_->Lookup(key);
42 if (handle == NULL) {
43 int fd = open(file_path.c_str(), O_RDONLY);
44 if (fd < 0) {
46 return NULL;
47 }
48 FileEntity* file = new FileEntity;
49 file->fd = fd;
50 file->file_name = file_path;
51 handle = cache_->Insert(key, file, 1, &DeleteEntry);
52 }
53 return handle;
54 }
167 bool BlockManager::LoadStorage() {
168 bool ret = true;
169 for (auto it = disks_.begin(); it != disks_.end(); ++it) {
170 Disk* disk = it->second;
171 ret = ret && disk->LoadStorage(std::bind(&BlockManager::AddBlock,
172 this, std::placeholders::_1,
173 std::placeholders::_2,
174 std::placeholders::_3));
175 disk_quota_ += disk->GetQuota();
176 }
177 return ret;
178 }
38 bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback) {
57 //more code...从leveldb中读并检查版本号
58 leveldb::Iterator* it = metadb_->NewIterator(leveldb::ReadOptions());
59 for (it->Seek(version_key+'\0'); it->Valid(); it->Next()) {
60 int64_t block_id = 0;
61 if (1 != sscanf(it->key().data(), "%ld", &block_id)) {
63 delete it;
64 return false;
65 }
66 BlockMeta meta;
67 if (!meta.ParseFromArray(it->value().data(), it->value().size())) {
69 assert(0); // TODO: fault tolerant
70 }
71 // TODO: do not need store_path in meta any more
72 std::string file_path = meta.store_path() + Block::BuildFilePath(block_id);
73 if (meta.version() < 0) {
76 metadb_->Delete(leveldb::WriteOptions(), it->key());
77 remove(file_path.c_str());
78 continue;
79 } else {
80 struct stat st;
81 if (stat(file_path.c_str(), &st) ||
82 st.st_size != meta.block_size() ||
83 access(file_path.c_str(), R_OK)) {
87 metadb_->Delete(leveldb::WriteOptions(), it->key());
88 remove(file_path.c_str());
89 continue;
90 } else {
93 }
94 }
95 callback(block_id, this, meta);
97 }
98 //more code...
根据上面的目录store_path_list
读每个BlockMeta
块的元数据,其中会处理一些错误的BlockMeta
,并回调BlockManager::AddBlock
:
325 bool BlockManager::AddBlock(int64_t block_id, Disk* disk, BlockMeta meta) {
326 Block* block = new Block(meta, disk, file_cache_);
327 block->AddRef();
328 MutexLock lock(&mu_);
329 return block_map_.insert(std::make_pair(block_id, block)).second;
330 }
4 message BlockMeta {
5 optional int64 block_id = 1;
6 optional int64 block_size = 2;
7 optional int64 checksum = 3;
8 optional int64 version = 4 [default = -1];
9 optional string store_path = 5;
10 }
34 Block::Block(const BlockMeta& meta, Disk* disk, FileCache* file_cache) :
35 disk_(disk), meta_(meta),
36 last_seq_(-1), slice_num_(-1), blockbuf_(NULL), buflen_(0),
37 bufdatalen_(0), disk_writing_(false),
38 disk_file_size_(meta.block_size()), file_desc_(kNotCreated), refs_(0),
39 close_cv_(&mu_), is_recover_(false), expected_size_(-1), deleted_(false),
40 file_cache_(file_cache) {
41 assert(meta_.block_id() < (1L<<40));
42 disk_->counters_.data_size.Add(meta.block_size());
43 disk_file_ = meta.store_path() + BuildFilePath(meta_.block_id());
44 disk_->counters_.blocks.Inc();
45 if (meta_.version() >= 0) {
46 finished_ = true;
47 recv_window_ = NULL;
48 } else {
49 finished_ = false;
50 recv_window_ = new common::SlidingWindow<Buffer>(100,
51 std::bind(&Block::WriteCallback, this,
52 std::placeholders::_1, std::placeholders::_2));
53 }
54 }
其中chunkserver中的Block
类是比较复杂的,代表真实的数据信息,且在加载的时候,根据version
表示是否已完成,否则会创建一个滑动窗口;
最后ChunkServerImpl::Register
:
135 void ChunkServerImpl::Register() {
136 RegisterRequest request;
137 request.set_chunkserver_addr(data_server_addr_);
138 request.set_disk_quota(block_manager_->DiskQuota());
139 request.set_namespace_version(block_manager_->NamespaceVersion());
140 request.set_tag(FLAGS_chunkserver_tag);
143 RegisterResponse response;
144 if (!nameserver_->SendRequest(&NameServer_Stub::Register, &request, &response, 20)) {
146 work_thread_pool_->DelayTask(5000, std::bind(&ChunkServerImpl::Register, this));
147 return;
148 }
149 //more code...
160 int64_t new_version = response.namespace_version();
161 if (block_manager_->NamespaceVersion() != new_version) {
162 // NameSpace change
163 if (!FLAGS_chunkserver_auto_clean) {
164 /// abort
166 }
167 LOG(INFO, "Use new namespace version: %ld, clean local data", new_version);
168 // Clean
169 if (!block_manager_->CleanUp(new_version)) {
171 }
172 if (!block_manager_->SetNamespaceVersion(new_version)) {
174 }
175 work_thread_pool_->AddTask(std::bind(&ChunkServerImpl::Register, this));
176 return;
177 }
179 //more code...
190 }
以上注册到nameserver服务上,会带上当前磁盘使用情况和版本号,后面收到响应后会做些处理,比如版本号不一致,需要重重置一些数据:
310 bool BlockManager::CleanUp(int64_t namespace_version) {
311 for (auto it = block_map_.begin(); it != block_map_.end();) {
312 Block* block = it->second;
313 if (block->CleanUp(namespace_version)) {
314 file_cache_->EraseFileCache(block->GetFilePath());
315 block->DecRef();
316 block_map_.erase(it++);
317 } else {
318 ++it;
319 }
320 }
322 return true;
323 }
140 bool Block::CleanUp(int64_t namespace_version) {
141 if (namespace_version != disk_->NamespaceVersion()) {
142 SetDeleted();
143 return true;
144 }
145 return false;
146 }
148 StatusCode Block::SetDeleted() {
149 disk_->RemoveBlockMeta(meta_.block_id());
150 int deleted = common::atomic_swap(&deleted_, 1);
151 if (deleted != 0) {
152 return kCsNotFound;
153 }
154 return kOK;
155 }
剩下的字段作用后面再说,其中chunkserver_id_
是nameserver服务上分析的id,屏蔽了具体ip/port
等信息,接着SendBlockReport /SendHeartbeat
,后面再分析,回到nameserver上收到RegisterRequest
消息的处理:
170 void NameServerImpl::Register(::google::protobuf::RpcController* controller,
171 const ::baidu::bfs::RegisterRequest* request,
172 ::baidu::bfs::RegisterResponse* response,
173 ::google::protobuf::Closure* done) {
174 if (!is_leader_) {
175 //send to leader
177 return;
178 }
179 sofa::pbrpc::RpcController* sofa_cntl =
180 reinterpret_cast<sofa::pbrpc::RpcController*>(controller);
181 const std::string& address = request->chunkserver_addr();
182 std::string port = address.substr(address.find(':'));
183 std::string ip_address = sofa_cntl->RemoteAddress();
184 ip_address = ip_address.substr(0, ip_address.find(':')) + port;
186 int64_t version = request->namespace_version();
187 if (version != namespace_->Version()) {
190 chunkserver_manager_->RemoveChunkServer(address);
191 } else {
193 if (chunkserver_manager_->HandleRegister(ip_address, request, response)) {
194 LeaveReadOnly();
195 }
196 }
197 response->set_namespace_version(namespace_->Version());
198 done->Run();
199 }
当nameserver收到chunkserver注册信息时,发现版本号不一致,那么会chunkserver_manager_->RemoveChunkServer(address)
会处理该chunkserver状态:
166 bool ChunkServerManager::RemoveChunkServer(const std::string& addr) {
167 MutexLock lock(&mu_, "RemoveChunkServer", 10);
168 std::map<std::string, int32_t>::iterator it = address_map_.find(addr);
169 if (it == address_map_.end()) {
170 return false;
171 }
172 ChunkServerInfo* cs_info = NULL;
173 bool ret = GetChunkServerPtr(it->second, &cs_info);
174 assert(ret);
175 if (cs_info->status() == kCsActive) {//如果在活跃状态
176 cs_info->set_status(kCsWaitClean);
177 std::function<void ()> task =
178 std::bind(&ChunkServerManager::CleanChunkServer,
179 this, cs_info, std::string("Dead"));
180 thread_pool_->AddTask(task);
181 }
182 return true;
183 }
125 void ChunkServerManager::CleanChunkServer(ChunkServerInfo* cs, const std::string& reason) {
126 int32_t id = cs->id();
127 MutexLock lock(&mu_, "CleanChunkServer", 10);
128 chunkserver_num_--;
129 auto it = block_map_.find(id);
130 assert(it != block_map_.end());
131 std::set<int64_t> blocks;
132 it->second->CleanUp(&blocks);
135 cs->set_status(kCsCleaning);
136 mu_.Unlock();
137 block_mapping_manager_->DealWithDeadNode(id, blocks);
138 mu_.Lock("CleanChunkServerRelock", 10);
139 cs->set_w_qps(0);
140 cs->set_w_speed(0);
141 cs->set_r_qps(0);
142 cs->set_r_speed(0);
143 cs->set_recover_speed(0);
144 if (std::find(chunkservers_to_offline_.begin(),
145 chunkservers_to_offline_.end(),
146 cs->ipaddress()) == chunkservers_to_offline_.end()) {
147 if (cs->is_dead()) {
148 cs->set_status(kCsOffLine);
149 } else {
150 cs->set_status(kCsStandby);
151 }
152 } else {
153 cs->set_status(kCsReadonly);
154 }
155 }
607 void BlockMapping::DealWithDeadNode(int32_t cs_id, const std::set<int64_t>& blocks) {
608 for (std::set<int64_t>::iterator it = blocks.begin(); it != blocks.end(); ++it) {
609 MutexLock lock(&mu_);
610 DealWithDeadBlockInternal(cs_id, *it);
611 }
612 MutexLock lock(&mu_);
613 NSBlock* block = NULL;
614 for (std::set<int64_t>::iterator it = hi_recover_check_[cs_id].begin();
615 it != hi_recover_check_[cs_id].end(); ++it) {
616 if (!GetBlockPtr(*it, &block)) { //没有找到
618 } else {
619 block->recover_stat = kNotInRecover;
620 }
621 }
622 hi_recover_check_.erase(cs_id);
623 for (std::set<int64_t>::iterator it = lo_recover_check_[cs_id].begin();
624 it != lo_recover_check_[cs_id].end(); ++it) {
625 if (!GetBlockPtr(*it, &block)) { //没有找到
627 } else {
628 block->recover_stat = kNotInRecover;
629 }
630 }
631 lo_recover_check_.erase(cs_id);
632 }
如果chunkserver服务的版本号和namserver相同,则chunkserver_manager_->HandleRegister
:
227 bool ChunkServerManager::HandleRegister(const std::string& ip,
228 const RegisterRequest* request,
229 RegisterResponse* response) {
230 const std::string& address = request->chunkserver_addr();
231 StatusCode status = kOK;
232 int cs_id = -1;
233 MutexLock lock(&mu_, "HandleRegister", 10);
234 std::map<std::string, int32_t>::iterator it = address_map_.find(address);
235 if (it == address_map_.end()) {
236 cs_id = AddChunkServer(request->chunkserver_addr(), ip,
237 request->tag(), request->disk_quota());//加入新的chunkserver节点
238 assert(cs_id >= 0);
239 response->set_chunkserver_id(cs_id);
240 } else {
241 cs_id = it->second;
242 ChunkServerInfo* cs_info;
243 bool ret = GetChunkServerPtr(cs_id, &cs_info);
244 assert(ret);
245 if (cs_info->status() == kCsWaitClean || cs_info->status() == kCsCleaning) {
246 status = kNotOK;
249 } else {
250 UpdateChunkServer(cs_id, request->tag(), request->disk_quota());
251 auto it = block_map_.find(cs_id);
252 assert(it != block_map_.end());
253 it->second->MoveNew();
254 response->set_report_id(it->second->GetReportId());
257 }
258 }
259 response->set_chunkserver_id(cs_id);
262 response->set_status(status);
263 return chunkserver_num_ >= FLAGS_expect_chunkserver_num;
264 }
561 int32_t ChunkServerManager::AddChunkServer(const std::string& address,
562 const std::string& ipaddress,
563 const std::string& tag,
564 int64_t quota) {
565 mu_.AssertHeld();
566 ChunkServerInfo* info = new ChunkServerInfo;
567 int32_t id = next_chunkserver_id_++; //自增分配
568 char buf[20];
569 common::timer::now_time_str(buf, 20, common::timer::kMin);
570 info->set_start_time(std::string(buf));
571 info->set_id(id);
572 //more code...
575 info->set_disk_quota(quota);
576 if (std::find(chunkservers_to_offline_.begin(), chunkservers_to_offline_.end(),
577 address) != chunkservers_to_offline_.end()) {
578 info->set_status(kCsReadonly);
579 } else {
580 info->set_status(kCsActive);
581 }
582 info->set_kick(false);
583 std::string host = address.substr(0, address.find(':'));
584 std::string ip = ipaddress.substr(0, ipaddress.find(':'));
585 //more code...
592 chunkservers_[id] = info;
593 address_map_[address] = id;
594 int32_t now_time = common::timer::now_time();
595 heartbeat_list_[now_time].insert(info);//加入检测保活
596 info->set_last_heartbeat(now_time);
597 ++chunkserver_num_;
598 Blocks* blocks = new Blocks(id);
599 block_map_.insert(std::make_pair(id, blocks));
600 return id;
601 }
130 void NameServerImpl::LeaveReadOnly() {
132 if (readonly_) {
133 readonly_ = false;
134 }
135 }
当chunkserver注册到nameserver的处于正常节点的数量不足配置时chunkserver_num_ >= FLAGS_expect_chunkserver_num
,只能提供读;后面会结合具体例子来具体分析chunkserver节点状态的变化和block在什么情况下发生:
39 enum ChunkServerStatus {//chunkserver节点状态
40 kCsActive = 101;
41 kCsWaitClean = 102;
42 kCsCleaning = 103;
43 kCsOffLine = 104;
44 kCsStandby = 105;
45 kCsReadonly = 106;
46 }
48 enum RecoverStat {//block块状态
49 kNotInRecover = 0;
50 kLoRecover = 1;
51 kHiRecover = 2;
52 kCheck = 3;
53 kIncomplete = 4;
54 kLost = 5;
55 kBlockWriting = 6;
56 kAny = 20;
57 }
接着回到chunkserver服务下,在ChunkServerImpl::Register
中,后面发送RegisterRequest
后,即上面分析的:
135 void ChunkServerImpl::Register() {
136 //more code...
179 chunkserver_id_ = response.chunkserver_id();
180 report_id_ = response.report_id() + 1;
181 first_round_report_start_ = last_report_blockid_;
182 is_first_round_ = true;
188 work_thread_pool_->DelayTask(1, std::bind(&ChunkServerImpl::SendBlockReport, this));
189 heartbeat_thread_->DelayTask(1, std::bind(&ChunkServerImpl::SendHeartbeat, this));
190 }
上面部分涉及到同步块的信息和心跳相关的,会在下面一篇中分析,并会举例分析状态的变化。