f-stack/app/micro_thread/mt_notify.cpp

572 lines
14 KiB
C++
Raw Normal View History

2017-04-21 10:43:26 +00:00
/**
* Tencent is pleased to support the open source community by making MSEC available.
*
* Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the GNU General Public License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* https://opensource.org/licenses/GPL-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
/**
* @file mt_notify.cpp
* @info ΢<EFBFBD>̵߳<EFBFBD><EFBFBD><EFBFBD>ע<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʵ<EFBFBD><EFBFBD>
* @time 20130924
**/
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "micro_thread.h"
#include "mt_session.h"
#include "mt_msg.h"
#include "mt_notify.h"
#include "mt_connection.h"
#include "mt_sys_hook.h"
#include "ff_hook.h"
using namespace std;
using namespace NS_MICRO_THREAD;
/**
* @brief ֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȴ<EFBFBD>״̬, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȴ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @param proxy <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>sessionģ<EFBFBD><EFBFBD>
*/
void ISessionNtfy::InsertWriteWait(SessionProxy* proxy)
{
if (!proxy->_flag) {
TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry);
proxy->_flag = 1;
}
}
/**
* @brief ֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ƴ<EFBFBD><EFBFBD>ȴ<EFBFBD>״̬
* @param proxy <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>sessionģ<EFBFBD><EFBFBD>
*/
void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy)
{
if (proxy->_flag) {
TAILQ_REMOVE(&_write_list, proxy, _write_entry);
proxy->_flag = 0;
}
}
/**
* @brief <EFBFBD>۲<EFBFBD><EFBFBD><EFBFBD>ģʽ, ֪ͨд<EFBFBD>ȴ<EFBFBD><EFBFBD>߳<EFBFBD>
* @info UDP<EFBFBD><EFBFBD><EFBFBD><EFBFBD>֪ͨÿ<EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>ִ<EFBFBD><EFBFBD>д<EFBFBD><EFBFBD><EFBFBD><EFBFBD>, TCP<EFBFBD><EFBFBD>Ҫ<EFBFBD>Ŷ<EFBFBD>д
*/
void UdpSessionNtfy::NotifyWriteWait()
{
MtFrame* frame = MtFrame::Instance();
SessionProxy* proxy = NULL;
MicroThread* thread = NULL;
TAILQ_FOREACH(proxy, &_write_list, _write_entry)
{
proxy->SetRcvEvents(KQ_EVENT_WRITE);
thread = proxy->GetOwnerThread();
if (thread && thread->HasFlag(MicroThread::IO_LIST))
{
frame->RemoveIoWait(thread);
frame->InsertRunable(thread);
}
}
}
/**
* @brief <EFBFBD><EFBFBD><EFBFBD><EFBFBD>socket, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɶ<EFBFBD><EFBFBD>¼<EFBFBD>
* @return fd<EFBFBD>ľ<EFBFBD><EFBFBD><EFBFBD>, <0 ʧ<EFBFBD><EFBFBD>
*/
int UdpSessionNtfy::CreateSocket()
{
// 1. UDP<44><50><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, ÿ<><C3BF><EFBFBD>´<EFBFBD>SOCKET
int osfd = socket(AF_INET, SOCK_DGRAM, 0);
if (osfd < 0)
{
MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno));
return -1;
}
// 2. <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
int flags = 1;
if (ioctl(osfd, FIONBIO, &flags) < 0)
{
MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno));
close(osfd);
osfd = -1;
return -2;
}
// <20><>ѡbindִ<64><D6B4>, <20><><EFBFBD>ñ<EFBFBD><C3B1><EFBFBD>port<72><74>ִ<EFBFBD><D6B4>
if (_local_addr.sin_port != 0)
{
int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr));
if (ret < 0)
{
MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)", inet_ntoa(_local_addr.sin_addr),
ntohs(_local_addr.sin_port), errno, strerror(errno));
close(osfd);
osfd = -1;
return -3;
}
}
// 3. <20><><EFBFBD>¹<EFBFBD><C2B9><EFBFBD><EFBFBD><EFBFBD>Ϣ, Ĭ<><C4AC>udp session <20><><EFBFBD><EFBFBD> epollin
this->SetOsfd(osfd);
this->EnableInput();
MtFrame* frame = MtFrame::Instance();
frame->KqueueNtfyReg(osfd, this);
frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ);
return osfd;
}
/**
* @brief <EFBFBD>ر<EFBFBD>socket, ֹͣ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɶ<EFBFBD><EFBFBD>¼<EFBFBD>
*/
void UdpSessionNtfy::CloseSocket()
{
int osfd = this->GetOsfd();
if (osfd > 0)
{
MtFrame* frame = MtFrame::Instance();
frame->KqueueCtrlDel(osfd, KQ_EVENT_READ);
frame->KqueueNtfyReg(osfd, NULL);
this->DisableInput();
this->SetOsfd(-1);
close(osfd);
}
}
/**
* @brief <EFBFBD>ɶ<EFBFBD><EFBFBD>¼<EFBFBD>֪ͨ<EFBFBD>ӿ<EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD>֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ܻ<EFBFBD><EFBFBD>ƻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD>÷<EFBFBD><EFBFBD><EFBFBD>ֵ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return 0 <EFBFBD><EFBFBD>fd<EFBFBD>ɼ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>; !=0 <EFBFBD><EFBFBD>fd<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
int UdpSessionNtfy::InputNotify()
{
while (1)
{
int ret = 0;
int have_rcv_len = 0;
// 1. <20><>ȡ<EFBFBD>հ<EFBFBD><D5B0><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <20><><EFBFBD><EFBFBD>ѡ<EFBFBD><D1A1>δ<EFBFBD><CEB4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>buff
if (!_msg_buff) {
_msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize());
if (NULL == _msg_buff) {
MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize());
return 0;
}
_msg_buff->SetBuffType(BUFF_RECV);
}
char* buff = (char*)_msg_buff->GetMsgBuff();
// 2. <20><>ȡsocket, <20>հ<EFBFBD><D5B0><EFBFBD><EFBFBD><EFBFBD>
int osfd = this->GetOsfd();
struct sockaddr_in from;
socklen_t fromlen = sizeof(from);
mt_hook_syscall(recvfrom);
ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(),
0, (struct sockaddr*)&from, &fromlen);
if (ret < 0)
{
if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
{
return 0;
}
else
{
MTLOG_ERROR("recv error, fd %d", osfd);
return 0; // ϵͳ<CFB5><CDB3><EFBFBD><EFBFBD>, UDP <20>ݲ<EFBFBD><DDB2>ر<EFBFBD>
}
}
else if (ret == 0)
{
MTLOG_DEBUG("remote close connection, fd %d", osfd);
return 0; // <20>Զ˹ر<CBB9>, UDP <20>ݲ<EFBFBD><DDB2>ر<EFBFBD>
}
else
{
have_rcv_len = ret;
_msg_buff->SetHaveRcvLen(have_rcv_len);
_msg_buff->SetMsgLen(have_rcv_len);
}
// 3. <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <20><>ȡsessionid
int sessionid = 0;
ret = this->GetSessionId(buff, have_rcv_len, sessionid);
if (ret <= 0)
{
MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it",
have_rcv_len, osfd);
MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
_msg_buff = NULL;
return 0;
}
// 4. ӳ<><D3B3><EFBFBD><EFBFBD>ѯthread<61><64><EFBFBD><EFBFBD>, <20><><EFBFBD><EFBFBD>handle<6C><65><EFBFBD><EFBFBD>, <20><><EFBFBD>ö<EFBFBD><C3B6>¼<EFBFBD><C2BC><EFBFBD><EFBFBD><EFBFBD>, <20>ҽ<EFBFBD>msgbuff
ISession* session = SessionMgr::Instance()->FindSession(sessionid);
if (NULL == session)
{
MT_ATTR_API(350403, 1); // session <20><><EFBFBD><EFBFBD><EFBFBD>ѳ<EFBFBD>ʱ
MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid);
MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
_msg_buff = NULL;
return 0;
}
// 5. <20>ҽ<EFBFBD>recvbuff, <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
IMtConnection* conn = session->GetSessionConn();
MicroThread* thread = session->GetOwnerThread();
if (!thread || !conn || !conn->GetNtfyObj())
{
MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong",
session, thread, conn);
MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
_msg_buff = NULL;
return 0;
}
MtMsgBuf* msg = conn->GetMtMsgBuff();
if (msg) {
MsgBuffPool::Instance()->FreeMsgBuf(msg);
}
conn->SetMtMsgBuff(_msg_buff);
_msg_buff = NULL;
conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ);
if (thread->HasFlag(MicroThread::IO_LIST))
{
MtFrame* frame = MtFrame::Instance();
frame->RemoveIoWait(thread);
frame->InsertRunable(thread);
}
}
return 0;
}
/**
* @brief <EFBFBD><EFBFBD>д<EFBFBD>¼<EFBFBD>֪ͨ<EFBFBD>ӿ<EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD>֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ܻ<EFBFBD><EFBFBD>ƻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD>÷<EFBFBD><EFBFBD><EFBFBD>ֵ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return 0 <EFBFBD><EFBFBD>fd<EFBFBD>ɼ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>; !=0 <EFBFBD><EFBFBD>fd<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
int UdpSessionNtfy::OutputNotify()
{
NotifyWriteWait();
return 0;
}
/**
* @brief <EFBFBD>֪ͨ<EFBFBD>ӿ<EFBFBD>, <EFBFBD>ر<EFBFBD>fd<EFBFBD><EFBFBD><EFBFBD><EFBFBD>, thread<EFBFBD>ȴ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ
* @return <EFBFBD><EFBFBD><EFBFBD>Է<EFBFBD><EFBFBD><EFBFBD>ֵ, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
int UdpSessionNtfy::HangupNotify()
{
// 1. <20><><EFBFBD><EFBFBD>epoll ctl<74><6C><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>
MtFrame* frame = MtFrame::Instance();
frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd());
// 2. <20><><EFBFBD>´<EFBFBD><C2B4><EFBFBD>socket
CloseSocket();
// 3. <20>ؼ<EFBFBD><D8BC><EFBFBD>epoll listen
CreateSocket();
return 0;
}
/**
* @brief <EFBFBD><EFBFBD><EFBFBD><EFBFBD>epoll<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD><EFBFBD>Ļص<EFBFBD><EFBFBD>ӿ<EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʼ<EFBFBD><EFBFBD>EPOLLIN, ż<EFBFBD><EFBFBD>EPOLLOUT
* @param args fd<EFBFBD><EFBFBD><EFBFBD>ö<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ָ<EFBFBD><EFBFBD>
* @return 0 <EFBFBD>ɹ<EFBFBD>, < 0 ʧ<EFBFBD><EFBFBD>, Ҫ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ع<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ǰ״̬
* @info Ĭ<EFBFBD><EFBFBD><EFBFBD>Ǽ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɶ<EFBFBD><EFBFBD>¼<EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>д<EFBFBD>¼<EFBFBD><EFBFBD>ļ<EFBFBD><EFBFBD><EFBFBD>ɾ<EFBFBD><EFBFBD>
*/
int UdpSessionNtfy::KqueueCtlAdd(void* args)
{
MtFrame* frame = MtFrame::Instance();
KqFdRef* fd_ref = (KqFdRef*)args;
//ASSERT(fd_ref != NULL);
int osfd = this->GetOsfd();
// ֪ͨ<CDA8><D6AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD>, FD֪ͨ<CDA8><D6AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ϲ<EFBFBD><CFB2><EFBFBD><E1B8B4>, <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD>, <20>쳣log<6F><67>¼
KqueuerObj* old_obj = fd_ref->GetNotifyObj();
if ((old_obj != NULL) && (old_obj != this))
{
MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
return -1;
}
// <20><><EFBFBD>ÿ<EFBFBD><C3BF>ܵ<EFBFBD>epoll ctl<74>ӿ<EFBFBD>, <20><><EFBFBD><EFBFBD>epoll ctrlϸ<6C><CFB8>
if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE))
{
MTLOG_ERROR("epfd ref add failed, log");
return -2;
}
this->EnableOutput();
return 0;
}
/**
* @brief <EFBFBD><EFBFBD><EFBFBD><EFBFBD>epoll<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD><EFBFBD>Ļص<EFBFBD><EFBFBD>ӿ<EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʼ<EFBFBD><EFBFBD>EPOLLIN, ż<EFBFBD><EFBFBD>EPOLLOUT
* @param args fd<EFBFBD><EFBFBD><EFBFBD>ö<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ָ<EFBFBD><EFBFBD>
* @return 0 <EFBFBD>ɹ<EFBFBD>, < 0 ʧ<EFBFBD><EFBFBD>, Ҫ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ع<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ǰ״̬
*/
int UdpSessionNtfy::KqueueCtlDel(void* args)
{
MtFrame* frame = MtFrame::Instance();
KqFdRef* fd_ref = (KqFdRef*)args;
//ASSERT(fd_ref != NULL);
int osfd = this->GetOsfd();
// ֪ͨ<CDA8><D6AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD>, FD֪ͨ<CDA8><D6AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ϲ<EFBFBD><CFB2><EFBFBD><E1B8B4>, <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD>, <20>쳣log<6F><67>¼
KqueuerObj* old_obj = fd_ref->GetNotifyObj();
if (old_obj != this)
{
MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
return -1;
}
// <20><><EFBFBD>ÿ<EFBFBD><C3BF>ܵ<EFBFBD>epoll ctl<74>ӿ<EFBFBD>, <20><><EFBFBD><EFBFBD>epoll ctrlϸ<6C><CFB8>
if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE))
{
MTLOG_ERROR("epfd ref del failed, log");
return -2;
}
this->DisableOutput();
return 0;
}
/**
* @brief <EFBFBD>ɶ<EFBFBD><EFBFBD>¼<EFBFBD>֪ͨ<EFBFBD>ӿ<EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD>֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ܻ<EFBFBD><EFBFBD>ƻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD>÷<EFBFBD><EFBFBD><EFBFBD>ֵ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return 0 <EFBFBD><EFBFBD>fd<EFBFBD>ɼ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>; !=0 <EFBFBD><EFBFBD>fd<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
int TcpKeepNtfy::InputNotify()
{
KeepaliveClose();
return -1;
}
/**
* @brief <EFBFBD><EFBFBD>д<EFBFBD>¼<EFBFBD>֪ͨ<EFBFBD>ӿ<EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD>֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ܻ<EFBFBD><EFBFBD>ƻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD>÷<EFBFBD><EFBFBD><EFBFBD>ֵ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return 0 <EFBFBD><EFBFBD>fd<EFBFBD>ɼ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD>; !=0 <EFBFBD><EFBFBD>fd<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
int TcpKeepNtfy::OutputNotify()
{
KeepaliveClose();
return -1;
}
/**
* @brief <EFBFBD>֪ͨ<EFBFBD>ӿ<EFBFBD>
* @return <EFBFBD><EFBFBD><EFBFBD>Է<EFBFBD><EFBFBD><EFBFBD>ֵ, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¼<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
int TcpKeepNtfy::HangupNotify()
{
KeepaliveClose();
return -1;
}
/**
* @brief <EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʵ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӹرղ<EFBFBD><EFBFBD><EFBFBD>
*/
void TcpKeepNtfy::KeepaliveClose()
{
if (_keep_conn) {
MTLOG_DEBUG("remote close, fd %d, close connection", _fd);
ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn);
} else {
MTLOG_ERROR("_keep_conn ptr null, error");
}
}
/**
* @brief sessionȫ<EFBFBD>ֹ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return ȫ<EFBFBD>־<EFBFBD><EFBFBD><EFBFBD>ָ<EFBFBD><EFBFBD>
*/
NtfyObjMgr* NtfyObjMgr::_instance = NULL;
NtfyObjMgr* NtfyObjMgr::Instance (void)
{
if (NULL == _instance)
{
_instance = new NtfyObjMgr;
}
return _instance;
}
/**
* @brief session<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȫ<EFBFBD>ֵ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ٽӿ<EFBFBD>
*/
void NtfyObjMgr::Destroy()
{
if( _instance != NULL )
{
delete _instance;
_instance = NULL;
}
}
/**
* @brief <EFBFBD><EFBFBD>Ϣbuff<EFBFBD>Ĺ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
NtfyObjMgr::NtfyObjMgr()
{
}
/**
* @brief <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Դ, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
NtfyObjMgr::~NtfyObjMgr()
{
}
/**
* @brief ע<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>session<EFBFBD><EFBFBD>Ϣ
* @param session_name <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӵı<EFBFBD>ʶ, ÿ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӵ<EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><EFBFBD>session<EFBFBD><EFBFBD>װ<EFBFBD><EFBFBD>ʽ
* @param session <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӷ<EFBFBD><EFBFBD><EFBFBD>ָ<EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return 0 <EFBFBD>ɹ<EFBFBD>, < 0 ʧ<EFBFBD><EFBFBD>
*/
int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session)
{
if (session_name <= 0 || NULL == session) {
MTLOG_ERROR("session %d, register %p failed", session_name, session);
return -1;
}
SessionMap::iterator it = _session_map.find(session_name);
if (it != _session_map.end())
{
MTLOG_ERROR("session %d, register %p already", session_name, session);
return -2;
}
_session_map.insert(SessionMap::value_type(session_name, session));
return 0;
}
/**
* @brief <EFBFBD><EFBFBD>ȡע<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>session<EFBFBD><EFBFBD>Ϣ
* @param session_name <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӵı<EFBFBD>ʶ, ÿ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӵ<EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><EFBFBD>session<EFBFBD><EFBFBD>װ<EFBFBD><EFBFBD>ʽ
* @return <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ָ<EFBFBD><EFBFBD>, ʧ<EFBFBD><EFBFBD>ΪNULL
*/
ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name)
{
SessionMap::iterator it = _session_map.find(session_name);
if (it != _session_map.end())
{
return it->second;
}
else
{
return NULL;
}
}
/**
* @brief <EFBFBD><EFBFBD>ȡͨ<EFBFBD><EFBFBD>֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>session֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @param type <EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD>߳<EFBFBD>֪ͨ<EFBFBD><EFBFBD><EFBFBD>ͣ<EFBFBD>UDP/TCP SESSION֪ͨ<EFBFBD><EFBFBD>
* @param session_name proxyģ<EFBFBD><EFBFBD>,һ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȡsession<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return ֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ָ<EFBFBD><EFBFBD>, ʧ<EFBFBD><EFBFBD>ΪNULL
*/
KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name)
{
KqueuerObj* obj = NULL;
SessionProxy* proxy = NULL;
switch (type)
{
case NTFY_OBJ_THREAD:
obj = _fd_ntfy_pool.AllocPtr();
break;
case NTFY_OBJ_SESSION:
proxy = _udp_proxy_pool.AllocPtr();
obj = proxy;
break;
case NTFY_OBJ_KEEPALIVE: // no need get this now
break;
default:
break;
}
// <20><>ȡ<EFBFBD>ײ<EFBFBD><D7B2>ij<EFBFBD><C4B3><EFBFBD><EFBFBD>Ӷ<EFBFBD><D3B6><EFBFBD>, <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʵ<EFBFBD>ʵ<EFBFBD>֪ͨ<CDA8><D6AA><EFBFBD><EFBFBD>
if (proxy) {
ISessionNtfy* ntfy = this->GetNameSession(session_name);
if (!ntfy) {
MTLOG_ERROR("ntfy get session name(%d) failed", session_name);
this->FreeNtfyObj(proxy);
obj = NULL;
} else {
proxy->SetRealNtfyObj(ntfy);
}
}
return obj;
}
/**
* @brief <EFBFBD>ͷ<EFBFBD>֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ָ<EFBFBD><EFBFBD>
* @param obj ֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj)
{
SessionProxy* proxy = NULL;
if (!obj) {
return;
}
int type = obj->GetNtfyType();
obj->Reset();
switch (type)
{
case NTFY_OBJ_THREAD:
return _fd_ntfy_pool.FreePtr(obj);
break;
case NTFY_OBJ_SESSION:
proxy = dynamic_cast<SessionProxy*>(obj);
return _udp_proxy_pool.FreePtr(proxy);
break;
case NTFY_OBJ_KEEPALIVE:
break;
default:
break;
}
delete obj;
return;
}