/**
 * Tencent is pleased to support the open source community by making MSEC available.
 *
 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
 *
 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 
 * you may not use this file except in compliance with the License. You may 
 * obtain a copy of the License at
 *
 *     https://opensource.org/licenses/GPL-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the 
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */


/**
 *  @file mt_mbuf_pool.cpp
 *  @time 20130924
 **/

#include <errno.h>
#include <netinet/tcp.h>
#include "micro_thread.h"
#include "mt_sys_hook.h"
#include "ff_hook.h"
#include "mt_net.h"


using namespace std;
using namespace NS_MICRO_THREAD;

CNetHelper::CNetHelper()
{
    handler = (void*)CNetMgr::Instance()->AllocNetItem();
}

CNetHelper::~CNetHelper()
{
    CNetHandler* net_handler = (CNetHandler*)handler;
    if (handler != NULL) 
    {
        net_handler->Reset();
        CNetMgr::Instance()->FreeNetItem(net_handler);
        handler = NULL;
    }
}

int32_t CNetHelper::SendRecv(void* data, uint32_t len, uint32_t timeout)
{
    if (handler != NULL) {
        CNetHandler* net_handler = (CNetHandler*)handler;
        return net_handler->SendRecv(data, len, timeout);
    } else {
        return RC_INVALID_HANDLER;
    }
}

void* CNetHelper::GetRspBuff()
{
    if (handler != NULL) {
        CNetHandler* net_handler = (CNetHandler*)handler;
        return net_handler->GetRspBuff();
    } else {
        return NULL;
    }
}

uint32_t CNetHelper::GetRspLen()
{
    if (handler != NULL) {
        CNetHandler* net_handler = (CNetHandler*)handler;
        return net_handler->GetRspLen();
    } else {
        return 0;
    }
}

char* CNetHelper::GetErrMsg(int32_t result)
{
    static const char* errmsg = "unknown error type";

    switch (result)
    {
        case RC_SUCCESS:
            errmsg = "success";
            break;

        case RC_ERR_SOCKET:
            errmsg = "create socket failed";
            break;
        
        case RC_SEND_FAIL:
            errmsg = "send pakeage timeout or failed";
            break;
            
        case RC_RECV_FAIL:
            errmsg = "recv response timeout or failed";
            break;
            
        case RC_CONNECT_FAIL:
            errmsg = "connect timeout or failed";
            break;

        case RC_CHECK_PKG_FAIL:
            errmsg = "user package check failed";
            break;

        case RC_NO_MORE_BUFF:
            errmsg = "user response buffer too small";
            break;

        case RC_REMOTE_CLOSED:
            errmsg = "remote close connection";
            break;
    
        case RC_INVALID_PARAM:
            errmsg = "params invalid";
            break;

        case RC_INVALID_HANDLER:
            errmsg = "net handler invalid";
            break;

        case RC_MEM_ERROR:
            errmsg = "no more memory, alloc failed";
            break;
            
        case RC_CONFLICT_SID:
            errmsg = "session id with the dest address conflict";
            break;

        case RC_KQUEUE_ERROR:
            errmsg = "epoll system error";
            break;
            
        default:
            break;
    }

    return (char*)errmsg;
}

void CNetHelper::SetProtoType(MT_PROTO_TYPE type)
{
    if (handler != NULL) {
        CNetHandler* net_handler = (CNetHandler*)handler;
        return net_handler->SetProtoType(type);
    } 
}

void CNetHelper::SetDestAddress(struct sockaddr_in* dst)
{
    if (handler != NULL) {
        CNetHandler* net_handler = (CNetHandler*)handler;
        return net_handler->SetDestAddress(dst);
    } 
}

void CNetHelper::SetSessionId(uint64_t sid)
{
    if (handler != NULL) {
        CNetHandler* net_handler = (CNetHandler*)handler;
        return net_handler->SetSessionId(sid);
    }
}

void CNetHelper::SetSessionCallback(CHECK_SESSION_CALLBACK function)
{
    if (handler != NULL) {
        CNetHandler* net_handler = (CNetHandler*)handler;
        return net_handler->SetSessionCallback(function);
    }
}

void CNetHandler::Reset()
{
    this->Unlink();
    this->UnRegistSession();

    if (_rsp_buff != NULL) {
        delete_sk_buffer(_rsp_buff);
        _rsp_buff               = NULL;
    }

    _thread                     = NULL;    
    _proto_type                 = NET_PROTO_TCP;
    _conn_type                  = TYPE_CONN_SESSION;
    _dest_ipv4.sin_addr.s_addr  = 0;
    _dest_ipv4.sin_port         = 0;
    _session_id                 = 0;
    _callback                   = NULL;
    _err_no                     = 0;
    _state_flags                = 0;
    _conn_ptr                   = NULL;
    _send_pos                   = 0;
    _req_len                    = 0;
    _req_data                   = NULL;
  
}

CNetHandler::CNetHandler()
{
    _state_flags = 0;
    _rsp_buff = NULL;
    
    this->Reset();
}

CNetHandler::~CNetHandler()
{
    this->Reset();
}

int32_t CNetHandler::CheckParams()
{
    if ((NULL == _req_data) || (_req_len == 0))
    {
        MTLOG_ERROR("param invalid, data[%p], len[%u]", _req_data, _req_len);
        return RC_INVALID_PARAM;
    }

    if ((_dest_ipv4.sin_addr.s_addr == 0) || (_dest_ipv4.sin_port == 0))
    {
        MTLOG_ERROR("param invalid, ip[%u], port[%u]", _dest_ipv4.sin_addr.s_addr,
             _dest_ipv4.sin_port);
        return RC_INVALID_PARAM;
    }

    if (_conn_type == TYPE_CONN_SESSION)
    {
        if ((_callback == NULL) || (_session_id == 0))
        {
            MTLOG_ERROR("param invalid, callback[%p], session_id[%llu]", _callback, _session_id);
            return RC_INVALID_PARAM;
        }

        if (!this->RegistSession())
        {
            MTLOG_ERROR("param invalid, session_id[%llu] regist failed", _session_id);
            return RC_CONFLICT_SID;
        }
    }

    return 0;
}

int32_t CNetHandler::GetConnLink()
{
    CDestLinks key;
    key.SetKeyInfo(_dest_ipv4.sin_addr.s_addr, _dest_ipv4.sin_port, _proto_type, _conn_type);
    
    CDestLinks* dest_link = CNetMgr::Instance()->FindCreateDest(&key);
    if (NULL == dest_link)
    {
        MTLOG_ERROR("get dest link handle failed");
        return RC_MEM_ERROR;
    }
    
    CSockLink* sock_link = dest_link->GetSockLink();
    if (NULL == sock_link)
    {
        MTLOG_ERROR("get sock link handle failed");
        return RC_MEM_ERROR;
    }
    
    this->Link(sock_link);

    return 0;
}

int32_t CNetHandler::WaitConnect(uint64_t timeout)
{
    CSockLink* conn = (CSockLink*)this->_conn_ptr;
    if (NULL == conn)
    {
        MTLOG_ERROR("get sock link handle failed");
        return RC_MEM_ERROR;
    }
    
    int32_t fd = conn->CreateSock();
    if (fd < 0)
    {
        MTLOG_ERROR("create sock failed, ret %d[%m]", fd);
        return RC_ERR_SOCKET;
    }

    if (conn->Connect())
    {
        MTLOG_DEBUG("sock conncet ok");
        return RC_SUCCESS;
    }

    this->SwitchToConn();

    MtFrame* mtframe = MtFrame::Instance();
    mtframe->WaitNotify(timeout);

    this->SwitchToIdle();

    if (_err_no != 0)
    {
        MTLOG_ERROR("connect get out errno %d", _err_no);
        return _err_no;
    }

    if (conn->Connected())
    {
        MTLOG_DEBUG("connect ok");
        return 0;
    }
    else
    {
        MTLOG_TRACE("connect not ok, maybe timeout");
        return RC_CONNECT_FAIL;
    }
}

int32_t CNetHandler::WaitSend(uint64_t timeout)
{
    CSockLink* conn = (CSockLink*)this->_conn_ptr;
    if (NULL == conn)
    {
        MTLOG_ERROR("get sock link handle failed");
        return RC_MEM_ERROR;
    }
    
    int32_t ret = conn->SendData(_req_data, _req_len);
    if (ret < 0)
    {
        MTLOG_ERROR("sock send failed, ret %d[%m]", ret);
        return RC_SEND_FAIL;
    }
    this->SkipSendPos(ret);
    
    if (_req_len == 0)
    {
        MTLOG_DEBUG("sock send ok");
        return RC_SUCCESS;
    }

    this->SwitchToSend();

    MtFrame* mtframe = MtFrame::Instance();
    mtframe->WaitNotify(timeout);

    this->SwitchToIdle();

    if (_err_no != 0)
    {
        MTLOG_ERROR("send get out errno %d", _err_no);
        return _err_no;
    }

    if (_req_len == 0)
    {
        MTLOG_DEBUG("send req ok, len %u", _send_pos);
        return 0;
    }
    else
    {
        MTLOG_TRACE("send req not ok, left len %u", _req_len);
        return RC_SEND_FAIL;
    }
}

int32_t CNetHandler::WaitRecv(uint64_t timeout)
{
    CSockLink* conn = (CSockLink*)this->_conn_ptr;
    if (NULL == conn)
    {
        MTLOG_ERROR("get sock link handle failed");
        return RC_MEM_ERROR;
    }

    if (_conn_type == TYPE_CONN_SENDONLY)
    {
        MTLOG_DEBUG("only send, without recv");
        return 0;
    }

    this->SwitchToRecv();

    MtFrame* mtframe = MtFrame::Instance();
    mtframe->WaitNotify(timeout);

    this->SwitchToIdle();

    if ((_rsp_buff != NULL) && (_rsp_buff->data_len > 0))
    {
        MTLOG_DEBUG("recv get rsp, len %d", _rsp_buff->data_len);
        return 0;
    }
    else
    {
        MTLOG_TRACE("recv get out errno %d", _err_no);
        return RC_RECV_FAIL;
    }
}

int32_t CNetHandler::SendRecv(void* data, uint32_t len, uint32_t timeout)
{
    utime64_t start_ms = MtFrame::Instance()->GetLastClock();
    utime64_t cost_time = 0;
    uint64_t time_left = timeout;
    this->_req_data = data;
    this->_req_len  = len;

    int32_t ret = this->CheckParams();
    if (ret < 0)
    {
        MTLOG_ERROR("check params failed, ret[%d]", ret);
        goto EXIT_LABEL;
    }

    ret = this->GetConnLink();
    if (ret < 0)
    {
        MTLOG_ERROR("get sock conn failed, ret: %d", ret);
        goto EXIT_LABEL;
    }

    ret = this->WaitConnect(time_left);
    if (ret < 0)
    {
        MTLOG_ERROR("sock connect failed, ret: %d", ret);
        goto EXIT_LABEL;
    }

    cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
    time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
    ret = this->WaitSend(time_left);
    if (ret < 0)
    {
        MTLOG_ERROR("sock send failed, ret: %d", ret);
        goto EXIT_LABEL;
    }

    cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
    time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
    ret = this->WaitRecv(time_left);
    if (ret < 0)
    {
        MTLOG_ERROR("sock recv failed, ret: %d", ret);
        goto EXIT_LABEL;
    }

    ret = 0;

EXIT_LABEL:

    this->Unlink();

    this->UnRegistSession();

    return ret;
}

uint32_t CNetHandler::SkipSendPos(uint32_t len)
{
    uint32_t skip_len = (len >= _req_len) ? _req_len : len;
    _req_len -= skip_len;
    _send_pos += skip_len;
    _req_data = (char*)_req_data + skip_len;

    return skip_len;
}

void CNetHandler::Link(CSockLink* conn)
{
    this->_conn_ptr = conn;
    this->SwitchToIdle();
}

void CNetHandler::Unlink()
{
    if (this->_state_flags != 0)
    {
        this->DetachConn();
    }
    this->_conn_ptr = NULL;
}

void CNetHandler::SwitchToConn()
{
    CSockLink* conn = (CSockLink*)this->_conn_ptr;
    if (NULL == conn)
    {
        MTLOG_ERROR("net handler invalid");
        return; 
    }

    this->DetachConn();

    this->_state_flags |= STATE_IN_CONNECT;
    conn->AppendToList(CSockLink::LINK_CONN_LIST, this);
}

void CNetHandler::SwitchToSend()
{
    CSockLink* conn = (CSockLink*)this->_conn_ptr;
    if (NULL == conn)
    {
        MTLOG_ERROR("net handler invalid");
        return; 
    }

    this->DetachConn();

    this->_state_flags |= STATE_IN_SEND;
    conn->AppendToList(CSockLink::LINK_SEND_LIST, this);
}

void CNetHandler::SwitchToRecv()
{
    CSockLink* conn = (CSockLink*)this->_conn_ptr;
    if (NULL == conn)
    {
        MTLOG_ERROR("net handler invalid");
        return; 
    }

    this->DetachConn();

    this->_state_flags |= STATE_IN_RECV;
    conn->AppendToList(CSockLink::LINK_RECV_LIST, this);
}

void CNetHandler::SwitchToIdle()
{
    CSockLink* conn = (CSockLink*)this->_conn_ptr;
    if (NULL == conn)
    {
        MTLOG_ERROR("net handler invalid");
        return; 
    }

    this->DetachConn();

    this->_state_flags |= STATE_IN_IDLE;
    conn->AppendToList(CSockLink::LINK_IDLE_LIST, this);
}

void CNetHandler::DetachConn()
{
    CSockLink* conn = (CSockLink*)this->_conn_ptr;
    if (NULL == conn)
    {
        MTLOG_DEBUG("net handler not set");
        return; 
    }

    if (_state_flags == 0)
    {
        return;
    }

    if (_state_flags & STATE_IN_CONNECT)
    {
        conn->RemoveFromList(CSockLink::LINK_CONN_LIST, this);
        _state_flags &= ~STATE_IN_CONNECT;
    }
    
    if (_state_flags & STATE_IN_SEND)
    {
        conn->RemoveFromList(CSockLink::LINK_SEND_LIST, this);
        _state_flags &= ~STATE_IN_SEND;
    }

    if (_state_flags & STATE_IN_RECV)
    {
        conn->RemoveFromList(CSockLink::LINK_RECV_LIST, this);
        _state_flags &= ~STATE_IN_RECV;
    }

    if (_state_flags & STATE_IN_IDLE)
    {
        conn->RemoveFromList(CSockLink::LINK_IDLE_LIST, this);
        _state_flags &= ~STATE_IN_IDLE;
    }
}

uint32_t CNetHandler::HashValue()
{
    uint32_t ip = _dest_ipv4.sin_addr.s_addr;
    ip ^= (_dest_ipv4.sin_port << 16) | (_proto_type << 8) | (_conn_type << 8);
    
    uint32_t hash = (_session_id >> 32) & 0xffffffff;
    hash ^= _session_id  & 0xffffffff;
    hash ^= ip;

    return hash;
}

int32_t CNetHandler::HashCmp(HashKey* rhs) 
{
    CNetHandler* data = (CNetHandler*)(rhs);
    if (!data) { 
        return -1;
    }
    if (this->_session_id != data->_session_id)
    {
        return (this->_session_id > data->_session_id) ? 1 : -1;
    }
    
    if (this->_dest_ipv4.sin_addr.s_addr != data->_dest_ipv4.sin_addr.s_addr) {
        return (this->_dest_ipv4.sin_addr.s_addr > data->_dest_ipv4.sin_addr.s_addr) ? 1 : -1;   
    }
    if (this->_dest_ipv4.sin_port != data->_dest_ipv4.sin_port) {
        return (this->_dest_ipv4.sin_port > data->_dest_ipv4.sin_port) ? 1 : -1;
    }
    if (this->_proto_type != data->_proto_type) {
        return (this->_proto_type > data->_proto_type) ? 1 : -1;
    }
    if (this->_conn_type != data->_conn_type) {
        return (this->_conn_type > data->_conn_type) ? 1 : -1;
    }
    
    return 0;
}; 

bool CNetHandler::RegistSession()
{
    if (CNetMgr::Instance()->FindNetItem(this) != NULL)
    {
        return false;
    }
    
    MtFrame* mtframe = MtFrame::Instance();
    this->_thread = mtframe->GetActiveThread();
    
    CNetMgr::Instance()->InsertNetItem(this);
    this->_state_flags |= STATE_IN_SESSION;
    return true;    
}

void CNetHandler::UnRegistSession()
{
    if (this->_state_flags & STATE_IN_SESSION)
    {
        CNetMgr::Instance()->RemoveNetItem(this);
        this->_state_flags &= ~STATE_IN_SESSION;
    }
}

TNetItemList* CSockLink::GetItemList(int32_t type)
{
    TNetItemList* list = NULL;
    switch (type)
    {
        case LINK_IDLE_LIST:
            list = &this->_idle_list;
            break;
            
        case LINK_CONN_LIST:
            list = &this->_wait_connect;
            break;

        case LINK_SEND_LIST:
            list = &this->_wait_send;
            break;

        case LINK_RECV_LIST:
            list = &this->_wait_recv;
            break;

        default:
            break;
    }

    return list;
}

void CSockLink::AppendToList(int32_t type, CNetHandler* item)
{
    TNetItemList* list = this->GetItemList(type);
    if (NULL == list)
    {
        MTLOG_ERROR("unknown list type: %d", type);
        return;
    }
    
    TAILQ_INSERT_TAIL(list, item, _link_entry);
}

void CSockLink::RemoveFromList(int32_t type, CNetHandler* item)
{
    TNetItemList* list = this->GetItemList(type);
    if (NULL == list)
    {
        MTLOG_ERROR("unknown list type: %d", type);
        return;
    }
    
    TAILQ_REMOVE(list, item, _link_entry);
}

void CSockLink::NotifyThread(CNetHandler* item, int32_t result)
{
    static MtFrame* frame = NULL;
    if (frame == NULL) {
        frame = MtFrame::Instance();
    }

    if (result != RC_SUCCESS)
    {
        item->SetErrNo(result);
    }

    MicroThread* thread = item->GetThread();
    if ((thread != NULL) && (thread->HasFlag(MicroThread::IO_LIST)))
    {
        frame->RemoveIoWait(thread);
        frame->InsertRunable(thread);
    }  
}

void CSockLink::NotifyAll(int32_t result)
{
    CNetHandler* item = NULL;
    CNetHandler* tmp = NULL;

    TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
    {
        NotifyThread(item, result);
        item->Unlink();
    }

    TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
    {
        NotifyThread(item, result);
        item->Unlink();
    }

    TAILQ_FOREACH_SAFE(item, &_wait_recv, _link_entry, tmp)
    {
        NotifyThread(item, result);
        item->Unlink();
    }

    TAILQ_FOREACH_SAFE(item, &_idle_list, _link_entry, tmp)
    {
        NotifyThread(item, result);
        item->Unlink();
    }
}

void CSockLink::Reset()
{
    this->Close();
    this->NotifyAll(_errno);

    rw_cache_destroy(&_recv_cache);
    if (_rsp_buff != NULL)
    {
        delete_sk_buffer(_rsp_buff);
        _rsp_buff = NULL;
    }

    TAILQ_INIT(&_wait_connect);
    TAILQ_INIT(&_wait_send);
    TAILQ_INIT(&_wait_recv);
    TAILQ_INIT(&_idle_list);
    
    _proto_type     = NET_PROTO_TCP;
    _errno          = 0;
    _state          = 0;
    _last_access    = mt_time_ms();
    _parents        = NULL;

    this->KqueuerObj::Reset();   
}

CSockLink::CSockLink()
{
    rw_cache_init(&_recv_cache, NULL);
    _rsp_buff       = NULL;

    TAILQ_INIT(&_wait_connect);
    TAILQ_INIT(&_wait_send);
    TAILQ_INIT(&_wait_recv);
    TAILQ_INIT(&_idle_list);

    _proto_type     = NET_PROTO_TCP;
    _errno          = 0;
    _state          = 0;
    _last_access    = mt_time_ms();
    _parents        = NULL;
}

CSockLink::~CSockLink()
{
    this->Reset();    
}

void CSockLink::SetProtoType(MT_PROTO_TYPE type)
{
    _proto_type = type;
    _recv_cache.pool = CNetMgr::Instance()->GetSkBuffMng(type);
}

void CSockLink::Close()
{
    if (_fd < 0)
    {
        return;
    }

    MtFrame::Instance()->KqueueDelObj(this);

    close(_fd);
    _fd = -1;
}

void CSockLink::Destroy()
{
    CDestLinks* dstlink = (CDestLinks*)_parents;
    if (NULL == dstlink)
    {
        MTLOG_ERROR("socket link without parents ptr, maybe wrong");
        delete this;
    }
    else
    {
        MTLOG_DEBUG("socket link just free");
        dstlink->FreeSockLink(this);
    }
}

int32_t CSockLink::CreateSock()
{
    if (_fd > 0)
    {
        return _fd;
    }

    if (NET_PROTO_TCP == _proto_type)
    {
        _fd = socket(AF_INET, SOCK_STREAM, 0);
    }
    else
    {
        _fd = socket(AF_INET, SOCK_DGRAM, 0);
    }

    if (_fd < 0)
    {
        MTLOG_ERROR("create socket failed, ret %d[%m]", _fd);
        return -1;
    }

    int flags = 1;
    if (ioctl(_fd, FIONBIO, &flags) < 0)
    {
        MTLOG_ERROR("socket unblock failed, %m");
        close(_fd);
        _fd = -1;
        return -2;
    }

    if (NET_PROTO_TCP == _proto_type)
    {
        setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
        this->EnableOutput();
    }

    this->EnableInput();
    if (!MtFrame::Instance()->KqueueAddObj(this))
    {
        MTLOG_ERROR("socket epoll mng failed, %m");
        close(_fd);
        _fd = -1;
        return -3;
    }

    return _fd;
}

struct sockaddr_in* CSockLink::GetDestAddr(struct sockaddr_in* addr)
{
    CDestLinks* dstlink = (CDestLinks*)_parents;
    if ((NULL == _parents) || (NULL == addr)) {
        return NULL;
    }

    uint32_t ip = 0;
    uint16_t port = 0;
    dstlink->GetDestIP(ip, port);
    
    addr->sin_family = AF_INET;
    addr->sin_addr.s_addr = ip;
    addr->sin_port = port;

    return addr;
}

bool CSockLink::Connect()
{
    this->_last_access = mt_time_ms();

    if (_proto_type == NET_PROTO_UDP)
    {
        _state |= LINK_CONNECTED;
    }

    if (_state & LINK_CONNECTED)
    {
        return true;
    }

    if (_state & LINK_CONNECTING)
    {
        return false;
    }

    struct sockaddr_in addr = {0};
    
    mt_hook_syscall(connect);
    int32_t ret = ff_hook_connect(_fd, (struct sockaddr*)this->GetDestAddr(&addr), sizeof(struct sockaddr_in));
    if (ret < 0)
    {
        int32_t err = errno;
        if (err == EISCONN)
        {
            _state |= LINK_CONNECTED;
            return true;
        }
        else
        {
            _state |= LINK_CONNECTING;
            if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR))
            {
                MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _fd, err);
                return false;
            }
            else
            {
                MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _fd, err);
                return false;
            }
        }
    }
    else
    {
        _state |= LINK_CONNECTED;
        return true;
    }
}

int32_t CSockLink::SendCacheUdp(void* data, uint32_t len)
{
    mt_hook_syscall(sendto);
    void* buff = NULL;
    uint32_t buff_len = 0;

    CNetHandler* item = NULL;
    CNetHandler* tmp = NULL;
    struct sockaddr_in dst = {0};

    TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
    {
        item->GetSendData(buff, buff_len);
        if ((NULL == buff) || (buff_len == 0))
        {
            MTLOG_ERROR("get buff ptr invalid, log it");
            NotifyThread(item, 0);
            item->SwitchToIdle();
            continue;
        }
        
        int32_t ret = ff_hook_sendto(_fd, buff, buff_len, 0, 
                    (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
        if (ret == -1)
        {
            if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
            {
                return 0;
            }
            else
            {
                MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd, 
                          errno, strerror(errno));
                return -2;
            }
        }
    
        NotifyThread(item, 0);
        item->SwitchToIdle();
    }

    if ((data == NULL) || (len == 0))
    {
        return 0;
    }

    int32_t ret = ff_hook_sendto(_fd, data, len, 0, 
                    (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
    if (ret == -1)
    {
        if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
        {
            return 0;
        }
        else
        {
            MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd, 
                   errno, strerror(errno));
            return -2;
        }
    }
    else
    {
        return ret;
    }
}

int32_t CSockLink::SendCacheTcp(void* data, uint32_t len)
{
    void* buff = NULL;
    uint32_t buff_len = 0;
    struct iovec iov[64];
    int32_t count = 0;
    CNetHandler* item = NULL;
    CNetHandler* tmp = NULL;

    TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
    {
        item->GetSendData(buff, buff_len); 
        iov[count].iov_base = buff;
        iov[count].iov_len  = (int32_t)buff_len;
        count++;
        if (count >= 64)
        {
            break;
        }
    }
    if ((count < 64) && (data != NULL))
    {
        iov[count].iov_base = data;
        iov[count].iov_len  = (int32_t)len;
        count++;
    }
    
    ssize_t bytes = writev(_fd, iov, count);
    if (bytes < 0)
    {
        if ((errno == EAGAIN) || (errno == EINTR))
        {
            return 0;
        }
        else
        {
            MTLOG_ERROR("socket writev failed, fd %d, errno %d(%s)", _fd, 
                   errno, strerror(errno));
            return -1;
        }
    }

    uint32_t send_left = (uint32_t)bytes;
    TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
    {
        send_left -= item->SkipSendPos(send_left);
        item->GetSendData(buff, buff_len); 
        if (buff_len == 0)
        {
            NotifyThread(item, 0);
            item->SwitchToIdle();
        }

        if (send_left == 0)
        {
            break;
        }
    }

    return send_left;
}

int32_t CSockLink::SendData(void* data, uint32_t len)
{
    int32_t ret = 0;
    bool rc = false;

    this->_last_access = mt_time_ms();

    if (_proto_type == NET_PROTO_UDP)
    {
        ret = SendCacheUdp(data, len);
    }
    else
    {
        ret = SendCacheTcp(data, len);
    }

    if (ret < (int32_t)len)
    {
        this->EnableOutput();
        rc = MtFrame::Instance()->KqueueCtrlAdd(_fd, KQ_EVENT_READ); 
    }
    else
    {
        this->DisableOutput();
        rc = MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE); 
    }

    if (!rc)
    {
        MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
    }

    return ret;
}

int32_t CSockLink::RecvDispath()
{
    if (_proto_type == NET_PROTO_UDP)
    {
        return this->DispathUdp();
    }
    else
    {
        return this->DispathTcp();
    }
}

void CSockLink::ExtendRecvRsp()
{
    if (NULL == _rsp_buff)
    {
        _rsp_buff = new_sk_buffer(512);
        if (NULL == _rsp_buff) 
        {
            MTLOG_ERROR("no more memory, error");
            return;
        }
    }
    
    _rsp_buff->data_len +=  read_cache_begin(&_recv_cache, _rsp_buff->data_len, 
        _rsp_buff->data + _rsp_buff->data_len , _rsp_buff->size - _rsp_buff->data_len);
}

CHECK_SESSION_CALLBACK CSockLink::GetSessionCallback()
{
    CHECK_SESSION_CALLBACK check_session = NULL;

    CNetHandler* item = TAILQ_FIRST(&_wait_recv);
    if (NULL == item)
    {
        MTLOG_DEBUG("recv data with no wait item, err");
        goto EXIT_LABEL;
    }
    
    check_session = item->GetSessionCallback();
    if (NULL == check_session)
    {
        MTLOG_ERROR("recv data with no session callback, err");
        goto EXIT_LABEL;
    }

EXIT_LABEL:

    CDestLinks* dstlink = (CDestLinks*)_parents;
    if (NULL == dstlink)
    {
        return check_session;
    }

    if (check_session != NULL)
    {
        dstlink->SetDefaultCallback(check_session);
    }
    else
    {
        check_session = dstlink->GetDefaultCallback();
    }

    return check_session;
}

int32_t CSockLink::DispathTcp()
{
    CHECK_SESSION_CALLBACK check_session = this->GetSessionCallback();
    if (NULL == check_session)
    {
        MTLOG_ERROR("recv data with no session callback, err");
        return -1;
    }

    uint32_t need_len = 0;
    uint64_t sid = 0;
    int32_t ret = 0;
    while (_recv_cache.len > 0)
    {
        this->ExtendRecvRsp();
        if (NULL == _rsp_buff)
        {
            MTLOG_ERROR("alloc memory, error");
            _errno = RC_MEM_ERROR;
            return -3;
        }

        need_len = 0;
        ret = check_session(_rsp_buff->data, _rsp_buff->data_len, &sid, &need_len);
        
        if (ret < 0)
        {
            MTLOG_ERROR("user check resp failed, ret %d", ret);
            _errno = RC_CHECK_PKG_FAIL;
            return -1;
        }

        if (ret == 0)
        {
            if ((need_len == 0) && (_rsp_buff->data_len == _rsp_buff->size))
            {
                MTLOG_DEBUG("recv default buff full[%u], but user no set need length", _rsp_buff->size);
                need_len = _rsp_buff->size * 2;
            }

            if ((need_len <= _rsp_buff->size) || (need_len > 100*1024*1024))
            {
                MTLOG_DEBUG("maybe need wait more data: %u", need_len);
                return 0;
            }

            _rsp_buff = reserve_sk_buffer(_rsp_buff, need_len);
            if (NULL == _rsp_buff)
            {
                MTLOG_ERROR("no more memory, error");
                _errno = RC_MEM_ERROR;
                return -3;
            }

            if (_rsp_buff->data_len >= _recv_cache.len)
            {
                MTLOG_DEBUG("maybe need wait more data, now %u", _recv_cache.len);
                return 0;
            }

            continue;
        }

        if (ret > (int32_t)_recv_cache.len)
        {
            MTLOG_DEBUG("maybe pkg not all ok, wait more");
            return 0;
        }

        CNetHandler* session = this->FindSession(sid);
        if (NULL == session)
        {
            MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
            cache_skip_data(&_recv_cache, ret);
            delete_sk_buffer(_rsp_buff);
            _rsp_buff = NULL;
        }
        else
        {
            MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
            cache_skip_data(&_recv_cache, ret);
            this->NotifyThread(session, 0);
            session->SwitchToIdle();
            _rsp_buff->data_len = ret;
            session->SetRespBuff(_rsp_buff);
            _rsp_buff = NULL;
        }
    }

    return 0;

}

int32_t CSockLink::DispathUdp()
{
    CHECK_SESSION_CALLBACK check_session = NULL;
    CNetHandler* item = TAILQ_FIRST(&_wait_recv);
    if (NULL == item)
    {
        MTLOG_DEBUG("recv data with no wait item, maybe wrong pkg recv");
    }
    else
    {
        check_session = item->GetSessionCallback();
        if (NULL == check_session)
        {
            MTLOG_TRACE("recv data with no session callback, err");
        }
    }

    uint64_t sid = 0;
    uint32_t need_len = 0;
    int32_t ret = 0;
    TSkBuffer* block = NULL;
    while ((block = TAILQ_FIRST(&_recv_cache.list)) != NULL)
    {
        if (check_session == NULL)
        {
            MTLOG_DEBUG("no recv wait, skip first block");
            cache_skip_data(&_recv_cache, block->data_len);
            continue;
        }

        need_len = 0;
        ret = check_session(block->data, block->data_len, &sid, &need_len);
        if ((ret <= 0) || (ret > (int32_t)block->data_len))
        {
            MTLOG_DEBUG("maybe wrong pkg come, skip it");
            cache_skip_data(&_recv_cache, block->data_len);
            continue;
        }

        CNetHandler* session = this->FindSession(sid);
        if (NULL == session)
        {
            MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
            cache_skip_data(&_recv_cache, block->data_len);
        }
        else
        {
            MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
            this->NotifyThread(session, 0);
            session->SwitchToIdle();
            cache_skip_first_buffer(&_recv_cache);
            session->SetRespBuff(block);
        }
    }

    return 0;
}

CNetHandler* CSockLink::FindSession(uint64_t sid)
{
    CNetHandler key;
    CDestLinks* dstlink = (CDestLinks*)_parents;
    if (NULL == dstlink)
    {
        MTLOG_ERROR("session dest link invalid, maybe error");
        return NULL;
    }
    struct sockaddr_in addr;
    key.SetDestAddress(this->GetDestAddr(&addr));
    key.SetConnType(dstlink->GetConnType());
    key.SetProtoType(dstlink->GetProtoType());
    key.SetSessionId(sid);

    return CNetMgr::Instance()->FindNetItem(&key);
}

int CSockLink::InputNotify()
{
    int32_t ret = 0;
    
    this->_last_access = mt_time_ms();

    if (_proto_type == NET_PROTO_UDP)
    {
        ret = cache_udp_recv(&_recv_cache, _fd, NULL);
    }
    else
    {
        ret = cache_tcp_recv(&_recv_cache, _fd);
    }

    if (ret < 0)
    {
        if (ret == -SK_ERR_NEED_CLOSE)
        {
            MTLOG_DEBUG("recv on link failed, remote close");
            _errno = RC_REMOTE_CLOSED;
        }
        else
        {
            MTLOG_ERROR("recv on link failed, close it, ret %d[%m]", ret);
            _errno = RC_RECV_FAIL;
        }
    
        this->Destroy();
        return -1;
    }

    ret = this->RecvDispath();
    if (ret < 0)
    {
        MTLOG_DEBUG("recv dispath failed, close it, ret %d[%m]", ret);
        this->Destroy();
        return -2;
    }

    return 0;
    
}

int CSockLink::OutputNotify()
{
    int32_t ret = 0;
    
    this->_last_access = mt_time_ms();

    if (_state & LINK_CONNECTING)
    {
        _state &= ~LINK_CONNECTING;
        _state |= LINK_CONNECTED;
        
        CNetHandler* item = NULL;
        CNetHandler* tmp = NULL;
        TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
        {
            NotifyThread(item, 0);
            item->SwitchToIdle();
        }
    }

    if (_proto_type == NET_PROTO_UDP)
    {
        ret = SendCacheUdp(NULL, 0);
    }
    else
    {
        ret = SendCacheTcp(NULL, 0);
    }

    if (ret < 0)
    {
        MTLOG_ERROR("Send on link failed, close it, ret %d[%m]", ret);
        _errno = RC_SEND_FAIL;
        this->Destroy();
        return ret;
    }

    if (TAILQ_EMPTY(&_wait_send))
    {
        this->DisableOutput();
        if (!MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE))
        {
            MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
        }
    }

    return 0;
}

int CSockLink::HangupNotify()
{
    MTLOG_ERROR("socket epoll error, fd %d", _fd);
    
    this->_errno = RC_KQUEUE_ERROR;
    this->Destroy();
    return -1;
}

CDestLinks::CDestLinks()
{
    _timeout        = 5*60*1000;
    _addr_ipv4      = 0;
    _net_port       = 0;
    _proto_type     = NET_PROTO_UNDEF;
    _conn_type      = TYPE_CONN_SESSION;
    
    _max_links      = 3; // 默认3个
    _curr_link      = 0;
    _dflt_callback  = NULL;

    TAILQ_INIT(&_sock_list);
}

void CDestLinks::Reset()
{
    CSockLink* item = NULL;
    CSockLink* temp = NULL;
    TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
    {
        item->Destroy();
    }
    TAILQ_INIT(&_sock_list);

    CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
    if (NULL != timer) 
    {
        timer->stop_timer(this);
    }

    _timeout        = 5*60*1000;
    _addr_ipv4      = 0;
    _net_port       = 0;
    _proto_type     = NET_PROTO_UNDEF;
    _conn_type      = TYPE_CONN_SESSION;
    
    _max_links      = 3;
    _curr_link      = 0;
}

CDestLinks::~CDestLinks()
{
    this->Reset();
}

void CDestLinks::StartTimer()
{
    CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
    if ((NULL == timer) || !timer->start_timer(this, 60*1000))
    {
        MTLOG_ERROR("obj %p attach timer failed, error", this);
    }
}

void CDestLinks::FreeSockLink(CSockLink* sock)
{
    if ((sock == NULL) || (sock->GetParentsPtr() != (void*)this))
    {
        MTLOG_ERROR("invalid socklink %p, error", sock);
        return;
    }

    TAILQ_REMOVE(&_sock_list, sock, _link_entry);
    if (this->_curr_link > 0) {
        this->_curr_link--;
    }

    sock->Reset();
    CNetMgr::Instance()->FreeSockLink(sock);
}

CSockLink* CDestLinks::GetSockLink()
{
    CSockLink* link = NULL;
    if (_curr_link < _max_links)
    {
        link = CNetMgr::Instance()->AllocSockLink();
        if (NULL == link)
        {
            MTLOG_ERROR("alloc sock link failed, error");
            return NULL;
        }
        link->SetParentsPtr(this);
        link->SetProtoType(_proto_type);
        TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
        _curr_link++;
    }
    else
    {
        link = TAILQ_FIRST(&_sock_list);
        TAILQ_REMOVE(&_sock_list, link, _link_entry);
        TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
    }

    return link;
}

void CDestLinks::timer_notify()
{
    uint64_t now = mt_time_ms();
    CSockLink* item = NULL;
    CSockLink* temp = NULL;
    TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
    {
        if ((item->GetLastAccess() + this->_timeout) < now)
        {
            MTLOG_DEBUG("link timeout, last[%llu], now [%llu]", item->GetLastAccess(), now);
            item->Destroy();
        }
    }

    item = TAILQ_FIRST(&_sock_list);
    if (NULL == item)
    {
        MTLOG_DEBUG("dest links timeout, now [%llu]", now);
        CNetMgr::Instance()->DeleteDestLink(this);
        return;
    }

    this->StartTimer();
    
    return;
}

CNetMgr* CNetMgr::_instance = NULL;
CNetMgr* CNetMgr::Instance (void)
{
    if (NULL == _instance)
    {
        _instance = new CNetMgr();
    }

    return _instance;
}

void CNetMgr::Destroy()
{
    if( _instance != NULL )
    {
        delete _instance;
        _instance = NULL;
    }
}

CNetHandler* CNetMgr::FindNetItem(CNetHandler* key)
{
    if (NULL == this->_session_hash)
    {
        return NULL;
    }

    return (CNetHandler*)_session_hash->HashFind(key);
}

void CNetMgr::InsertNetItem(CNetHandler* item)
{
    if (NULL == this->_session_hash)
    {
        return;
    }

    int32_t ret = _session_hash->HashInsert(item);
    if (ret < 0)
    {
        MTLOG_ERROR("session insert failed, ret %d", ret);
    }
    
    return;
}

void CNetMgr::RemoveNetItem(CNetHandler* item)
{
    CNetHandler* handler =  this->FindNetItem(item);
    if (NULL == handler)
    {
        return;
    }

    _session_hash->HashRemove(handler);
}

CDestLinks* CNetMgr::FindDestLink(CDestLinks* key)
{
    if (NULL == this->_ip_hash)
    {
        return NULL;
    }

    return (CDestLinks*)_ip_hash->HashFind(key);
}

void CNetMgr::InsertDestLink(CDestLinks* item)
{
    if (NULL == this->_ip_hash)
    {
        return;
    }

    int32_t ret = _ip_hash->HashInsert(item);
    if (ret < 0)
    {
        MTLOG_ERROR("ip dest insert failed, ret %d", ret);
    }
    
    return;
}

void CNetMgr::RemoveDestLink(CDestLinks* item)
{
    CDestLinks* handler =  this->FindDestLink(item);
    if (NULL == handler)
    {
        return;
    }

    _ip_hash->HashRemove(handler);
}

CDestLinks* CNetMgr::FindCreateDest(CDestLinks* key)
{
    CDestLinks* dest = this->FindDestLink(key);
    if (dest != NULL)
    {
        MTLOG_DEBUG("dest links reuse ok");
        return dest;
    }

    dest = this->AllocDestLink();
    if (NULL == dest)
    {
        MTLOG_ERROR("dest links alloc failed, log it");
        return NULL;
    }

    dest->CopyKeyInfo(key);
    dest->StartTimer();
    this->InsertDestLink(dest);
    
    return dest;
}

void CNetMgr::DeleteDestLink(CDestLinks* dst)
{
    this->RemoveDestLink(dst);
    dst->Reset();
    this->FreeDestLink(dst);   
}

CNetMgr::CNetMgr()
{
    sk_buffer_mng_init(&_tcp_pool, 60, 4096);
    sk_buffer_mng_init(&_udp_pool, 60, SK_DFLT_BUFF_SIZE);

    _ip_hash = new HashList(100000);
    _session_hash = new HashList(100000);
}

CNetMgr::~CNetMgr()
{
    if (_ip_hash != NULL) 
    {
        HashKey* hash_item = _ip_hash->HashGetFirst();
        while (hash_item)
        {
            delete hash_item;
            hash_item = _ip_hash->HashGetFirst();
        }
    
        delete _ip_hash;
        _ip_hash = NULL;
    }

    if (_session_hash != NULL) 
    {
        HashKey* hash_item = _session_hash->HashGetFirst();
        while (hash_item)
        {
            delete hash_item;
            hash_item = _session_hash->HashGetFirst();
        }
        
        delete _session_hash;
        _session_hash = NULL;
    }

    sk_buffer_mng_destroy(&_tcp_pool);
    sk_buffer_mng_destroy(&_udp_pool);
}

void CNetMgr::RecycleObjs(uint64_t now)
{
    uint32_t now_s = (uint32_t)(now / 1000);
    
    recycle_sk_buffer(&_udp_pool, now_s);
    recycle_sk_buffer(&_tcp_pool, now_s);
    
    _net_item_pool.RecycleItem(now);
    _sock_link_pool.RecycleItem(now);
    _dest_ip_pool.RecycleItem(now);
}