• 周三. 6月 29th, 2022

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

JAVA网络编程-非阻塞IO

admin

11月 28, 2021

缓冲区

创建缓冲区
填充与获取
绝对定位
批量操作
数据转换
视图缓冲区
复制缓冲区
分片缓冲区
SocketChannel
连接
非阻塞连接
读取数据
写入数据
ServerSocketChannel
Channels
Socket选项
拆解Selector和SelectionKey
完整的客户端与服务端

缓冲区

在新的IO模型中,不在向输,出流写入数据和从输入流读取数据,而是要从缓冲区中读写数据。像在缓冲流中一样,缓冲区可能就是子节数组。不过,原始实现可以将缓冲区直接与硬件或内存连接,或者使用其他非常高效的实现。

流和缓冲区之间的关键区别在于流是基于子节的,而缓冲区是基于块的流设计为按顺序一个子节接着一个子节的传送数据。出于性能的考虑,也可以传送字节数组。而缓冲区会传送缓冲区中的数据块。可以读写缓冲区的子节之前,这些子节必须已经存储在缓冲区中,而且一次会读/写一个缓冲区的数据。

流和缓冲区之间的第二个区别是,缓冲区支持同一对象的读/写。

除了boolean外,Java的所有基本数据类型都有特定的Buffer子类:ByteBuffer,CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer和DoubleBuffer.每个子类中的方法都有相应类型的返回值和参数列表。例如,DoubleBuffer类有设置和获取double的方法。IntBuffer类有设置和获取int的方法。公共的Buffer基类只提供一些通用方法,这些方法不需要知道缓冲区所包含数据是什么类型。网络程序几乎只会使用BufferBuffer,但程序偶尔也会使用其他类型来取代ByteBuffer.

除了数据列表外,每个缓冲区都记录了4个关键信息。无论缓冲区是何种类型,都有相同的方法来获取和设置这些值

位置(position)缓冲区中将读取或写入的下一个位置。这个位置从0开始,最大值等于缓冲区的大小。可以使用以下两个方法设置和获取:

public final int position();public final Buffer position(int newPosition);

容量(capacity)缓冲区可以保存多少个元素的值。容量值在缓冲区创建时设置,此后不能改变。可以使用以下方法获取。

public final int capacity();jishi

限度(limit)缓冲区中可以访问数据的末尾位置。只要不改变限度,就无法读/写超过这个位置的数据,即使缓冲区有更大的容量也没有用。使用以下方法获取和设置限度。

public final int limit();public final Buffer limit(int newLimit);

标记(mark)缓冲区中客户端指定的索引。通过调用mark()可以标记当前位置,调用reset()可以将position设置到mark()位置。

public final Buffer mark(); public final Buffer reset();

与读取InputStream不同,读取缓冲区实际上不会以任何方式改变缓冲区的数据。只是可能向前或者向后设置位置,从而可以从缓冲区中某个特定位置开始读取。类似的,程序可以调整限度,从而控制要读取的数据的末尾。

公共的Buffer基类还提供了另外几个方法,可以通过这些公共属性的引用来进行操作。

clear()方法将位置position设置为0,并将limit设置为capacity。这么做意味着缓冲区内的数据不会有任何改变,改变的只是两个指针。

public final Buffer clear();

rewind()将位置position设置为0,但不改变limit.

public final Buffer rewind();

flip()方法将limit设置到position,然后将position设置为0.

public final Buffer flip();

remaining()返回当前缓冲区中position和limit之间的元素数,缓冲区可操作剩余数。

public final int remaining();

hasRemaining()如果remaining()返回大于0,则hasRemaining()返回true

public final bookean hasRemaining();

创建缓冲区

public static void main(String[] args) {
        // 创建一个空的缓冲区position位于0,用allocate()创建的 缓冲区基于Java数组实现
        CharBuffer buffer1 = CharBuffer.allocate(1);
        buffer1.put('a');

        // 通过array()方法获取buffer中的数据。
        char[] cs = buffer1.array();
        System.out.println(cs[0]);// 输出为a

        cs[0] = 'b';// 修改数组

        char[] cs2 = buffer1.array();// 再次获取缓冲区内数据
        // 缓冲区内的数据会受数组的修改的影响。
        System.out.println(cs2[0]);// 输出为b
    }
public static void main(String[] args) {
        // 创建一个空的缓冲区position位于0,用allocate()创建的 缓冲区基于Java数组实现
        CharBuffer buffer1 = CharBuffer.allocate(1);
        buffer1.put('a');

        // 通过array()方法获取buffer中的数据。
        char[] cs = buffer1.array();
        System.out.println(cs[0]);//输出为a
        
        //将position设置为0,从而可以继续添加数据,否则会下标越界。其实这是一个覆盖操作。
        buffer1.flip();
        buffer1.put('b');
        
        //更改缓冲区内的数据也是影响到数组。
        System.out.println(cs[0]);//输出为b
    }
public static void main(String[] args) {
        //只有ByteBuffer才支持这个方法。这个方法会直接在以太网卡,核心内存或者其他位置
        //上的缓冲区使用直接内存访问。直接缓冲区比简介缓冲区的效率更高,但是代价
        //也更高。从API的角度看它与allocate()无异。
        Buffer buffer1 = ByteBuffer.allocateDirect(1);
        System.out.println(buffer1.isDirect());//true
        
        Buffer buffer2 = CharBuffer.allocate(1);
        System.out.println(buffer2.isDirect());//false
        
        //使用直接内存创建缓冲区无法获取数组,实际上也并没有数组。
        buffer1.array();
        
    }
public static void main(String[] args) {
        char[] cs = new char[] {'a'};
        //创建已有数据的Buffer
        CharBuffer buffer1 = CharBuffer.wrap(cs);
        //可以添加一个数据,不会报错表示使用wrap创建的Buffer的position是0
        buffer1.append('b');
        //buffer1.append('c');//这将会报错
        
        //修改Buffer会影响到原有的cs
        System.out.println(cs[0]);//输出b
        
        //获取的数组还是构造Buffer的数组
        char[] c = buffer1.array();
        System.out.println(c[0]);//输出同样也是b
        
        //修改Buffer返回的数组
        c[0] = 'c';
        
        //同样会影响构造Buffer的数组
        System.out.println(cs[0]);    
    }

填充与获取

put()和get()可以操作缓冲区的最基本的填充数据和获取数据.两个操作都会移动position,意味着都不能超过limit.否则会抛出异常.

public static void main(String[] args) {
        //创建一个capacity为3的缓冲区
        CharBuffer buffer1 = CharBuffer.allocate(3);
        //每次调用put都会在buffer中的position位置存入元素,position+1
        buffer1.put('0');
        buffer1.put('1');
        buffer1.put('2');
        
        //若position已经超过capacity或limit则抛出异常
        //buffer1.put('3');//java.nio.BufferOverflowException
    }
public static void main(String[] args) {
        //创建一个capacity为3的缓冲区
        CharBuffer buffer1 = CharBuffer.allocate(3);
        buffer1.put('0');
        buffer1.put('1');
        
        //每次调用get会获取buffer里position位置的元素,如果
        //该位置没有元素则会返回一个空字符(不是null)
        //并且position+1
        System.out.println("---"+buffer1.get());//---
        
        //若position已经超过capacity或limit则抛出异常
        //System.out.println("---"+buffer1.get());//java.nio.BufferUnderflowException
    }
public static void main(String[] args) {
        //创建一个capacity为3的缓冲区
        CharBuffer buffer1 = CharBuffer.allocate(3);
        
        buffer1.put('0');
        buffer1.put('1');
        
        //使用flip将缓冲区的position重置
        buffer1.flip();
        
        //使用hasRemaining判断position位置是否受限制。
        while(buffer1.hasRemaining()) {
            System.out.println(buffer1.get());
        }
    }

绝对定位

绝对定位的put()和get()需要指定正确的下标.否则会抛出异常.这两个操作不会更改position

public static void main(String[] args) {
        //创建一个capacity为3的缓冲区
        CharBuffer buffer1 = CharBuffer.allocate(3);
        
        //可以使用绝对定位的方法填充缓冲区,这将不会改变position
        buffer1.put(0, '1');
        buffer1.put(1, '2');
        buffer1.put(2, '3');
        //但如果超过capacity依然会抛出异常
        //buffer1.put(3, '4');//java.lang.IndexOutOfBoundsException
    
    
        //使用绝对定位方法获取元素也不会改变position
        System.out.println(buffer1.get(0));//1
        System.out.println(buffer1.get(1));//2
        System.out.println(buffer1.get(2));//3
        //System.out.println(buffer1.get(3));//java.lang.IndexOutOfBoundsException
        
        //绝对定位添加元素和获取元素后position的位置还是0.
        System.out.println(buffer1.position());//0
    }

批量操作

put()的批量操作一次接收一个数组.如果buffer存不下数组则抛出异常.get()接收一个数组,缓冲区元素不足以填充数组则抛出异常.这两个操作会移动position.

public static void main(String[] args) {
        // 创建一个capacity为3的缓冲区
        CharBuffer buffer1 = CharBuffer.allocate(3);

        // 可以一次填充一个数组的数据
        // buffer1.put(new char[] { 'a', 'b', 'c' });

        // 如果缓冲区容纳不下数组则抛出异常
        // buffer1.put(new char[] {'a','b','c','d'});//java.nio.BufferOverflowException

        // 给定一个数组,从缓冲区的start位置填充length个元素到缓冲区
        buffer1.put(new char[] { 'a', 'b', 'c' }, 0, 2);
        // 当前position的位置是 2 ,表示批量添加也会修改position
        System.out.println(buffer1.position());// 2

    }
public static void main(String[] args) {
        // 创建一个capacity为3的缓冲区
        CharBuffer buffer1 = CharBuffer.allocate(3);
    
            // 可以一次填充一个数组的数据
            buffer1.put(new char[] { 'a', 'b', 'c' });
    
            buffer1.flip();
            
            //将缓冲区元素复制到
//            char b[] = new char[3];
//            buffer1.get(b);
//            System.out.println(Arrays.toString(b));//[a, b, c]
            
            //缓冲区元素比数组多不会异常
//            char b[] = new char[2];
//            buffer1.get(b);
//            System.out.println(Arrays.toString(b));//[a, b]
            
            //缓冲区元素不足以填充数组则抛出异常
//            char b[] = new char[4];
//            buffer1.get(b);//java.nio.BufferUnderflowException
    }
public static void main(String[] args) {
        // 创建一个capacity为3的缓冲区
        CharBuffer buffer1 = CharBuffer.allocate(3);

        // 可以一次填充一个数组的数据
        buffer1.put(new char[] { 'a', 'b', 'c' });

        buffer1.flip();
        
        //char[] c = new char[10];
        //将缓冲区元素填充到数组,从数组的start位置开始填充length个
        //buffer1.get(c, 1, 2);
        //[ , a, b, , , , , , , ]
        //System.out.println(Arrays.toString(c));
        
        //若缓冲区内元素不够填充数组则抛出异常
        char[] c = new char[10];
        buffer1.get(c, 1, 4);//java.nio.BufferUnderflowException
    }

数据转换 

ByteBuffer允许添加除了Boolean类型外的其他7中基本数据类型。添加byte会占用1个capacity,short占用2个子节,int占用4个子节,long占用8个子节,float占用4个子节,double占用8个子节。char占用2个子节。

public static void main(String[] args) {
        ByteBuffer b = ByteBuffer.allocate(100);

        b.put((byte) 1);
        System.out.println(b.position());//1

        b.putShort((short) 1);
        System.out.println(b.position());//3

        b.putInt(1);
        System.out.println(b.position());//7

        b.putLong(1L);
        System.out.println(b.position());//15

        b.putFloat(1F);
        System.out.println(b.position());//19

        b.putDouble(1D);
        System.out.println(b.position());//27

        b.putChar('1');
        System.out.println(b.position());//29
    }

视图缓冲区

SocketChannel获取的只能是ByteBuffer,如果可以确定输出的是某种基本数据类型则可以使用视图缓冲区,ByteBuffer有6中方法可以将ByteBuffer转换为其他的6中数据类型。以下代码为转换为IntBuffer视图。

public static void main(String[] args) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(4);
        IntBuffer intBuffer = byteBuffer.asIntBuffer();
        
        byteBuffer.put((byte)0);
        byteBuffer.put((byte)0);
        byteBuffer.put((byte)0);
        byteBuffer.put((byte)1);
        
        System.out.println(intBuffer.capacity());//1
        System.out.println(intBuffer.get());//1
        System.out.println(intBuffer.position());//1
    }

复制缓冲区

可以从现有的缓冲区上复制一个新的缓冲区,两个缓冲区有相同的capacity但是position和limit是独立的。两个缓冲区会互相影响。

public static void main(String[] args) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(4);
        //复制缓冲区
        ByteBuffer byteBuffer2 = byteBuffer.duplicate();
        
        //原缓冲区添加
        byteBuffer.put((byte)1);
        System.out.println(byteBuffer.position());//position 1
        
        System.out.println(byteBuffer2.position());//copy缓冲区position 0
        System.out.println(byteBuffer2.get());//可以获取 1
        
        byteBuffer2.put(0, (byte)2);//修改copy缓冲区
        
        System.out.println(byteBuffer.get(0));//获取原缓冲区 2
    }

分片缓冲区

分片缓冲区大致和复制缓冲区类似,区别在于分片缓冲区的大小只能是原缓冲区position到limit的长度。

public static void main(String[] args) {
        //原缓冲区
        IntBuffer buffer = IntBuffer.allocate(10);
        //添加5个元素
        buffer.put(1);
        buffer.put(2);
        buffer.put(3);
        buffer.put(4);
        buffer.put(5);
        
        //从原缓冲区上创建分片缓冲区
        IntBuffer buffer2 = buffer.slice();
        //分片缓冲区的capacity为原缓冲区的剩余空间
        System.out.println(buffer2.capacity());//5
        //分片缓冲区的position从0开始
        System.out.println(buffer2.position());//0
        //分片缓冲区添加元素
        buffer2.put(6);
        buffer2.put(7);
        buffer2.put(8);
        buffer2.put(9);
        buffer2.put(10);
        //缓冲区位置不变
        System.out.println(buffer.position());
        //原缓冲区数据被同步
        System.out.println(buffer.get(5));
    }

SocketChannel

SocketChannel类可以读写TCP Socket.数据必须编码到ByteBuffer对象中来完成读/写。每个SocketChannel都和一个对等端Socket对象相关联。

连接

public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(1);
        while(true) {
            Socket socket = server.accept();
            OutputStream out = socket.getOutputStream();
            out.write(new String("abc").getBytes());
            socket.close();
        }
    }//BIO服务端

public static SocketChannel open(SocketAddress remote)throws IOException;这个方法将会立刻打开连接。在建立连接之前或者建立失败之前不会返回,而是一直阻塞。

public static void main(String[] args) throws Exception {
        //使用通道连接
        SocketChannel client = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1));
    }//NIO客户端

非阻塞连接

要设置非阻塞通道则需要使用迂回的方法创建连接,先设置然后调用connect()。非阻塞通道在连接建立之前就会返回,所以在使用连接之前必须调用finishConnect()判断连接是否可用。

public static void main(String[] args) throws Exception {
        //使用通道连接
        SocketChannel channel = SocketChannel.open();
        //设置非阻塞连接
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress("127.0.0.1", 1));
        
        //如果连接打开返回true
        System.out.println(channel.isConnected());
        //如果连接仍在建立,但没有打开则返回true
        System.out.println(channel.isConnectionPending());
        //如何连接可用返回true
        System.out.println(channel.finishConnect());    
    }//NIO客户端

读取数据

从channel中读取数据基本的方法是read()这个方法接收buffer,它与BIO的read()方法类似,如果可以读取数据则一次读取buffer个字节(理想状态下,也可能一次读取的字节不满buffer).当读取到末尾则返回-1.这里关键点在于如何操纵buffer,flip()和clear()两个方法前者会将limit置为当前position,然后把position置为0.后者会把position置为0,limit置为capacity.

read(Buffer[])和read(Buffer,off,len)两个方法则应用于一次读取存入多个缓冲区中.read(Buffer[])会存入缓冲区数组中的每一个buffer中,read(Buffer,off,len)则只会存入off开始len个长度的buffer.(有点咬嘴,看下边demo即可)

public static void main(String[] args) throws Exception {
        //使用通道连接
        SocketChannel channel = SocketChannel.open();
        //设置非阻塞连接
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress("127.0.0.1", 1));
        //保证连接可用
        while (!channel.finishConnect()) {
        }
        //创建缓冲区,所有基于NIO的输入和输出都基于缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(2);
        //本地的输出流
        ByteOutputStream out = new ByteOutputStream();
        //循环读取,将读取的数据存入buffer,如果返回-1则表示流结束.
        //每次read到的最大字节数是buffer的capacity.或者不满capacity
        while (channel.read(buffer)!=-1){
            //每次read到buffer后buffer的position会发生变化.如果要读则先要重置position为
            //0,总共读取limit个字节.flip会将limit置为position,position置为0
            buffer.flip();
            while (buffer.hasRemaining()){//判断是否到达limit
                //如果没有到达limit则不断的从缓冲区get,然后将字节存入本地的输出流
                out.write(buffer.get());
            }
            //buffer读取完毕后,必须要在次重置position为0,limit置为capacity.clear方法可以做到.
          buffer.clear();
        }
        System.out.println(out.toString());
    }//NIO客户端
public static void main(String[] args) throws Exception {
        //使用通道连接
        SocketChannel channel = SocketChannel.open();
        //设置非阻塞连接
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress("127.0.0.1", 1));
        //保证连接可用
        while (!channel.finishConnect()) {
        }
        //创建多个缓冲区
        ByteBuffer buffer1 = ByteBuffer.allocate(3);
        ByteBuffer buffer2 = ByteBuffer.allocate(3);
        ByteBuffer buffer3 = ByteBuffer.allocate(3);
        //将多个缓存区存入数组
        ByteBuffer[] bufArray = new ByteBuffer[3];
        bufArray[0] = buffer1;
        bufArray[1] = buffer2;
        bufArray[2] = buffer3;
        ByteOutputStream out = new ByteOutputStream();
        //read(arr[])会一个一个的向每个缓冲区写入数据.
        while (channel.read(bufArray) != -1) {
            //循环每一个缓冲区,依然要注意flip,clear
            for (int i = 0; i < bufArray.length; i++) {
                bufArray[i].flip();
                while (bufArray[i].hasRemaining()) {
                    out.write(bufArray[i].get());
                }
                bufArray[i].clear();
            }
        }
        System.out.println(out.toString());
    }//NIO客户端
public static void main(String[] args) throws Exception {
        //使用通道连接
        SocketChannel channel = SocketChannel.open();
        //设置非阻塞连接
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress("127.0.0.1", 1));
        //保证连接可用
        while (!channel.finishConnect()) {
        }
        //创建多个缓冲区
        ByteBuffer buffer1 = ByteBuffer.allocate(3);
        ByteBuffer buffer2 = ByteBuffer.allocate(3);
        ByteBuffer buffer3 = ByteBuffer.allocate(3);
        //将多个缓存区存入数组
        ByteBuffer[] bufArray = new ByteBuffer[3];
        bufArray[0] = buffer1;
        bufArray[1] = buffer2;
        bufArray[2] = buffer3;
        ByteOutputStream out = new ByteOutputStream();
        //read(arr[])会一个一个的向每个缓冲区写入数据.
        while (channel.read(bufArray,0,1) != -1) {
            System.out.println(buffer1.position());
            System.out.println(buffer2.position());
            System.out.println(buffer3.position());
            //循环每一个缓冲区,依然要注意flip,clear
            for (int i = 0; i < bufArray.length; i++) {
                bufArray[i].flip();
                while (bufArray[i].hasRemaining()) {
                    out.write(bufArray[i].get());
                }
                bufArray[i].clear();
            }
        }
        System.out.println(out.toString());
    }//NIO客户端

写入数据

向channel中写入数据的方法是write(buffer);它将把从缓冲区的position开始读取数据到limit结束将这些数据写入到channel中.写入完毕后通道要关闭输出流.对于服务端亦是如此.同时也有一次写入多个buffer的方法read(buffer[]).和read(buffer[],off,leng).

public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(1);
        while(true) {
            Socket socket = server.accept();
            OutputStream out = socket.getOutputStream();
            out.write(new String("abcdefg").getBytes());
            //写入数据完毕后一定要手动关闭输出流.否则你的接收方不知道你是否写入完毕,会一直等待.
            socket.shutdownOutput();
            //获取客户端数据
            InputStream in = socket.getInputStream();
            byte[]b= new byte[1024];
            int len = 0;
            //将客户端数据存储到容器中
            ByteOutputStream out2 = new ByteOutputStream();
            while ((len=in.read(b))!=-1){
                out2.write(b,0,len);
            }
            System.out.println(out2.toString());
            socket.close();
        }
    }//BIO服务端
public static void main(String[] args) throws Exception {
        //使用通道连接
        SocketChannel channel = SocketChannel.open();
        //设置非阻塞连接
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress("127.0.0.1", 1));
        //保证连接可用
        while (!channel.finishConnect()) {
        }
        //创建缓冲区,所有基于NIO的输入和输出都基于缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(2);
        //本地的输出流
        ByteOutputStream out = new ByteOutputStream();
        //循环读取,将读取的数据存入buffer,如果返回-1则表示流结束.
        //每次read到的最大字节数是buffer的capacity.或者不满capacity
        while (channel.read(buffer)!=-1){
            //每次read到buffer后buffer的position会发生变化.如果要读则先要重置position为
            //0,总共读取limit个字节.flip会将limit置为position,position置为0
            buffer.flip();
            while (buffer.hasRemaining()){//判断是否到达limit
                //如果没有到达limit则不断的从缓冲区get,然后将字节存入本地的输出流
                out.write(buffer.get());
            }
            //buffer读取完毕后,必须要在次重置position为0,limit置为capacity.clear方法可以做到.
            buffer.clear();
        }
        System.out.println(out.toString());

        //将要发送的数据写入到buffer中
        ByteBuffer buffer1 = ByteBuffer.allocate(10);
        buffer1.put("This NIO".getBytes());
        //数据存入完毕后一定要flip,它会充值limit和position
        buffer1.flip();
        //将buffer数据写入到channel中,写入的从position开始到limit结束
        channel.write(buffer1);
        channel.shutdownOutput();

        channel.close();
    }//NIO客户端
public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(1);
        while(true) {
            Socket socket = server.accept();
            OutputStream out = socket.getOutputStream();
            out.write(new String("abcdefg").getBytes());
            //写入数据完毕后一定要手动关闭输出流.否则你的接收方不知道你是否写入完毕,会一直等待.
            socket.shutdownOutput();
            //获取客户端数据
            InputStream in = socket.getInputStream();
            byte[]b= new byte[1024];
            int len = 0;
            //将客户端数据存储到容器中
            ByteOutputStream out2 = new ByteOutputStream();
            while ((len=in.read(b))!=-1){
                out2.write(b,0,len);
            }
            System.out.println(out2.toString());
            socket.close();
        }
    }//BIO服务端
public static void main(String[] args) throws Exception {
        ServerSocket server = new ServerSocket(1);
        while(true) {
            Socket socket = server.accept();
            OutputStream out = socket.getOutputStream();
            out.write(new String("abcdefg").getBytes());
            //写入数据完毕后一定要手动关闭输出流.否则你的接收方不知道你是否写入完毕,会一直等待.
            socket.shutdownOutput();
            //获取客户端数据
            InputStream in = socket.getInputStream();
            byte[]b= new byte[1024];
            int len = 0;
            //将客户端数据存储到容器中
            ByteOutputStream out2 = new ByteOutputStream();
            while ((len=in.read(b))!=-1){
                out2.write(b,0,len);
            }
            System.out.println(out2.toString());
            socket.close();
        }
    }//BIO服务端

ServerSocketChannel

ServerSocketChannel类只有一个目的:接受入站连接.你无法读取,写入或连接ServerSocketChannel.它支持的唯一操作就是接受一个新的入站连接.这个类的accept()方法最重要,ServerSocketChannel的非阻塞模式需要向Selector注册得到入站连接.在阻塞模式下,它的作用仅仅是无限循环的调用accept()接受新的入站信息.accept接受到入站连接后返回一个SocketChannel它的操作与客户端的操作无异.

public static void main(String[] args) throws Exception{
        ServerSocketChannel server = ServerSocketChannel.open();
        server.bind(new InetSocketAddress(1));
        while (true){
            SocketChannel client = server.accept();
            ByteBuffer buffer = ByteBuffer.allocate(10);
            buffer.put("abcdef".getBytes());
            buffer.flip();
            client.write(buffer);
            client.shutdownOutput();

            ByteBuffer buffer1 = ByteBuffer.allocate(10);
            ByteOutputStream out = new ByteOutputStream();
            while (client.read(buffer1)!=-1){
                buffer1.flip();
                while (buffer1.hasRemaining()){
                    out.write(buffer1.get());
                }
                buffer1.clear();
            }
            System.out.println(out.toString());
            client.close();
        }
    }//NIO服务器
public static void main(String[] args) throws Exception {
        //使用通道连接
        SocketChannel channel = SocketChannel.open();
        //设置非阻塞连接
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress("127.0.0.1", 1));
        //保证连接可用
        while (!channel.finishConnect()) {
        }
        //创建缓冲区,所有基于NIO的输入和输出都基于缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(2);
        //本地的输出流
        ByteOutputStream out = new ByteOutputStream();
        //循环读取,将读取的数据存入buffer,如果返回-1则表示流结束.
        //每次read到的最大字节数是buffer的capacity.或者不满capacity
        while (channel.read(buffer)!=-1){
            //每次read到buffer后buffer的position会发生变化.如果要读则先要重置position为
            //0,总共读取limit个字节.flip会将limit置为position,position置为0
            buffer.flip();
            while (buffer.hasRemaining()){//判断是否到达limit
                //如果没有到达limit则不断的从缓冲区get,然后将字节存入本地的输出流
                out.write(buffer.get());
            }
            //buffer读取完毕后,必须要在次重置position为0,limit置为capacity.clear方法可以做到.
            buffer.clear();
        }
        System.out.println(out.toString());

        //将要发送的数据写入到buffer中
        ByteBuffer buffer1 = ByteBuffer.allocate(10);
        buffer1.put("This NIO".getBytes());
        ByteBuffer buffer2 = ByteBuffer.allocate(10);
        buffer2.put("This NIO".getBytes());
        ByteBuffer buffer3 = ByteBuffer.allocate(10);
        buffer3.put("This NIO".getBytes());

        //数据存入完毕后一定要flip,它会充值limit和position
        buffer1.flip();
        buffer2.flip();
        buffer3.flip();

        ByteBuffer[] bs = new ByteBuffer[3];
        bs[0] = buffer1;
        bs[1] = buffer2;
        bs[2] = buffer3;

        //将buffer数据写入到channel中,写入的从position开始到limit结束
        channel.write(bs,0,2);
        channel.shutdownOutput();

        System.out.println(channel.isOpen());
        channel.close();
        System.out.println(channel.isOpen());
    }//NIO客户端

Channels

Channels是一个简单的工具类,可以将传统的基于IO的流,阅读器,书写器包装在通道中,也可以从通道转换为基于IO的流,书写器,阅读器.总之Channels有一些将NIOAPI转换为BIOAPI的方法,和BIOAPI转换为NIOAPI的方法.

public static void main(String[] args) throws Exception {
        FileInputStream in = new FileInputStream("C:\Users\bykj\Desktop\周报.txt");
        //通过BIO输入流获取Channel
        ReadableByteChannel inChannel = Channels.newChannel(in);
        FileOutputStream out = new FileOutputStream("C:\Users\bykj\Desktop\111.txt");
        //通过BIO输出流获取Channel
        WritableByteChannel outChannel = Channels.newChannel(out);
        //操纵通道复制文件.
        ByteBuffer buffer = ByteBuffer.allocate(10);
        while (inChannel.read(buffer) != -1) {
            buffer.flip();
            outChannel.write(buffer);
            buffer.clear();
        }
        inChannel.close();
        outChannel.close();
        in.close();
        out.close();
    }
public static void main(String[] args) throws Exception {
        FileInputStream inFile = new FileInputStream("C:\Users\bykj\Desktop\周报.txt");
        FileChannel channel = inFile.getChannel();

        FileOutputStream outFile = new FileOutputStream("C:\Users\bykj\Desktop\111.txt");
        FileChannel channel1 = outFile.getChannel();

        ByteBuffer buffer = ByteBuffer.allocate(20);
        while (channel.read(buffer) != -1) {
            buffer.flip();
            channel1.write(buffer);
            buffer.clear();
        }

        channel.close();
        channel1.close();
        inFile.close();
        outFile.close();
    }
public static void main(String[] args) throws Exception {
        FileInputStream inFile = new FileInputStream("C:\Users\bykj\Desktop\周报.txt");
        FileChannel channel = inFile.getChannel();

        FileOutputStream outFile = new FileOutputStream("C:\Users\bykj\Desktop\111.txt");
        FileChannel channel1 = outFile.getChannel();

        InputStream inputStream = Channels.newInputStream(channel);
        OutputStream outputStream = Channels.newOutputStream(channel1);

        byte[] b = new byte[20];
        int len = 0;
        while ((len=inputStream.read(b))!=-1){
            outputStream.write(b,0,len);
        }

        inputStream.close();
        outputStream.close();
        inFile.close();
        outFile.close();
    }
public static void main(String[] args) throws Exception {
        FileInputStream inFile = new FileInputStream("C:\Users\bykj\Desktop\周报.txt");
        FileChannel channel = inFile.getChannel();

        FileOutputStream outFile = new FileOutputStream("C:\Users\bykj\Desktop\111.txt");
        FileChannel channel1 = outFile.getChannel();

        Reader reader = Channels.newReader(channel, "UTF-8");
        Writer writer = Channels.newWriter(channel1, "UTF-8");

        char[] c = new char[20];
        int len = 0;
        while ((len = reader.read(c))!=-1){
            writer.write(c,0,len);
        }
        reader.close();
        writer.close();
        channel.close();
        channel1.close();
    }

Socket选项

在Socket和ServerSocket中可以设置TCP_NODELAY,SO_TIMEOUT等选项.为Socket或ServerSocket设置更多的属性.Channel也支持设置类似的选项.但是NIO的Channel有多个.每种可设置的选项是不同的.通过以下方法可以查询特定的Channel可以设置的选项.

public static void main(String[] args) throws Exception {
        printOptions(SocketChannel.open());
        printOptions(ServerSocketChannel.open());
        printOptions(AsynchronousSocketChannel.open());
        printOptions(AsynchronousServerSocketChannel.open());
        printOptions(DatagramChannel.open());
    }

    private static void printOptions(NetworkChannel channel) throws Exception {
        System.out.println(channel.getClass().getSimpleName());
        Set<SocketOption<?>> socketOptions = channel.supportedOptions();
        for (SocketOption<?> option : socketOptions) {
            System.out.println(option.name());
        }
        System.out.println();
        channel.close();
    }

拆解Selector和SelectionKey

只有设置Channel为非阻塞才可以将Channel注册到Selector里。否则会抛出java.nio.channels.IllegalBlockingModeException异常。

public static void main(String[] args) throws Exception {
        // 创建服务端
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置服务端为非阻塞
        //serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(1));

        // 唯一一个获取Selector的方法
        Selector selector = Selector.open();
        // 向选择器增加通道,不是所有的通道都支持注册到Selector的,但是所有的网络通道都是可以的。
        // 使用 | 定义要注册到通道的操作类型
        serverChannel.register(selector, SelectionKey.OP_ACCEPT, "附件");
    }// NIO服务端

ServerSocketChannel和SocketChannel注册到Selector里的事件不是随便的。ServerSocketChannel只能注册OP_ACCEPT(客户端连接)SocketChannel可以注册三种事件OP_CONNECT(客户端连接服务器,其实在服务端完全用不上这个,只有使用NIO客户端才用的上。)OP_WRITE(写准备就绪)OP_READ(读准备就绪)

public static void main(String[] args) throws Exception {
        // 创建服务端
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置服务端为非阻塞
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(1));

        // 唯一一个获取Selector的方法
        Selector selector = Selector.open();
        // 向选择器增加通道,不是所有的通道都支持注册到Selector的,但是所有的网络通道都是可以的。
        // 使用 | 定义要注册到通道的操作类型
        serverChannel.register(selector, SelectionKey.OP_ACCEPT, "附件");

        while (true) {
            System.out.println("~~~~~~~~~~~~~~~~~~~");
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    client.configureBlocking(false);
                    client.register(selector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE,
                            "附件");
                    System.out.println("注册客户端连接到selector");
                }
            }
        }
    }// NIO服务端

register()的第三个参数用于给准备就绪的事件传递附件参数。在事件就绪后可以通过SelectionKey的attachment()获取该附件。

public static void main(String[] args) throws Exception {
        // 创建服务端
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置服务端为非阻塞
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(1));

        // 唯一一个获取Selector的方法
        Selector selector = Selector.open();
        // 向选择器增加通道,不是所有的通道都支持注册到Selector的,但是所有的网络通道都是可以的。
        // 使用 | 定义要注册到通道的操作类型
        serverChannel.register(selector, SelectionKey.OP_ACCEPT, "附件");

        while (true) {
            System.out.println("~~~~~~~~~~~~~~~~~~~");
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                String s = (String)key.attachment();
                System.out.println(s);
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    client.configureBlocking(false);
                    client.register(selector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE,
                            "附件");
                    System.out.println("注册客户端连接到selector");
                }
            }
        }
    }// NIO服务端

为什么一定要remove()掉key。selectedKeys这个集合是Selector这个选择器的公用集合,也就是说所有的通道注册的事件,只要准备就绪都会在这里出现,如果不删除也不做其他的操作,下次从select()返回后,会再向selectedKeys中添加一个key,在用迭代器遍历就会出现一个通道的同一个事件被注册多次,并且第一次添加的key依然是准备就绪的。在进行key.channel()就会抛出异常。

public static void main(String[] args) throws Exception {
        // 创建服务端
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置服务端为非阻塞
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(1));

        // 唯一一个获取Selector的方法
        Selector selector = Selector.open();
        // 向选择器增加通道,不是所有的通道都支持注册到Selector的,但是所有的网络通道都是可以的。
        // 使用 | 定义要注册到通道的操作类型
        serverChannel.register(selector, SelectionKey.OP_ACCEPT, "附件");

        while (true) {
            System.out.println("~~~~~~~~~~~~~~~~~~~");
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            System.out.println(keys.size());
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //iterator.remove();
                String s = (String)key.attachment();
                System.out.println(s);
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    client.configureBlocking(false);
                    client.register(selector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE,
                            "附件");
                    System.out.println("注册客户端连接到selector");
                }
            }
        }
    }// NIO服务端

cancel()取消key的注册。不使用remove()的方式是key.cancel()这将取消其Selection对象的注册,这样选择器就不会浪费资源再去查询它是否准备就绪。

public static void main(String[] args) throws Exception {
        // 创建服务端
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置服务端为非阻塞
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(1));

        // 唯一一个获取Selector的方法
        Selector selector = Selector.open();
        // 向选择器增加通道,不是所有的通道都支持注册到Selector的,但是所有的网络通道都是可以的。
        // 使用 | 定义要注册到通道的操作类型
        serverChannel.register(selector, SelectionKey.OP_ACCEPT, "附件");

        while (true) {
            System.out.println("~~~~~~~~~~~~~~~~~~~");
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            System.out.println(keys.size());
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //iterator.remove();
                String s = (String) key.attachment();
                System.out.println(s);
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    client.configureBlocking(false);
                    client.register(selector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE,
                            "附件");
                    System.out.println("注册客户端连接到selector");
                    key.cancel();
                }
            }
        }
    }// NIO服务端

但是如果cancel()后再次使用这个key就会抛出java.nio.channels.CancelledKeyException以下情况就是取消key后,程序继续执行到可读,就报错了。

public static void main(String[] args) throws Exception {
        // 创建服务端
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置服务端为非阻塞
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(1));

        // 唯一一个获取Selector的方法
        Selector selector = Selector.open();
        // 向选择器增加通道,不是所有的通道都支持注册到Selector的,但是所有的网络通道都是可以的。
        // 使用 | 定义要注册到通道的操作类型
        serverChannel.register(selector, SelectionKey.OP_ACCEPT, "附件");

        while (true) {
            System.out.println("~~~~~~~~~~~~~~~~~~~");
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            System.out.println(keys.size());
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                String s = (String) key.attachment();
                System.out.println(s);
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    client.configureBlocking(false);
                    client.register(selector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE,
                            "附件");
                    System.out.println("注册客户端连接到selector");
                    key.cancel();
                }
                if (key.isReadable()) {
                    SocketChannel client = (SocketChannel) key.channel();
                    client.write(ByteBuffer.wrap("你好".getBytes()));
                    client.shutdownOutput();
                }

            }
        }
    }// NIO服务端

完整的客户端与服务端

这里特别需要注意的是对于NIO的服务端如果同时有读和写的操作一定要在写之后key.cancel。对于NIO的客户端一定要在读之后key.cancel。因为客户端肯定是先写后读的,服务端是先读后写的。

package com.datang.bingxiang.demo;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringBufferInputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;

public class NioClient {
    public static void main(String[] args) throws Exception {
        // 使用通道连接
        SocketChannel channel = SocketChannel.open();
        // 设置非阻塞连接
        channel.configureBlocking(false);
        
        Selector selector = Selector.open();

        channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        
        channel.connect(new InetSocketAddress("127.0.0.1", 1));

         while (!channel.finishConnect()){

         }

        while (true) {
            System.out.println("!!!!!!!!!!!!!!");
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            System.out.println(keys.size());
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                
                if (key.isWritable()) {// 服务端可写
                    System.out.println("服务端可写");
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.wrap("你好".getBytes());
                    client.write(buffer);
                    client.shutdownOutput();
                }

                if (key.isReadable()) {// 服务端可读
                    System.out.println("服务端可读");
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(2);
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    while (client.read(buffer) != -1) {
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            bos.write(buffer.get());
                        }
                        buffer.clear();
                    }
                    System.out.println(bos.toString());
                    client.shutdownInput();
                    key.cancel();
                }
            }
        }

    }// NIO客户端
}
package com.datang.bingxiang.demo;

import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import org.apache.tomcat.util.buf.ByteChunk.ByteOutputChannel;

public class NioServer {
    public static void main(String[] args) throws Exception {
        // 创建服务端
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置服务端为非阻塞
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(1));

        // 唯一一个获取Selector的方法
        Selector selector = Selector.open();
        // 向选择器增加通道,不是所有的通道都支持注册到Selector的,但是所有的网络通道都是可以的。
        // 使用 | 定义要注册到通道的操作类型
        serverChannel.register(selector, SelectionKey.OP_ACCEPT, "附件");

        while (true) {
            System.out.println("~~~~~~~~~~~~~~~~~~~");
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            System.out.println(keys.size());
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                String s = (String) key.attachment();
                System.out.println(s);
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    client.configureBlocking(false);
                    client.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, "附件");
                    System.out.println("注册客户端连接到selector");
                }

                if (key.isReadable()) {// 客户端可读
                    System.out.println("客户端可读");
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(2);
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    while (client.read(buffer) != -1) {
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            bos.write(buffer.get());
                        }
                        buffer.clear();
                    }
                    client.shutdownInput();
                    System.out.println(bos.toString());
                }

                if (key.isWritable()) {// 客户端可写
                    System.out.println("客户端可写");
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.wrap("你好".getBytes());
                    client.write(buffer);
                    client.shutdownOutput();
                    System.out.println("写入完毕");
                    key.cancel();
                }

            }
        }
    }// NIO服务端
}

发表评论

您的电子邮箱地址不会被公开。