全网最完整的SocketChannel源码
对象集合
- SocketChannelServer:socket服务端
- SocketChannelClient:socket客户端
- Handler:socket处理器
- Util:等待进度条打印
- Result:返回对象
SocketChannelServer
package com.demo.socketchannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
/**
* @author MT
* @date 2019-12-10 10:08
*/
public class SocketChannelServer {
public static void main(String[] args) throws IOException {
// 创建serversocketchannel 监听8081端口
ServerSocketChannel socketServer = ServerSocketChannel.open();
// 通过serverSocket绑定8081端口
socketServer.socket().bind(new InetSocketAddress(8081));
//设置为非阻塞
socketServer.configureBlocking(false);
Selector selector = Selector.open();
//Channel注册到selector上,必需在非阻塞模式下
socketServer.register(selector, SelectionKey.OP_ACCEPT);
// 创建处理器
Handler handler = new Handler(1024);
while (true) {
if (Util.select(selector)) {
continue;
}
System.out.println("\n处理请求");
// 网上用的迭代器,这里使用foreach跟迭代器是一样的
for (SelectionKey key : selector.selectedKeys()) {
// 如果key是有效的
// 接收到链接请求超时
if (key.isValid() && key.isAcceptable()) {
handler.handleAccept(key);
}
// 读数据
if (key.isValid() && key.isReadable()) {
System.out.println("可读:isReadable");
handler.handleRead(key);
}
// 写数据
if (key.isValid() && key.isWritable()) {
System.out.println("可写:isWritable");
handler.handleWrite(key);
}
// 关闭客户端
// handler.handleClose(key);
// 移除已读取的key
selector.selectedKeys().remove(key);
}
}
}
}
SocketChannelClient
package com.demo.socketchannel;
import com.alibaba.fastjson.JSON;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
* @author MT
* @date 2019-12-10 10:28
*/
public class SocketChannelClient {
private Selector selector;
private SocketChannel socketChannel;
private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
public SocketChannelClient(String hostname, int port) {
try {
// 创建SocketChannel 监听8081端口
socketChannel = SocketChannel.open();
// 设置为非阻塞
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(hostname, port));
selector = Selector.open();
//Channel注册到selector上,必需在非阻塞模式下
socketChannel.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
}
// 发送消息
private void sendMsg(String msg) throws IOException {
// 清空缓冲区
writeBuffer.clear();
// 将要发送的消息提交到缓冲区
writeBuffer.put(msg.getBytes("UTF-8"));
// 翻转缓冲区:该限制设置为当前位置,然后将该位置设置为零。 如果标记被定义,则它被丢弃。
//在通道读取或放置操作的序列之后,调用此方法来准备一系列通道写入或相对获取操作
writeBuffer.flip();
// 如果还有剩余,一直写入
while (writeBuffer.hasRemaining()) {
socketChannel.write(writeBuffer);
}
// 清空缓冲区
writeBuffer.clear();
// 压缩此缓冲区,以防后续写入数据不完整,从缓冲区写入数据之后调用此方法
writeBuffer.compact();
}
// 接收读取消息
private void readMsg() throws IOException {
if (socketChannel.read(readBuffer) == -1) {
System.out.println("没有返回消息");
return;
}
//改写入为读取
readBuffer.flip();
String receivedRequestData = Charset.forName("UTF-8").newDecoder().decode(readBuffer).toString();
readBuffer.clear();
Result returnResult = JSON.parseObject(receivedRequestData, Result.class);
if (returnResult != null) {
System.out.println(returnResult.toString());
} else {
System.out.println("没有返回消息");
}
}
// 关闭socket
private void close() throws IOException {
socketChannel.close();
}
public static void main(String[] args) throws IOException {
SocketChannelClient socketChannelClient = new SocketChannelClient("192.168.1.163", 8081);
// 需要发送的数据
Result result = Result.getInstance();
result.setCode(1);
result.setData("测试");
result.setSuccess(true);
result.setMsg("测试JSON连接");
// 如果连接已完成
if (socketChannelClient.socketChannel.finishConnect()) {
int i = 0;
while(true){
result.setCode(i);
// 将对象转为json字符串
String message = JSON.toJSONString(result);
socketChannelClient.sendMsg(message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
socketChannelClient.readMsg();
i++;
}
}
}
}
Handler
package com.demo.socketchannel;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
* @author MT
* SocketChannel 处理器
* @date 2019-12-10 11:18
*/
public class Handler {
private int bufferSize = 1024;
private String localCharset = "UTF-8";
public Handler() {
}
public Handler(int bufferSize) {
this(bufferSize, null);
}
public Handler(String localCharset) {
this(-1, localCharset);
}
public Handler(int bufferSize, String localCharset) {
if (bufferSize > 0) {
this.bufferSize = bufferSize;
}
if (localCharset != null) {
this.localCharset = localCharset;
}
}
// 接收到链接请求超时
public void handleAccept(SelectionKey key) throws IOException {
// 通过选择器键获取服务器套接字通道,通过accept()方法获取套接字通道连接
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
// 设置套接字通道为非阻塞模式
socketChannel.configureBlocking(false);
// 为套接字通道注册选择器,该选择器为服务器套接字通道的选择器,即选择到该SocketChannel的选择器
// 设置选择器关心请求为读操作,设置数据读取的缓冲器容量为处理器初始化时候的缓冲器容量
socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
}
public void handleRead(SelectionKey key) throws IOException {
// 获取套接字通道
SocketChannel socketChannel = (SocketChannel) key.channel();
// 获取缓冲器并进行重置,selectionKey.attachment()为获取选择器键的附加对象
ByteBuffer buf = (ByteBuffer) key.attachment();
if (socketChannel.read(buf) == -1) {
System.out.println("没有消息内容");
socketChannel.close();
return;
}
// 将缓冲器转换为读状态
buf.flip();
// 将缓冲器中接收到的值按localCharset格式编码保存
String receivedRequestData = Charset.forName(localCharset).newDecoder().decode(buf).toString();
Result result = JSON.parseObject(receivedRequestData, Result.class);
System.out.println("接收到客户端的Resule:" + result.toString());
buf.clear();
//为下一次读取或写入做准备
key.interestOps(SelectionKey.OP_WRITE);
}
public void handleWrite(SelectionKey key) throws IOException {
// 获取套接字通道
SocketChannel socketChannel = (SocketChannel) key.channel();
Result result = Result.getInstance();
result.setCode(1);
result.setData("测试");
result.setSuccess(true);
result.setMsg("谢谢你的连接");
// 将对象转为json字符串
String message = JSON.toJSONString(result);
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.clear();
writeBuffer.put(message.getBytes(localCharset));
writeBuffer.flip();
while (writeBuffer.hasRemaining()) {
socketChannel.write(writeBuffer);
}
writeBuffer.clear();
// 压缩此缓冲区,以防后续写入数据不完整,从缓冲区写入数据之后调用此方法
writeBuffer.compact();
//为下一次读取或写入做准备
key.interestOps(SelectionKey.OP_READ);
}
}
Util
package com.demo.socketchannel;
import java.io.IOException;
import java.nio.channels.Selector;
/**
* @author MT
* @date 2019-12-10 11:25
*/
public class Util {
// 进度条总长度
private static final int barlen = 100;
// 每1000毫秒打印一次
private static final int timeout = 1000;
// 计数器
private static int i = 0;
public static boolean select(Selector selector) {
try {
Util.printSchedule(i, barlen);
if (selector.select(timeout) == 0) {
setIIncreasing();
return true;
}
resetI();
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* i进行范围限定
*/
private static void setIIncreasing() {
i++;
if (i > barlen) {
for (int i = 0; i < barlen + 10; i++) {
// 退格符,就是删除键
// 删除原有进度
System.out.print("\b");
}
i = 0;
}
}
/**
* 重置I为0
*/
private static void resetI() {
i = 0;
}
/**
* 打印进度方法
* 源码地址:http://googlevip8.com/post/112
*
* @param percent 当前进度
* @param barLen 进度条总长度
*/
private static void printSchedule(int percent, int barLen) {
if (percent == 0) return;
// 删除原有进度
for (int i = 0; i < barLen + 10; i++) {
// 退格符,就是删除键
// 删除原有进度
System.out.print("\b");
}
// 总进度/当前进度=进度条百分比
int now = barLen * percent / 100;
// 打印当前进度,░▒表示
// 打印当前进度,>表示
// 打印当前进度,.表示
for (int i = 0; i < now; i++) {
System.out.print(".");
}
// 打印剩余进度,空格表示
for (int i = 0; i < barLen - now; i++) {
System.out.print(" ");
}
// 打印当前进度百分比数字
System.out.print(" " + percent + "秒");
}
}
Result
package com.demo.socketchannel;
import lombok.Data;
import lombok.ToString;
/**
* @author MT
* @date 2019-12-10 11:12
*/
@Data
@ToString
public class Result {
String msg;
Object data;
boolean success;
int code;
private static final Result INSTANCE = new Result();
public static Result getInstance(){
return INSTANCE;
}
}
基于jdk8的NIO的socket教程,百度了两天没有找到好的教程,自己问遍了人写出这么个完整但是不完善的demo
如有高手知晓socket,请加qq群:831363708,一起进步
目前存在问题:
- 客户端如何等待服务端发送消息
- 服务端如何主动发送消息到客户端
此为JDK 原生网络应用程序 API,但是存在一系列问题,主要如下:
1)NIO 的类库和 API 繁杂,使用麻烦:你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
2)需要具备其他的额外技能做铺垫:例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。
3)可靠性能力补齐,开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。
4)JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。
强烈推荐使用netty
注意:本文归作者所有,未经作者允许,不得转载