全网最完整的SocketChannel - demo 源码

GoogleVip8 1年前 ⋅ 725 阅读

全网最完整的SocketChannel源码

源码地址:码云/牧头/MT/SocketChannel

对象集合

  1. SocketChannelServer:socket服务端
  2. SocketChannelClient:socket客户端
  3. Handler:socket处理器
  4. Util:等待进度条打印
  5. Result:返回对象

SocketChannelServer

package com.demo.socketchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;

/**
 * @author MT
 * @date 2019-12-10 10:08
 */
public class SocketChannelServer {
    public static void main(String[] args) throws IOException {
        // 创建serversocketchannel 监听8081端口
        ServerSocketChannel socketServer = ServerSocketChannel.open();
        // 通过serverSocket绑定8081端口
        socketServer.socket().bind(new InetSocketAddress(8081));
        //设置为非阻塞
        socketServer.configureBlocking(false);
        Selector selector = Selector.open();
        //Channel注册到selector上,必需在非阻塞模式下
        socketServer.register(selector, SelectionKey.OP_ACCEPT);

        // 创建处理器
        Handler handler = new Handler(1024);

        while (true) {
            if (Util.select(selector)) {
                continue;
            }
            System.out.println("\n处理请求");

            // 网上用的迭代器,这里使用foreach跟迭代器是一样的
            for (SelectionKey key : selector.selectedKeys()) {
                // 如果key是有效的

                // 接收到链接请求超时
                if (key.isValid() && key.isAcceptable()) {
                    handler.handleAccept(key);
                }
                // 读数据
                if (key.isValid() && key.isReadable()) {
                    System.out.println("可读:isReadable");
                    handler.handleRead(key);
                }
                // 写数据
                if (key.isValid() && key.isWritable()) {
                    System.out.println("可写:isWritable");
                    handler.handleWrite(key);
                }
                // 关闭客户端
//                handler.handleClose(key);
                // 移除已读取的key
                selector.selectedKeys().remove(key);
            }
        }
    }
}

SocketChannelClient

package com.demo.socketchannel;

import com.alibaba.fastjson.JSON;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

/**
 * @author MT
 * @date 2019-12-10 10:28
 */
public class SocketChannelClient {
    private Selector selector;
    private SocketChannel socketChannel;

    private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);

    public SocketChannelClient(String hostname, int port) {
        try {
            // 创建SocketChannel 监听8081端口
            socketChannel = SocketChannel.open();
            // 设置为非阻塞
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress(hostname, port));

            selector = Selector.open();
            //Channel注册到selector上,必需在非阻塞模式下
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 发送消息
    private void sendMsg(String msg) throws IOException {
        // 清空缓冲区
        writeBuffer.clear();
        // 将要发送的消息提交到缓冲区
        writeBuffer.put(msg.getBytes("UTF-8"));
        // 翻转缓冲区:该限制设置为当前位置,然后将该位置设置为零。 如果标记被定义,则它被丢弃。
        //在通道读取或放置操作的序列之后,调用此方法来准备一系列通道写入或相对获取操作
        writeBuffer.flip();
        // 如果还有剩余,一直写入
        while (writeBuffer.hasRemaining()) {
            socketChannel.write(writeBuffer);
        }
        // 清空缓冲区
        writeBuffer.clear();
        // 压缩此缓冲区,以防后续写入数据不完整,从缓冲区写入数据之后调用此方法
        writeBuffer.compact();
    }

    // 接收读取消息
    private void readMsg() throws IOException {
        if (socketChannel.read(readBuffer) == -1) {
            System.out.println("没有返回消息");
            return;
        }
        //改写入为读取
        readBuffer.flip();
        String receivedRequestData = Charset.forName("UTF-8").newDecoder().decode(readBuffer).toString();
        readBuffer.clear();
        Result returnResult = JSON.parseObject(receivedRequestData, Result.class);
        if (returnResult != null) {
            System.out.println(returnResult.toString());
        } else {
            System.out.println("没有返回消息");
        }
    }

    // 关闭socket
    private void close() throws IOException {
        socketChannel.close();
    }

    public static void main(String[] args) throws IOException {
        SocketChannelClient socketChannelClient = new SocketChannelClient("192.168.1.163", 8081);
        // 需要发送的数据
        Result result = Result.getInstance();
        result.setCode(1);
        result.setData("测试");
        result.setSuccess(true);
        result.setMsg("测试JSON连接");

        // 如果连接已完成
        if (socketChannelClient.socketChannel.finishConnect()) {
            int i = 0;
            while(true){

                result.setCode(i);
                // 将对象转为json字符串
                String message = JSON.toJSONString(result);

                socketChannelClient.sendMsg(message);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                socketChannelClient.readMsg();
                i++;
            }
        }
    }
}

Handler

package com.demo.socketchannel;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

/**
 * @author MT
 * SocketChannel 处理器
 * @date 2019-12-10 11:18
 */
public class Handler {

    private int bufferSize = 1024;
    private String localCharset = "UTF-8";

    public Handler() {

    }

    public Handler(int bufferSize) {
        this(bufferSize, null);
    }

    public Handler(String localCharset) {
        this(-1, localCharset);
    }

    public Handler(int bufferSize, String localCharset) {
        if (bufferSize > 0) {
            this.bufferSize = bufferSize;
        }
        if (localCharset != null) {
            this.localCharset = localCharset;
        }
    }

    // 接收到链接请求超时
    public void handleAccept(SelectionKey key) throws IOException {
        // 通过选择器键获取服务器套接字通道,通过accept()方法获取套接字通道连接
        SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
        // 设置套接字通道为非阻塞模式
        socketChannel.configureBlocking(false);
        // 为套接字通道注册选择器,该选择器为服务器套接字通道的选择器,即选择到该SocketChannel的选择器
        // 设置选择器关心请求为读操作,设置数据读取的缓冲器容量为处理器初始化时候的缓冲器容量
        socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
    }


    public void handleRead(SelectionKey key) throws IOException {
        // 获取套接字通道
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 获取缓冲器并进行重置,selectionKey.attachment()为获取选择器键的附加对象
        ByteBuffer buf = (ByteBuffer) key.attachment();
        if (socketChannel.read(buf) == -1) {
            System.out.println("没有消息内容");
            socketChannel.close();
            return;
        }
        // 将缓冲器转换为读状态
        buf.flip();
        // 将缓冲器中接收到的值按localCharset格式编码保存
        String receivedRequestData = Charset.forName(localCharset).newDecoder().decode(buf).toString();
        Result result = JSON.parseObject(receivedRequestData, Result.class);
        System.out.println("接收到客户端的Resule:" + result.toString());
        buf.clear();
        //为下一次读取或写入做准备
        key.interestOps(SelectionKey.OP_WRITE);
    }

    public void handleWrite(SelectionKey key) throws IOException {
        // 获取套接字通道
        SocketChannel socketChannel = (SocketChannel) key.channel();

        Result result = Result.getInstance();
        result.setCode(1);
        result.setData("测试");
        result.setSuccess(true);
        result.setMsg("谢谢你的连接");
        // 将对象转为json字符串
        String message = JSON.toJSONString(result);

        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
        writeBuffer.clear();
        writeBuffer.put(message.getBytes(localCharset));
        writeBuffer.flip();
        while (writeBuffer.hasRemaining()) {
            socketChannel.write(writeBuffer);
        }
        writeBuffer.clear();
        // 压缩此缓冲区,以防后续写入数据不完整,从缓冲区写入数据之后调用此方法
        writeBuffer.compact();
        //为下一次读取或写入做准备
        key.interestOps(SelectionKey.OP_READ);
    }
}

Util

package com.demo.socketchannel;

import java.io.IOException;
import java.nio.channels.Selector;

/**
 * @author MT
 * @date 2019-12-10 11:25
 */
public class Util {

    // 进度条总长度
    private static final int barlen = 100;
    // 每1000毫秒打印一次
    private static final int timeout = 1000;
    // 计数器
    private static int i = 0;


    public static boolean select(Selector selector) {
        try {
            Util.printSchedule(i, barlen);
            if (selector.select(timeout) == 0) {
                setIIncreasing();
                return true;
            }
            resetI();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * i进行范围限定
     */
    private static void setIIncreasing() {
        i++;
        if (i > barlen) {
            for (int i = 0; i < barlen + 10; i++) {
                // 退格符,就是删除键
                // 删除原有进度
                System.out.print("\b");
            }
            i = 0;
        }
    }

    /**
     * 重置I为0
     */
    private static void resetI() {
        i = 0;
    }

    /**
     * 打印进度方法
     * 源码地址:http://googlevip8.com/post/112
     *
     * @param percent 当前进度
     * @param barLen  进度条总长度
     */
    private static void printSchedule(int percent, int barLen) {
        if (percent == 0) return;
        // 删除原有进度
        for (int i = 0; i < barLen + 10; i++) {
            // 退格符,就是删除键
            // 删除原有进度
            System.out.print("\b");
        }
        // 总进度/当前进度=进度条百分比
        int now = barLen * percent / 100;

        // 打印当前进度,░▒表示
        // 打印当前进度,>表示
        // 打印当前进度,.表示
        for (int i = 0; i < now; i++) {
            System.out.print(".");
        }
        // 打印剩余进度,空格表示
        for (int i = 0; i < barLen - now; i++) {
            System.out.print(" ");
        }
        // 打印当前进度百分比数字
        System.out.print("  " + percent + "秒");
    }
}

Result

package com.demo.socketchannel;

import lombok.Data;
import lombok.ToString;

/**
 * @author MT
 * @date 2019-12-10 11:12
 */
@Data
@ToString
public class Result {

        String msg;
        Object data;
        boolean success;
        int code;
        private static final Result INSTANCE = new Result();

        public static Result getInstance(){
            return INSTANCE;
        }
}

基于jdk8的NIO的socket教程,百度了两天没有找到好的教程,自己问遍了人写出这么个完整但是不完善的demo

如有高手知晓socket,请加qq群:831363708,一起进步

目前存在问题:

  1. 客户端如何等待服务端发送消息
  2. 服务端如何主动发送消息到客户端

此为JDK 原生网络应用程序 API,但是存在一系列问题,主要如下:

1)NIO 的类库和 API 繁杂,使用麻烦:你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。

2)需要具备其他的额外技能做铺垫:例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。

3)可靠性能力补齐,开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。

4)JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。

强烈推荐使用netty


全部评论: 0

    我有话说: