pthread_cond_init(&wait_queue_cond_ready, NULL);
pipe(pd);
ufds = static_cast<struct pollfd *>(malloc(sizeof(struct pollfd)));
+ if(ufds == NULL) {
+ throw new Exception;
+ }
ufds->fd = pd[0];
ufds->events = POLLIN;
ufds_size = 1;
num_workers = n;
for(unsigned int i = 0; i < n; i++) {
+ // XXX check return
pthread_create(&workers[i], NULL, ThreadPool::threadMain, NULL);
}
}
void
ThreadPool::postWork(WorkDescription *work_unit)
{
- pthread_mutex_lock(&work_queue_mutex);
+ E_ASSERT(pthread_mutex_lock(&work_queue_mutex) >= 0);
work_queue.push_back(work_unit);
work_count++;
- pthread_cond_signal(&work_queue_cond_ready);
- pthread_mutex_unlock(&work_queue_mutex);
+ E_ASSERT(pthread_cond_signal(&work_queue_cond_ready) >= 0);
+ E_ASSERT(pthread_mutex_unlock(&work_queue_mutex) >= 0);
}
void
ThreadPool::queueWork(WaitDesc *wd)
{
- pthread_mutex_lock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
wait_queue.push_back(wd);
wait_count++;
- pthread_cond_signal(&wait_queue_cond_ready);
- pthread_mutex_unlock(&wait_queue_mutex);
- write(pd[1], "1", 1);
+ E_ASSERT(pthread_cond_signal(&wait_queue_cond_ready) >= 0);
+ E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
+ if(write(pd[1], "1", 1) != 1) {
+ throw new Exception;
+ }
}
WorkDescription *work_unit = NULL;
struct timespec timeout;
- pthread_mutex_lock(&work_queue_mutex);
+ E_ASSERT(pthread_mutex_lock(&work_queue_mutex) >= 0);
if(work_count == 0) {
timeout.tv_sec = 1;
timeout.tv_nsec = 0;
// pthread_cond_timedwait(&work_queue_cond_ready, &work_queue_mutex, &timeout);
- pthread_cond_wait(&work_queue_cond_ready, &work_queue_mutex);
+ E_ASSERT(pthread_cond_wait(&work_queue_cond_ready, &work_queue_mutex) == 0);
}
if(work_count > 0) {
work_count--;
work_unit = work_queue.front();
work_queue.pop_front();
}
- pthread_mutex_unlock(&work_queue_mutex);
+ E_ASSERT(pthread_mutex_unlock(&work_queue_mutex) >= 0);
return work_unit;
}
{
ThreadPool *pool = ThreadPool::instance();
- pthread_mutex_unlock(&(pool->work_queue_mutex));
+ E_ASSERT(pthread_mutex_unlock(&(pool->work_queue_mutex)) >= 0);
}
std::list<WaitDesc *>::iterator j = i;
// actually this is safe even for the first element
- pthread_mutex_lock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
j--;
wait_queue.erase(i);
wait_count--;
i = j;
- pthread_mutex_unlock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
}
std::list<WaitDesc *>::iterator theIterator;
struct pollfd *p;
- pthread_mutex_lock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
if(wait_count == 0) {
- pthread_cond_wait(&wait_queue_cond_ready, &wait_queue_mutex);
+ E_ASSERT(pthread_cond_wait(&wait_queue_cond_ready, &wait_queue_mutex) != 0);
}
if(wait_count == 0) {
- pthread_mutex_unlock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
return;
}
if(ufds_size != wait_count + 1) {
ufds = static_cast<struct pollfd *>(realloc(ufds, (1 + wait_count) * sizeof(struct pollfd)));
if(ufds == NULL) {
-// throw new Exception();
+ throw new Exception();
}
ufds_size = wait_count + 1;
}
min_timeout = w->timeout;
}
}
- pthread_mutex_unlock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
}
}
// at least we have to adjust timeouts
- pthread_mutex_lock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
i = wait_queue.begin();
- pthread_mutex_unlock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
// the wait queue mutex is unlocked inside the loop
// to allow handlers to add queue new
// WorkDescriptions - these are added at the
// check for consistency
if(p->fd != w->get_fd()) {
// mismatch, what shall we do?
- abort();
+ throw new Exception;
}
// subtract the time passed from timeout
w->timeout.tv_usec = 0;
}
}
- pthread_mutex_lock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_lock(&wait_queue_mutex) >= 0);
i++;
- pthread_mutex_unlock(&wait_queue_mutex);
+ E_ASSERT(pthread_mutex_unlock(&wait_queue_mutex) >= 0);
}
} else {
// some nasty error
+ throw new Exception;
}
}
}