added error checking
authorMichal Voců <michal@ruk.cuni.cz>
Wed, 20 Jun 2007 08:57:09 +0000 (08:57 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Wed, 20 Jun 2007 08:57:09 +0000 (08:57 +0000)
org.glite.lb.logger/src-nt/ThreadPool.cpp

index 763e930..7f20453 100644 (file)
@@ -79,6 +79,9 @@ ThreadPool::ThreadPool()
        pthread_cond_init(&wait_queue_cond_ready, NULL);
        pipe(pd);
        ufds = static_cast<struct pollfd *>(malloc(sizeof(struct pollfd)));
+       if(ufds == NULL) {
+               throw new Exception;
+       }
        ufds->fd = pd[0];
        ufds->events = POLLIN;
        ufds_size = 1;
@@ -102,6 +105,7 @@ ThreadPool::startWorkers(unsigned int n)
 
        num_workers = n;
        for(unsigned int i = 0; i < n; i++) {
+               // XXX check return 
                pthread_create(&workers[i], NULL, ThreadPool::threadMain, NULL);
        }
 }
@@ -121,11 +125,11 @@ ThreadPool::stopWorkers()
 void 
 ThreadPool::postWork(WorkDescription *work_unit)
 {
-       pthread_mutex_lock(&work_queue_mutex);
+       E_ASSERT(pthread_mutex_lock(&work_queue_mutex) >= 0);
        work_queue.push_back(work_unit);
        work_count++;
-       pthread_cond_signal(&work_queue_cond_ready);
-       pthread_mutex_unlock(&work_queue_mutex);
+       E_ASSERT(pthread_cond_signal(&work_queue_cond_ready) >= 0);
+       E_ASSERT(pthread_mutex_unlock(&work_queue_mutex) >= 0);
 }
 
 
@@ -133,12 +137,14 @@ inline
 void 
 ThreadPool::queueWork(WaitDesc *wd)
 {
-       pthread_mutex_lock(&wait_queue_mutex);
+       E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
        wait_queue.push_back(wd);
        wait_count++;
-       pthread_cond_signal(&wait_queue_cond_ready);
-       pthread_mutex_unlock(&wait_queue_mutex);
-       write(pd[1], "1", 1);
+       E_ASSERT(pthread_cond_signal(&wait_queue_cond_ready) >= 0);
+       E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
+       if(write(pd[1], "1", 1) != 1) {
+               throw new Exception;
+       }
 }
 
 
@@ -211,19 +217,19 @@ ThreadPool::getWork()
        WorkDescription *work_unit = NULL;
        struct timespec timeout;
 
-       pthread_mutex_lock(&work_queue_mutex);
+       E_ASSERT(pthread_mutex_lock(&work_queue_mutex) >= 0);
        if(work_count == 0) {
                timeout.tv_sec = 1;
                timeout.tv_nsec = 0;
 //             pthread_cond_timedwait(&work_queue_cond_ready, &work_queue_mutex, &timeout);
-               pthread_cond_wait(&work_queue_cond_ready, &work_queue_mutex);
+               E_ASSERT(pthread_cond_wait(&work_queue_cond_ready, &work_queue_mutex) == 0);
        }
        if(work_count > 0) {
                work_count--;
                work_unit = work_queue.front();
                work_queue.pop_front();
        }
-       pthread_mutex_unlock(&work_queue_mutex);
+       E_ASSERT(pthread_mutex_unlock(&work_queue_mutex) >= 0);
        return work_unit;
 }
 
@@ -232,7 +238,7 @@ ThreadPool::threadCleanup(void *data)
 {
        ThreadPool *pool = ThreadPool::instance();
 
-       pthread_mutex_unlock(&(pool->work_queue_mutex));
+       E_ASSERT(pthread_mutex_unlock(&(pool->work_queue_mutex)) >= 0);
 }
 
 
@@ -263,12 +269,12 @@ ThreadPool::removeWaitDesc(std::list<WaitDesc *>::iterator &i)
        std::list<WaitDesc *>::iterator j = i;
        
        // actually this is safe even for the first element
-       pthread_mutex_lock(&wait_queue_mutex);
+       E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
        j--;
        wait_queue.erase(i);
        wait_count--;
        i = j;
-       pthread_mutex_unlock(&wait_queue_mutex);
+       E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
 }
 
 
@@ -278,18 +284,18 @@ ThreadPool::prepareDescriptorArray()
        std::list<WaitDesc *>::iterator theIterator;
        struct pollfd *p;
 
-       pthread_mutex_lock(&wait_queue_mutex);
+       E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
        if(wait_count == 0) {
-               pthread_cond_wait(&wait_queue_cond_ready, &wait_queue_mutex);
+               E_ASSERT(pthread_cond_wait(&wait_queue_cond_ready, &wait_queue_mutex) != 0);
        }
        if(wait_count == 0) {
-               pthread_mutex_unlock(&wait_queue_mutex);
+               E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
                return;
        }
        if(ufds_size != wait_count + 1) {
                ufds = static_cast<struct pollfd *>(realloc(ufds, (1 + wait_count) * sizeof(struct pollfd)));
                if(ufds == NULL) {
-//                     throw new Exception();
+                       throw new Exception();
                }
                ufds_size = wait_count + 1;
        }
@@ -305,7 +311,7 @@ ThreadPool::prepareDescriptorArray()
                        min_timeout = w->timeout;
                }
        }
-       pthread_mutex_unlock(&wait_queue_mutex);
+       E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
 }
 
 
@@ -338,9 +344,9 @@ ThreadPool::run()
                        }
 
                        // at least we have to adjust timeouts
-                       pthread_mutex_lock(&wait_queue_mutex);
+                       E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
                        i = wait_queue.begin();
-                       pthread_mutex_unlock(&wait_queue_mutex);
+                       E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
                        // the wait queue mutex is unlocked inside the loop
                        // to allow handlers to add queue new
                        // WorkDescriptions - these are added at the
@@ -352,7 +358,7 @@ ThreadPool::run()
                                // check for consistency
                                if(p->fd != w->get_fd()) {
                                        // mismatch, what shall we do?
-                                       abort();
+                                       throw new Exception;
                                }
 
                                // subtract the time passed from timeout
@@ -389,12 +395,13 @@ ThreadPool::run()
                                                w->timeout.tv_usec = 0;
                                        }
                                }
-                               pthread_mutex_lock(&wait_queue_mutex);
+                               E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
                                i++;
-                               pthread_mutex_unlock(&wait_queue_mutex);
+                               E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
                        }
                } else {
                        // some nasty error
+                       throw new Exception;
                }
        }
 }