springboot整合socket源码分享

GoogleVip8 1年前 ⋅ 697 阅读

springboot整合socket源码

  1. ChatClient:模拟客户端
  2. ClientSocket:客户端心跳连接
  3. SocketHandler:socket连接管理
  4. SocketPool:保存客户端key
  5. SocketServer:socket主服务
  6. SocketApplication:启动类

源码地址:码云/牧头/socket

socket.properties

port=8081
keepAlive=true

chatClient测试用的客户端

package com.demo.socket;

import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.net.Socket;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author MT
 * 模拟客户端
 * @date 2019-12-09 13:26
 */
@Slf4j
public class ChatClient {
    public static void main(String[] args) throws IOException {

        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 5; i++) {
//            TimeUnit.SECONDS.sleep(5);
            executorService.submit(() -> {

                // 要连接的服务端IP地址和端口
                String host = "192.168.1.163";
                int port = 8081;
                // 与服务端建立连接
                Socket socket = null;
                try {
                    socket = new Socket(host, port);
                    // 建立连接后获得输出流
                    OutputStream outputStream = socket.getOutputStream();
                    String message = UUID.randomUUID().toString();
                    byte[] messageBytes = message.getBytes("UTF-8");
                    socket.setSendBufferSize(messageBytes.length);
                    socket.getOutputStream().write(messageBytes);
                    // 必需关闭输出流
                    socket.shutdownOutput();
                    // 获取输出流
                    InputStream inputStream = socket.getInputStream();
                    byte[] bytes = new byte[1024];
                    int len;
                    StringBuilder sb = new StringBuilder();
                    while ((len = inputStream.read(bytes)) != -1) {
                        //注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
                        sb.append(new String(bytes, 0, len, "UTF-8"));
                    }
                    System.out.println("get message from server: " + sb);

                    inputStream.close();
                    outputStream.close();
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

clientSocket

package com.demo.socket;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.TimeUnit;

/**
 * @author MT
 * 客户端心跳连接
 * @date 2019-12-09 13:18
 */
@Slf4j
@Data
public class ClientSocket implements Runnable {
    private Socket socket;
    private DataInputStream inputStream;
    private DataOutputStream outputStream;
    private String key;
    private String message;
    private boolean isCloseStream = false;

    public void shutdownStream(){
        try {
            socket.shutdownOutput();
            socket.shutdownInput();
            isCloseStream = true;
        } catch (IOException e) {
            e.printStackTrace();
            isCloseStream = false;
        }
    }

    @Override
    public void run() {
        //每5秒进行一次客户端连接,判断是否需要释放资源
        while (true){
            try {
                TimeUnit.SECONDS.sleep(5);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (SocketHandler.isSocketClosed(this)){
                log.info("客户端已关闭,其Key值为:{}", this.getKey());
                //关闭对应的服务端资源
                SocketHandler.close(this);
                break;
            }
        }
    }
}

SocketHandler

package com.demo.socket;

import lombok.extern.slf4j.Slf4j;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.management.ThreadInfo;
import java.net.Socket;


/**
 * @author MT
 * @date 2019-12-09 13:17
 */
@Slf4j
public class SocketHandler {
    /**
     * 将连接的Socket注册到Socket池中
     *
     * @param socket
     * @return
     */
    public static ClientSocket register(Socket socket) {
        ClientSocket clientSocket = new ClientSocket();
        clientSocket.setSocket(socket);
        try {
            clientSocket.setInputStream(new DataInputStream(socket.getInputStream()));
            clientSocket.setOutputStream(new DataOutputStream(socket.getOutputStream()));

            byte[] bytes = new byte[socket.getSendBufferSize()];
            int len = clientSocket.getInputStream().read(bytes);
            clientSocket.setKey(new String(bytes, 0, len, "UTF-8"));
//            int len = 0;
//            byte[] bytes = new byte[1024];
//            StringBuffer sb = new StringBuffer();
//            while ((len = clientSocket.getInputStream().read(bytes)) != -1) {
//                sb.append(new String(bytes, 0, len, "UTF-8"));
//            }
//            clientSocket.setKey(sb.toString());

            SocketPool.add(clientSocket);
            return clientSocket;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 向指定客户端发送信息
     *
     * @param clientSocket
     * @param message
     */
    public static void sendMessage(ClientSocket clientSocket, String message) {
        try {
            clientSocket.getOutputStream().write(message.getBytes("utf-8"));
            //clientSocket.getOutputStream().writeUTF(message);
        } catch (IOException e) {
            log.error("发送信息异常:{}", e);
            SocketHandler.close(clientSocket);
        }
    }

    /**
     * 获取指定客户端的上传信息
     *
     * @param clientSocket
     * @return
     */
    public static String onMessage(ClientSocket clientSocket) {
        byte[] bytes = new byte[1024];
        try {
            clientSocket.getInputStream().read(bytes);
            String msg = new String(bytes, "utf-8");
            return msg;
        } catch (IOException e) {
            e.printStackTrace();
            SocketHandler.close(clientSocket);
        }
        return null;
    }

    /**
     * 指定Socket资源回收
     *
     * @param clientSocket
     */
    public static void close(ClientSocket clientSocket) {
        log.info("进行资源回收");
        if (clientSocket != null) {
            log.info("开始回收socket相关资源,其Key为{}", clientSocket.getKey());
            SocketPool.remove(clientSocket.getKey());
            Socket socket = clientSocket.getSocket();
            try {
                if (!clientSocket.isCloseStream()){
                    socket.shutdownInput();
                    socket.shutdownOutput();
                }
            } catch (IOException e) {
                log.error("关闭输入输出流异常,{}", e);
            } finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    log.error("关闭socket异常{}", e);
                }
            }
        }
    }


    /**
     * 发送数据包,判断数据连接状态
     *
     * @param clientSocket
     * @return
     */
    public static boolean isSocketClosed(ClientSocket clientSocket) {
        try {
            clientSocket.getSocket().sendUrgentData(1);
            return false;
        } catch (IOException e) {
            return true;
        }
    }
}

SocketPool

package com.demo.socket;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @author MT
 * @date 2019-12-09 13:20
 */
public class SocketPool {
    private static final ConcurrentHashMap<String, ClientSocket> ONLINE_SOCKET_MAP = new ConcurrentHashMap<>();


    public static void add(ClientSocket clientSocket){
        if (clientSocket != null && !clientSocket.getKey().isEmpty())
            ONLINE_SOCKET_MAP.put(clientSocket.getKey(), clientSocket);
    }

    public static void remove(String key){
        if (!key.isEmpty())
            ONLINE_SOCKET_MAP.remove(key);
    }
}

SocketServer

package com.demo.socket;

import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author MT
 * @date 2019-12-09 13:21
 */
@Slf4j
@Component
@Data
@NoArgsConstructor
@PropertySource("classpath:socket.properties")
public class SocketServer {

    @Value("${port}")
    private Integer port;

    @Value("${keepAlive}")
    private boolean keepAlive;

    private boolean started;
    private ServerSocket serverSocket;
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * 测试服务端启动
     *
     * @param args
     */
    public static void main(String[] args) {
        new SocketServer().start(8081);
    }

    public void start() {
        start(null);
    }


    public void start(Integer port) {
        log.info("port: 配置端口{}, 主动指定启动端口{}", this.port, port);
        try {
            serverSocket = new ServerSocket(port == null ? this.port : port);
            started = true;
            log.info("Socket服务已启动,占用端口: {}", serverSocket.getLocalPort());
        } catch (IOException e) {
            log.error("端口冲突,异常信息:{}", e);
            System.exit(0);
        }
        try {
            while (true) {

                Socket socket = serverSocket.accept();
                socket.setKeepAlive(keepAlive);
                socket.setSoTimeout(5000);

                executorService.submit(() -> {

                    ClientSocket register = SocketHandler.register(socket);
                    log.info("客户端已连接,其Key值为:{}", register.getKey());
                    SocketHandler.sendMessage(register, JSON.toJSONString("返回值"));
                    if (register != null) {
                        register.shutdownStream();
                        executorService.submit(register);
                    }
                });
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

SocketApplication:启动类

package com.demo.socket;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class SocketApplication {

    public static void main(String[] args) {
        ApplicationContext applicationContext = SpringApplication.run(SocketApplication.class, args);
        applicationContext.getBean(SocketServer.class).start();
    }

}

练习地址

安卓与服务端使用socket通信 -- 初始demo;

socket实现双向通信;


全部评论: 0

    我有话说: