使用ABT(The asynchronous backtrack
本文代码点击这里下载。
将4个皇后放入4x4的棋盘中,修改4个皇后的位置,使他们不能“立即”攻击对方。这里我们假设4个皇后被放置在不同的行中,仅能修改4个皇后的列的位置。
假设我们4个皇后的id依次是A1,A2,A3和A4,它们的优先级依次是1,2,3和4,它们的位置依次是(1,1),(2,1),(3,1)和(4,1)。算法仅能修改它们所在的列。最终使它们无法一步攻击彼此。
四皇后问题:1-2循环- 循环1. A1发Ok?信号给A2,A3和A4,A2发Ok?信号给A3和A4,A3发Ok?信号给A4.
- 循环2. A2收到A1的Ok?信号后,发现自己目前的位置与A1有冲突,于是进行重新选了一个位置——(2,3),并将新位置通过Ok?信号发送给A3和A4. A3收到A1和A2的Ok?信号后,发现自己目前的位置与A1和A2有冲突,于是进行重新选了一个位置——(3,4),并将新位置通过Ok?信号发送给A4. A4收到A1,A2和A3的Ok?信号后,发现自己目前的位置与A1,A2和A3有冲突,但是无法找到可行的位置,于是发送Nogood信号给自己的上级——A3,并将A3的位置从自己的agent_view表中抹去,更新了自己的位置——(4,2).
- 循环3. A3收到了A2的Ok?信号,无法找到可行的位置,于是发送Nogood信号给自己的上级——A2,并将A2的位置从自己的agent_view表中抹去,为自己重新分配了位置(虽然与原先一样,因为A2被抹去后该位置不再非法)——(3,4). A3收到了A4的Nogood信号,但是由于A4发来的Nogood信号与自己的agent_view表中的数据不一致(A4在发送此Nogood信号时认为A1,A2和A3还都在第1列,因此对此Nogood信号不作理会。A4收到了Ok?信号,但其位置并不非法,因此不作理会。
- 循环4. A2收到了A3的Nogood信号,为自己重新分配了位置——(2,4),并通过Ok?信号告知A3和A4.
- 循环5. A3收到了A2的Ok?信号,为自己重新分配了位置——(3,2),并通过Ok?信号告知A4. A4收到了A2的Ok?信号,无法找到可行的位置,于是发送Nogood信号给A3,并将A3的位置从自己的agent_view表中抹去,为自己重新分配了位置——(3,4).
如此这般循环,找到一个可行方案。
主函数部分代码如下。
while len(ok_set) != 0 or len(nogood_set) != 0:
for agent in agents:
agent.handle_ok()
agent.handle_nogood()
每次循环中,每个智能体都要处理自身收到的Ok?和Nogood信号,当没有信号在网络中传递时,程序结束。
处理Ok?信号函数的部分代码如下。
def handle_ok(self, it, ok_set, nogood_set):
del_list = []
i = 0
for ok in ok_set:
if ok.it == it - 1 and ok.receiver_id == self.id:
print(self.id, '处理', ok)
self.agent_view[ok.sender_id] = ok.pose
del_list.append(i)
i += 1
for i in range(len(del_list)):
# 由于i是被从小到大添加到del_list中的,因此可以这样做
del ok_set[del_list[i]]
for j in range(i + 1, len(del_list)):
del_list[j] -= 1
self.check_agent_view(it, ok_set, nogood_set, False)
可以看到,程序的实现方法是:收集上一循环中自己作为接收方的Ok?信号,并且将其添加到自己的agent_view中,最后检查自己的值(check_agent_view)。
处理Nogood信号函数的部分代码如下。
def handle_nogood(self, it, ok_set, nogood_set):
del_list = []
i = 0
ignore = False
for nogood in nogood_set:
if nogood.it == it - 1 and nogood.receiver_id == self.id:
del_list.append(i)
ignore = self.ignore_nogood(nogood)
if not ignore:
print(self.id, '处理', nogood)
self.add_nogood(nogood.agent_view)
else:
print(self.id, '忽略', nogood)
i += 1
for i in range(len(del_list)):
# 由于i是被从小到大添加到del_list中的,因此可以这样做
del nogood_set[del_list[i]]
for j in range(i + 1, len(del_list)):
del_list[j] -= 1
# 没有检查是否是neighbor的步骤
self.check_agent_view(it, ok_set, nogood_set, ignore)
与处理Ok?信号的过程比较类似,不同在于,如果Nogood信号中的内容与自己agent_view表中内容不相符,该Nogood信号会被丢弃。
check_agent_view函数的部分代码如下。
def check_agent_view(self, it, ok_set, nogood_set, send_ok):
old_pose = [self.pose.x, self.pose.y]
cps = np.random.permutation(range(1, self.pars['map_width'] + 1)).tolist()
k = 0
is_consistent, inconsistent_id = self.check_consistent()
while not is_consistent and k < len(cps):
self.pose.y = cps[k]
is_consistent, inconsistent_id = self.check_consistent()
k += 1
if not is_consistent:
if self.priority == 1:
print('无解')
sys.exit()
self.backtrack(it, ok_set, nogood_set)
else:
if old_pose != [self.pose.x, self.pose.y] or send_ok:
receivers_id = self.get_ok_receivers(self.priority)
for receiver in receivers_id:
ok_set.append(signals.Ok(it, self.id, receiver, copy.deepcopy(self.pose)))
可以看出,如果自己的位置是非法的,那么智能体应寻找新的位置,并且将新的位置发送给下一级智能体。
回溯(backtrack)函数部分代码如下。
def backtrack(self, it, ok_set, nogood_set):
# 怎样判断nogood已经全部出现是一个问题
# !!! 此处的方法存疑
if len(self.nogood_list) >= self.pars['map_width'] ** self.pose.x:
print('无解')
sys.exit()
receiver_id = self.get_nogood_receiver(self.priority)
nogood_set.append(signals.Nogood(it, self.id, receiver_id, copy.deepcopy(self.agent_view)))
if receiver_id in self.agent_view.keys():
del self.agent_view[receiver_id]
self.check_agent_view(it, ok_set, nogood_set, True)
回溯过程会判断整个算法是否已经无解。如果还无法判断是否无解,那么就将Nogood表中优先级最低的智能体的Nogood去掉,并发送Nogood信号给那个智能体,并重新检查自己的值(check_agent_view)。
请注意下面的代码:
def check_agent_view(self, it, ok_set, nogood_set, send_ok):
check_agent_view
函数中有一个send_ok
参数,如果该参数为True
,则该智能体的位置会被Ok?信号发送到下一级智能体中。如果智能体在check_agent_view过程中发现自己的值与进入该过程前的值不一致,那么它会发送Ok?信号。那么这个参数起什么作用呢?可以看到,在下面两句中,这个参数可能被赋值为True
:
def handle_nogood(self, it, ok_set, nogood_set):
self.check_agent_view(it, ok_set, nogood_set, ignore)
def backtrack(self, it, ok_set, nogood_set):
self.check_agent_view(it, ok_set, nogood_set, True)
先来看看handle_nogood
的情况,我们假设3个智能体A->B->C。在一次循环中,A发送Ok?信号给B和C,C发送Nogood信号给B。再次循环中,B发现自己目前的位置是合法的,而C的Nogood信号被忽略,也就是B没有改变的自己的位置,也没有任何信号发出;C收到Ok?信号后,发现自己目前的位置不合法,于是换了新的位置,但是与当前B位置冲突,但是由于上一次循环中,C在发送Nogood信号前把B的位置抹去了,因此C并不知情。因此,此时出现了一种奇怪的现象:B与C的位置是矛盾的,网络中却没有信号了。为了避免这种情况的发生,如果Nogood信号被忽略了(ignore==True),那么send_ok
位被赋值位True
,即B一定会发送Ok?信号给C。
再来看看backtrack
过程得情况,我们假设有一个智能体收到了Nogood信号,在check_agent_view过程中,它改变了自己的位置,但是仍然无法找到合法的位置,此时进入backtrack过程,此时它的位置与处理Nogood信号前位置不一样(即它已经更改了自己的位置)。接下来,它在backtrack过程中发送了Nogood信号,并且将某一上级智能体的位置抹去,便又进入check_agent_view过程。当它目前的位置是合法的时候(因为某一上级智能体的位置被抹去了),它不会发送Ok?信号给下一级智能体(因为在此次check_agent_view过程中位置没变)。但是,它目前的位置与最开始相比,已经改变了,因此此时需要把send_ok
设置为True
。