 16 class BlockMappingManager {
 17 public :
 18     BlockMappingManager(int32_t bucket_num);
 19     ~BlockMappingManager();
 46 private:
 47     int32_t blockmapping_bucket_num_;
 48     std::vector<BlockMapping*> block_mapping_;
 49     ThreadPool* thread_pool_;
 50 };

 20 BlockMappingManager::BlockMappingManager(int32_t bucket_num) :
 21     blockmapping_bucket_num_(bucket_num) {
 22     thread_pool_ = new ThreadPool(FLAGS_blockmapping_working_thread_num);
 23     block_mapping_.resize(blockmapping_bucket_num_);
 24     for (size_t i = 0; i < block_mapping_.size(); i++) {
 25         block_mapping_[i] = new BlockMapping(thread_pool_);
 26     }
 27     srand(time(NULL));
 28 }

 33 int32_t BlockMappingManager::GetBucketOffset(int64_t block_id) {
 34     return block_id % blockmapping_bucket_num_;
 35 }



  93 void NameServerImpl::CheckLeader() {
  94     if (!sync_ || sync_->IsLeader()) {
  96         NameServerLog log;
  97         std::function<void (const FileInfo&)> task =
  98             std::bind(&NameServerImpl::RebuildBlockMapCallback, this, std::placeholders::_1);
  99         namespace_->Activate(task, &log);
 100         if (!LogRemote(log, std::function<void (bool)>())) {
 102         }
 103         recover_timeout_ = FLAGS_nameserver_start_recover_timeout;
 104         start_time_ = common::timer::get_micros();
 105         work_thread_pool_->DelayTask(1000, std::bind(&NameServerImpl::CheckRecoverMode, this));
 106         is_leader_ = true;
 107     } else {
 108         is_leader_ = false;
 109         work_thread_pool_->DelayTask(100, std::bind(&NameServerImpl::CheckLeader, this));
 111     }
 112 }


 52 void NameSpace::Activate(std::function<void (const FileInfo&)> callback, NameServerLog* log) {
 53     //more code...
 75     SetupRoot();
 76     RebuildBlockMap(callback);
 77     InitBlockIdUpbound(log);
 78 }

675 bool NameSpace::RebuildBlockMap(std::function<void (const FileInfo&)> callback) {
676     //more code...
681     leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
682     for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) {
683         FileInfo file_info;
684         bool ret = file_info.ParseFromArray(it->value().data(), it->value().size());
689         FileType file_type = GetFileType(file_info.type());
690         if (file_type == kDefault) {
691             //a file
692             for (int i = 0; i < file_info.blocks_size(); i++) {
693                 if (file_info.blocks(i) >= next_block_id_) {
694                     next_block_id_ = file_info.blocks(i) + 1;
695                     block_id_upbound_ = next_block_id_;
696                 }
697                 ++block_num;
698             }
699             ++file_num;
700             if (callback) {
701                 callback(file_info);
702             }
707         }
708     }
709 //more code...


1065 void NameServerImpl::RebuildBlockMapCallback(const FileInfo& file_info) {
1066     for (int i = 0; i < file_info.blocks_size(); i++) {
1067         int64_t block_id = file_info.blocks(i);
1068         int64_t version = file_info.version();
1069         block_mapping_manager_->RebuildBlock(block_id, file_info.replicas(), version, file_info.size());
1071     }   
1072 } 

以上过程在启动时遍历leveldb中的元数据,并建立起映射关系,也会初始化诸如last_entry_id_ /next_block_id_等信息,用于在后续真正写文件时分配新的block索引,后期会介绍他们的具体作用。


 23 struct NSBlock {
 24     int64_t id;
 25     int64_t version; 
 26     std::set<int32_t> replica;
 27     int64_t block_size;
 28     uint32_t expect_replica_num;
 29     RecoverStat recover_stat;                 
 30     std::set<int32_t> incomplete_replica;
 31     NSBlock();
 32     NSBlock(int64_t block_id, int32_t replica, int64_t version, int64_t size);
 33     bool operator<(const NSBlock &b) const {
 34         return (this->replica.size() >= b.replica.size());
 35     }   
 36 };

 48 enum RecoverStat { //此block的状态
 49     kNotInRecover = 0;
 50     kLoRecover = 1;
 51     kHiRecover = 2;
 52     kCheck = 3;
 53     kIncomplete = 4;
 54     kLost = 5;
 55     kBlockWriting = 6;
 56     kAny = 20;
 57 }

105 void BlockMapping::RebuildBlock(int64_t block_id, int32_t replica,
106                                 int64_t version, int64_t size) {
107     NSBlock* nsblock = NULL;
108     nsblock = new NSBlock(block_id, replica, version, size);
109     if (size) {//不是很明白为什么会根据size设置状态
110         nsblock->recover_stat = kLost;
111         lost_blocks_.insert(block_id);
112     } else {                   
113         nsblock->recover_stat = kBlockWriting;
114     }
123     MutexLock lock(&mu_);
125     std::pair<NSBlockMap::iterator, bool> ret =
126         block_map_.insert(std::make_pair(block_id, nsblock));
129 }

其中nsblock在初始化过程中recover_stat(version < 0 ? kBlockWriting : kNotInRecover),用版本号来决定该块处于什么状态,后面再根据size重置该状态,有点不明白;



 58 class BlockMapping {
 59 public:
 68     bool UpdateBlockInfo(int64_t block_id, int32_t server_id, int64_t block_size, int64_t block_version);
 84 private:
 85     void DealWithDeadBlockInternal(int32_t cs_id, int64_t block_id);
 86     typedef std::map<int32_t, std::set<int64_t> > CheckList;
 89     void TryRecover(NSBlock* block);
104 private:
105     Mutex mu_;
106     ThreadPool* thread_pool_;
107     typedef std::map<int64_t, NSBlock*> NSBlockMap;
108     NSBlockMap block_map_;
110     CheckList hi_recover_check_;
111     CheckList lo_recover_check_;
112     CheckList incomplete_;
113     std::set<int64_t> lo_pri_recover_;
114     std::set<int64_t> hi_pri_recover_;
115     std::set<int64_t> lost_blocks_;
116 };


 369 void NameServerImpl::CreateFile(::google::protobuf::RpcController* controller,
 370                                 const CreateFileRequest* request,
 371                                 CreateFileResponse* response,
 372                                 ::google::protobuf::Closure* done) {
 373     //more code...
 389     FileLockGuard file_lock(new WriteLock(path));
 390     StatusCode status = namespace_->CreateFile(path, flags, mode, replica_num, &blocks_to_remov     e, &log);
 391     //more code...


 19 class Lock {
 20 public:
 21     virtual ~Lock() {}
 22 };  
 24 class WriteLock : public Lock {
 25 public:
 26     WriteLock(const std::string& file_path);
 27     WriteLock(const std::string& file_path_a,
 28               const std::string& file_path_b);//对多个路径加锁,先比较,按照字典序先后加锁,解锁时逆序;
 29     ~WriteLock();
 30     static void SetFileLockManager(FileLockManager* file_lock_manager);
 31 private:
 32     // will be initialized in NameServerImpl's constructor
 33     static FileLockManager* file_lock_manager_;
 34     std::vector<std::string> file_path_;
 35 };  
 37 class ReadLock : public Lock {
 38 public:
 39     ReadLock(const std::string& file_path);
 40     ~ReadLock();
 41     static void SetFileLockManager(FileLockManager* file_lock_manager);
 42 private:
 43     // will be initialized in NameServerImpl's constructor
 44     static FileLockManager* file_lock_manager_;
 45     std::string file_path_; 
 46 };

 16 WriteLock::WriteLock(const std::string& file_path) {
 17     file_path_.push_back(file_path);
 18     file_lock_manager_->WriteLock(file_path);
 19 }

 40 WriteLock::~WriteLock() {
 41     if (file_path_.size() == 1) {
 42         file_lock_manager_->Unlock(file_path_[0]);
 43     } else {//解锁时逆序
 44         file_lock_manager_->Unlock(file_path_[1]);
 45         file_lock_manager_->Unlock(file_path_[0]);
 46     }
 47 }
 53 ReadLock::ReadLock(const std::string& file_path) {
 54     file_path_ = file_path;
 55     file_lock_manager_->ReadLock(file_path);
 56 }
 58 ReadLock::~ReadLock() {
 59     file_lock_manager_->Unlock(file_path_);
 60 }
 21 class FileLockManager {
 22 public:
 23     FileLockManager(int bucket_num = 19);
 24     ~FileLockManager();
 25     void ReadLock(const std::string& file_path);
 26     void WriteLock(const std::string& file_path);
 27     void Unlock(const std::string& file_path);
 28 private:
 29     enum LockType {
 30         kRead,
 31         kWrite
 32     };
 33     struct LockEntry {
 34         common::Counter ref_;
 35         common::RWLock rw_lock_;
 36     };
 37     struct LockBucket {
 38         Mutex mu;
 39         std::unordered_map<std::string, LockEntry*> lock_map;
 40     };
 41     void LockInternal(const std::string& path, LockType lock_type);
 42     void UnlockInternal(const std::string& path);
 43     int GetBucketOffset(const std::string& path);
 44 private:
 45     std::vector<LockBucket*> locks_;
 46 };



 46 void FileLockManager::WriteLock(const std::string& file_path) {
 48     std::vector<std::string> paths;
 49     common::SplitString(file_path, "/", &paths);//对路径分层
 50     // first lock "/"
 51     if (paths.size() == 0) { //对根目录加写锁
 52         LockInternal("/", kWrite);
 53         return;
 54     }
 55     LockInternal("/", kRead);//否则对根目录加读锁
 56     std::string cur_path;
 57     for (size_t i = 0; i < paths.size() - 1; i++) {
 58         cur_path += ("/" + paths[i]);
 59         LockInternal(cur_path, kRead);//依次加读锁
 60     }
 61     cur_path += ("/" + paths.back()); //加写锁
 62     LockInternal(cur_path, kWrite);
 63 }


 83 void FileLockManager::LockInternal(const std::string& path,
 84                                      LockType lock_type) {
 85     LockEntry* entry = NULL;
 87     int bucket_offset = GetBucketOffset(path);//hash path
 88     LockBucket* lock_bucket = locks_[bucket_offset];
 90     {
 91         MutexLock lock(&(lock_bucket->mu));
 92         auto it = lock_bucket->lock_map.find(path);
 93         if (it == lock_bucket->lock_map.end()) {
 94             entry = new LockEntry();
 95             // hold a ref for lock_map_
 96             entry->ref_.Inc();
 97             lock_bucket->lock_map.insert(std::make_pair(path, entry));
 98         } else {
 99             entry = it->second;
100         }
101         // inc ref_ first to prevent deconstruct
102         entry->ref_.Inc();
103     }
105     if (lock_type == kRead) {
106         // get read lock
107         entry->rw_lock_.ReadLock();
108     } else {
109         // get write lock
110         entry->rw_lock_.WriteLock();
111     }
112 }

 33 void FileLockManager::ReadLock(const std::string& file_path) {
 35     std::vector<std::string> paths;
 36     common::SplitString(file_path, "/", &paths);
 37     // first lock "/"
 38     LockInternal("/", kRead);
 39     std::string cur_path;
 40     for (size_t i = 0; i < paths.size(); i++) {
 41         cur_path += ("/" + paths[i]);
 42         LockInternal(cur_path, kRead);
 43     }
 44 }




 65 void FileLockManager::Unlock(const std::string& file_path) {
 66     /// TODO maybe use NormalizePath is better
 67     std::vector<std::string> paths;
 68     common::SplitString(file_path, "/", &paths);
 69     std::string path;
 70     for (size_t i = 0; i < paths.size(); i++) {
 71         path += ("/" + paths[i]);
 72     }
 74     std::string cur_path = path;
 75     for (size_t i = 0; i < paths.size() ; i++) {
 76         UnlockInternal(cur_path);
 77         cur_path.resize(cur_path.find_last_of('/'));
 78     }
 79     // last unlock "/"
 80     UnlockInternal("/");
 81 }

114 void FileLockManager::UnlockInternal(const std::string& path) {
115     int bucket_offset = GetBucketOffset(path);
116     LockBucket* lock_bucket = locks_[bucket_offset];
118     MutexLock lock(&(lock_bucket->mu));
119     auto it = lock_bucket->lock_map.find(path);
120     assert(it != lock_bucket->lock_map.end());
121     LockEntry* entry = it->second;
122     // release lock
123     entry->rw_lock_.Unlock();
124     if (entry->ref_.Dec() == 1) {
125         // we are the last holder
126         /// TODO maybe don't need to deconstruct immediately
127         delete entry;
128         lock_bucket->lock_map.erase(it);
129     }
130 }





