A few weeks earlier we were given an interesting assignment on POSIX threads. The problem statement is as follows:
"You have to simulate a multiteller bank. The bank has two tellers. Customers enter the bank one at a time. Each teller has a separate queue. A new arriving customer joins the shortest queue, choosing the leftmost in case of ties. Each teller takes one customer from his queue and services the customer (ignore the service time). If the teller finds that his queue is empty then it goes to sleep. There will be three Apartments. Each apartment will produce customers for the bank. Both queues have length of 5 customers. So whenever Queue is full no customer is allowed to enter the bank. In that case the apartment goes to sleep."
The POSIX library has a lot of powerful tools for handling this kind of problems. With my very little experience, I decided to stick with just mutex and semaphores. But one thing came to my mind, since I'm an avid fan of java's threading model, I decided code it in the Java way!!! Yes create a C++ class that behaves like a Java thread class. You just override the run function just like in Java, and you've got a new thread.
Let's get into some details, shall we? This is the
Thread.h
file. Here you can find the Thread class,
#ifndef THREAD_H
#define THREAD_H
#include <cstdio>
#include <string>
#include <exception>
#include <cassert>
#include <cstdlib>
#include <pthread.h>
#include <semaphore.h>
class Thread{
protected:
pthread_t thread;
int res;
void *thread_result;
volatile bool stoprequested;
volatile bool running;
// this semaphore is just for a safety precation
sem_t safety_sem;
static void* __run(void *object){
void *exit_status;
printf("Calling the run function for thread.\n");
// this is the crutial part,
reinterpret_cast(object)->run();
pthread_exit(exit_status);
}
virtual void run(){
printf("If you are seeing this, it means have to run this programme again\n\
This happened because the overridden run fuction did not get the time to be instantiated as the thread got created too earlier\n");
}
public:
const char *id;
int start(){
// assert(running == false);
if(running == false){
running = true;
printf("Creating the thread with id %s\n", id);
usleep(200);
// we are using this semaphore so that this function waits upto the point when the dereived class gets instantiated
sem_wait(&safety_sem);
res = pthread_create(&thread, NULL, __run, this);
if(res != 0){
perror("Thread creation failed\n");
exit(EXIT_FAILURE);
}
}
return 0;
}
Thread(const char *id): running(false), stoprequested(false){
this->id = id;
sem_init(&safety_sem, 0, 0);
}
void stop(){
// assert(running == true);
if(running == true){
running = false;
stoprequested = true;
res = pthread_join(thread, &thread_result);
if(res != 0){
printf("Thread joining failed");
}
printf("%s: Thread is returning\n", id);
}
sem_destroy(&safety_sem);
}
~Thread(){
if(stoprequested == false){
stop();
}
}
bool isAlive(){
return running;
}
};
#endif // THREAD_H
Now we'll need a thread safe queue implementation.
Here is the code in
SafeQueue.h
#ifndef SAFEQUEUE_H
#define SAFEQUEUE_H
#include <semaphore.h>
#include <pthread.h>
#include <queue>
#include <vector>
using namespace std;
class SafeQueue{
protected:
sem_t sem_full;
sem_t sem_empty;
pthread_mutex_t mutex_common;
queue q;
int max_queue_size;
char* name;
public:
SafeQueue(const int max_queue_size, char* name){
this->max_queue_size = max_queue_size;
this->name = name;
sem_init(&sem_full, 0, max_queue_size);
sem_init(&sem_empty, 0, 0);
pthread_mutex_init(&mutex_common, 0);
}
// this function is thread safe
void push(int i, const char* thread_id){
// if full, go to sleep
sem_wait(&sem_full);
pthread_mutex_lock(&mutex_common);
// cout << thread_id << ": inside lock" << endl;
q.push(i);
cout << thread_id << ": Pushing data: " << i << ", in " << name <<", queue size is: " << q.size() << endl;
pthread_mutex_unlock(&mutex_common);
// we release the empty semaphore
sem_post(&sem_empty);
}
// this function is thread safe
void pop(int *i, const char* thread_id){
// if empty go to sleep
sem_wait(&sem_empty);
pthread_mutex_lock(&mutex_common);
*i = q.front();
q.pop();
cout << thread_id << ": Popping data: " << *i << " from "<< name << " queue size is: " << q.size() << endl;
pthread_mutex_unlock(&mutex_common);
// we release the full semapore, because the queue is now one element short of being full
// so that other thread can now push into this queue
sem_post(&sem_full);
}
// helper function, needed when we want to use this objects mutex from outside
pthread_mutex_t* getMutex(){
return &mutex_common;
}
// this function is NOT thread safe, use only when needed in conjuncture with getMutex function
unsigned get_size(){
return q.size();
}
~SafeQueue(){
sem_destroy(&sem_empty);
sem_destroy(&sem_full);
pthread_mutex_destroy(&mutex_common);
}
};
#endif // SAFEQUEUE_H
I also needed to create a parallel queue implementation which, upon call gives me the pointer to the least populated queue. Code for
ParallelQueue.h
#ifndef PARALLELQUEUE_H
#define PARALLELQUEUE_H
#include <semaphore.h>
#include <pthread.h>
#include <queue>
#include <vector>
#include "SafeQueue.h"
using namespace std;
class ParallelQueue{
protected:
pthread_mutex_t mutex;
vector vq;
SafeQueue* min_q;
public:
ParallelQueue(vector vq){
pthread_mutex_init(&mutex, 0);
this->vq = vq;
}
void push_min(int data, const char* thread_id){
unsigned i;
pthread_mutex_lock(&mutex);
// lock all the queue
min_q = &(* vq[0]);
for(i=0; i pthread_mutex_lock((vq[i])->getMutex());
// find the min queue
if((vq[i])->get_size() < min_q->get_size()){
min_q = vq[i];
}
pthread_mutex_unlock((vq[i])->getMutex());
}
// insert into the min queue
min_q->push(data, thread_id);
pthread_mutex_unlock(&mutex);
}
~ParallelQueue(){
pthread_mutex_destroy(&mutex);
}
};
#endif // PARALLELQUEUE_H
Finally the
main.cpp
#include <iostream>
#include <queue>
#include <stdexcept>
#include <exception>
#include <algorithm>
#include "Thread.h"
#include "ParallelQueue.h"
//using namespace std;
#ifndef _REENTRANT
#define _REENTRANT
#endif
#define QUEUE_SIZE 5
// this class gives a unique customer id, each time its called, and its thread safe
class UniqueCustomer{
public:
int id;
pthread_mutex_t id_mutex;
UniqueCustomer(): id(0){
pthread_mutex_init(&id_mutex, 0);
}
~UniqueCustomer(){
pthread_mutex_destroy(&id_mutex);
}
void setNextCustomerId(int *id){
pthread_mutex_lock(&id_mutex);
*id = this->id;
this->id++;
pthread_mutex_unlock(&id_mutex);
}
};
// the global variable for getting a unique customer
UniqueCustomer uc;
class Producer: public Thread{
private:
SafeQueue* q;
ParallelQueue *pq;
int max_producer_delay;
public:
Producer(SafeQueue* q, const int max_producer_delay, const char* id): Thread(id){
this->q = q;
this->max_producer_delay = max_producer_delay;
// this is a must
sem_post(&safety_sem);
}
Producer(ParallelQueue* pq, const int max_producer_delay, const char *id): Thread(id){
this->pq = pq;
this->max_producer_delay = max_producer_delay;
sem_post(&safety_sem);
}
void run(){
int data;
printf("%s: Starting\n", id);
for(int i=0; i < 100; i++){
// cout << id << ": trying to push " << i << endl;
uc.setNextCustomerId(&data);
// q->push(data, id);
pq->push_min(data, id);
sleep(rand() % max_producer_delay + 1);
}
}
~Producer(){
}
};
class Consumer: public Thread{
private:
SafeQueue* q;
int max_consumer_delay;
public:
Consumer(SafeQueue *q, const int max_consumer_delay,const char* id): Thread(id){
// this is a must
this->max_consumer_delay = max_consumer_delay;
this->q = q;
sem_post(&safety_sem);
}
void run(){
int i;
printf("%s: Starting\n", id);
while(1){
q->pop(&i, id);
// cout << id << ": popped " << i << endl;
sleep(rand() % max_consumer_delay + 1);
}
}
~Consumer(){
}
};
int main (void)
{
try{
srand(time(NULL));
SafeQueue sq1 = SafeQueue(QUEUE_SIZE, (char *)"Queue 1");
SafeQueue sq2 = SafeQueue(QUEUE_SIZE, (char *)"Queue 2");
vector< SafeQueue* > vq;
vq.push_back(&sq1);
vq.push_back(&sq2);
ParallelQueue pq = ParallelQueue(vq);
Producer prod1 = Producer(&pq, 2, "Producer 1");
Producer prod2 = Producer(&pq, 3, "Producer 2");
Consumer cons1 = Consumer(&sq1, 4, "Consumer 1");
Consumer cons2 = Consumer(&sq2, 2, "Consumer 2");
cons1.start();
prod1.start();
cons2.start();
prod2.start();
}catch(exception &ex){
printf("%s\n", ex.what());
}
return 0;
}
You can see that the thread functions has become pretty simple, you just need to make sure that the data you are accessing belongs to a syncronized class and don't have to worry about anything else. I'm looking for a better solution though, as there are some quirks in the code(see the comments). Might take a look into Boost's thread lib too if I can get the time (which most probably won't happen :().