NSConditionLock and the producer/consummer problem with multiples buffers, how to ?
10/03/04 09:50
Is there a beginner example how to use NSConditionLock in a producer/consummer situation with multiple buffers and showing how to handle speed differences between both thread ?
Here is a very "academic" example that will help you understand what happens in various situations: you can adjust both producer and consumer threads relative speed and watch the messages.
The producing and consuming is very basic (filling a buffer with ASCII chars) and the "speed" is adjusted by "sleeping"... but well you are asking for a beginner example ;). Just copy-paste the following code and compile it in Terminal.app:
cc -framework Foundation -o example example.m ./example
#import
#import
#define N_BUFFERS (5) // how many buffers to play with
#define BUF_SIZE (32)
NSConditionLock *gBufferLock[N_BUFFERS] ;
char *gFileBuffer[N_BUFFERS] ;
int gCurProdBuffer ;
int gCurConsBuffer ;
BOOL gProducerDone ;
BOOL gConsummerDone ;
int gBuffersFilled ;
int gBuffersRead ;
// change the read and writer speeds (from 0 to MAX_SPEED)
// to simulate what happens when one goes faster than the other
// To slow down a thread we just sleep MAX_SPEED - thread speed seconds
// If you set both speeds to be ==, you will get random messages of one thread being too slow
// If you made the consummer slower than the producer, you may experiment by increasing the number of buffers
// to evaluate how many buffers are required to catch with the speed difference
// (according to the number of loops to be executed)
#define READER_SPEED (9)
#define WRITER_SPEED (10)
#define MAX_SPEED (10)
#define N_LOOPS (10) // we fill the buffers with ASCII char from 'A' to 'A' + N_LOOPS - 1
void fillBuffer(int inBuffer,int fillValue)
{
int i ;
char *buffer = gFileBuffer[inBuffer] ;
for(i=0;i buffer[i] = (char)fillValue ;
buffer[BUF_SIZE] = 0 ;
sleep(MAX_SPEED - WRITER_SPEED) ;
++gBuffersFilled ;
gProducerDone = (fillValue == ((int)'A' + N_LOOPS - 1)) ;
}
void *producerEntry(void *inRefCon)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init] ;
int fillValue = 'A' ;
while (!gProducerDone) {
fprintf(stdout,"producer waiting for buffer %d\n",gCurProdBuffer) ;
while (![gBufferLock[gCurProdBuffer] tryLockWhenCondition:0]) {
fprintf(stderr,"*** Consummer is too sloooooooow ! ***\n") ;
sleep(1);
}
fillBuffer(gCurProdBuffer,fillValue) ;
fprintf(stdout,"producer has filled buffer %d with '%c'\n",gCurProdBuffer,fillValue);
[gBufferLock[gCurProdBuffer] unlockWithCondition:1] ;
gCurProdBuffer = (gCurProdBuffer + 1) % N_BUFFERS ;
++fillValue ;
}
fprintf(stdout,"PRODUCER DONE\n");
[pool release] ;
return nil ;
}
void readBuffer(int inBuffer)
{
fprintf(stdout,"Reading buffer %d '%s'\n", inBuffer, gFileBuffer[inBuffer]);
sleep(MAX_SPEED - READER_SPEED) ;
++gBuffersRead ;
gConsummerDone = (gBuffersRead == gBuffersFilled) ;
if (gConsummerDone && !gProducerDone) {
fprintf(stderr,"*** Producer is too sloooooooow ! ***\n") ;
gConsummerDone = NO ;
}
}
void *consumerEntry(void *inRefCon)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init] ;
while (!gConsummerDone) {
fprintf(stdout,"consummer waiting for buffer %d\n",gCurConsBuffer);
[gBufferLock[gCurConsBuffer] lockWhenCondition:1] ;
readBuffer(gCurConsBuffer);
[gBufferLock[gCurConsBuffer] unlockWithCondition:0] ;
gCurConsBuffer = (gCurConsBuffer + 1) % N_BUFFERS ;
}
fprintf(stdout,"CONSUMMER DONE\n");
[pool release] ;
return nil ;
}
int main(int argc, char **argv)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init] ;
pthread_t consummer ;
pthread_t producer ;
OSStatus result ;
int i ;
fprintf(stdout,"starting\n") ;
for(i=0;i gFileBuffer[i] = malloc(BUF_SIZE+1) ;
gBufferLock[i] = [[NSConditionLock alloc] initWithCondition:0] ; // buffer is ready to be written
}
gCurProdBuffer = 0 ;
gCurConsBuffer = 0 ;
gProducerDone = NO ;
gConsummerDone = NO ;
gBuffersFilled = 0 ;
gBuffersRead = 0 ;
result = pthread_create(&producer, NULL, producerEntry, NULL) ;
if (result) {
fprintf(stderr,"*** error creating producer thread %d\n",result) ;
exit(result);
}
result = pthread_create(&consummer, NULL, consumerEntry, NULL) ;
if (result) {
fprintf(stderr,"*** error creating consummer thread %d\n",result) ;
exit(result);
}
pthread_join(producer,NULL);
pthread_join(consummer,NULL);
[pool release] ;
fprintf(stdout,"done %d buffers filled, %d buffers read\n",gBuffersFilled, gBuffersRead) ;
return 0 ;
}
The producing and consuming is very basic (filling a buffer with ASCII chars) and the "speed" is adjusted by "sleeping"... but well you are asking for a beginner example ;). Just copy-paste the following code and compile it in Terminal.app:
cc -framework Foundation -o example example.m ./example
#import
#import
#define N_BUFFERS (5) // how many buffers to play with
#define BUF_SIZE (32)
NSConditionLock *gBufferLock[N_BUFFERS] ;
char *gFileBuffer[N_BUFFERS] ;
int gCurProdBuffer ;
int gCurConsBuffer ;
BOOL gProducerDone ;
BOOL gConsummerDone ;
int gBuffersFilled ;
int gBuffersRead ;
// change the read and writer speeds (from 0 to MAX_SPEED)
// to simulate what happens when one goes faster than the other
// To slow down a thread we just sleep MAX_SPEED - thread speed seconds
// If you set both speeds to be ==, you will get random messages of one thread being too slow
// If you made the consummer slower than the producer, you may experiment by increasing the number of buffers
// to evaluate how many buffers are required to catch with the speed difference
// (according to the number of loops to be executed)
#define READER_SPEED (9)
#define WRITER_SPEED (10)
#define MAX_SPEED (10)
#define N_LOOPS (10) // we fill the buffers with ASCII char from 'A' to 'A' + N_LOOPS - 1
void fillBuffer(int inBuffer,int fillValue)
{
int i ;
char *buffer = gFileBuffer[inBuffer] ;
for(i=0;i
buffer[BUF_SIZE] = 0 ;
sleep(MAX_SPEED - WRITER_SPEED) ;
++gBuffersFilled ;
gProducerDone = (fillValue == ((int)'A' + N_LOOPS - 1)) ;
}
void *producerEntry(void *inRefCon)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init] ;
int fillValue = 'A' ;
while (!gProducerDone) {
fprintf(stdout,"producer waiting for buffer %d\n",gCurProdBuffer) ;
while (![gBufferLock[gCurProdBuffer] tryLockWhenCondition:0]) {
fprintf(stderr,"*** Consummer is too sloooooooow ! ***\n") ;
sleep(1);
}
fillBuffer(gCurProdBuffer,fillValue) ;
fprintf(stdout,"producer has filled buffer %d with '%c'\n",gCurProdBuffer,fillValue);
[gBufferLock[gCurProdBuffer] unlockWithCondition:1] ;
gCurProdBuffer = (gCurProdBuffer + 1) % N_BUFFERS ;
++fillValue ;
}
fprintf(stdout,"PRODUCER DONE\n");
[pool release] ;
return nil ;
}
void readBuffer(int inBuffer)
{
fprintf(stdout,"Reading buffer %d '%s'\n", inBuffer, gFileBuffer[inBuffer]);
sleep(MAX_SPEED - READER_SPEED) ;
++gBuffersRead ;
gConsummerDone = (gBuffersRead == gBuffersFilled) ;
if (gConsummerDone && !gProducerDone) {
fprintf(stderr,"*** Producer is too sloooooooow ! ***\n") ;
gConsummerDone = NO ;
}
}
void *consumerEntry(void *inRefCon)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init] ;
while (!gConsummerDone) {
fprintf(stdout,"consummer waiting for buffer %d\n",gCurConsBuffer);
[gBufferLock[gCurConsBuffer] lockWhenCondition:1] ;
readBuffer(gCurConsBuffer);
[gBufferLock[gCurConsBuffer] unlockWithCondition:0] ;
gCurConsBuffer = (gCurConsBuffer + 1) % N_BUFFERS ;
}
fprintf(stdout,"CONSUMMER DONE\n");
[pool release] ;
return nil ;
}
int main(int argc, char **argv)
{
NSAutoreleasePool *pool = [[NSAutoreleasePool alloc] init] ;
pthread_t consummer ;
pthread_t producer ;
OSStatus result ;
int i ;
fprintf(stdout,"starting\n") ;
for(i=0;i
gBufferLock[i] = [[NSConditionLock alloc] initWithCondition:0] ; // buffer is ready to be written
}
gCurProdBuffer = 0 ;
gCurConsBuffer = 0 ;
gProducerDone = NO ;
gConsummerDone = NO ;
gBuffersFilled = 0 ;
gBuffersRead = 0 ;
result = pthread_create(&producer, NULL, producerEntry, NULL) ;
if (result) {
fprintf(stderr,"*** error creating producer thread %d\n",result) ;
exit(result);
}
result = pthread_create(&consummer, NULL, consumerEntry, NULL) ;
if (result) {
fprintf(stderr,"*** error creating consummer thread %d\n",result) ;
exit(result);
}
pthread_join(producer,NULL);
pthread_join(consummer,NULL);
[pool release] ;
fprintf(stdout,"done %d buffers filled, %d buffers read\n",gBuffersFilled, gBuffersRead) ;
return 0 ;
}