对抗搜索和博弈 - 终局搜索(算杀)
终局搜索(算杀)
我们在下五子棋的时候,常常会有这样一种思考策略。就是针对现在的局面,我是否可以通过连续的冲四达到杀棋的效果。如果你发现了,你其实就意识到自己已经赢了。
那么我们是否可以在原来枚举所有优秀候选落子对局面算分的同时。再额外开一个线程,计算一下当前的局面是否可以达到杀棋必胜。
我们可以这么思考这个问题,
- 如果我走冲四,对面只有1个应手,就是挡我的冲四。下一轮还是我主导进攻
- 如果我走活三,对面有2个应手,就是挡我的活三左右侧。下一轮还是我主导进攻。
- 活三的应手,对面可以通过冲四来延迟应手。此时我们堵掉对面冲四,依然是我方主导进攻
- 如果我走活三,对面的应手棋子触发了对面的活三或冲四,我被迫应手。对面主导进攻。
- 如果对面已经有活三 或 冲四,我必须应手,对面主导进攻。
那么我们需要做的是不断通过冲四,活三,看看最后是否必胜(构建出1个双冲四,或者活四);如果在进攻的这条线路中,最后我们没有活三或冲四了,则搜索失败。又或者我的进攻时候,主导权易主了,则进攻失败。
所以算法的算法最后会返回一个空的落子,代表没有算杀成功。如果成功,则返回正确的杀棋落子。
基于上述思路我们首先在AI的环节,要找到所有杀棋的候选子(活三或者冲四)
这边我们返回的候选集,需要带上分数,因为我们要区分,这次的下棋,是不是上面的第三种情况;就是由我们主导进攻,但是在我们活三进攻中,对面选择了冲四,所以我们堵掉冲四即可。那么我们把这种棋子的分数标价为负,就知道,虽然是防守,但是可以继续算杀。
我们专门生成一个类,来维护算杀的分数。
public class VCXCachedScoreManager extends CachedScoreManager {
private int[][] aiScore;
private int[][] humanScore;
private final VCXOptimization killOptimization;
public VCXCachedScoreManager(IChessboardAIAlgo chessBoardAIAlgo, int humanColor, VCXOptimization vcxOptimization) {
super(chessBoardAIAlgo);
aiScore = humanColor != Player.BLACK.getId() ? blackScore : whiteScore;
humanScore = humanColor == Player.BLACK.getId() ? blackScore : whiteScore;
killOptimization = vcxOptimization;
}
...
}
找到进攻棋子
除了对面冲四的口子,我们标记为负数外。
我们这边做了一个米字优化,
它的含义是,我们假设下一步的进攻,应该在上一步进攻落子的4个方向上(水平,竖直,正对角,反对角)
这样可以大大加速算杀的速度,原因是候选落子会少很多。
准确率会略微降低,因为有的时候确实我们正确的进攻步骤,不满足每2步之间都呈现这种米字关系
1693041653948.png
public List<Long> findAIKillSteps(int lastMaxPoint) {
List<Long> results = new ArrayList<>();
List<Long> fives = new LinkedList<>();
for (int y = 0; y < size; y++) {
for (int x = 0; x < size; x++) {
if (chessBoardAlgo.getValInBoard(x, y) != 0) continue;
int pos = convertToIdx(y, x, size);
if (aiScore[y][x] >= Score.FIVE.value) {
return List.of(encode(aiScore[y][x], pos));
} else if (humanScore[y][x] >= Score.FIVE.value) {
fives.add(encode(-humanScore[y][x], pos));
} else {
if (lastMaxPoint == -1
|| y == getY(lastMaxPoint, size)
|| x == getX(lastMaxPoint, size)
|| Math.abs(y - getY(lastMaxPoint, size)) == Math.abs(x - getX(lastMaxPoint, size))) {
// 连续进攻
if (aiScore[y][x] >= Score.THREE.value)
results.add(encode(aiScore[y][x], pos));
} else if (humanScore[y][x] >= Score.FOUR.value && aiScore[y][x] >= Score.THREE.value) {
// 连守带攻
results.add(encode(aiScore[y][x], pos));
} else if (killOptimization.matchKillSteps(aiScore[y][x])) {
results.add(encode(aiScore[y][x], pos));
}
}
}
}
if (!fives.isEmpty()) return fives;
results.sort((a, b)-> Integer.compare(score(b), score(a)));
return results;
}
另外上面代码第二个FOR循环,是另一个特例,虽然我们不在米子上,但是这步棋我们挡住了对面的活三同时,构建出自己的一个活三,也是值得考虑的。
为什么呢,因为对方在应手的时候,可能出现了自己的活三;那么这个时候如果我们没有冲四了,只能要去挡对面的活三;那么攻守易主,其实等于算杀失败。但是如果存在一步棋,可以防守的同时结合进攻,就可以继续算杀。
最后一个IF,是我们设计了一个枚举类,去平衡,算杀速度和算杀准确率的一个选项,我自己在使用的时候,基本都是用FAST的。
public enum VCXOptimization {
FAST(2){
@Override
public boolean matchKillSteps(int aiScore) {
return false;
}
}, MEDIUM(4) {
@Override
public boolean matchKillSteps(int aiScore) {
return aiScore >= Score.BLOCKED_FOUR.value;
}
}, SLOW(8) {
@Override
public boolean matchKillSteps(int aiScore) {
return aiScore >= Score.THREE.value;
}
};
int factor;
VCXOptimization(int factor) {
this.factor = factor;
}
public abstract boolean matchKillSteps(int aiScore);
}
好了下面我们来写,防守的应手选项的代码。
不考虑已经赢了可以退出的选项,其实选择无非就是,自己冲四,挡对面冲四,挡对面活三
public List<Long> findHumanDefendSteps() {
List<Long> fives = new ArrayList<>();
LinkedList<Long> fours = new LinkedList<>();
List<Long> blockedFours = new ArrayList<>();
for (int y = 0; y < size; y++) {
for (int x = 0; x < size; x++) {
if (chessBoardAlgo.getValInBoard(x, y) != 0) continue;
int pos = convertToIdx(y, x, size), hs = humanScore[y][x], as = aiScore[y][x];
if (hs >= Score.FIVE.value) {
return List.of(encode(-hs, pos));
}
if (as >= Score.FIVE.value) {
fives.add(encode(as, pos));
} else if (hs >= Score.FOUR.value) {
fours.addFirst(encode(-hs, pos));
} else if (as >= Score.FOUR.value) {
fours.add(encode(as, pos));
} else if (hs >= Score.BLOCKED_FOUR.value) {
fours.add(encode(-hs, pos));
} else if (as >= Score.BLOCKED_FOUR.value) {
blockedFours.add(encode(as, pos));
}
}
}
// 挡对面的冲四
if (!fives.isEmpty()) return fives;
// 形成自己的活四,挡对面的活三, 形成自己的冲四
if (!fours.isEmpty()) {
// 有时对面的杀棋,是活三的一个冲四位而非活四位
fours.addAll(blockedFours);
}
return fours;
}
好了,有了候选落子,我们就开始写算杀的算法逻辑。
基本规则就是,进攻方枚举每一种进攻,只要有其中一种可以必胜,自己返回必胜。防守方枚举每一种防守手段,只要有一种可以防下来(攻守易主,或者对面没有额外的活三,冲四进攻手段),那么就算防守成功。
// lastMaxPoint, 为了优化性能, 方便剪枝
boolean aiWin(int role, int depth, int lastMaxPoint) {
if (depth <= 0) return false;
List<Long> candidates = vcxCachedScoreManager.findAIKillSteps(lastMaxPoint);
if (!candidates.isEmpty() && score(candidates.get(0)) >= Score.FOUR.value) {
if (depth == firstDepth)
nextPoint = pos(candidates.get(0));
return true;
}
if (candidates.isEmpty()) return false;
int maxPoint = -1;
for (long p : candidates) {
int pos = pos(p), score = score(p);
int y = getY(pos), x = getX(pos);
addPiece(y, x, pos,true);
if (score > -Score.FIVE.value) {
maxPoint = pos;
}
boolean humanLose = humanLose(Player.enemyColor(role), depth - 1, maxPoint);
removePiece(y, x, pos, true);
if (humanLose) {
if (depth == firstDepth)
nextPoint = pos;
return true;
}
}
return false;
}
private boolean humanLose(int role, int depth, int lastMaxPoint) {
// 超过回合数,代表防守成功
if (depth <= 0) return false;
List<Long> candidates = vcxCachedScoreManager.findHumanDefendSteps();
// 如果发现对面没有进攻手段(活三,冲四,活四),则代表防守成功
if (candidates.isEmpty()) return false;
// 如果对面能成五,发现自己有成五;
// 如果对面不能成五,发现自己有活四;
if (-1 * score(candidates.get(0)) >= Score.FOUR.value)
return false;
for (long p : candidates) {
int pos = pos(p);
int y = getY(pos), x = getX(pos);
addPiece(y, x, pos, false);
boolean aiWin = aiWin(Player.enemyColor(role), depth - 1, lastMaxPoint);
removePiece(y, x, pos, false);
if (!aiWin) return false;
}
return true;
}
综上我们的算法单独部分算是完成了,接下来,我们需要优雅的把他集成进原来minmax 算法里,并行计算。
我这边选择了一个装饰者模式,因为模块批次独立,可以并行计算,我们可以通过装饰的方式,把算杀运用到任何一种AI算法中。
比如我们的主算法可以是MINMAX搜索,算杀可以装饰它。主算法也可以是蒙特卡洛树搜索,算杀也可以装饰它,一起并行计算。
最后的结果是,如果算杀找到解,则用算杀解。不然就用主算法的解。
那么为了实现装饰者模式,我们先用装饰者类去实现下接口。
public abstract class AIAlgoDecorator implements IAIAlgo {
protected IAIAlgo decoratedAIAlgo;
@Getter
protected IChessboardAIAlgo chessboardAlgo;
@Getter
protected int humanColor;
@Getter
protected IScoreManager scoreManager;
public AIAlgoDecorator(IAIAlgo iaiAlgo){
this.decoratedAIAlgo = iaiAlgo;
chessboardAlgo = iaiAlgo.getChessboardAlgo();
humanColor = iaiAlgo.getHumanColor();
scoreManager = iaiAlgo.getScoreManager();
}
public void setPieceCallBack(int y, int x, int player, boolean isAI) {
decoratedAIAlgo.setPieceCallBack(y, x, player, isAI);
}
public boolean isWinning() {
if (decoratedAIAlgo instanceof AIAlgoDecorator) {
return ((AIAlgoDecorator) decoratedAIAlgo).isWinning();
}
return false;
}
protected Position runInParallel(Callable<Position> finalResult, WinningAlgoTask... potentialWin) {
int received = 0, taskCount = 1 + potentialWin.length;
// important, since interrupt will not clear the queue, if it is a class level object, the interrupted task will be take() next time
CompletionService<Position> completionService = new ExecutorCompletionService<>(ThreadPoolContext.threadPool);
Future<Position> finalRes = completionService.submit(finalResult);
Map<Future<Position>, Consumer<Position>> task2Callback = new HashMap<>();
task2Callback.put(finalRes, DO_NOTHING);
for (WinningAlgoTask i : potentialWin) {
task2Callback.put(completionService.submit(i.winningTask), i.winningCallback);
}
Position end = Position.EMPTY;
try {
//System.out.println(getClass().getSimpleName() + " taskCount: " + taskCount);
while (received < taskCount) {
Future<Position> future = completionService.take();
if (future.isCancelled()) {
//System.out.println(getClass().getSimpleName() + " cancelled: " + future);
received++;
continue;
}
Position result = future.get();
task2Callback.get(future).accept(result);
received++;
if (result != Position.EMPTY && result.winning) {
//System.out.println(getClass().getSimpleName() + " winning: " + result);
end = result;
break;
}
}
if (end == Position.EMPTY) {
end = finalRes.get();
}
} catch (InterruptedException e) {
// ignore
} catch (Exception e) {
System.err.println(getClass().getSimpleName() + " result error");
e.printStackTrace();
}
task2Callback.keySet().forEach(i -> i.cancel(true));
return end;
}
private static Consumer<Position> DO_NOTHING = pos -> {};
class WinningAlgoTask {
Callable<Position> winningTask;
Consumer<Position> winningCallback;
public WinningAlgoTask(Callable<Position> winningTask, Consumer<Position> winningCallback) {
this.winningTask = winningTask;
this.winningCallback = winningCallback;
}
}
}
然后我们实现一个算杀的装饰者实现。如果一旦算杀成功之后,之后所有的落子都可以交给算杀模块。而不需要去调主模块了。
public class VCXDecorator extends AIAlgoDecorator {
public VCX mustWin = null;
private int vcxDepth;
public VCXDecorator(IAIAlgo decoratedAIAlgo, int vcxDepth) {
super(decoratedAIAlgo);
this.vcxDepth = vcxDepth;
}
@Override
public Position aiFindPos() {
if (mustWin != null) {
Position pos = mustWin.aiFindPos();
if (pos == Position.EMPTY) {
System.out.println("vcx has bug");
chessboardAlgo.generateStepsCode();
mustWin = null;
scoreManager.initScore();
return decoratedAIAlgo.aiFindPos();
}
return pos;
}
VCX vcx = new VCX(chessboardAlgo.clone(), humanColor, vcxDepth);
Position res = runInParallel(() -> decoratedAIAlgo.aiFindPos(),
new WinningAlgoTask(() -> vcx.aiFindPos(),
pos -> {
if (pos != Position.EMPTY) {
mustWin = new VCX(chessboardAlgo, humanColor, vcxDepth - 2);
System.out.println("算杀必胜");
} else {
System.out.println("算杀失败");
}
}));
System.out.println("vcx calculation finished");
return res;
}
@Override
public void setPieceCallBack(int y, int x, int player, boolean isAI) {
if (mustWin != null) {
mustWin.setPieceCallBack(y,x,player, isAI);
}
decoratedAIAlgo.setPieceCallBack(y, x, player, isAI);
}
@Override
public boolean isWinning() {
return mustWin != null || super.isWinning();
}
}
最后我们在配置AI算法的类中,注册进这个算杀的装饰者。
大概写法如下:
new AIPlayStrategy(new VCXDecorator(new ThreadSafeMinMaxAIAlgo(chessboardAlgo, enemyColor, 7, NegamaxVCXEnhancedContext.DISABLE), 21), uiCallback);
上面我们用到了一个线程安全的minmax algo,原因是我们开了多线程,且我们为了提高交互速度,只要算杀有解,我们可以直接返回算杀解。这时即使我们的装饰者给MINMAX线程发送了CANCEL请求,但是它依然有可能还在维护它自己的分数状态和棋盘状态。这个时候UI上对手已经可以下棋了。那么就会造成对棋盘类的RACE CONDITION。
所以我们需要一个线程安全的算杀类。他其实就是在原来的算杀类上包了一层,做1个COPY ON WRITE。同时我们要确保我们在COPY的时候,没有其他MINAMX的线程还在写这个棋盘类。
否则的话,我们复制出一个错误的棋盘状态,造成后面算法不对
public class ThreadSafeMinMaxAIAlgo extends MinMaxAIAlgo {
private IChessboardAIAlgo originalChessboardAlgo;
private IScoreManager originalScoreManager;
private Semaphore semaphore = new Semaphore(1);
public ThreadSafeMinMaxAIAlgo(IChessboardAIAlgo chessBoardAlgo, int enemyColor) {
this(chessBoardAlgo, enemyColor, 9, NegamaxVCXEnhancedContext.DISABLE);
}
public ThreadSafeMinMaxAIAlgo(IChessboardAIAlgo chessBoardAlgo, int enemyColor, int depth, NegamaxVCXEnhancedContext context) {
super(chessBoardAlgo.clone(), enemyColor, depth, context);
originalChessboardAlgo = chessBoardAlgo;
originalScoreManager = new CachedScoreManager(chessBoardAlgo);
}
@Override
public IScoreManager getScoreManager() {
return originalScoreManager;
}
@Override
public IChessboardAIAlgo getChessboardAlgo() {
return originalChessboardAlgo;
}
@Override
public void setPieceCallBack(int y, int x, int player, boolean isAI) {
originalScoreManager.updateScore(y, x);
super.setPieceCallBack(y, x, player, isAI);
}
@Override
public Position aiFindPos() {
try {
cloneStatefulComponent();
Position res = super.aiFindPos();
semaphore.release();
return res;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Position.EMPTY;
}
private void cloneStatefulComponent() throws InterruptedException {
semaphore.acquire();
chessboardAlgo = originalChessboardAlgo.clone();
scoreManager = new CachedScoreManager(chessboardAlgo);
}
}
总结
这个AI实现之后,当你看到AI算杀必胜的时候,你会发现其实你根本不知道自己会怎么输。但是AI已经枚举自己的杀棋,以及的你所有应手,知道你必输了。所以在残局的时候,AI优势极大。他可以在极短的时间往后遍历20多步,判断出你无论怎么走都是必输的情况。
这就是算法的魅力。