diff --git a/app/micro_thread/micro_thread.cpp b/app/micro_thread/micro_thread.cpp index 55f494eeb..452b55378 100644 --- a/app/micro_thread/micro_thread.cpp +++ b/app/micro_thread/micro_thread.cpp @@ -49,7 +49,7 @@ Thread::Thread(int stack_size) memset(&_jmpbuf, 0, sizeof(_jmpbuf)); } - +static DefaultLogAdapter def_log_adapt; /** * @brief LINUX x86/x86_64's allocated stacks. */ @@ -75,7 +75,7 @@ bool Thread::InitStack() void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0); if (vaddr == (void *)MAP_FAILED) { - MTLOG_ERROR("mmap stack failed, size %d", memsize); + MTLOG_ERROR("mmap stack failed, size %d,errmsg: %s.", memsize,strerror(errno)); free(_stack); _stack = NULL; return false; @@ -130,6 +130,12 @@ void Thread::SwitchContext() } } + +int Thread::SaveContext() +{ + return save_context(_jmpbuf); +} + void Thread::RestoreContext() { restore_context(_jmpbuf, 1); @@ -375,6 +381,7 @@ void ScheduleObj::ScheduleStartRun() unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads. +unsigned int ThreadPool::last_default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads. unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE; ///< 128k stack. bool ThreadPool::InitialPool(int max_num) @@ -442,6 +449,7 @@ MicroThread* ThreadPool::AllocThread() if (_total_num >= _max_num) { MT_ATTR_API(361140, 1); // no more quota + MTLOG_ERROR("total %d is outof max: %d", _total_num,_max_num); return NULL; } @@ -455,6 +463,12 @@ MicroThread* ThreadPool::AllocThread() } _total_num++; _use_num++; + if(_use_num >(int) default_thread_num){ + if(((int) default_thread_num * 2 )< _max_num){ + last_default_thread_num = default_thread_num; + default_thread_num = default_thread_num * 2; + } + } return thread; } @@ -475,6 +489,10 @@ void ThreadPool::FreeThread(MicroThread* thread) thread->Destroy(); delete thread; _total_num--; + if(default_thread_num / 2 >= DEFAULT_THREAD_NUM){ + last_default_thread_num = default_thread_num; + default_thread_num = default_thread_num / 2; + } } } @@ -500,7 +518,11 @@ void MtFrame::SetHookFlag() { bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num) { - _log_adpt = logadpt; + if(logadpt == NULL){ + _log_adpt = &def_log_adapt; + }else{ + _log_adpt = logadpt; + } if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num)) { @@ -851,6 +873,42 @@ void MtFrame::WaitNotify(utime64_t timeout) thread->SwitchContext(); } +void MtFrame::NotifyThread(MicroThread* thread) +{ + if(thread == NULL){ + return; + } + MicroThread* cur_thread = GetActiveThread(); + if (thread->HasFlag(MicroThread::IO_LIST)) + { + this->RemoveIoWait(thread); + if(cur_thread == this->DaemonThread()){ + // 这里不直接切的话,还是不及时,会导致目标线程等待到超时 + if(cur_thread->SaveContext() == 0){ + this->SetActiveThread(thread); + thread->SetState(MicroThread::RUNNING); + thread->RestoreContext(); + } + }else{ + this->InsertRunable(thread); + } + } +} + +void MtFrame::SwapDaemonThread() +{ + MicroThread* thread = GetActiveThread(); + MicroThread* daemon_thread = this->DaemonThread(); + if(thread != daemon_thread){ + if(thread->SaveContext() == 0){ + this->InsertRunable(thread); + this->SetActiveThread(daemon_thread); + daemon_thread->SetState(MicroThread::RUNNING); + daemon_thread->RestoreContext(); + } + } +} + bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) { MicroThread* thread = GetActiveThread(); diff --git a/app/micro_thread/micro_thread.h b/app/micro_thread/micro_thread.h index e91bdb114..9aa97231f 100644 --- a/app/micro_thread/micro_thread.h +++ b/app/micro_thread/micro_thread.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -56,8 +57,9 @@ namespace NS_MICRO_THREAD { #define STACK_PAD_SIZE 128 #define MEM_PAGE_SIZE 4096 -#define DEFAULT_STACK_SIZE 128*1024 -#define DEFAULT_THREAD_NUM 2000 +#define DEFAULT_STACK_SIZE STACK_PAD_SIZE * 1024 +#define DEFAULT_THREAD_NUM 5000 +#define MAX_THREAD_NUM 800000 typedef unsigned long long utime64_t; typedef void (*ThreadStart)(void*); @@ -119,6 +121,8 @@ public: void SwitchContext(void); + int SaveContext(void); + void RestoreContext(void); utime64_t GetWakeupTime(void) { @@ -317,12 +321,51 @@ public: }; +class DefaultLogAdapter :public LogAdapter +{ +public: + + + bool CheckDebug(){ return false;}; + bool CheckTrace(){ return false;}; + bool CheckError(){ return false;}; + + inline void LogDebug(char* fmt, ...){ + va_list args; + char szBuff[1024]; + va_start(args, fmt); + memset(szBuff, 0, sizeof(szBuff)); + vsprintf(szBuff, fmt, args); + va_end(args); + printf("%s\n",szBuff); + }; + inline void LogTrace(char* fmt, ...){ + va_list args; + char szBuff[1024]; + va_start(args, fmt); + memset(szBuff, 0, sizeof(szBuff)); + vsprintf(szBuff, fmt, args); + va_end(args); + printf("%s\n",szBuff); + }; + inline void LogError(char* fmt, ...){ + va_list args; + char szBuff[1024]; + va_start(args, fmt); + memset(szBuff, 0, sizeof(szBuff)); + vsprintf(szBuff, fmt, args); + va_end(args); + printf("%s\n",szBuff); + }; + +}; class ThreadPool { public: static unsigned int default_thread_num; + static unsigned int last_default_thread_num; static unsigned int default_stack_size; static void SetDefaultThreadNum(unsigned int num) { @@ -403,7 +446,7 @@ public: MicroThread *GetRootThread(); - bool InitFrame(LogAdapter* logadpt = NULL, int max_thread_num = 50000); + bool InitFrame(LogAdapter* logadpt = NULL, int max_thread_num = MAX_THREAD_NUM); void SetHookFlag(); @@ -441,6 +484,10 @@ public: void WaitNotify(utime64_t timeout); + void NotifyThread(MicroThread* thread); + + void SwapDaemonThread(); + void RemoveIoWait(MicroThread* thread); void InsertRunable(MicroThread* thread); diff --git a/app/micro_thread/mt_api.cpp b/app/micro_thread/mt_api.cpp index 474685fd7..586f9a4ae 100644 --- a/app/micro_thread/mt_api.cpp +++ b/app/micro_thread/mt_api.cpp @@ -645,6 +645,26 @@ void* mt_start_thread(void* entry, void* args) return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true); } +void* mt_active_thread() +{ + return MtFrame::Instance()->GetActiveThread(); +} + +void mt_thread_wait(int ms) +{ + MtFrame::Instance()->WaitNotify(ms); +} + +void mt_thread_wakeup_wait(void * thread_p) +{ + MtFrame::Instance()->NotifyThread((MicroThread *) thread_p); +} + +void mt_swap_thread() +{ + return MtFrame::Instance()->SwapDaemonThread(); +} + #define BUF_ALIGNMENT_SIZE 4096 #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1)) #define BUF_DEFAULT_SIZE 4096 diff --git a/app/micro_thread/mt_api.h b/app/micro_thread/mt_api.h index fe1e14a57..dfc3e64cb 100644 --- a/app/micro_thread/mt_api.h +++ b/app/micro_thread/mt_api.h @@ -134,6 +134,14 @@ int mt_wait_events(int fd, int events, int timeout); void* mt_start_thread(void* entry, void* args); +void* mt_active_thread(); + +void mt_thread_wait(int ms); + +void mt_thread_wakeup_wait(void * thread_p); + +void mt_swap_thread(); + } #endif