/** * Tencent is pleased to support the open source community by making MSEC available. * * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. * * Licensed under the GNU General Public License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. You may * obtain a copy of the License at * * https://opensource.org/licenses/GPL-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. */ /** * @file mt_notify.cpp * @info 微线程调度注册对象管理实现 * @time 20130924 **/ #include #include #include #include #include #include "micro_thread.h" #include "mt_session.h" #include "mt_msg.h" #include "mt_notify.h" #include "mt_connection.h" #include "mt_sys_hook.h" #include "ff_hook.h" using namespace std; using namespace NS_MICRO_THREAD; /** * @brief 通知代理进入等待状态, 挂入等待队列中 * @param proxy 代理的session模型 */ void ISessionNtfy::InsertWriteWait(SessionProxy* proxy) { if (!proxy->_flag) { TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry); proxy->_flag = 1; } } /** * @brief 通知代理移除等待状态 * @param proxy 代理的session模型 */ void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy) { if (proxy->_flag) { TAILQ_REMOVE(&_write_list, proxy, _write_entry); proxy->_flag = 0; } } /** * @brief 观察者模式, 通知写等待线程 * @info UDP可以通知每个线程执行写操作, TCP需要排队写 */ void UdpSessionNtfy::NotifyWriteWait() { MtFrame* frame = MtFrame::Instance(); SessionProxy* proxy = NULL; MicroThread* thread = NULL; TAILQ_FOREACH(proxy, &_write_list, _write_entry) { proxy->SetRcvEvents(KQ_EVENT_WRITE); thread = proxy->GetOwnerThread(); if (thread && thread->HasFlag(MicroThread::IO_LIST)) { frame->RemoveIoWait(thread); frame->InsertRunable(thread); } } } /** * @brief 创建socket, 监听可读事件 * @return fd的句柄, <0 失败 */ int UdpSessionNtfy::CreateSocket() { // 1. UDP短连接, 每次新创SOCKET int osfd = socket(AF_INET, SOCK_DGRAM, 0); if (osfd < 0) { MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno)); return -1; } // 2. 非阻塞设置 int flags = 1; if (ioctl(osfd, FIONBIO, &flags) < 0) { MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno)); close(osfd); osfd = -1; return -2; } // 可选bind执行, 设置本地port后执行 if (_local_addr.sin_port != 0) { int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr)); if (ret < 0) { MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)", inet_ntoa(_local_addr.sin_addr), ntohs(_local_addr.sin_port), errno, strerror(errno)); close(osfd); osfd = -1; return -3; } } // 3. 更新管理信息, 默认udp session 侦听 epollin this->SetOsfd(osfd); this->EnableInput(); MtFrame* frame = MtFrame::Instance(); frame->KqueueNtfyReg(osfd, this); frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ); return osfd; } /** * @brief 关闭socket, 停止监听可读事件 */ void UdpSessionNtfy::CloseSocket() { int osfd = this->GetOsfd(); if (osfd > 0) { MtFrame* frame = MtFrame::Instance(); frame->KqueueCtrlDel(osfd, KQ_EVENT_READ); frame->KqueueNtfyReg(osfd, NULL); this->DisableInput(); this->SetOsfd(-1); close(osfd); } } /** * @brief 可读事件通知接口, 考虑通知处理可能会破坏环境, 可用返回值区分 * @return 0 该fd可继续处理其它事件; !=0 该fd需跳出回调处理 */ int UdpSessionNtfy::InputNotify() { while (1) { int ret = 0; int have_rcv_len = 0; // 1. 获取收包缓冲区, 优先选择未处理完的链接buff if (!_msg_buff) { _msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize()); if (NULL == _msg_buff) { MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize()); return 0; } _msg_buff->SetBuffType(BUFF_RECV); } char* buff = (char*)_msg_buff->GetMsgBuff(); // 2. 获取socket, 收包处理 int osfd = this->GetOsfd(); struct sockaddr_in from; socklen_t fromlen = sizeof(from); mt_hook_syscall(recvfrom); ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(), 0, (struct sockaddr*)&from, &fromlen); if (ret < 0) { if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) { return 0; } else { MTLOG_ERROR("recv error, fd %d", osfd); return 0; // 系统错误, UDP 暂不关闭 } } else if (ret == 0) { MTLOG_DEBUG("remote close connection, fd %d", osfd); return 0; // 对端关闭, UDP 暂不关闭 } else { have_rcv_len = ret; _msg_buff->SetHaveRcvLen(have_rcv_len); _msg_buff->SetMsgLen(have_rcv_len); } // 3. 检查消息的完整性, 提取sessionid int sessionid = 0; ret = this->GetSessionId(buff, have_rcv_len, sessionid); if (ret <= 0) { MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it", have_rcv_len, osfd); MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); _msg_buff = NULL; return 0; } // 4. 映射查询thread句柄, 连接handle句柄, 设置读事件来临, 挂接msgbuff ISession* session = SessionMgr::Instance()->FindSession(sessionid); if (NULL == session) { MT_ATTR_API(350403, 1); // session 到达已超时 MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid); MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); _msg_buff = NULL; return 0; } // 5. 挂接recvbuff, 唤醒线程 IMtConnection* conn = session->GetSessionConn(); MicroThread* thread = session->GetOwnerThread(); if (!thread || !conn || !conn->GetNtfyObj()) { MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong", session, thread, conn); MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); _msg_buff = NULL; return 0; } MtMsgBuf* msg = conn->GetMtMsgBuff(); if (msg) { MsgBuffPool::Instance()->FreeMsgBuf(msg); } conn->SetMtMsgBuff(_msg_buff); _msg_buff = NULL; conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ); if (thread->HasFlag(MicroThread::IO_LIST)) { MtFrame* frame = MtFrame::Instance(); frame->RemoveIoWait(thread); frame->InsertRunable(thread); } } return 0; } /** * @brief 可写事件通知接口, 考虑通知处理可能会破坏环境, 可用返回值区分 * @return 0 该fd可继续处理其它事件; !=0 该fd需跳出回调处理 */ int UdpSessionNtfy::OutputNotify() { NotifyWriteWait(); return 0; } /** * @brief 异常通知接口, 关闭fd侦听, thread等待处理超时 * @return 忽略返回值, 跳过其它事件处理 */ int UdpSessionNtfy::HangupNotify() { // 1. 清理epoll ctl监听事件 MtFrame* frame = MtFrame::Instance(); frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents()); MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd()); // 2. 重新打开socket CloseSocket(); // 3. 重加入epoll listen CreateSocket(); return 0; } /** * @brief 调整epoll侦听事件的回调接口, 长连接始终EPOLLIN, 偶尔EPOLLOUT * @param args fd引用对象的指针 * @return 0 成功, < 0 失败, 要求事务回滚到操作前状态 * @info 默认是监听可读事件的, 这里只处理可写事件的监听删除 */ int UdpSessionNtfy::KqueueCtlAdd(void* args) { MtFrame* frame = MtFrame::Instance(); KqFdRef* fd_ref = (KqFdRef*)args; //ASSERT(fd_ref != NULL); int osfd = this->GetOsfd(); // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); if ((old_obj != NULL) && (old_obj != this)) { MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); return -1; } // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节 if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE)) { MTLOG_ERROR("epfd ref add failed, log"); return -2; } this->EnableOutput(); return 0; } /** * @brief 调整epoll侦听事件的回调接口, 长连接始终EPOLLIN, 偶尔EPOLLOUT * @param args fd引用对象的指针 * @return 0 成功, < 0 失败, 要求事务回滚到操作前状态 */ int UdpSessionNtfy::KqueueCtlDel(void* args) { MtFrame* frame = MtFrame::Instance(); KqFdRef* fd_ref = (KqFdRef*)args; //ASSERT(fd_ref != NULL); int osfd = this->GetOsfd(); // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); if (old_obj != this) { MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); return -1; } // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节 if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE)) { MTLOG_ERROR("epfd ref del failed, log"); return -2; } this->DisableOutput(); return 0; } /** * @brief 可读事件通知接口, 考虑通知处理可能会破坏环境, 可用返回值区分 * @return 0 该fd可继续处理其它事件; !=0 该fd需跳出回调处理 */ int TcpKeepNtfy::InputNotify() { KeepaliveClose(); return -1; } /** * @brief 可写事件通知接口, 考虑通知处理可能会破坏环境, 可用返回值区分 * @return 0 该fd可继续处理其它事件; !=0 该fd需跳出回调处理 */ int TcpKeepNtfy::OutputNotify() { KeepaliveClose(); return -1; } /** * @brief 异常通知接口 * @return 忽略返回值, 跳过其它事件处理 */ int TcpKeepNtfy::HangupNotify() { KeepaliveClose(); return -1; } /** * @brief 触发实际连接关闭操作 */ void TcpKeepNtfy::KeepaliveClose() { if (_keep_conn) { MTLOG_DEBUG("remote close, fd %d, close connection", _fd); ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn); } else { MTLOG_ERROR("_keep_conn ptr null, error"); } } /** * @brief session全局管理句柄 * @return 全局句柄指针 */ NtfyObjMgr* NtfyObjMgr::_instance = NULL; NtfyObjMgr* NtfyObjMgr::Instance (void) { if (NULL == _instance) { _instance = new NtfyObjMgr; } return _instance; } /** * @brief session管理全局的销毁接口 */ void NtfyObjMgr::Destroy() { if( _instance != NULL ) { delete _instance; _instance = NULL; } } /** * @brief 消息buff的构造函数 */ NtfyObjMgr::NtfyObjMgr() { } /** * @brief 析构函数, 不持有资源, 并不负责清理 */ NtfyObjMgr::~NtfyObjMgr() { } /** * @brief 注册长连接session信息 * @param session_name 长连接的标识, 每个连接处理一类session封装格式 * @param session 长连接对象指针, 定义连接属性 * @return 0 成功, < 0 失败 */ int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session) { if (session_name <= 0 || NULL == session) { MTLOG_ERROR("session %d, register %p failed", session_name, session); return -1; } SessionMap::iterator it = _session_map.find(session_name); if (it != _session_map.end()) { MTLOG_ERROR("session %d, register %p already", session_name, session); return -2; } _session_map.insert(SessionMap::value_type(session_name, session)); return 0; } /** * @brief 获取注册长连接session信息 * @param session_name 长连接的标识, 每个连接处理一类session封装格式 * @return 长连接指针, 失败为NULL */ ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name) { SessionMap::iterator it = _session_map.find(session_name); if (it != _session_map.end()) { return it->second; } else { return NULL; } } /** * @brief 获取通用通知对象, 如线程通知对象与session通知代理对象 * @param type 类型, 线程通知类型,UDP/TCP SESSION通知等 * @param session_name proxy模型,一并获取session对象 * @return 通知对象的指针, 失败为NULL */ KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name) { KqueuerObj* obj = NULL; SessionProxy* proxy = NULL; switch (type) { case NTFY_OBJ_THREAD: obj = _fd_ntfy_pool.AllocPtr(); break; case NTFY_OBJ_SESSION: proxy = _udp_proxy_pool.AllocPtr(); obj = proxy; break; case NTFY_OBJ_KEEPALIVE: // no need get this now break; default: break; } // 获取底层的长连接对象, 关联代理与实际的通知对象 if (proxy) { ISessionNtfy* ntfy = this->GetNameSession(session_name); if (!ntfy) { MTLOG_ERROR("ntfy get session name(%d) failed", session_name); this->FreeNtfyObj(proxy); obj = NULL; } else { proxy->SetRealNtfyObj(ntfy); } } return obj; } /** * @brief 释放通知对象指针 * @param obj 通知对象 */ void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj) { SessionProxy* proxy = NULL; if (!obj) { return; } int type = obj->GetNtfyType(); obj->Reset(); switch (type) { case NTFY_OBJ_THREAD: return _fd_ntfy_pool.FreePtr(obj); break; case NTFY_OBJ_SESSION: proxy = dynamic_cast(obj); return _udp_proxy_pool.FreePtr(proxy); break; case NTFY_OBJ_KEEPALIVE: break; default: break; } delete obj; return; }