Yuchuan Weng.Miko Tech-Blog.

Java并发编程笔记(九)NIO与AIO2

2017/12/14 Share

读完了再通知我 AIO

AIO即为Asynchronized。异步IO, NIO只是newIO不是异步的,只是同步可以设置为非阻塞模式 ,真正异步的是AIO。AIO将Future模式运用的十分广泛,它不是等待IO准备好后再通知线程。而是等待IO操作完成后再通知线程,实现异步IO。

AIO来实现服务器

异步IO需要使用异步通道AsynchronousServerSocketChannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package thread.test.concurrent.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* 异步非阻塞IO
* AIO服务器
* @author 圈圈
* @version 1.0
*/
public class AIOServer {
public static final int port = 8888;
public static final String addr="127.0.0.1";
//构建AIO核心对象
private AsynchronousServerSocketChannel aioChannel;
//构造方法
public AIOServer (){
try {
//初始化AIO对象 绑定IP和端口
aioChannel=AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(addr, port));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

//下面代码为启动服务器的一些执行措施
public void start() throws Exception{
System.out.println("Server start on "+addr+":"+port);
//注册时间完成和完成后的事件处理器
/**
* 该方法的签名为
* public abstract <A> void accept(A attachment,
CompletionHandler<AsynchronousSocketChannel,? super A> handler);
会立即返回
第一个参数为任意类型的附件,作用是让当前线程和后续的回调方法可以共享信息
第二个参数是 CompltionHandler 接口
这个接口有2个方法
void completed(V result,Aattachment); 异步操作成功时回调
void failed(Throwable exc,A attachment); 异步操作失败时回调
accept方法实际上做了2件事:
1、告诉系统可以开始监听端口
2、注册CompleHandler实例,告诉系统一旦有客户端来连接 返回对应的成功或者失败方法。
所以accept方法不会阻塞
*/
aioChannel.accept(null,new CompletionHandler<AsynchronousSocketChannel, Object>() {
//构造用于存放数据的ByteBuffer
final ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println(Thread.currentThread().getName());
Future<Integer> writeResult = null;
try {
buffer.clear();
//read方法读取客户的数据 这里的read也是异步的不会阻塞 所以它不会等待读完了再返回,
//我们这里设置时间强行让它等待100秒
result.read(buffer).get(100, TimeUnit.SECONDS);
//读取客户端发来的信息
System.out.println("Client :"+new String(buffer.array()).trim());
//读模式变成写模式
buffer.flip();
//写数据给客户端 这里也是异步的立即返回
writeResult = result.write(ByteBuffer.wrap(new String("hello Client").getBytes()));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
try {
//接受下一个连接请求
aioChannel.accept(null, this);
//利用future模式的get()方式进行等待 完全写完了再关闭
writeResult.get();
//关闭当前链接
result.close();
} catch (Exception e) {
System.out.println(e.toString());
}
}

}

@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("Faild :" +exc);
}
});
}

//主线程方法
public static void main(String[] args) throws Exception {
new AIOServer().start();
//由于上述方法为异步方法 如果主线程退出了上述方法也就退出了 所以需要让主线程驻守执行
while(true){
Thread.sleep(100);
}
}
}

AIO Echo客户端实现

在服务端的实现中。我们使用Future.get()方法将异步调用转为了一个同步等待。在客户端的实现里,我们将全部使用异步回调的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package thread.test.concurrent.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

/**
* AIO客户端
* @author 圈圈
* @version 1.0
*/
public class AIOClient {

public static void main(String[] args) throws Exception {
try {
//构建客户端socketChannel对象
final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
client.connect(new InetSocketAddress("127.0.0.1", 8888), null, new CompletionHandler<Void, Object>() {

//同理返回 连接成功和失败回调函数 所有操作均是完全异步的执行很快不会等待
@Override
public void completed(Void result, Object attachment) {
// TODO Auto-generated method stub
//往服务端写数据也会有回调方法
client.write(ByteBuffer.wrap(new String("Hello Server !").getBytes()), null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
//构建用于读取服务端的bytebuff
ByteBuffer buff = ByteBuffer.allocate(1024);
//从服务端读取写回的数据 arg2位置不要为null
client.read(buff, buff, new CompletionHandler<Integer, ByteBuffer>() {

@Override
public void completed(Integer result, ByteBuffer buff) {
buff.flip();
try {
//读取客户端发来的信息
System.out.println("Server :"+new String(buff.array()).trim());
//关闭客户端 异步操作很快
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
// TODO Auto-generated method stub

}
});
}

@Override
public void failed(Throwable exc, Object attachment) {
}
});
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

//由于主线程马上休息 这里等待上述处理全部完成即可
Thread.sleep(5000);

}
}

CATALOG
  1. 1. 读完了再通知我 AIO
  2. 2. AIO来实现服务器
  3. 3. AIO Echo客户端实现