The Producer Thread
What does a producer thread write to the buffer? Create 20 different files (named in0.txt, in1.txt, ... in19.txt). [Testing Hint: you want
to be able to tell from the text which file it is from, so the contents should be aaaaaa or bbbbbbb etc..] Vary the length of the file
contents, so that you can test that your program works with content smaller than the buffer size and content longer than the buffer
size. Implement a FIFO queue and add the file names to it, one per queue entry. Create 20 Producer threads. Up to 128
characters of each file is "produced" as follows: a producer thread gets the file name from the FIFO queue, reads the contents from
that file, converts all characters to uppercase, and writes the contents into any available buffer slot.
The Consumer Thread
Create 5 consumer threads. Each consumer thread should read the current contents of a full buffer slot, and print them to stdout. A
consumer thread continues until 2 conditions hold: All buffers are empty, and all 20 files have been produced.
Synchronization
Each different text buffer must be produced and consumed exactly once. In other words, the output from your program should show
the file contents up to a max of 128 characters.
-------------------------------------------------
OK so I'm basically done with this thing except I've run into a last minute problem. It seems like only one buffer is being used the entire time instead of using all 5 slots. Also it feels like only one consumer is being used, which is a problem. I posted this on another board, but only one guy was helping me out and I think he might be done for the night, so figured I'd try here because I've been on this for about 20+ hours the last few days and I'm stumped here.
Why is this happening? I'm sure my semaphores are set up properly. Anyway, my code is posting below, with the semaphore sections BOLDED. And the code is following by the output. The output to the left is from the producer, the output to the right is from the consumer.
#include<stdio.h>
#include<stdlib.h>
#include<ctype.h>
#include<unistd.h>
#include<pthread.h>
#include<string.h>
#include<semaphore.h>
#define FILESIZE 20
#define BUFFER_SIZE 5
#define PRODUCERS 20
#define CONSUMERS 5
struct fifo_struct
{
char fileName[1024];
struct fifo_struct* next;
};
struct linked_list
{
struct fifo_struct* head;
struct fifo_struct* tail;
};
char buffer[BUFFER_SIZE][128];
int counter;
pthread_mutex_t mutex;
sem_t full, empty;
void print_queue(const struct linked_list* ps)
{
struct fifo_struct* p = NULL;
if(ps)
{
for(p = ps->head; p; p = p->next)
{
if(p)
printf("int = %s\n", p->fileName);
else
printf("can't print NULL STRUCT\n");
}
}
}
void *producer(void *q);
void *consumer(void *q);
struct linked_list *s;
int main()
{
pthread_t producerVar[PRODUCERS];
pthread_t consumerVar[CONSUMERS];
char str[200];
int i = 0;
counter = 0;
sem_init(&full, 0, 0);
sem_init(&empty, 0, BUFFER_SIZE);
pthread_mutex_init(&mutex,NULL);
struct fifo_struct* fifo;
// Initialize the 5 buffer slots
for(i = 0; i < BUFFER_SIZE; i++)
{
buffer[i][0] = '\n';
}
// Create linked list
s = malloc( 1 * sizeof(*s));
if(s == NULL)
{
fprintf(stderr, "LINE: %d, malloc() failed\n", __LINE__);
}
s->head = s->tail = NULL;
for(i = 0; i < (FILESIZE); i++)
{
// Generates file names to store into queue
sprintf(str, "in%d.txt", i);
// Create queue to store file names
fifo = malloc(1 * sizeof(*fifo));
// Error in creating fifo
if(fifo == NULL)
{
fprintf(stderr, "IN %s, %s: malloc() failed\n", __FILE__, "list_add");
}
// Store filename into queue
strcpy(fifo->fileName,str);
fifo->next = NULL;
if(s == NULL)
{
printf("Error: Queue has not been initialized\n");
}
else if(s->head == NULL && s->tail == NULL)
{
// First element in queue
s->head = s->tail = fifo;
}
else if(s->head == NULL || s->tail == NULL)
{
printf("Error: Problem with code\n");
free(fifo);
}
else
{
// Increments queue
s->tail->next = fifo;
s->tail = fifo;
}
}
//print_queue(s);
// Create producer threads
for(i = 0; i < PRODUCERS; i++)
{
pthread_create(&producerVar[i], NULL, producer, &i);
//pthread_join(producerVar[i], NULL);
}
// Create consumer threads
for(i = 0; i < CONSUMERS; i++)
{
pthread_create(&consumerVar[i], NULL, consumer, s);
}
for(i = 0; i < PRODUCERS; i++)
{
pthread_join(producerVar[i], NULL);
}
return 0;
}
void *producer(void *idx)
{
int myidx = * (int *) idx;
int i = 0;
char fileContent[1024];
char line[1024];
FILE * myfile;
struct linked_list *q;
//print_queue(q);
struct fifo_struct* tmp1 = NULL;
struct fifo_struct* tmp2 = NULL;
pthread_mutex_lock(&mutex);
printf("IN PRODUCER\n");
q = s;
if(q == NULL)
{
printf("List is empty\n");
return(NULL);
}
else if(q->head == NULL && q->tail == NULL)
{
printf("List is empty\n");
return(NULL);
}
else if(q->head == NULL || q->tail == NULL)
{
printf("Error: Problem with code\n");
return(NULL);
}
printf("Producer: %d\n", myidx);
//print_queue(q);
myfile = fopen(q->head->fileName,"r");
tmp1 = q->head;
tmp2 = tmp1->next;
free(tmp1);
q->head = tmp2;
if(q->head == NULL)
q->tail = q->head;
//print_queue(q);
printf("After printq\n");
fflush(stdout);
printf("\n");
if((fgets(line, 1024, myfile)) != NULL)
{
strcpy(fileContent, line);
printf("%s",fileContent);
}
strcpy(fileContent, line);
printf("Myfile: %s",fileContent);
fclose(myfile);
while(fileContent[i] != '\n')
{
fileContent[i] = toupper(fileContent[i]);
i++;
}
pthread_mutex_unlock(&mutex);
[B] sem_wait(&empty);
if(counter < BUFFER_SIZE) {
strncpy(buffer[counter],fileContent,128);
printf("buffer[%d] = %s\n", counter, buffer[counter]);
counter++;
}
sem_post(&full);[/B]
return(NULL);
}
void *consumer(void *q)
{
int myidx = * (int *) q;
printf("\t\t\t\tCONSUMER: %d\n", myidx);
while(1)
{
[B] sem_wait(&full);
if(counter > 0) {
printf("\t\t\tbuffer[%d] = %s\n", counter - 1, buffer[(counter - 1)]);
counter--;
}
sem_post(&empty);[/B]
}
return(NULL);
}
IN PRODUCER
Producer: 1
After printq
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
Myfile: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
IN PRODUCER
buffer[0] = AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
Producer: 15
CONSUMER: 141192
buffer[0] = AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
CONSUMER: 141192
CONSUMER: 141192
CONSUMER: 141192
CONSUMER: 141192
After printq
bbbbbbbb
Myfile: bbbbbbbb
buffer[0] = BBBBBBBB
IN PRODUCER
buffer[0] = BBBBBBBB
Producer: 0
After printq
ccccccc
Myfile: ccccccc
buffer[0] = CCCCCCC
IN PRODUCER
buffer[0] = CCCCCCC
Producer: 19
After printq
ddddddd
Myfile: ddddddd
buffer[0] = DDDDDDD
IN PRODUCER
buffer[0] = DDDDDDD
Producer: 18
After printq
eeeeeee
Myfile: eeeeeee
buffer[0] = EEEEEEE
IN PRODUCER
buffer[0] = EEEEEEE
Producer: 16
After printq
ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
Myfile: ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
buffer[0] = FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
IN PRODUCER
buffer[0] = FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
Producer: 14
After printq
ggggggg
Myfile: ggggggg
buffer[0] = GGGGGGG
IN PRODUCER
buffer[0] = GGGGGGG
Producer: 13
After printq
hhhhhhh
Myfile: hhhhhhh
buffer[0] = HHHHHHH
IN PRODUCER
buffer[0] = HHHHHHH
Producer: 12
After printq
iiiiiii
Myfile: iiiiiii
buffer[0] = IIIIIII
IN PRODUCER
buffer[0] = IIIIIII
Producer: 11
After printq
jjjjjjj
Myfile: jjjjjjj
buffer[0] = JJJJJJJ
IN PRODUCER
buffer[0] = JJJJJJJ
Producer: 10
After printq
kkkkkk
Myfile: kkkkkk
buffer[0] = KKKKKK
IN PRODUCER
buffer[0] = KKKKKK
Producer: 9
After printq
llllllll
Myfile: llllllll
buffer[0] = LLLLLLLL
IN PRODUCER
buffer[0] = LLLLLLLL
Producer: 8
After printq
mmmmmmm
Myfile: mmmmmmm
buffer[0] = MMMMMMM
IN PRODUCER
buffer[0] = MMMMMMM
Producer: 7
After printq
nnnnnnn
Myfile: nnnnnnn
buffer[0] = NNNNNNN
IN PRODUCER
buffer[0] = NNNNNNN
Producer: 6
After printq
ooooooo
Myfile: ooooooo
buffer[0] = OOOOOOO
IN PRODUCER
buffer[0] = OOOOOOO
Producer: 5
After printq
ppppppp
Myfile: ppppppp
buffer[0] = PPPPPPP
IN PRODUCER
buffer[0] = PPPPPPP
Producer: 4
After printq
qqqqqq
Myfile: qqqqqq
buffer[0] = QQQQQQ
IN PRODUCER
buffer[0] = QQQQQQ
Producer: 3
After printq
rrrrrrr
Myfile: rrrrrrr
buffer[0] = RRRRRRR
IN PRODUCER
buffer[0] = RRRRRRR
Producer: 2
After printq
sssssssss
Myfile: sssssssss
buffer[0] = SSSSSSSSS
IN PRODUCER
buffer[0] = SSSSSSSSS
Producer: 17
After printq
tttttt
Myfile: tttttt
buffer[0] = TTTTTT
buffer[0] = TTTTTT