2020-02-09-Socket全双工通信

2020-02-09  本文已影响0人  耿望

参考了网上的代码,实现两台客户端通过服务器实现全双工通信。
客户端输入 connect to:+id 来选择连接到那一台机器。
同时可以通过update id: 来更改本身id。

服务端

package server;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;

public class Server {
    
    private static final int DEFAULT_PORT = 54321;
    
    private ServerSocket serverSocket = null;
    private HashMap<Integer, Processer> socketMap = null;
    
    public Server() {
        try {
            serverSocket = new ServerSocket(DEFAULT_PORT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        socketMap = new HashMap<Integer, Processer>();
    }
    
    public void start() {
        try {
            while(true) {
                Socket socket = serverSocket.accept();
                int id = socket.getPort();
                String host = socket.getInetAddress().getHostAddress();
                System.out.println("connected "+ host + " " + id);
                Processer processer = new Processer(socket, id);
                processer.start();
                socketMap.put(id, processer);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        
    }
    
    public static void main(String[] args) {
        new Server().start();
    }

    private void sendMessage(int id, String msg) {
        Processer processer = socketMap.get(id);
        if (processer == null) {
            return;
        }
        processer.writeMessage(msg);
    }
    
    private void setId(int oldId, int newId) {
        if (socketMap.containsKey(oldId)) {
            Processer processer = socketMap.get(oldId);
            processer.updateId(newId);
            socketMap.remove(oldId);
            socketMap.put(newId, processer);
        }
    }
    
    private class Processer extends Thread {
        
        private Socket socket = null;
        private Writer writer = null;
        private Reader reader = null;
        private int id = -1;
        
        public Processer(Socket socket, int id) {
            this.id = id;
            this.socket = socket;
            writer = new Writer();
            reader = new Reader();
        }
        
        @Override
        public void run() {
            reader.start();
            writer.start();
            try {
                reader.join();
                writer.join();
                System.out.println("Cient closed, exit.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        private void writeMessage(String message) {
            writer.writeMessage(message);
        }
        
        private void updateId(int newId) {
            this.id = newId;
        }
    
        private class Reader extends Thread {

            private BufferedReader bufferedReader = null;
            
            public Reader() {
                try {
                    bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            
            @Override
            public void run() {
                String message = "";
                int targetId = -1;
                try {
                    while (!socket.isClosed()
                            && message != null
                            && !"exit".equals(message)) {
                        message = bufferedReader.readLine();
                        if (message == null || "".equals(message)) {
                            continue;
                        }
                        if (message.contains("connect to:")) {
                            String[] data = message.split(":");
                            targetId = Integer.parseInt(data[1]);
                        } else if (message.contains("update id:")) {
                            String[] data = message.split(":");
                            int newId = Integer.parseInt(data[1]);
                            setId(id, newId);
                        } else {
                            if (targetId > 0) {
                                sendMessage(targetId, message);
                            }
                        }
                    }
                } catch (IOException e) {
                    System.out.println("Cient closed, Reader exit.");
                } finally {
                    if (bufferedReader != null) {
                        try {
                            bufferedReader.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    
        private class Writer extends Thread {

            private PrintWriter printWriter = null;
            private volatile ArrayList<String> messageList = null;
            
            public Writer() {
                messageList = new ArrayList<String>();
                try {
                    printWriter = new PrintWriter(socket.getOutputStream());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            
            public void writeMessage(String message) {
                if (message == null || "".equals(message)) {
                    return;
                }
                synchronized (messageList) {
                    messageList.add(message);
                    messageList.notify();
                }
            }
            
            @Override
            public void run() {
                String message = "";
                while (!socket.isClosed() && message != null) {
                    synchronized (messageList) {
                        while (messageList.isEmpty()) {
                            try {
                                messageList.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                                break;
                            }
                        }
                        message = messageList.get(0);
                        messageList.remove(0);
                    }
                    if (message == null || "".equals(message)) {
                        System.out.println("Can not send empty message!");
                    } else {
                        printWriter.println(message);
                        printWriter.flush();
                        System.out.println("write " + message);
                    }
                }
                if (printWriter != null) {
                    printWriter.close();
                }
            }
        }
    }
}

客户端

package client;

import java.io.IOException;
import java.net.Socket;

public class Client {
    
    private static final int DEFAULT_PORT = 54321;
    private static final String DEFAULT_IP = "193.112.193.132";
    
    private Socket socket = null;
    private Reader reader = null;
    private Writer writer = null;
    
    public Client() {
        try {
            socket = new Socket(DEFAULT_IP, DEFAULT_PORT);
            System.out.println("connetct " + socket.getLocalPort());
            reader = new Reader(socket);
            writer = new Writer(socket);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public void start() {
        reader.start();
        writer.start();
        try {
            reader.join();
            writer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        new Client().start();
    }
}

为了避免线程阻塞,读写分别在两个子线程实现。

Reader

package client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class Reader extends Thread {
    
    private Socket socket = null;
    private BufferedReader bufferedReader = null;
    
    public Reader(Socket socket) {
        this.socket = socket;
        try {
            bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void run() {
        String readLine = "";
        try {
            while (!socket.isClosed()
                    && readLine != null
                    && !"exit".equals(readLine)) {
                readLine = bufferedReader.readLine();
                System.out.println(readLine);
            }
            System.out.println("Server closed, Reader exit.");
        } catch (IOException e) {
            System.out.println("Server closed, Reader exit.");
        } finally {
            try {
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (socket != null) {
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Writer

package client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Writer extends Thread {
    
    private Socket socket = null;
    private PrintWriter printWriter = null;
    private BufferedReader bufferedReader = null;
    
    public Writer(Socket socket) {
        super("ClientWriterThread");
        this.socket = socket;
        bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        try {
            printWriter = new PrintWriter(socket.getOutputStream());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void run() {
        String readLine = "";
        try {
            while (!socket.isClosed()
                    && readLine != null
                    && !"exit".equals(readLine)) {
                readLine = bufferedReader.readLine();
                if ("".equals(readLine)) {
                    System.out.println("Can not send empty message!");
                } else {
                    printWriter.println(readLine);
                    printWriter.flush();
                }
            }
            System.out.println("Client closed, Writer exit.");
        } catch (IOException e) {
            System.out.println("Server closed, Writer exit.");
        } finally {
            try {
                if (printWriter != null) {
                    printWriter.close();
                }
                if (bufferedReader != null) {
                    bufferedReader.close();
                }
                if (socket != null) {
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

上一篇 下一篇

猜你喜欢

热点阅读