当前位置:网站首页>NIO简易示例
NIO简易示例
2022-07-26 01:07:00 【局外人Inside】
一、BIO和NIO
Java中常见的IO集合有BIO和NIO两种。
BIO(同步阻塞IO):传统的Java IO编程,其基本的类和接口在java.io包中,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个独立的 Acceptor 线程负责监听客户端的连接,并在while(true)循环中监听客户端发起的请求,如果这个连接不做任何事情会造成不必要的线程开销,适用于连接数目比较小且固定的架构,这种服务方式对服务器资源要求比价高,并且局限于应用中,是JDK1.4以前的唯一选择,程序简单易理解。其原理图如下:
NIO(同步非阻塞IO): 相关类都被放在java.nio包及其子包下,其核心组件是:Selector(选择器)、Channel(通道)、Buffer(缓冲区)。相比BIO面向stream流编程,NIO是面向Buffer缓冲区编程的,更加底层,通过Stream可以获取响应的Channel,channel与buffer结合操作,当buffer未满时线程不用阻塞在此,可以去做其它事情,等待buffer满了再去读取或写入数据,所以NIO是非阻塞的;且相比Stream而言,由于channel是双向的,所以结合buffer可以进行双向操作,而Stream只能单向操作;Selector用于使用单个线程处理多个通道,它循环监听多个客户端连接通道,如果通道中没有数据即客户端没有请求时它可以去处理别的通道或者做其他的事情,如果通道中有数据他就会选择这个通道然后进行处理,这就做到了一个线程处理多个连接。其原理图如下:

二、NIO代码示例
参考netty中的NIO实现,网络编程中,一般存在读、写、连接事件,其中读写事件频率较高,连接事件只有当客户端发起连接时发生。参考这种特性,netty将连接事件和读写事件使用不同的Selector来监听,使服务器的处理效率更高,以下是粗略版的实现:
BossThread:用于监听客户端连接事件
@Slf4j
public class BossThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("bossGroup");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.register(boss, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8002));
//创建固定数量的worker去处理读写事件,并进行worker的初始化
//一版建议设置为服务器的CPU核心数
//bug:Runtime.getRuntime().availableProcessors()如果是部署在docker容器下,因为容器不是物理隔离的,所以该方法会拿到物理机的CPU个数,而不是容器申请的CPU个数
//该问题在jdk10才修复,使用jvm参数UseContainerSupport配置,默认开启
WorkerV2[] workers = new WorkerV2[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new WorkerV2("worker-" + i);
}
AtomicInteger sum = new AtomicInteger();
while (true){
boss.select();
Iterator<SelectionKey> keyIterator = boss.selectedKeys().iterator();
while (keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()){
SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
log.info("connected...{}", socketChannel.getRemoteAddress());
//关联worker的selector
log.info("before register...{}", socketChannel.getRemoteAddress());
//初始化worker并添加注册事件
workers[sum.incrementAndGet()%workers.length].register(socketChannel);
log.info("after register...{}", socketChannel.getRemoteAddress());
}
}
}
}
}Worker:处理读写事件
/**
* 1.0版本使用selector.select(long times)来解决boss线程中socketChannel注册和worker启动中selector阻塞的先后问题,但是会造成资源消耗
* 于是2.0版本将socketChannel注册放在worker初始化中,并使用消息同步队列来实现,当初始化worker时,向队列中提交一个注册sc的任务,在worker线程的
* run方法中监听队列,来执行注册sc的任务
* @create 2022/7/14 6:03 PM
*/
@Slf4j
public class WorkerV2 implements Runnable{
private Thread thread;
public Selector selector;
private String name;
//是否处于启动中
private volatile boolean start = false;
//同步消息队列
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public WorkerV2(String name){
this.name = name;
}
/**
* 初始化线程和selector
*/
public void register(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this, name);
thread.start();
selector = Selector.open();
start = true;
}
//向队列中添加一个任务,但这个任务并没有立即执行
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
//wakeup和select类似于park和unpark,通过信号量实现,不在乎先后顺序
selector.wakeup();
}
/**
* worker专门监测读写事件
*/
@Override
public void run() {
while (true){
try {
selector.select();
//执行
Runnable task = queue.poll();
if (task != null){task.run();}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
log.info("read data...{}", channel.getRemoteAddress());
buffer.flip();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
TestClient:测试客户端
/**
* @create 2022/7/19 2:15 PM
*/
public class TestClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8002));
socketChannel.write(Charset.defaultCharset().encode("hello world"));
System.in.read();
}
}
边栏推荐
- [RTOS training camp] about classes and Q & A
- How jupyter changes the default browser
- Gcdqueue encapsulation
- [secsha concept] original reverse supplement
- Force buckle 25. Turn over the linked list in a group of K
- 【软件开发规范二】《禁止项开发规范》
- REST-assured接口测试框架详解
- LVGL官方+100ASK合力打造的中文输入(拼音输入法)组件,让LVGL支持中文输入!
- 【RTOS训练营】任务调度(续)、任务礼让、调度总结、队列和晚课提问
- 【RTOS训练营】上节回顾、空闲任务、定时器任务、执行顺序、调度策略和晚课提问
猜你喜欢

Nanjie's embarrassment

【RTOS训练营】作业讲解、队列和环形缓冲区、队列——传输数据、队列——同步任务和晚课提问

RHCE之at和crontab命令详解及chrony部署

985高校副教授晒年薪,公积金顶普通人月薪,网友:不愧是在上海

游戏思考17:寻路引擎recast和detour学习二:recast导航网格生成流程及局限性

Ssd7 | embedded friendly target detection network, product landing

【RTOS训练营】上节回顾、空闲任务、定时器任务、执行顺序、调度策略和晚课提问

Microwave oven rectifier diode cl01-12

How does the proxy IP server ensure its information security in the network

Jupyter changes the main interface and imports the dataset
随机推荐
“元气可乐”不是终点,“中国可乐”才是
数据库系统原理与应用教程(053)—— MySQL 查询(十五):字符型函数的用法
Download exclusively | Alibaba cloud maxcompute questions and answers to unlock SaaS mode cloud data warehouse in this electronic manual!
LVGL官方+100ASK合力打造的中文输入(拼音输入法)组件,让LVGL支持中文输入!
编程学习过程中有哪些快速提高编程技巧的方法?
200 yuan a hair dryer, only a week, to achieve 2million?
985高校副教授晒年薪,公积金顶普通人月薪,网友:不愧是在上海
IP地址能精确到哪步?动态IP及静态IP是什么?切换IP最常用的方法
[Jizhong] July 16, 2022 1432. Oil pipeline
更换IP地址常见的4种简单有效的方法
[laser principle and application-4]: internal structure and working principle of laser
【秒杀概念】大小端
[RTOS training camp] review of the previous section, idle tasks, timer tasks, execution sequence, scheduling strategy and evening class questions
【RTOS训练营】晚课学员问题
NLP introduction + practice: Chapter 4: using pytorch to manually realize linear regression
Paged query design of scenarios
Implementation process of adding loading effect to easycvr page
Matlab bitwise and or not
加载dll失败
How jupyter changes the default browser