Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)

news/2024/9/18 2:52:33

单线程Reactor

package org.example.utils.echo.single;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class EchoServerReactor implements Runnable{
    Selector selector;
    ServerSocketChannel serverSocketChannel;
    EchoServerReactor() throws IOException {
        //Reactor初始化
        selector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open();

        InetSocketAddress address =
                new InetSocketAddress("localhost",
                        8848);

        //非阻塞
        serverSocketChannel.configureBlocking(false);


        //分步处理,第一步,接收accept事件
        SelectionKey sk =
                serverSocketChannel.register(selector,0,new AcceptorHandler());

        // SelectionKey.OP_ACCEPT
        serverSocketChannel.socket().bind(address);
        System.out.println("服务端已经开始监听:"+address);


        sk.interestOps(SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
       try {
           while (!Thread.interrupted()){
               selector.select();
               Set<SelectionKey>  selected=selector.selectedKeys();
               Iterator<SelectionKey> it=selected.iterator();
               while (it.hasNext()){
                   SelectionKey sk=it.next();
                   dispatch(sk);
               }
               selected.clear();
            }
       } catch (IOException e) {
           throw new RuntimeException(e);
       }
    }

    private void dispatch(SelectionKey sk) {
        Runnable handler=(Runnable) sk.attachment();
        if (handler!=null){
            handler.run();
        }
    }

    class AcceptorHandler implements Runnable{


        @Override
        public void run() {
            try {
                SocketChannel channel=serverSocketChannel.accept();
                if (channel!=null)
                    new EchoHandler(selector,channel);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

    }

    public static void main(String[] args) throws IOException {
        new Thread(new EchoServerReactor()).start();
    }
}
package org.example.utils.echo.single;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class EchoHandler implements Runnable{
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
    static final int RECIEVING=0,SENDING=1;
    int state=RECIEVING;

    EchoHandler(Selector selector,SocketChannel c) throws IOException {
        channel=c;
        c.configureBlocking(false);
        sk=channel.register(selector,0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
    @Override
    public void run() {
        try {
            if (state==SENDING){
                channel.write(byteBuffer);
                byteBuffer.clear();
                sk.interestOps(SelectionKey.OP_READ);
                state=RECIEVING;
            }else if (state==RECIEVING){
                int length=0;
                while ((length=channel.read(byteBuffer))>0)
                {
                    System.out.println(new String(byteBuffer.array(),0,length));
                }
                byteBuffer.flip();
                sk.interestOps(SelectionKey.OP_WRITE);
                state=SENDING;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

结果:

原理无非就是:

多线程,无非就是搞多个Reactor   ,   一个专门接受accept  ,  一个专门dispatch ,  再搞一个多线程池处理handle

这里面最主要的就是

handle类,sk.attach(this);把对象传回reactor

参考文献:

java高并发核心编程. 卷1,NIO、Netty、Redis、ZooKeeper  (尼恩)


http://lihuaxi.xjx100.cn/news/1863219.html

相关文章

数据库系统原理与实践 笔记 #11

文章目录 数据库系统原理与实践 笔记 #11事务管理和并发控制与恢复事务概念转账的例子 事务ACID特性ACID特性事务原子性和持久性事务隔离性调度SQL中的事务定义 可串行化事务的简化视图冲突的指令冲突可串行化 可恢复性级联回滚无级联调度 数据库系统原理与实践 笔记 #11 事务…

Docker容器(一)概述

一、虚拟化概述 1.1引⼊虚拟化技术的必要性 服务器只有5%的时间是在⼯作的&#xff1b;在其它时间服务器都处于“休眠”状态. 虚拟化前 每台主机⼀个操作系统; 软硬件紧密结合; 在同⼀个主机上运⾏多个应⽤程序通常会遭遇冲突; 系统的资源利⽤率低; 硬件成本⾼昂⽽且不够灵活…

医院信息化专业人员必备医院业务运作及管理流程知识(详细)

业务流程是一家医院运作的基础,医院所有业务都需要流程加以驱动。熟知医院各项业务,了解医院管理流程,有利于医院工作人员更好地投入自身岗位,提高工作效率。本文整理了常见医院业务运作及管理流程,仅供参考! 【门诊业务】 一、门诊业务的特点: 1.接诊病人多,就诊时…

C++:智能指针[重点!]

目录 一、关于智能指针 1、引入智能指针 2、RAII 二、详述智能指针 auto_ptr unique_ptr shared_tr 循环引用 weak_ptr 定制删除器 三、关于内存泄漏 一、关于智能指针 1、引入智能指针 首先引入一个例子&#xff1a; 在Test函数中&#xff0c;new了两个对象p1p2&a…

正点原子linux应用编程——提高篇4

MP157开发板支持音频&#xff0c;板上搭载了音频编解码芯片CS42L51&#xff0c;支持播放以及录音功能。 ALSA概述 ALSA是Advanced Linux Sound Architecture(高级的Linux声音体系)的缩写&#xff0c;目前已经成为了 linux 下的主流音频体系架构&#xff0c;提供了音频和MIDI的…

智能变压器监控系统

智能变压器监控系统是一种先进的物联网技术和智能设备&#xff0c;能够实现对变压器的实时监测和管理&#xff0c;提高变压器的运行效率和可靠性&#xff0c;为用户提供及时、准确的变压器运行状态信息和故障预警。 力安科技A30变压器云控终端是一款集变压器温控仪、变压器运行…

分享80个菜单导航JS特效,总有一款适合您

分享80个菜单导航JS特效&#xff0c;总有一款适合您 80个菜单导航JS特效下载链接&#xff1a;https://pan.baidu.com/s/1NgNc759Kg1of_8vR7kaj6A?pwd6666 提取码&#xff1a;6666 Python采集代码下载链接&#xff1a;采集代码.zip - 蓝奏云 学习知识费力气&#xff0c;…

Go to do list

go 语言中怎么实现分布式系统&#xff1f; 在Go语言中实现分布式系统需要考虑以下几个方面&#xff1a; 通信协议&#xff1a;在分布式系统中&#xff0c;各个节点需要通过网络进行通信。Go语言提供了丰富的网络编程库&#xff0c;如net/http、net/rpc等&#xff0c;可以方便…