1.最近项目不是很忙,结合之前看的一些开源代码(skynet及其他github代码)及项目代码,抽空写了一个简单的任务队列当做练习。
2.介绍:
1)全局队列中锁的使用:多线程下,全局队列需要加锁,本例中封装了MutexGuard。操作全局队列之前,先在栈上创建一个临时锁对象,调用构造函数时加锁,对象销毁时调用析构函数从而解锁,减少了我们手动加锁,解锁的过程。
2)信号的使用:本例可以说是为了使用信号而使用信号,仅仅是为了熟悉信号机一些特性。 当程序以后台模式 跑起来以后,输入kill -USR1 %1 向程序发送SIGUSR1信号,从而使生产者生产一定数量的job,供消费者使用;消费者线程,在处理完全局队列以后sleep,等待生产者产生新任务; 输入 kill -USR2 %1, 改变变量状态,向信号监听线程发送结束通知,结束线程。
3)简单的线程池模型。
4)简单的线程间通信和同步方式示例。
5)简单的类模板的使用。
3.编译: 文件不多,偷懒没有写makefile文件,可自行加上。编译指令 : g++ -g -Wall -o test main.cpp mutex.cpp List.h mutex.h -lpthread
4:执行流程:
1)编译成功后,输入 ./mytest &。 以后台模式运行程序
2)此时所有consumer线程阻塞,等待生产者生产job; 一个producer线程阻塞在select处,等待读管道内的消息;一个signal_handler线程调用 pthread_sigwait( ... ) 等待 SIGUSR1 和 SIGUSR2 信号的到来。
可通过在控制台输入: kill -USR1 %1(ps: kill 指令用来产生信号 当以后台模式运行该进程时, %1用来获得该进程 id,因此该命令表示向 该进程发送 SIGUSR1 信号)进程发送SIGUSR1信号,被signal_handler捕捉到以后,生产job,唤醒consumer线程处理job,此流程可重复执行;当在控制台输入 kill -USR2 %1 时, 改变quit变量值,从而使得各个线程退出,进程结束。还有一个 spoliling 轮询线程,在全局队列不为空的情况下,及时唤醒consumer线程处理任务。可通过调整wakeup中的参数,调整唤醒consumer的频率。
5.参考:
1)UNIX环境高级编程。
2)https://github.com/idispatch/signaltest
3)https:github.com/cloudwu/skynet/skynet-src/skynet_start.c
水平有限,仅供参考,希望能对读者有所帮助。以上描述及以下源码有任何漏洞与不足,欢迎及时指正与交流。
6:源码:
main.cpp:
#include#include #include #include #include #include #include #include #include #include #include "List.h"#include "mutex.h"#define THREAD_NUM 4#define JOB_NUM 100#define handle_error_en(en, msg) \ do{ errno = en; perror(msg); exit(EXIT_FAILURE); } while(0)using std::cout;using std::endl;using std::string;using std::cin;struct monitor { int count; pthread_cond_t cond; pthread_mutex_t mutex; int sleep; int quit; int pfds[2];};struct sig{ sigset_t set; struct monitor *m;};typedef void (*thread_func)(void *arg, int value); //job call backstruct job{ void *arg; thread_func cb;}; List g_list; const int allowed_signals[] = {SIGUSR1, SIGUSR2, SIGQUIT}; static voidprint_v(void *value, int pid){ printf("pid: %d, value: %d\n", pid, *(int*)value);} static voidfree_job(struct job *j){ if(j == NULL) { return; } free(j->arg); j->arg = NULL; free(j); j = NULL;}static int dispatch(int pid){ struct job *j = g_list.Pop(); if (j != NULL) { j->cb(j->arg, pid); free_job(j); return 0; } return -1;} static void * consumer(void *arg){ struct monitor *m = (struct monitor *)arg; int r = 0; usleep(50000); int pid = pthread_self(); while(!m->quit) { r = dispatch(pid); if (r < 0) { if(pthread_mutex_lock(&m->mutex) == 0) { ++m->sleep; cout << "thread : " << pid << " sleep" << endl; if(!m->quit) { pthread_cond_wait(&m->cond, &m->mutex); } -- m->sleep; cout << "thread : " << pid << " wakeup" << endl; if(pthread_mutex_unlock(&m->mutex)) { fprintf(stderr, "unlock mutex error"); exit(1); } } } } cout << "thread consumer quit " << endl; return NULL; } static voidfree_monitor(struct monitor *m){ if(m == NULL) { return; } cout << "free monitor called" << endl; close(m->pfds[0]); close(m->pfds[1]); free(m); cout << "free monitor over" << endl;} static voidwakeup(struct monitor *m, int busy) { if (m->sleep >= m->count - busy) { // signal sleep worker, "spurious wakeup" is harmless pthread_cond_signal(&m->cond); }} static struct job*create_job(){ struct job * j = (struct job *)calloc(1, sizeof(*j)); if (j == NULL) { fprintf(stderr, "create_job failed"); return NULL; } int v = rand(); j->arg = malloc(sizeof (int)); if (j->arg == NULL) { fprintf(stderr, "get arg failed"); return NULL; } memcpy(j->arg, &v, sizeof (int) ); j->cb = print_v; return j;} static void * producer(void *arg){ struct monitor *m = (struct monitor *)arg; cout << "producer called" << endl; int pid = pthread_self(); int state; while(!m->quit) { fd_set fds; FD_ZERO(&fds); FD_SET(m->pfds[0], &fds); state = select(m->pfds[0] + 1, &fds, NULL, NULL, NULL); if(state < 0) { if(errno == EINTR) { cout << "errno == EINTR" << endl; continue; } break; } else if (state == 0) { } else { char msg[200]; memset(msg, 0, sizeof(msg)); read(m->pfds[0], msg, sizeof(msg)); //only to clear up pipe. msg[strlen(msg)] = '\0'; fprintf(stdout, "msgis: %s\n", msg); fflush(stdout); if (FD_ISSET(m->pfds[0], &fds)) { if(strncmp(msg, "quit", strlen("quit")) == 0) { break; } int i; for (i = 0; i < JOB_NUM; i++) { struct job *j = create_job(); if (j == NULL) { fprintf(stderr, "prodecer failed"); exit(1); } g_list.Push(j); } cout << "Thread " << "[" << pid << "]" << ": create " << JOB_NUM << " jobs" << endl; wakeup(m, 2); } } } cout << "thread producer quit" << endl; return NULL;} static intcheck_g_list(){ int len = g_list.get_job_num(); if(len == 0 ) { return -1; } return 1;} static void * spoiling(void *arg){ struct monitor *m = (struct monitor *)arg; cout << "spoiling called" << endl; while(!m->quit) { int n = check_g_list(); if(n == 0) { break; } if(n < 0) { continue; } wakeup(m, 1); } cout << "thread spoiling quit" << endl; return NULL;} static voidthread_create(pthread_t *pid, void *arg , void * (*pthread_func) (void *)){ if(pthread_create(pid, NULL, pthread_func, arg) != 0) { fprintf(stderr, "create_thread failed"); exit(1); }} static void*signal_handler(void *arg){ struct monitor *m = (struct monitor *)arg; int isig, state; sigset_t set; sigemptyset(&set); sigaddset(&set, SIGUSR1); sigaddset(&set, SIGUSR2); sigaddset(&set, SIGTERM); cout << "signal_handler called" << endl; for(;;) { state = sigwait(&set, &isig); cout << "sigwait : " << isig << endl; if(state != 0) { fprintf(stderr, "wrong state %d\n", state); continue; } if(isig == SIGUSR1) { cout << "SIGUSR1 " << endl; char msg[200]; memset(msg, 0, sizeof(msg)); snprintf(msg, sizeof(msg), "signal_handler: received signal=%d(thread=%d)\n", isig, (int)pthread_self()); write(m->pfds[1], msg, strlen(msg)); } else if(isig == SIGUSR2) { cout << "SIGUSR2 " << endl; pthread_mutex_lock(&m->mutex); m->quit = 1; write(m->pfds[1], "quit", strlen("quit")); pthread_cond_broadcast(&m->cond); pthread_mutex_unlock(&m->mutex); //when quit, send "quit" to producer or it will block on select break; } else { cout << "SIG OTHER quit" << endl; break; } } cout << "signal_handler quit" << endl; return NULL;} static void start_thread(){ pthread_t pids[THREAD_NUM + 3]; struct monitor *m = (struct monitor *)malloc(sizeof(*m));//(struct monitor *)malloc(sizeof(*m)); if (m == NULL) { fprintf(stderr, "create monitor failed"); exit(1); } if(pipe(m->pfds)) { fprintf(stderr, "%s: pipe failed\n", __FUNCTION__); exit(1); } m->count = THREAD_NUM; m->sleep = 0; m->quit = 0; if(pthread_mutex_init(&(m->mutex), NULL) != 0 || pthread_cond_init(&(m->cond), NULL) != 0) { fprintf(stderr, "mutex or cond init failed"); exit(1); } int rc; sigset_t set; sigemptyset(&set); sigaddset(&set, SIGUSR1); sigaddset(&set, SIGUSR2); sigaddset(&set, SIGQUIT); rc = pthread_sigmask(SIG_BLOCK, &set, NULL); if(rc != 0) { fprintf(stderr, "%s pthread_sigmask failed\n", __FUNCTION__); exit(1); } thread_create(&pids[0], m, signal_handler); thread_create(&pids[1], m, spoiling); //spoiling thread , check if the g_list is empty thread_create(&pids[2], m, producer); //producer thread int i; for (i = 3; i < THREAD_NUM + 3; i++) { thread_create(&pids[i], m, consumer); //consumer thread } for (i = 0; i < THREAD_NUM + 3; i++) { pthread_join(pids[i], NULL); } free_monitor(m);} int main(int argc, char *argv[]){ cout << "-----------------start---------------------" << endl; start_thread(); cout << "------------------end----------------------" << endl;}
mutex.h
#ifndef __MUTEX__H__#define __MUTEX__H__#include#include
class MyMutex{ public: MyMutex(pthread_mutex_t& m); ~MyMutex(); void Lock(); void UnLock(); private: pthread_mutex_t& m_m;};class MyMutexGuard{ public: MyMutexGuard(pthread_mutex_t& m); ~MyMutexGuard(); private: MyMutex mm;};#endif
mutex.cpp
#include "mutex.h"MyMutex::MyMutex(pthread_mutex_t& m) : m_m(m){}MyMutex::~MyMutex(){}void MyMutex::Lock(){ pthread_mutex_lock(&m_m);}void MyMutex::UnLock(){ pthread_mutex_unlock(&m_m);}MyMutexGuard::MyMutexGuard(pthread_mutex_t& m):mm(m){ mm.Lock();}MyMutexGuard::~MyMutexGuard(){ mm.UnLock();}
List.h
#ifndef __LIST_HEAD__#define __LIST_HEAD__#include "mutex.h"#includeusing std::list;#ifndef _WIN32#include
#endiftemplate class List{ public: List(); List(const list &l); virtual ~List(); T Pop(); void Push(const T t); bool Empty(); int get_job_num(); private: void init(); void destroy(); private: bool m_init; list my_list; pthread_mutex_t mm;};#include "List.cpp"#endif
List.cpp
#include "List.h"#include "mutex.h"templateList ::List() :m_init(false){ }template List ::List(const list &l) :m_init(false){}template List ::~List(){ destroy();}template void List ::Push(const T t){ MyMutexGuard g(mm); my_list.push_back(t);}template T List ::Pop(){ MyMutexGuard g(mm); if(my_list.empty()) { return NULL; } else { T tt = my_list.front(); my_list.pop_front(); return tt; }}template bool List ::Empty(){ MyMutexGuard g(mm); return my_list.empty();}template void List ::init(){ if(!m_init) { m_init = (pthread_mutex_init(&mm, NULL) == 0); } return m_init;}template void List ::destroy(){ pthread_mutex_destroy(&mm);}template int List ::get_job_num(){ MyMutexGuard g(mm); return my_list.size();}