基于muduo,mysql,redis,nginx的集群聊天服务器
集成聊天服务器 项目介绍
项目名称集成聊天服务器
平台工具vs code ,linux shell命令行,cmake
编程语言C++
项目内容:
网络层基于muduo网络库的reactor模型,one loop per thread,使用方便;
使用优秀的第三方库json,用以实现数据的序列化和反序列化,进行数据传输;
数据库选择了关系型数据库MySQL;
负载均衡器方面,选择了nginx的tcp负载均衡模块;
实现多台服务器之间的通信功能,选择使用redis的订阅-发布功能
收获熟悉了C++程序中调用MySQL数据库的方法,掌握了json库的使用方法,了解了nginx的tcp均衡模块的配置方法,熟悉了redis的发布-订阅方法在程序中的适当的调用方式。
问题及解决方案在进行集成聊天服务项目时,代码时常会出现bug,解决方案为gdb调试,查看tcp连接情况以及定点打印方式进行debug。
所需配置及工具- Ubuntu虚拟机VS code ,用以与虚拟机连接,在VS code上书写代码且调试,相对vim更为方便MySQL,建立表redisnginx负载均衡器muduo网络库json
在这里实现了基础的ChatServer聊天服务器类,muduo库的调用以及最基本的回调函数的绑定等
#ifndef CHATSERVER_H #define CHATSERVER_H //聊天服务器主类 class ChatServer { public: //初始化聊天服务器对象 ChatServer(EventLoop loop, const IAddress &listenAddr, const string &nameArg); //启动服务 void start(); private: //上报链接相关信息的回调 void onConnection(const TcpConnectionPtr &); //上报读写事件相关信息的回调函数 void onMessage(const TcpConnectionPtr &, Buffer, Timestamp); TcpServer server_; EventLoop loop_; }; #endifchatservice
这里包含了数种处理方法的类,以及登录,注册,聊天等方法。
#ifndef CHATSERVICE_H #define CHATSERVICE_H using MsgHandler = std::functionpublic; //聊天服务业务类 class ChatService { public: //获取单例对象的接口函数 static ChatService instance(); //处理登录业务 void login(const TcpConnectionPtr &conn, json &js, Timestamp time); //处理注册业务 void reg(const TcpConnectionPtr &conn, json &js, Timestamp time); //一对一聊天 void oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time); //添加好友 void addFriend(const TcpConnectionPtr &conn, json &js, Timestamp time); //获取消息对应的处理器 MsgHandler getHandler(int msgid); //处理用户异常退出 void clientCloseException(const TcpConnectionPtr &conn); //用户注销操作 void loginout(const TcpConnectionPtr &conn, json &js, Timestamp time); //处理服务器断开时修改用户登录状态 void reset(); //创建群组业务 void createGroup(const TcpConnectionPtr &conn, json &js, Timestamp time); //加入群组业务 void addGroup(const TcpConnectionPtr &conn, json &js, Timestamp time); //群组聊天业务 void groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time); //从redis消息队列中获取订阅的消息 void handleRedisSubscribeMessage(int userid, string msg); private : //获取单例对象的接口函数 ChatService(); //存储消息id和其对应的业务处理方法 unordered_map _msgHandlerMap; //存储在线用户的通信连接 unordered_map _userConnMap; //定义互斥锁,保证_userConnMap的线程安全 mutex _connMutex; UserModel _userModel; offlineMsgModel _offlineMsgModel; FriendModel _friendModel; GroupModel _groupModel; Redis _redis; }; #endif
#ifndef PUBLIC_H #define PUBLIC_H enum EnMsgType { LOGIN_MSG = 1,//登录消息 LOGIN_MSG_ACK, LOGINOUT_MSG, REG_MSG, //注册消息 REG_MSG_ACK, ONE_CHAT_MSG,//聊天信息 ADD_FRIEND_MSG,//添加好友信息 CREATE_GROUP_MSG,//创建群聊 ADD_GROUP_MSG,//加入群聊 GROUP_CHAT_MSG//群聊天 }; #endiffriendmodel
class FriendModel { public: //添加好友关系 void insert(int userid, int friendid); //返回用户好友列表 vectorgroupquery(int userid); };
#ifndef GROUP_H #define GROUP_H #include "groupuser.hpp" #includegroupmodelusing namespace std; class Group { public: Group(int id = -1, string name = "", string desc = "") { this->id = id; this->name = name; this->desc = desc; } void setId(int id) { this->id = id; } void setName(string name) { this->name = name; } void setDesc(string desc) { this->desc = desc; } int getId() { return this->id; } string getName() { return this->name; } string getDesc() { return this->desc; } vector &getUsers() { return this->users; } private: int id; string name; string desc; vector users; }; #endif
#ifndef GROUPMODEL_H #define GROUPMODEL_H #include"group.hpp" #includegroup#include class GroupModel { public: //创建群聊 bool createGroup(Group &group); //加入群聊 void addGroup(int userid, int groupid, string role); //查询用户所在群组信息 vector queryGroups(int userid); //根据指定的groupid查询用户id列表,除了userid自己,主要用户群聊业务给群组其它成员群发消息 vector queryGroupUsers(int userid, int groupid); }; #endif
#ifndef USER_H #define USER_H #includegroupuserusing namespace std; class User { public: User(int id=-1,string name="",string passord="",string state="offline") { this->id = id; this->name = name; this->passord = passord; this->state = state; } void setId(int id) { this->id = id; } void setName(string name) { this->name = name; } void setPassord(string passord) { this->passord = passord; } void setState(string state) { this->state = state; } int getId() { return id; } string getName() { return name; } string getPassord() { return passord; } string getState() { return state; } private: int id; string name; string passord; string state; }; #endif
#ifndef GROUPUSER_H #define GROUPUSER_H #include"user.hpp" class GroupUser:public User { public: void setRole(string role) { this->role = role; } string getRole() { return this->role; } private: string role;//角色 }; #endifusermodel
#ifndef USERMODEL_H #define USERMODEL_H #include"user.hpp" class UserModel { public: //User表的增加方法 bool insert(User &user); //根据用户号码查询id User query(int id); //更新用户状态 bool updateState(User user); //服务器断开时,重置用户登录状态 void resetState(); private: }; #endifofflinemessagemodel
#ifndef OFFLINEMESSAGEMODEL_H #define OFFLINEMESSAGEMODEL_H #includedb --MySQL调用接口方法#include using namespace std; class offlineMsgModel { public: //存储用户离线消息 void insert(int userid, string msg); //删除用户的离线消息 void remove(int userid); //查询用户的离线消息 vector query(int userid); }; #endif
#ifndef DB_H #define DB_H #includeredis --redis调用方法#include using namespace std; // 数据库操作类 class MySQL { public: // 初始化数据库连接 MySQL(); // 释放数据库连接资源 ~MySQL(); // 连接数据库 bool connect(); // 更新操作 bool update(string sql); // 查询操作 MYSQL_RES query(string sql); //获取连接 MYSQL getConnection(); private: MYSQL _conn; }; #endif
#ifndef REDIS_H #define REDIS_H #include客户端启动流程#include #include using namespace std; class Redis { public: Redis(); ~Redis(); //连接redis服务器 bool connect(); //向redis指定的通道channel发布消息 bool publish(int channel, string message); //向redis指定的通道subscriber订阅消息 bool subscribe(int channel); //向redis指定的通道unsubscriber取消订阅消息 bool unsubscribe(int channel); //在独立线程中接收订阅通道中的消息 void observer_channel_message(); //初始化向业务层上报通道消息的回调对象 void init_notify_handler(function fn); private: //hiredis同步上下文对象,负责publish消息 redisContext _publish_context; //hiredis同步上下文对象,负责subscribe消息 redisContext _subscribe_context; //回调操作,收到订阅的消息,给service层上报 function _notify_message_handler; }; #endif
主函数开启子线程,子线程进入阻塞
选择登录业务,将id和passord序列化成json,发送给服务器,此后main函数进入条件变量阻塞,需等待子线程唤醒。
服务器发现可读事件发送,调用onMessage,反序列化后得到msgid,由此得到对应的登录对调函数,并运行。(其中传入的json参数即主函数发送过来的json变量)
运行中会发送相关数据,子线程接收到数据,解除阻塞,识别json数据,得知是登录业务,处理后,释放条件变量,主线程继续运行,进入mainMenu函数。在该函数内,不断接收用户输入的指令,然后与登录业务一样,组合json语句,根据manMap中的键值对来调用回调函数。期间自然是主线程给服务端发送数据,服务端给客户端子线程发送数据,子线程完成任务返回到mainMenu不断循环的过程了。当用户注销用户时便会退回mian函数。
nginx负载均衡器的tcp模块的配置方法 配置//与HTTP处于同级位置 # nginx tcp loadbalance config stream { upstream MyServer { server 127.0.0.1:6000 eight=1 max_fails=3 fail_timeout=30s; server 127.0.0.1:6002 eight=1 max_fails=3 fail_timeout=30s; } server { proxy_connect_timeout 1s; listen 8000; proxy_pass MyServer; tcp_nodelay on; } }
注释
listen 端口号 nginx监听该端口,客户端连接的就是该端口
proxy_pass MyServer; 负载均衡分配到该内容里面的端口,我们可以看到上面还有个MyServer
server 127.0.0.1:6000 eight=1 max_fails=3 fail_timeout=30s;
一台服务器运行在6000端口,eight代表权重,nginx以轮询的方式进行分发客户端。
max_fails=3 fail_timeout=30s 代表超时时间,和重复次数
启动方法cd /usr/local/nginx/sbin
sudo ./nginx
sudo ./nginx -s reload 平滑启动
sudo ./nginx -s s 关闭服务
项目不足之处以及思考 不足
该项目中用redis的发布-订阅功能进行多台服务器间的通信功能。目前考虑的是所有服务器关闭的情况。如若是仅仅关闭一台服务器,那么mysql库的更新用户在线状态的功能是不合理的,如表的结构以及mysql语句所示,一台服务器关闭时会将mysql库中所有用户的在线重置为下线状态。
还有一个就是在关闭一台服务器时,相关的代码并没有将在该台服务器上登录的用户的redis订阅通道给取消订阅。
解决思路我的想法是在user表中增加一列,该列记录的是该次登陆的服务器的ip及端口号,不在线的用户该列可以置为-1。这样当我们仅仅关闭一台服务器时,就可以在数据库中区分开登陆在该服务器中的用户然后选择性得更改其状态。