NIO与Socket编程

2020-03-19  本文已影响0人  路过的魔法师

听完不用,怕都给忘了,写一写记一记吧(毕竟教材更像是API的简单罗列

Socket编程

Socket编程其实就是实现服务端和客户端的数据通信,大体上来说总该有以下四个步骤:

  1. 建立连接
  2. 请求连接
  3. 回应数据
  4. 结束连接

缓冲区技术

NIO(non-blocking)实现高性能处理的原理是使用较少的线程来处理更多的任务,NIO技术中的核心要点是缓冲区技术。

Buffer类及其子类的使用

可以看到Buffer类及其子类都是抽象类,不能直接实例化,需要借助提供的静态工场方法wrap()来将数组放入缓冲区,或者直接分配空间。而分配空间的话,又有直接缓冲区和非直接缓冲区之分,看图就知道区别主要在于性能。

// wrap
ByteBuffer bf  = ByteBuffer.wrap(new byte[] {1, 2, 3});
ShortBuffer sf = ShortBuffer.wrap(new short[] {1, 2, 3, 4});
IntBuffer _if = IntBuffer.wrap(new int[] {1, 2, 3, 4, 5});
LongBuffer lf = LongBuffer.wrap(new long[] {1, 2, 3, 4, 5, 6});
FloatBuffer ff = FloatBuffer.wrap(new float[] {1, 2, 3, 4, 5, 6, 7});
DoubleBuffer df = DoubleBuffer.wrap(new double[] {1, 2, 3, 4, 5, 6, 7, 8}, 2, 5);       
CharBuffer cf = CharBuffer.wrap(new char[] {'a', 'b', 'c', 'd'});

// allocate
ByteBuffer bf = ByteBuffer.allocate(233);
ByteBuffer bbf = ByteBuffer.allocateDirect(233);

对于Buffer类而言,最重要的几个properties有如下关系:
0 \leq mark \leq positon \leq limit \leq capacity

A buffer's capacity is the number of elements it contains. The capacity of a buffer is never negative and never changes.

A buffer's limit is the index of the first element that should not be read or written. A buffer's limit is never negative and is never greater than its capacity.

A buffer's position is the index of the next element to be read or written. A buffer's position is never negative and is never greater than its limit.

需要注意的是,limit是第一个不能读取的位置的索引,出于性能考虑,多数编程语言数组索引从0开始,而capacity是缓冲区容量,是一个从1开始的数值(当然它也可以为0),最后的结果是limit数值上和capacity相同,而mark我也把它放到这儿了,mark的用处还是有的,但没有其他三个嫩么明显,引用高洪岩对于mark的解释:“缓冲区中的mark有些类似于探险或爬山时在关键路口设置”路标“,目的是在原路返回时找到回去的路”,什么意思呢,看到下面的方法时就会明白了。

// put()按照插入数据是否给定索引有相对和绝对之分,相对改变position,绝对不改变position
// 啊这,数组越界检查都做了啊
public ByteBuffer put(byte x) {
    hb[ix(nextPutIndex())] = x;             // 没看到对position有改变啊...
    return this;
}
public ByteBuffer put(int i, byte x) {
    hb[ix(checkIndex(i))] = x;
    return this;
}
bf.put((byte) 1);
System.out.println(bf.toString());
bf.put(3, (byte) 2);
System.out.println(bf.toString());


// get()和put()是差不多的,同样有相对和绝对之分,对postion的影响也相同
public byte get() {
    return hb[ix(nextGetIndex())];
}
public byte get(int i) {
    return hb[ix(checkIndex(i))];
}


// 一般来说,flip()用在写后读前,像相对写是会改变position的,不flip()一下,读不到写入的数据(绝对读另当别论
public Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

// reset()作用就是将position修改为mark的值
public Buffer reset() {
    int m = mark;
    if (m < 0)
        throw new InvalidMarkException();
    position = m;
    return this;
}

    
// 一般说来,rewind()用在重新读/写缓冲区之前
// rewind()和flip()的区别在于是否修改limit的值
// 重新读缓冲区可能并不一定当前就读到了最后,如果这时候调用flip(),那么limit被修改为position的值,会丢失信息
public Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

// clear()并不会做清空数据的操作,只是重置了position、limit、mark
// 要写缓冲区前/读完缓冲区后都可以“无脑”clear()一下
public Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

通道技术

在NIO技术中,缓冲区将数据进行打包,而打包好的数据需要使用通道来传输,由于通道的功能是要依赖于操作系统的,所以在jdk中,通道被设计成接口数据类型。

AutoCloseable接口与try()结合可以实现自动关闭。

class TestAutoCloseable implements AutoCloseable {

    @Override
    public void close() throws Exception {
        System.out.println("closed.");
    }
};

public class _2_2 {
    
    public static void main(String[] args) {
        try (AutoCloseable as = new TestAutoCloseable();) {
            System.out.println("opened.");
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }
}

NetWorkChannel接口

NetWorkChannel接口主要作用是使通道与Socket进行关联,使得数据能够传输。

MulticastChannel接口

MulticastChannel接口的存在是为了实现多播,通过将多个主机地址分组,然后向这个组发送数据。

Snipaste_2020-06-13_21-04-43.png

(这一章主要讲了FileChannel,有需求再看吧,感觉是快了

获取网络设备信息

Socket是传输层连接的端点,并非协议,RFC793对socket的描述如下:

To allow for many processes within a single Host to use TCP
communication facilities simultaneously, the TCP provides a set of
addresses or ports within each host. Concatenated with the network
and host addresses from the internet communication layer, this forms
a socket. A pair of sockets uniquely identifies each connection.
That is, a socket may be simultaneously used in multiple
connections.

NetWorkInterface类

/**
 * This class represents a Network Interface made up of a name,
 * and a list of IP addresses assigned to this interface.
 * It is used to identify the local interface on which a multicast group
 * is joined.
 *
 * Interfaces are normally known by names such as "le0".
 *
 * @since 1.4
 */

获取网络接口相关信息,诸如MAC地址、IP地址、启停状态、对多播的支持等等。

public class _3_1 {
    public static void main(String[] args) throws SocketException, UnknownHostException {
        Enumeration<NetworkInterface> netifs = NetworkInterface.getNetworkInterfaces();
        while (netifs.hasMoreElements()) {
            NetworkInterface netif = netifs.nextElement();
            System.out.println("MAC:" + netif.getHardwareAddress() + ", Name:" + netif.getName() + ", DisplayName:" + netif.getDisplayName() + 
                     ", ISUP:" + netif.isUp() + ", MTU:" + netif.getMTU() + ", supportMulticast:" + netif.supportsMulticast());
            
            Enumeration<InetAddress> inetaddrs = netif.getInetAddresses();
            while (inetaddrs.hasMoreElements()) {
                InetAddress inetaddr = inetaddrs.nextElement();
                System.out.println("HostName:" + inetaddr.getHostName() + ", HostAddress:" + inetaddr.getHostAddress());
            }
            
            break;
        }
        
        // 根据主机名获取IP地址
        System.out.println("HostAddress: " + InetAddress.getByName("www.baidu.com").getHostAddress() + 
                ", " + InetAddress.getByName("www.baidu.com").getClass().getName());
        System.out.println("HostAddress: " + InetAddress.getByName("0:0:0:0:0:0:0:1").getHostAddress() +
                ", " + InetAddress.getByName("0:0:0:0:0:0:0:1").getClass().getName());
        
        // 根据主机名获取所有的IP地址
        InetAddress[] baiduips = InetAddress.getAllByName("www.baidu.com");
        System.out.println("baiduips");
        for (InetAddress baiduip : baiduips) {
            System.out.println("HostAddress: " + baiduip.getHostAddress() + ", " + baiduip.getClass().getName());
        }
        
        // 根据主机名或IP地址获得InetAddress对象
        byte[] lip = InetAddress.getByName("hahaha").getAddress();
        System.out.println(InetAddress.getByAddress(lip).getHostName());
    }
}

实现Socket通信

前面提到RFC793对于socket的定义里可以看出来socket是基于TCP/UDP的,TCP有长连接、短连接之分,无连接的UDP则没有长短连接的概念,长连接以空间换时间,短连接以时间换空间。

实现简单的消息发送与接收。

// server
public class _4_1_message_sending_server {
    public static void main(String[] args) throws IOException  {
        ServerSocket server_socket = null;
        Socket socket = null;
        
        try {
            server_socket = new ServerSocket(2233);
            socket = server_socket.accept();

            // 因为readFully()方法是”读满缓冲区“的
            // 所以写数据前一定要注意发送方需要将数据总大小在数据前发送过去
            // 而接收方需要在接受数据前写获取到数据总大小,并以此来建立缓冲区接受数据
            // 宜小不宜大吧,小了多读几次,大了read会阻塞程序
            InputStream is = socket.getInputStream();
            ObjectInputStream ois = new ObjectInputStream(is);
            byte[] ba = new byte[ois.readInt()];
            ois.readFully(ba);
            System.out.println(new String(ba));
            
            OutputStream os = socket.getOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(os);
            oos.writeInt("connected to server.".length());
            oos.write("connected to server.".getBytes());
            oos.flush();
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
             socket.close();
             server_socket.close();
        }
    }
}

// client
public class _4_1_meeesge_sending_client {
    public static void main(String[] args) throws IOException {
        Socket socket = null;
        
        try {
            socket = new Socket("localhost", 2233);
            
            OutputStream os = socket.getOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(os);
            oos.writeInt("connected to client.".length());
            oos.write("connected to client..".getBytes());
            oos.flush();
            
            InputStream is = socket.getInputStream();
            ObjectInputStream ois = new ObjectInputStream(is);
            byte[] ba = new byte[ois.readInt()];
            ois.readFully(ba);
            System.out.println(new String(ba));

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            socket.close();
        }
    }
}

实现图片传输

// server
public class _4_1_picture_sending_server {
    public static void main(String[] args) throws IOException {
        ServerSocket server_socket = null;
        Socket socket = null;
        InputStream is = null;
        FileOutputStream fos = null;
        
        try {
            server_socket = new ServerSocket(8080);
            socket = server_socket.accept();
            
            byte[] ba = new byte[2048]; int len = -1;
            is = socket.getInputStream();
            fos = new FileOutputStream(new File("C:\\Users\\lttzz\\Desktop\\bbeauty.jpg"));
            
            do {
                len = is.read(ba); if (len == -1) break;
                fos.write(ba, 0, len);
            } while (len != -1);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            fos.close();
            is.close();
            socket.close();
            server_socket.close();
        }   
    }
}

// client
public class _4_1_picture_sending_client {
    public static void main(String[] args) throws IOException {
        Socket socket = null;
        FileInputStream fis = null;
        OutputStream os = null;
        try {
            socket = new Socket("localhost", 8080);
            
            byte[] ba = new byte[2048]; int len = -1;
            
            String jpg_path = "C:\\Users\\lttzz\\Desktop\\beauty.jpg";
            fis = new FileInputStream(new File(jpg_path));
            
            os = socket.getOutputStream();
            do {
                len = fis.read(ba); if (len == -1) break;
                os.write(ba, 0, len);
                os.flush();
            } while (len != 1);
            
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            os.close();
            fis.close();
            socket.close();
        }
    }
}

结合多线程

// server
public class _4_1_mulitithread_server {
    public static void main(String[] args) {        
        try {
            ServerSocket server_socket = new ServerSocket(8080);
            int run_tag = 1;
            
            while (run_tag == 1) {
                Socket socket = server_socket.accept();
                
                 _4_1_mulitithread_thread thread = new _4_1_mulitithread_thread(socket);
                 thread.start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
// thread
public class _4_1_mulitithread_thread extends Thread {
    private Socket socket;
    
    public _4_1_mulitithread_thread(Socket socket) {
        super();
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            InputStream is = socket.getInputStream();
            ObjectInputStream ois = new ObjectInputStream(is);
            byte[] ba = new byte[ois.readInt()];
            ois.readFully(ba);
            System.out.println(this.currentThread() + ": "+ new String(ba));
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }   
}

// client
public class _4_1_multithread_client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("localhost", 8080);
            
            OutputStream os = socket.getOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(os);
            oos.writeInt("Multithread test.".length());
            oos.write("Multithread test.".getBytes());
            oos.flush();
            
            os.close();
            oos.close();
            socket.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

提到多线程,那就不得不提线程池了 。

// server
public class _4_1_thread_pool_client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("localhost", 8080);
            
            OutputStream os = socket.getOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(os);
            oos.writeInt("thread pool test.".length());
            oos.write("thread pool test.".getBytes());
            oos.flush();
            
            oos.close();
            os.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }   
    }
}
// thread
public class _4_1_thread_pool_thread implements Runnable {
    private Socket socket = null;
    
    public _4_1_thread_pool_thread(Socket socket) {
        super();
        this.socket = socket;
    }
    
    @Override
    public void run() {
        try {
            InputStream is = socket.getInputStream();
            ObjectInputStream ois = new ObjectInputStream(is);
            byte[] ba = new byte[ois.readInt()];
            ois.readFully(ba);
            System.out.println(new String(ba));
            
            ois.close();
            is.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }       
    }
}

// client
public class _4_1_thread_pool_client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("localhost", 8080);
            
            OutputStream os = socket.getOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(os);
            oos.writeInt("thread pool test.".length());
            oos.write("thread pool test.".getBytes());
            oos.flush();
            
            oos.close();
            os.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }   
    }
}

读写对象类型数据以及I/O流顺序问题

要注意的问题有两点:

  1. 类需要实现Serializable接口。
  2. 如果服务端先获得ObjectInputStream对象,那么客户端就要先获得ObjectOutputStream对象,否则会阻塞掉。
// server
public class _4_1_exchange_object_information_server {
    public static void main(String[] args) {
        try {
            ServerSocket server_socket = new ServerSocket(8080);
            Socket socket = server_socket.accept();
            
            // 2.注意ObjectxxStream获取顺序要和另一端相反
            InputStream is = socket.getInputStream();
            OutputStream os = socket.getOutputStream();
            ObjectInputStream ois = new ObjectInputStream(is);
            ObjectOutputStream oos = new ObjectOutputStream(os);
            
            for (int i = 0; i < 5; ++i) {
                _4_1_exchange_object_information_userinfo userinfo = (_4_1_exchange_object_information_userinfo) ois.readObject();
                System.out.println(userinfo.getId() + ", " + userinfo.getUsername() + ", " + userinfo.getPassword());
                
                _4_1_exchange_object_information_userinfo _userinfo = 
                        new _4_1_exchange_object_information_userinfo(i+1, "username"+(i+1), "password"+(i+1));
                
                oos.writeObject(_userinfo);
                oos.flush();
            }
            
            ois.close();
            oos.close();
            is.close();
            os.close();
            socket.close();
            server_socket.close();
            
        } catch (IOException e) {
            e.printStackTrace();
        }  catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}
// class
// 1.必须要实现Serializable接口
public class _4_1_exchange_object_information_userinfo implements Serializable{
    private long id;
    private String username;
    private String password;
    
    public _4_1_exchange_object_information_userinfo() {
    }
    
    public _4_1_exchange_object_information_userinfo(long id, String username, String password) {
        this.id = id;
        this.username = username;
        this.password = password;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

// client
public class _4_1_exchange_object_information_client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("localhost", 8080);
            
            // 2.// 注意ObjectxxStream获取顺序要和另一端相反
            OutputStream os = socket.getOutputStream();
            InputStream is = socket.getInputStream();
            ObjectOutputStream oos = new ObjectOutputStream(os);
            ObjectInputStream ois = new ObjectInputStream(is);
            
            for (int i = 0; i < 5; ++i) {
                _4_1_exchange_object_information_userinfo _userinfo = 
                        new _4_1_exchange_object_information_userinfo(i+1, "username"+(i+1), "password"+(i+1));
                oos.writeObject(_userinfo);
                
                _4_1_exchange_object_information_userinfo userinfo = (_4_1_exchange_object_information_userinfo) ois.readObject();
                System.out.println(userinfo.getId() + ", " + userinfo.getUsername() + ", " + userinfo.getPassword());
            }
            
            ois.close();
            oos.close();
            is.close();
            os.close();
            socket.close();
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

ServerSocket类

设置超时、最大连接数等等

public class _4_2_server_socket {
    public static void main(String[] args) throws IOException {
        ServerSocket server_socket = new ServerSocket();
        System.out.println(server_socket.getLocalSocketAddress() + ", " + server_socket.getLocalPort());
        
        server_socket.bind(new InetSocketAddress("192.168.1.100", 8080));
        System.out.println(server_socket.getLocalSocketAddress() + ", " + server_socket.getLocalPort());
        
        // 超时设置为5s
        server_socket.setSoTimeout(5000);
        
        // InetSocketAddress类表示IP套接字地址(IP地址/主机名+端口号)
        InetAddress inetaddr = InetAddress.getByName("localhost");
        InetSocketAddress inetsaddr = new InetSocketAddress(inetaddr, 8080);
        ServerSocket _server_socket = new ServerSocket();
        _server_socket.bind(inetsaddr);
        System.out.println(_server_socket.getLocalSocketAddress() + ", " + _server_socket.getLocalPort());
        
        System.out.println(new InetSocketAddress("192.168.1.100", 8800).getHostName());
        System.out.println(new InetSocketAddress("192.168.1.100", 8080).getHostString());
        // 注意上下两例的区别,getHostString()会先判断hostname是否为null,不是直接返回hostname
        // 下例中对换两条语句顺序结果是不同的
        InetSocketAddress _inetaddr = new InetSocketAddress("192.168.1.100", 8080);
        System.out.println(_inetaddr.getHostName());
        System.out.println(_inetaddr.getHostString());
        
        // 两个getAddress()的作用是不同的,第一个返回InetAddress,第二个返回raw IP address
        byte[] ipaddr = inetsaddr.getAddress().getAddress();
        for (int i = 0; i < ipaddr.length; ++i) System.out.print((byte) ipaddr[i] + " ");
    }
}

Socket类

Nagle algorithm窝看明白了(发送方缓存小数据包直到总大小为MSS再发送,会导致延迟增大,但可的一定程度上较少拥塞的可能),可是setTcpNoDelay()似乎没有效果,也可能是我操作不对,留坑。

socket.setTcpNoDelay(true) is not working

// server
public class _4_3_socket_tcp_delay_server {
    public static void main(String[] args) throws IOException {
        ServerSocket server_socket = new ServerSocket();
        server_socket.bind(new InetSocketAddress("192.168.1.100", 8080));
        Socket socket = server_socket.accept();
        
        // getLocalPort()获取本地端口,getPort()获取远程端口
        System.out.println("localport:" + socket.getLocalPort() + ", port:" + socket.getPort());
        
        System.out.println("before:" + socket.getTcpNoDelay());
        socket.setTcpNoDelay(false);
        System.out.println("after:" + socket.getTcpNoDelay());
        
        // OutputStream os = socket.getOutputStream();
//      BufferedWriter bfw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
//      for (int i = 0; i < 100; ++i) {
            //os.write("1".getBytes());
            //os.flush();
//          bfw.write("1");
//          bfw.newLine();
//          bfw.flush();
//      }
        
        ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
        for (int i = 0; i < 10000; ++i) {
            oos.writeInt("1".length());
            oos.write("1".getBytes());
            oos.flush();
        }
        
        socket.close();
        server_socket.close();
    }
}

// client
public class _4_3_socket_tcp_delay_client {
    public static void main(String[] args) throws IOException, InterruptedException {
        Socket socket = new Socket();
        // 不显式bind()会使用随机端口
        socket.bind(new InetSocketAddress("192.168.1.100", 8081));
        socket.connect(new InetSocketAddress("192.168.1.100", 8080));
        // getLocalPort()获取本地端口,getPort()获取远程端口
        System.out.println("localport:" + socket.getLocalPort() + ", port:" + socket.getPort());
        
        // Thread.sleep(100000);
        
        System.out.println("before:" + socket.getTcpNoDelay());
        socket.setTcpNoDelay(true);
        System.out.println("after:" + socket.getTcpNoDelay());
        
        // InputStream is = socket.getInputStream();
//      BufferedReader bfr = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//      while (bfr.readLine() != null) {
            // System.out.println(bfr);
//      }
//      byte[] ba = new byte[100];
//      int len = -1;
//      while ((len = is.read(ba)) != -1) {
//          System.out.println(new String(ba, 0, len));
//      }
        
        Long begin = System.currentTimeMillis();
        ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
        for (int i = 0; i < 10; ++i) {
            byte[] ba = new byte[ois.readInt()];
            ois.readFully(ba);
            System.out.println(new String(ba));
        }
        System.out.println(System.currentTimeMillis() - begin);
        
        socket.close();
    }
}

这一章设计网络知识比较多,唔,先跳过,等复习完计网再补上如果可能的话

UDP

感觉上就没有server与client之分了,只管接受/发送就好,管它对方是谁或者在不在线呢。

// server
public class _4_4_udp_server {
    public static void main(String[] args) throws IOException {
        try {
            DatagramSocket socket = new DatagramSocket(8080);
            byte[] ba = new byte[100];
            
            DatagramPacket packet = new DatagramPacket(ba, 10);
            socket.receive(packet);
            
            System.out.println("length:" + packet.getLength() + ", " + new String(packet.getData(), 0, packet.getLength()));
            
            socket.close();
        } catch (SocketException e) {
            e.printStackTrace();
        }
    }
}

// client
public class _4_4_udp_client {
    public static void main(String[] args) throws IOException {
        try {
            DatagramSocket socket = new DatagramSocket();
            socket.connect(new InetSocketAddress("localhost", 8080));
            
            byte[] ba = "1234567890".getBytes();
            DatagramPacket packet = new DatagramPacket(ba, ba.length);
            socket.send(packet);
            
            socket.close();
        } catch (SocketException e) {
            e.printStackTrace();
        }
    }
}

实现组播

public class _4_4_udp_multicast_server_a {
    public static void main(String[] args) throws IOException {
        MulticastSocket socket = new MulticastSocket(8080);
        socket.joinGroup(InetAddress.getByName("224.0.0.0"));
        
        byte[] ba = new byte[100];
        DatagramPacket packet = new DatagramPacket(ba, ba.length);
        socket.receive(packet);

        System.out.println("server A:" + new String(ba = packet.getData()));
        
        socket.close();
    }
}

public class _4_4_udp_multicast_server_b {
    public static void main(String[] args) throws IOException {
        MulticastSocket socket = new MulticastSocket(8080);
        socket.joinGroup(InetAddress.getByName("224.0.0.0"));
        
        byte[] ba = new byte[100];
        DatagramPacket packet = new DatagramPacket(ba,  ba.length);
        socket.receive(packet);
        
        System.out.println("server B:" + new String(ba = packet.getData()));
    }
}

public class _4_4_udp_multicast_client {
    public static void main(String[] args) throws IOException {
        MulticastSocket socket = new MulticastSocket(8080);
        socket.joinGroup(InetAddress.getByName("224.0.0.0"));
        
        byte[] ba = "udp multicast test...".getBytes();
        DatagramPacket packet = new DatagramPacket(ba, ba.length, InetAddress.getByName("224.0.0.0"), 8080);
        socket.send(packet);
        
        socket.close();
    }
}

选择器的使用

通道+选择器=“令牌轮询”???
前面经常用到的socket都有对应的channel,继承关系如下:

public class _5_7_selector_server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.bind(new InetSocketAddress(InetAddress.getByName("localhost"), 8080));
        ServerSocket server_socket = channel.socket();
        // 通道注册到选择器前,需要先设置成非阻塞模式
        channel.configureBlocking(false);
        
        Selector selector = Selector.open();
        SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT);
        
        System.out.println("selector: " + selector + ", key:" + key);
        System.out.println("use xxChannelkeyFor:" + channel.keyFor(selector));
        
        System.out.println(channel.getOption(StandardSocketOptions.SO_RCVBUF));
        channel.setOption(StandardSocketOptions.SO_RCVBUF, 22333);
        System.out.println(channel.getOption(StandardSocketOptions.SO_RCVBUF));
        
        System.out.println("host:" + ((InetSocketAddress)channel.getLocalAddress()).getHostString() + ", " 
                                  + "port: " + ((InetSocketAddress)channel.getLocalAddress()).getPort());
        
        server_socket.close();
        channel.close();
        selector.close();
    }
}

传输大文件

public class _5_7_big_file_transfer_server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.configureBlocking(false);
        channel.bind(new InetSocketAddress(InetAddress.getByName("localhost"), 8080));
        
        Selector selector = Selector.open();
        channel.register(selector, SelectionKey.OP_ACCEPT);
        
        boolean run_tag = true;
        while (run_tag) {
            selector.select();
            Set<SelectionKey> s = selector.selectedKeys();
            Iterator<SelectionKey> it = s.iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                it.remove();
                
                if (key.isAcceptable()) {
                    SocketChannel socket_channel = channel.accept();
                    socket_channel.configureBlocking(false);
                    socket_channel.register(selector, SelectionKey.OP_WRITE);
                }
                
                if (key.isWritable()) {
                    SocketChannel socket_channel = (SocketChannel) key.channel();
                    FileInputStream file = new FileInputStream("C:\\Users\\lttzz\\Downloads\\Video\\Keep Your Hands Off Eizouken EP01.mp4");
                    
                    FileChannel file_channel = file.getChannel();
                    ByteBuffer bf = ByteBuffer.allocate(524288000);
                    
                    while (file_channel.position() < file_channel.size()) {
                        file_channel.read(bf);
                        bf.flip();
                        while (bf.hasRemaining()) {
                            socket_channel.write(bf);
                        }
                        bf.clear();
                        System.out.println(file_channel.position() + " / " + file_channel.size());
                    }
                    
                    System.out.println("write finished...");
                    socket_channel.close();
                }
            }
        }
        channel.close();
    }
}


public class _5_7_big_file_transfer {
    public static void main(String[] args) throws IOException {
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress(InetAddress.getByName("localhost"), 8080));
        
        Selector selector = Selector.open();
        channel.register(selector, SelectionKey.OP_CONNECT);
        
        boolean run_tag = true;
        while (run_tag) {
            System.out.println("begin selector...");
            
            if (channel.isOpen()) {
                selector.select();
                System.out.println("end selector...");
                
                Set<SelectionKey> s = selector.selectedKeys();
                Iterator<SelectionKey> it = s.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove();
                    
                    if (key.isConnectable()) {
                        while (!channel.finishConnect()) {
                            // ??? 
                        }
                        channel.register(selector, SelectionKey.OP_READ);
                    }
                    
                    if (key.isReadable()) {
                        ByteBuffer bf = ByteBuffer.allocate(51200);
                        int len = channel.read(bf);
                        bf.flip();
                        long cnt = 0;
                        while (len != -1) {
                            cnt += len;
                            System.out.println("cnt:" + cnt + ", len:" + len);
                            len = channel.read(bf);
                            bf.clear();
                        }
                        
                        System.out.println("read finished...");
                        channel.close();
                    }
                }
            } else {
                break;
            }
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读