Trying to develop a pool executor from pthreads... in C++

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Trying to develop a pool executor from pthreads... in C++

mlnglstaccram alksjd
For the sake of learning i am trying to develp a pool executor... but
it doesnt work.. can u ppl tell me where i am goin wrong? I am getting
an assertion failure in windows in the line below with comments, in
file PoolExecutor.h..


/*
main.cpp
*/
#include <pthread.h>
#include <iostream>
#include <windows.h>
#include "PoolExecutor.h"



using namespace std;

class Runner{
public:
       Runner(){}
       ~Runner(){}
       void run(){
               for(int i=0;i<100;++i)
                       cout<<"i="<<i << " in thread "<<
((int)pthread_self().p) <<endl;
       }
};

int main(){
       PoolExecutor pool(10);
       pool.execute(((Runnable *)new Runner()));
       pool.execute(((Runnable *)new Runner()));
       pool.execute(((Runnable *)new Runner()));
       Sleep(1000);
       return 0;
}

///////////////////////////////////////////////////////////////////////////next
file//////////////////////////////////////////

/*in file PoolExecutor.h*/

#include<pthread.h>
#include<queue>
#include<windows.h>

class PoolExecutor;
void *func(void *temp);

class Runnable{
public:
       virtual void run()=0;
};


class Thread{
       pthread_t threadId;
       Runnable *runnable;
public:
       Thread(){
               runnable=NULL;
       }

       void start(PoolExecutor *pthRun){
               int started = pthread_create (&threadId, NULL, func, pthRun);
               std::cout<< started<<std::endl;
       }

       void join(){
               pthread_join(threadId,NULL);
       }

};



class PoolExecutor{
       Thread *threads;
       int N;
       bool cancelled ;
       std::queue<Runnable*> taskQueue;
       pthread_mutex_t taskQueueMutex;
public:
       PoolExecutor(int numberOfThreads){
               N=numberOfThreads;
               if(numberOfThreads<=0 && numberOfThreads>512)
                       N=5;
               cancelled= false;
       pthread_mutex_init(&taskQueueMutex,NULL);
               threads = new Thread[N];
               for(int i=0;i<N;++i)
                       threads[i].start(this);
       }
       ~PoolExecutor(){
               cancelled = true;
               for(int i=0;i<N;++i)
                       threads[i].join();
               delete[] threads;
       }

       void execute(Runnable* runnable ){
               pthread_mutex_lock(&taskQueueMutex);
               taskQueue.push(runnable);
               pthread_mutex_unlock(&taskQueueMutex);
       }

       void* runMethod(void * param){
               for(;;){
                       while(taskQueue.empty()&& !cancelled){
                               Sleep(100);
                       }
                       if(cancelled){
                               return NULL;
                       }
                       pthread_mutex_lock(&taskQueueMutex);
                       if(taskQueue.empty()){
                               pthread_mutex_unlock(&taskQueueMutex);
                               continue;
                       }
                       Runnable *runnable = (Runnable *)taskQueue.front();
////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////// i get error in the above line
////////////////////////////////////////////////////////////////////////////////////////////////
                       taskQueue.pop();
                       pthread_mutex_unlock(&taskQueueMutex);
                       runnable->run();
                       delete runnable;
                       if(cancelled){
                               return NULL;
                       }
               }
               return NULL;
       }

};

void *func(void *temp){
       PoolExecutor *poolExecutor = (PoolExecutor *)temp;
       return poolExecutor->runMethod(NULL);
}

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Trying to develop a pool executor from pthreads... in C++

gcherian
Hello,

Runner class should be inheriting the Runnable (to override the virtual run() method)

-George Cherian

mlnglstaccram alksjd wrote
For the sake of learning i am trying to develp a pool executor... but
it doesnt work.. can u ppl tell me where i am goin wrong? I am getting
an assertion failure in windows in the line below with comments, in
file PoolExecutor.h..


/*
main.cpp
*/
#include <pthread.h>
#include <iostream>
#include <windows.h>
#include "PoolExecutor.h"



using namespace std;

class Runner{
public:
       Runner(){}
       ~Runner(){}
       void run(){
               for(int i=0;i<100;++i)
                       cout<<"i="<<i << " in thread "<<
((int)pthread_self().p) <<endl;
       }
};

int main(){
       PoolExecutor pool(10);
       pool.execute(((Runnable *)new Runner()));
       pool.execute(((Runnable *)new Runner()));
       pool.execute(((Runnable *)new Runner()));
       Sleep(1000);
       return 0;
}

///////////////////////////////////////////////////////////////////////////next
file//////////////////////////////////////////

/*in file PoolExecutor.h*/

#include<pthread.h>
#include<queue>
#include<windows.h>

class PoolExecutor;
void *func(void *temp);

class Runnable{
public:
       virtual void run()=0;
};


class Thread{
       pthread_t threadId;
       Runnable *runnable;
public:
       Thread(){
               runnable=NULL;
       }

       void start(PoolExecutor *pthRun){
               int started = pthread_create (&threadId, NULL, func, pthRun);
               std::cout<< started<<std::endl;
       }

       void join(){
               pthread_join(threadId,NULL);
       }

};



class PoolExecutor{
       Thread *threads;
       int N;
       bool cancelled ;
       std::queue<Runnable*> taskQueue;
       pthread_mutex_t taskQueueMutex;
public:
       PoolExecutor(int numberOfThreads){
               N=numberOfThreads;
               if(numberOfThreads<=0 && numberOfThreads>512)
                       N=5;
               cancelled= false;
       pthread_mutex_init(&taskQueueMutex,NULL);
               threads = new Thread[N];
               for(int i=0;i<N;++i)
                       threads[i].start(this);
       }
       ~PoolExecutor(){
               cancelled = true;
               for(int i=0;i<N;++i)
                       threads[i].join();
               delete[] threads;
       }

       void execute(Runnable* runnable ){
               pthread_mutex_lock(&taskQueueMutex);
               taskQueue.push(runnable);
               pthread_mutex_unlock(&taskQueueMutex);
       }

       void* runMethod(void * param){
               for(;;){
                       while(taskQueue.empty()&& !cancelled){
                               Sleep(100);
                       }
                       if(cancelled){
                               return NULL;
                       }
                       pthread_mutex_lock(&taskQueueMutex);
                       if(taskQueue.empty()){
                               pthread_mutex_unlock(&taskQueueMutex);
                               continue;
                       }
                       Runnable *runnable = (Runnable *)taskQueue.front();
////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////// i get error in the above line
////////////////////////////////////////////////////////////////////////////////////////////////
                       taskQueue.pop();
                       pthread_mutex_unlock(&taskQueueMutex);
                       runnable->run();
                       delete runnable;
                       if(cancelled){
                               return NULL;
                       }
               }
               return NULL;
       }

};

void *func(void *temp){
       PoolExecutor *poolExecutor = (PoolExecutor *)temp;
       return poolExecutor->runMethod(NULL);
}

Thanks