首页   >   代码编程

Java Socket实现心跳机制的代码案例详解

在上一篇文章中,教大家利用socket写了一个简单的聊天室,只有消息的发送接收以及系统广播,并没有实现心跳,聊天室中的人下线了之后,其他人收不到通知,在文章末尾处,我给出了几个实现心跳的思路,今天也来动手亲自实现一个(上篇文章中的一和三两种思路)。

实现的功能:

1、客户端和服务器之间保持心跳;

2、客户端下线之后,服务器能够感知;

3、客户端下线之后,服务器发送系统广播,通知聊天室中的其他人;

4、服务器宕机之后,客户端能够感知,并退出聊天室;

5、客户端主动退出,服务器发送系统广播;

实现的思路:

1、服务器和客户端都增加一个线程,专门负责心跳的时间记录;

2、客户端在心跳和主动退出时,服务器需要给出回执消息,用来监测服务器的健康情况,以及确保消息的不丢失;

客户端下线服务端感知,效果如下:

Java Socket实现心跳机制的代码案例详解

客户端下线,聊天室系统播报,效果如下:

Java Socket实现心跳机制的代码案例详解

主动下线,输入“exit”字符,效果如下:

Java Socket实现心跳机制的代码案例详解

此时服务器感知到客户端的主动下线,效果如下:

Java Socket实现心跳机制的代码案例详解

服务器宕机,客户端感知,效果如下:

Java Socket实现心跳机制的代码案例详解

服务器代码:

package com.wolffy.socket.tcp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by Felix on 2019/3/26.
 */
public class Server {

    /**
     * 客户端集合
     */
    private static List<Socket> clients = new ArrayList<>();

    /**
     * 客户端最近的一次心跳时间集合
     */
    private static Map<Socket, Date> heartbeatMap = new HashMap<>();

    /**
     * 心跳超时时间,超过了就认为client下线了
     */
    private static final long TIMEOUT = 10 * 1000;

    private void start() {
        try {
            // 开启服务,设置指定端口
            ServerSocket server = new ServerSocket(5555);
            System.out.println("服务开启,等待客户端连接中...");
            // 循环监听
            while (true) {
                // 等待客户端进行连接
                Socket client = server.accept();
                // 将客户端添加到集合
                clients.add(client);
                // 连接上来,立即添加一次心跳时间,否则会导致MessageListener开启不了
                heartbeatMap.put(client, new Date());

                System.out.println("客户端[" + client.getRemoteSocketAddress() + "]连接成功,当前在线客户端" + clients.size() + "个");

                // 每一个客户端开启一个线程处理消息
                new MessageListener(client).start();
                // 每一个客户端开启一个线程监测心跳
                new HeartbeatListener(client).start();
            }
        } catch (IOException e) {
            // log
        }
    }

    /**
     * 发送消息
     *
     * @param type   消息类型(0、系统消息;1、客户端消息)
     * @param msg    消息内容
     * @param client 客户端
     * @throws IOException
     */
    private void sendMsg(int type, String msg, Socket client) throws IOException {
        if (type != 0) {
            System.out.println("处理消息:" + msg);
        }
        OutputStream os;
        PrintWriter pw;
        for (Socket socket : clients) {
            // 1、如果是系统消息,那就群发;
            // 2、如果是客户端消息,转发时就需要跳过客户端自己;
            if (type != 0 && socket == client) {
                continue;
            }
            os = socket.getOutputStream();
            pw = new PrintWriter(os);
            pw.println(msg);// 这里需要特别注意,对方用readLine获取消息,就必须用print而不能用write,否则会导致消息获取不了
            pw.flush();
        }
    }

    /**
     * 接收消息
     *
     * @param client 客户端
     * @return 消息内容
     * @throws IOException
     */
    private String receiveMsg(Socket client) throws IOException {
        InputStream is = client.getInputStream();
        InputStreamReader isr = new InputStreamReader(is);
        BufferedReader br = new BufferedReader(isr);
        return br.readLine();
    }

    /**
     * 消息处理线程,负责转发消息到聊天室里的人
     */
    class MessageListener extends Thread {

        // 将每个连接上的客户端传递进来,收消息和发消息
        private Socket client;

        public MessageListener(Socket socket) {
            this.client = socket;
        }

        @Override
        public void run() {
            try {
                // 每个客户端连接上了,就发送一条系统消息(类似于广播)
                sendMsg(0, "[系统消息]:欢迎" + client.getRemoteSocketAddress() + "来到聊天室,当前共有" + clients.size() + "人在聊天", client);
                String msg;
                // 不再使用while(true)了,避免在客户端下线了之后,线程还依旧在无限循环
                while (heartbeatMap.get(client) != null) {
                    msg = receiveMsg(client);

                    // 如果客户端已经断开连接,但是心跳间隔时间又还没到的这一段时间(本案例中是2秒),会导致接收消息的readLine()一直为null
                    if (msg == null) {
                        continue;
                    }

                    if ("heartbeat".equals(msg)) {
                        // 记录客户端的心跳时间
                        heartbeatMap.put(client, new Date());
                        // 发送回执消息(主要用来给客户端做监测使用,可以知道服务器是否健康)
                        OutputStream os = client.getOutputStream();
                        PrintWriter pw = new PrintWriter(os);
                        pw.println("heartbeat_receipt");
                        pw.flush();
                    } else if ("exit".equals(msg)) {
                        // 客户端主动下线,需要移除相应的记录
                        heartbeatMap.remove(client);
                        clients.remove(client);
                        // 发送回执消息(客户端需要确保服务器知道我要下线了,才会退出)
                        OutputStream os = client.getOutputStream();
                        PrintWriter pw = new PrintWriter(os);
                        pw.println("exit_receipt");
                        pw.flush();

                        System.out.println("[" + client.getRemoteSocketAddress() + "]已下线,当前在线客户端" + clients.size() + "个");

                        // 发送广播
                        sendMsg(0, "[系统消息]:" + client.getRemoteSocketAddress() + "已下线,当前共有" + clients.size() + "人在聊天", client);
                    } else {
                        sendMsg(1, "[" + client.getRemoteSocketAddress() + "]:" + msg, client);
                    }
                }
            } catch (IOException e) {
                // log
            }
        }
    }

    /**
     * 心跳监测线程,负责客户端的下线提醒
     */
    class HeartbeatListener extends Thread {

        // 将每个连接上的客户端传递进来,收消息和发消息
        private Socket client;

        public HeartbeatListener(Socket socket) {
            this.client = socket;
        }

        @Override
        public void run() {
            try {
                // 比对时间来监测客户端是否已经下线了
                Date time, now;
                // 不再使用while(true)了,避免在客户端下线了之后,线程还依旧在无限循环
                while ((time = heartbeatMap.get(client)) != null) {
                    now = new Date();
                    // 如果超过指定时间还没有心跳,那就视作已下线
                    if (now.getTime() - time.getTime() > TIMEOUT) {
                        // 移除记录
                        heartbeatMap.remove(client);
                        clients.remove(client);

                        System.out.println("[" + client.getRemoteSocketAddress() + "]已下线,当前在线客户端" + clients.size() + "个");

                        sendMsg(0, "[系统消息]:" + client.getRemoteSocketAddress() + "已下线,当前共有" + clients.size() + "人在聊天", client);
                    }
                }
            } catch (IOException e) {
                // log
            }
        }
    }

    public static void main(String[] args) {
        new Server().start();
    }

}

客户端代码:

package com.wolffy.socket.tcp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;
import java.util.Scanner;

/**
 * Created by Felix on 2019/3/26.
 */
public class Client {

    private Socket server = null;

    /**
     * 服务器是否可用
     */
    private boolean SERVER_ALIAVE;

    /**
     * 服务器给与的最后一次心跳回执时间
     */
    private Date SERVER_HEART_RECEIVE_TIME;

    /**
     * 心跳间隔(你的脉搏多久跳动一次)
     */
    private static final long HEARTBEAT_SLEEP = 2 * 1000;

    /**
     * 服务器宕机的宽限时间,超过了,就视作挂了
     */
    private static final long TIMEOUT = 10 * 1000;

    private void start() {
        try {
            // 连接服务器
            server = new Socket("127.0.0.1", 5555);
            SERVER_ALIAVE = true;
            System.out.println("连接服务器成功,身份证:" + server.getLocalSocketAddress());
            // 启动接受消息的线程
            new ReceiveMessageListener().start();
            // 启动发送消息的线程
            new SendMessageListener().start();
            // 启动发送心跳的线程
            new HeartbeatListener().start();
        } catch (IOException e) {
            // log
        }
    }

    /**
     * 发送消息的线程
     */
    class SendMessageListener extends Thread {
        @Override
        public void run() {
            try {
                // 监听idea、eclipse的console输入
                Scanner scanner = new Scanner(System.in);
                // 不再使用while(true)了,避免在服务器宕机了之后,线程还依旧在无限循环
                while (SERVER_ALIAVE) {
                    sendMsg(scanner.next());
                }
            } catch (IOException e) {
                // log
            }
        }
    }

    /**
     * 接受消息的线程
     */
    class ReceiveMessageListener extends Thread {
        @Override
        public void run() {
            try {
                // 不再使用while(true)了,避免在服务器宕机了之后,线程还依旧在无限循环
                String msg;
                while (SERVER_ALIAVE) {
                    msg = receiveMsg();

                    // 如果服务器已经断开连接,但是心跳回执的间隔时间又还没到的这一段时间(本案例中是2秒),会导致接收消息的readLine()一直为null
                    if (msg == null) {
                        continue;
                    }

                    if ("heartbeat_receipt".equals(msg)) {
                        SERVER_HEART_RECEIVE_TIME = new Date();// 记录每一次的心跳回执时间

                    } else if ("exit_receipt".equals(msg)) {
                        System.exit(0);// 必须在收到回执消息之后再退出,否则会导致消息丢失

                    } else {
                        System.out.println(msg);
                    }
                }
            } catch (IOException e) {
                // log
            }
        }
    }

    /**
     * 发送心跳的线程
     */
    class HeartbeatListener extends Thread {
        @Override
        public void run() {
            try {
                // 不再使用while(true)了,避免在服务器宕机了之后,线程还依旧在无限循环
                Date now;
                while (SERVER_ALIAVE) {
                    // 每次发送心跳之前,都需要检测一下服务器是否健康,不做无用功
                    now = new Date();
                    // 连接到服务器之后,不会立即发送心跳(虽然本案例中服务器端是做了处理,第一次连接默认记录当前时间),所以需要判断是否为null
                    if (SERVER_HEART_RECEIVE_TIME != null && now.getTime() - SERVER_HEART_RECEIVE_TIME.getTime() > TIMEOUT) {
                        SERVER_ALIAVE = false;
                        System.out.println("服务器[" + server.getRemoteSocketAddress() + "]都宕机了,还玩个锤子,走喽~");
                        // 服务器宕机了,客户端也自动退出
                        System.exit(0);
                    }
                    // 发送心跳
                    sendMsg("heartbeat");
                    // 限定发送频率
                    try {
                        Thread.sleep(HEARTBEAT_SLEEP);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                // log
            }
        }
    }

    /**
     * 发送消息
     *
     * @param msg 消息内容
     * @throws IOException
     */
    private void sendMsg(String msg) throws IOException {
        OutputStream os = server.getOutputStream();
        PrintWriter pw = new PrintWriter(os);
        pw.println(msg);// 这里需要特别注意,对方用readLine获取消息,就必须用print而不能用write,否则会导致消息获取不了
        pw.flush();
    }

    /**
     * 接受消息
     *
     * @return 消息内容
     * @throws IOException
     */
    private String receiveMsg() throws IOException {
        InputStream is = server.getInputStream();
        InputStreamReader isr = new InputStreamReader(is);
        BufferedReader br = new BufferedReader(isr);
        return br.readLine();
    }

    public static void main(String[] args) {
        new Client().start();
    }

}

上述代码,大家可以直接拷贝下来运行,虽然很简陋,但效果还是达到了!

QQ群Ⅰ: 686430774 (已满)

QQ群Ⅱ: 718410762 (已满)

QQ群Ⅲ: 638620451 (已满)

QQ群Ⅳ: 474195684

如果文章有帮到你,可以考虑请博主喝杯咖啡!

分享到:

欢迎分享本文,转载请注明出处!

作者:不忘初心

发布时间:2019-03-28

永久地址:https://www.jiweichengzhu.com/article/a64f177017a94117a505ef3b158df7e1

评论