I am working on some image processing ideas and I want to implement them using pthreads. Here is an example I came up with just to make sure that the threads were actually doing what they were supposed to be doing. It just increments the values in an array and prints the run time. There is also a serial version of the incrementing function just to show the benefits of the pthreads. The program is getting hung up though and I am not sure why or how to fix this. It is labeled in two locations (same error) as "FOO". The code runs correctly but not without the two usleep(1000) inserted in nIncreaseArray() (both marked with "FOO"). Let me know what you think and if you have any suggestions, thanks. The code is also attached as a .cpp if you want to compile and run.
Sam
//pthread_example1.cpp.cpp
#include <iostream>
#include <stdio.h> //used for printf
#include <math.h>
#include <errno.h>
#include <sys/time.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
using namespace std;
#define ARRAY_HEIGHT 50 //each cell represents a ditsance of 5cm
#define ARRAY_WIDTH 50 //each cell represents a ditsance of 5cm
int g_anArray[ARRAY_HEIGHT][ARRAY_WIDTH];
pthread_mutex_t mutex[4], count_cond_mutex, count_mutex;
pthread_cond_t condition[4], count_condition;
pthread_t threads[4];
pthread_attr_t attr;
int nCount = 0;
bool bExit[4];
int nInit(){
int i, nError;
/* Initialize mutex and condition variable objects */
for(i = 0; i < 4; i++){
pthread_mutex_init(&mutex[i], NULL);
pthread_cond_init (&condition[i], NULL);
}
pthread_mutex_init(&count_cond_mutex, NULL);
pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init(&count_condition, NULL);
/* For portability, explicitly create threads in a joinable state */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
memset(bExit, false, 4);
return(0);
}
int nUninit(){
/* remove thread dependencies */
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&count_cond_mutex);
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&count_condition);
for(int i = 0; i < 4; i++){
pthread_mutex_destroy(&mutex[i]);
pthread_cond_destroy(&condition[i]);
}
pthread_exit(NULL);
//return(0);
}
void *vThread(void *pvQuadrant){
int r_t, r_b, c_l, c_r, x, y, q;
q = (int)pvQuadrant;
/* Set index limits for quadrant */
if(q == 0){
r_t = 0; r_b = (ARRAY_HEIGHT / 2) - 1; c_l = ARRAY_WIDTH / 2; c_r = ARRAY_WIDTH - 1;
}
if(q == 1){
r_t = 0; r_b = (ARRAY_HEIGHT / 2) - 1; c_l = 0; c_r = (ARRAY_WIDTH / 2) - 1;
}
if(q == 2){
r_t = ARRAY_HEIGHT / 2; r_b = ARRAY_HEIGHT - 1; c_l = 0; c_r = (ARRAY_WIDTH / 2) - 1;
}
if(q == 3){
r_t = ARRAY_HEIGHT / 2; r_b = ARRAY_HEIGHT - 1; c_l = ARRAY_WIDTH / 2; c_r = ARRAY_WIDTH - 1;
}
//let calling process know that all the threads have been created
pthread_mutex_lock(&count_cond_mutex);
nCount++;
if (nCount == 4)
pthread_cond_signal(&count_condition);
pthread_mutex_unlock(&count_cond_mutex);
while(true){
pthread_mutex_lock(&mutex[q]);
pthread_cond_wait(&condition[q], &mutex[q]);
pthread_mutex_unlock(&mutex[q]);
if(bExit[q]){
printf("Thread %d exited\n", q);
pthread_exit(NULL);
}
//PUT CODE FOR MANIPULATING QUADRANTS OF 2-D ARRAY
for(y = r_t; y <= r_b; y++){
for(x = c_l; x <= c_r; x++){
g_anArray[y][x]++;
}
}
//usleep(500000); //this is just so you can see they all have to finish first
/* Check if thread is the last one finsihed, if so, signal nIncreaseArray() that vDilate is finsihed */
pthread_mutex_lock(&count_cond_mutex);
nCount++;
if (nCount == 4)
pthread_cond_signal(&count_condition);
pthread_mutex_unlock(&count_cond_mutex);
}
}
int nIncreaseArray(int amount){
int nError, i, j;
timeval clk1, clk2;
/* Create all the threads for vDilate */
for(i = 0; i < 4; i++){
if( nError = (pthread_create(&threads[i], &attr, vThread, (void *)i) != 0) ){
cout << "Pthread_Example1: nIncreaseArray() - pthread_create() error - " << strerror( nError ) << endl;
return(-1);
}
}
/* give threads a moment to be created */
pthread_mutex_lock(&count_cond_mutex);
if(nCount < 4){ //don't run pthread_cond_wait if nCount already reached 4 because pthread_cond_wait will never receive the signal
pthread_cond_wait(&count_condition, &count_cond_mutex);
nCount = 0;
}
pthread_mutex_unlock(&count_cond_mutex);
usleep(1000); //FOO - do work here or last running thread in vThread gets hung up on pthread_mutex_unlock(&count_cond_mutex);
for(j = 0; j < amount; j++){
gettimeofday(&clk1, NULL);
printf("Threads Starting\n");
/* Signal threads to start */
for (i = 0; i < 4; i++){
pthread_mutex_lock(&mutex[i]);
pthread_cond_signal(&condition[i]);
pthread_mutex_unlock(&mutex[i]);
}
/* Wait for all threads to complete (the last thread will make nCount 4 and call pthread_cond_signal()) */
pthread_mutex_lock(&count_cond_mutex);
if(nCount < 4){ //don't run pthread_cond_wait if nCount already reached 4 because pthread_cond_wait will never receive the signal
pthread_cond_wait(&count_condition, &count_cond_mutex);
nCount = 0;
}
pthread_mutex_unlock(&count_cond_mutex);
usleep(1000); //FOO - do work here or last running thread in vThread gets hung up on pthread_mutex_unlock(&count_cond_mutex);
printf("Threads Finished\n");
gettimeofday(&clk2, NULL);
printf("Threads Execution Time: %.6f seconds\n\n", (double)(clk2.tv_sec - clk1.tv_sec) +
( ( (double)(clk2.tv_usec - clk1.tv_usec) ) / 1000000 ) );
}
/* Terminate all the threads, wait for them to finish */
memset(bExit, true, 4);
for(i = 0; i < 4; i++){
pthread_mutex_lock(&mutex[i]);
pthread_cond_signal(&condition[i]);
pthread_mutex_unlock(&mutex[i]);
}
for(i = 0; i < 4; i++){
if(nError = (pthread_join(threads[i], NULL) != 0) ){
cout << "Pthread_Example1: nIncreaseArray() - pthread_join() error - " << strerror(nError) << endl;
return(-1);
}
}
}
int nIncreaseArray_s(int amount){
int x, y, j;
timeval clk1, clk2;
printf("Serial Started\n");
for(j = 0; j < amount; j++){
gettimeofday(&clk1, NULL);
/* Signal vDilate threads to start */
printf("iteration: %d\n", j);
for(y = 0; y < ARRAY_HEIGHT; y++){
for(x = 0; x < ARRAY_WIDTH; x++){
g_anArray[y][x]++;
}
}
//usleep(50000); //this is just so you can see they all have to finish first
gettimeofday(&clk2, NULL);
printf("Serial Execution Time: %.6f seconds\n\n", (double)(clk2.tv_sec - clk1.tv_sec) +
( ( (double)(clk2.tv_usec - clk1.tv_usec) ) / 1000000 ) );
}
printf("Serial Finsished\n");
}
void vPrintArray(){
for(int r = 0; r < ARRAY_HEIGHT; r++){
for(int c = 0; c < ARRAY_WIDTH; c++){
cout << g_anArray[r][c] << " ";
}
cout << endl;
}
}
int main(int argc, char *argv[]){
if(nInit() == -1){
printf("Pthread_Example1: main() - nInit() error\n");
return(-1);
}
//memset(g_anArray, 0, sizeof(g_anArray));
//nIncreaseArray_s(10);
memset(g_anArray, 0, sizeof(g_anArray));
nIncreaseArray(20);
vPrintArray();
if(nUninit() == -1){
printf("Pthread_Example1: main() - nUninit() error\n");
return(-1);
}
pthread_exit(NULL);
}