springboot整合socket源码
- ChatClient:模拟客户端
- ClientSocket:客户端心跳连接
- SocketHandler:socket连接管理
- SocketPool:保存客户端key
- SocketServer:socket主服务
- SocketApplication:启动类
源码地址:码云/牧头/socket
socket.properties
port=8081
keepAlive=true
chatClient测试用的客户端
package com.demo.socket;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.net.Socket;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author MT
* 模拟客户端
* @date 2019-12-09 13:26
*/
@Slf4j
public class ChatClient {
public static void main(String[] args) throws IOException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 5; i++) {
// TimeUnit.SECONDS.sleep(5);
executorService.submit(() -> {
// 要连接的服务端IP地址和端口
String host = "192.168.1.163";
int port = 8081;
// 与服务端建立连接
Socket socket = null;
try {
socket = new Socket(host, port);
// 建立连接后获得输出流
OutputStream outputStream = socket.getOutputStream();
String message = UUID.randomUUID().toString();
byte[] messageBytes = message.getBytes("UTF-8");
socket.setSendBufferSize(messageBytes.length);
socket.getOutputStream().write(messageBytes);
// 必需关闭输出流
socket.shutdownOutput();
// 获取输出流
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
int len;
StringBuilder sb = new StringBuilder();
while ((len = inputStream.read(bytes)) != -1) {
//注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
sb.append(new String(bytes, 0, len, "UTF-8"));
}
System.out.println("get message from server: " + sb);
inputStream.close();
outputStream.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
clientSocket
package com.demo.socket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
/**
* @author MT
* 客户端心跳连接
* @date 2019-12-09 13:18
*/
@Slf4j
@Data
public class ClientSocket implements Runnable {
private Socket socket;
private DataInputStream inputStream;
private DataOutputStream outputStream;
private String key;
private String message;
private boolean isCloseStream = false;
public void shutdownStream(){
try {
socket.shutdownOutput();
socket.shutdownInput();
isCloseStream = true;
} catch (IOException e) {
e.printStackTrace();
isCloseStream = false;
}
}
@Override
public void run() {
//每5秒进行一次客户端连接,判断是否需要释放资源
while (true){
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (SocketHandler.isSocketClosed(this)){
log.info("客户端已关闭,其Key值为:{}", this.getKey());
//关闭对应的服务端资源
SocketHandler.close(this);
break;
}
}
}
}
SocketHandler
package com.demo.socket;
import lombok.extern.slf4j.Slf4j;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.management.ThreadInfo;
import java.net.Socket;
/**
* @author MT
* @date 2019-12-09 13:17
*/
@Slf4j
public class SocketHandler {
/**
* 将连接的Socket注册到Socket池中
*
* @param socket
* @return
*/
public static ClientSocket register(Socket socket) {
ClientSocket clientSocket = new ClientSocket();
clientSocket.setSocket(socket);
try {
clientSocket.setInputStream(new DataInputStream(socket.getInputStream()));
clientSocket.setOutputStream(new DataOutputStream(socket.getOutputStream()));
byte[] bytes = new byte[socket.getSendBufferSize()];
int len = clientSocket.getInputStream().read(bytes);
clientSocket.setKey(new String(bytes, 0, len, "UTF-8"));
// int len = 0;
// byte[] bytes = new byte[1024];
// StringBuffer sb = new StringBuffer();
// while ((len = clientSocket.getInputStream().read(bytes)) != -1) {
// sb.append(new String(bytes, 0, len, "UTF-8"));
// }
// clientSocket.setKey(sb.toString());
SocketPool.add(clientSocket);
return clientSocket;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 向指定客户端发送信息
*
* @param clientSocket
* @param message
*/
public static void sendMessage(ClientSocket clientSocket, String message) {
try {
clientSocket.getOutputStream().write(message.getBytes("utf-8"));
//clientSocket.getOutputStream().writeUTF(message);
} catch (IOException e) {
log.error("发送信息异常:{}", e);
SocketHandler.close(clientSocket);
}
}
/**
* 获取指定客户端的上传信息
*
* @param clientSocket
* @return
*/
public static String onMessage(ClientSocket clientSocket) {
byte[] bytes = new byte[1024];
try {
clientSocket.getInputStream().read(bytes);
String msg = new String(bytes, "utf-8");
return msg;
} catch (IOException e) {
e.printStackTrace();
SocketHandler.close(clientSocket);
}
return null;
}
/**
* 指定Socket资源回收
*
* @param clientSocket
*/
public static void close(ClientSocket clientSocket) {
log.info("进行资源回收");
if (clientSocket != null) {
log.info("开始回收socket相关资源,其Key为{}", clientSocket.getKey());
SocketPool.remove(clientSocket.getKey());
Socket socket = clientSocket.getSocket();
try {
if (!clientSocket.isCloseStream()){
socket.shutdownInput();
socket.shutdownOutput();
}
} catch (IOException e) {
log.error("关闭输入输出流异常,{}", e);
} finally {
try {
socket.close();
} catch (IOException e) {
log.error("关闭socket异常{}", e);
}
}
}
}
/**
* 发送数据包,判断数据连接状态
*
* @param clientSocket
* @return
*/
public static boolean isSocketClosed(ClientSocket clientSocket) {
try {
clientSocket.getSocket().sendUrgentData(1);
return false;
} catch (IOException e) {
return true;
}
}
}
SocketPool
package com.demo.socket;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author MT
* @date 2019-12-09 13:20
*/
public class SocketPool {
private static final ConcurrentHashMap<String, ClientSocket> ONLINE_SOCKET_MAP = new ConcurrentHashMap<>();
public static void add(ClientSocket clientSocket){
if (clientSocket != null && !clientSocket.getKey().isEmpty())
ONLINE_SOCKET_MAP.put(clientSocket.getKey(), clientSocket);
}
public static void remove(String key){
if (!key.isEmpty())
ONLINE_SOCKET_MAP.remove(key);
}
}
SocketServer
package com.demo.socket;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author MT
* @date 2019-12-09 13:21
*/
@Slf4j
@Component
@Data
@NoArgsConstructor
@PropertySource("classpath:socket.properties")
public class SocketServer {
@Value("${port}")
private Integer port;
@Value("${keepAlive}")
private boolean keepAlive;
private boolean started;
private ServerSocket serverSocket;
private ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* 测试服务端启动
*
* @param args
*/
public static void main(String[] args) {
new SocketServer().start(8081);
}
public void start() {
start(null);
}
public void start(Integer port) {
log.info("port: 配置端口{}, 主动指定启动端口{}", this.port, port);
try {
serverSocket = new ServerSocket(port == null ? this.port : port);
started = true;
log.info("Socket服务已启动,占用端口: {}", serverSocket.getLocalPort());
} catch (IOException e) {
log.error("端口冲突,异常信息:{}", e);
System.exit(0);
}
try {
while (true) {
Socket socket = serverSocket.accept();
socket.setKeepAlive(keepAlive);
socket.setSoTimeout(5000);
executorService.submit(() -> {
ClientSocket register = SocketHandler.register(socket);
log.info("客户端已连接,其Key值为:{}", register.getKey());
SocketHandler.sendMessage(register, JSON.toJSONString("返回值"));
if (register != null) {
register.shutdownStream();
executorService.submit(register);
}
});
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
SocketApplication:启动类
package com.demo.socket;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class SocketApplication {
public static void main(String[] args) {
ApplicationContext applicationContext = SpringApplication.run(SocketApplication.class, args);
applicationContext.getBean(SocketServer.class).start();
}
}
练习地址
注意:本文归作者所有,未经作者允许,不得转载