Общий параллельный ограниченный буфер в C

Я только что реализовал общую параллельную очередь на C и хотел бы получить отзывы о моей реализации, а также логику обработки общих данных и параллелизма.

buffer.h

#ifndef BOUNDED_BUFFER_H
#define BOUNDED_BUFFER_H

#include <stdlib.h>

typedef struct _boundedBuffer BoundedBuffer;

BoundedBuffer* allocBoundedBuffer(size_t capacity, size_t dataSize);
void destroyBoundedBuffer(BoundedBuffer* buf);

void dequeue(BoundedBuffer* buf, void* dest, size_t destSize);
int enqueue(BoundedBuffer* buf, void* data);

#endif

buffer.c

#include "boundedbuffer.h"
#include "scerrhand.h"
#include <pthread.h>
#include <stdio.h>
#include <assert.h>
#include <string.h>

#define MIN(a,b) (a) <= (b) ? (a) : (b)

struct _node {
    /*
    A buffer node.
     */

    void* data;
    struct _node* nextPtr;
};

struct _boundedBuffer {
    /*
    A concurrent buffer with limited capacity.
    */

    size_t capacity;
    size_t numElements;
    size_t dataSize;
    struct _node* headPtr;
    struct _node* tailPtr;
    struct _bufferConcurrencyTools* tools;
};

struct _bufferConcurrencyTools {
    /*
    A set of tools to handle concurrency for a bounded buffer.
    `mutex` is a lock used to mutual exclusion access to the buffer,
    `empty` and `full` are condition variables used respectively for when
    the buffer is empty and for when it is full.
    */

    pthread_mutex_t mutex;
    pthread_cond_t empty;
    pthread_cond_t full;
};

static struct _node* _allocNode(void* data, size_t dataSize) {
    /*
    Allocates a new node for the bounded buffer, initializing its value to
    the given one, and returns it.

    Returns NULL if the node could not be allocated.
    */

    struct _node* newNode;
    if (!(newNode = malloc(sizeof(struct _node)))) {
        // malloc failed; return NULL to caller
        return NULL;
    }

    if (!(newNode->data = malloc(dataSize))) {
        // malloc failed; return NULL to caller
        return NULL;
    }

    // copy data into new node
    memcpy(newNode->data, data, dataSize);

    newNode->nextPtr = NULL;

    return newNode;
}


static struct _bufferConcurrencyTools* _allocBufferConcurrencyTools() {
    struct _bufferConcurrencyTools* tools;
    /*
    Allocates a _bufferConcurrencyTools struct and initializes its mutex lock and cond vars;
    then returns a pointer to it.

    Returns NULL if memory for the struct could not be allocated.
    */

    if (!(tools = malloc(sizeof(struct _bufferConcurrencyTools)))) {
        return NULL;
    }
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t full;
    pthread_cond_t empty;

    // initialize condition variables
    SYSCALL_OR_DIE_NZ(pthread_cond_init(&full, NULL));
    SYSCALL_OR_DIE_NZ(pthread_cond_init(&empty, NULL));

    tools->mutex = mutex;
    tools->full = full;
    tools->empty = empty;

    return tools;
}

BoundedBuffer* allocBoundedBuffer(size_t capacity, size_t dataSize) {
    /*
    Initializes and returns a new empty buffer with the given capacity.
    */

    assert(capacity > 0);

    BoundedBuffer* buf = malloc(sizeof(BoundedBuffer));
    if (!buf) { // malloc failed; return NULL to caller
        return NULL;
    }
    buf->capacity = capacity;
    buf->dataSize = dataSize;
    buf->numElements = 0;
    buf->headPtr = NULL;
    buf->tailPtr = NULL;
    SYSCALL_OR_DIE_NULL(buf->tools = _allocBufferConcurrencyTools());

    return buf;
}


void dequeue(BoundedBuffer* buf, void* dest, size_t destSize) {
    /*
    Pops the node at the head of the buffer and returns its value.

    If the buffer is empty, waits until there is at least one element in it.
    */

    // NULL can be passed as dest if we just want to pop the element without saving it; however
    // if we want to save it somewhere, destSize must be greater than 0 bytes

    assert(buf);
    assert(!dest || destSize > 0);

    SYSCALL_OR_DIE_NZ(pthread_mutex_lock(&(buf->tools->mutex))); // gain mutual exclusion access

    while (buf->numElements == 0) { // buffer is empty: wait
        SYSCALL_OR_DIE_NZ(pthread_cond_wait(&(buf->tools->empty), &(buf->tools->mutex)));
    }

    struct _node* node = buf->headPtr; // get buffer head node
    if (dest) {
        // copy node data to destination
        memcpy(dest, node->data, MIN(buf->dataSize, destSize));
    }
    buf->headPtr = node->nextPtr;
    buf->numElements--;

    SYSCALL_OR_DIE_NZ(pthread_cond_signal(&(buf->tools->full))); // wake up a producer thread (if any)
    SYSCALL_OR_DIE_NZpthread_mutex_unlock(&(buf->tools->mutex))); // waive mutual exclusion access

    // done outside of critical section to avoid doing costly syscalls in mutual exclusion uselessly
    free(node->data);
    free(node);
}

void destroyBoundedBuffer(BoundedBuffer* buf) {
    /*
    Frees every remaining element in the buffer, then frees the buffer
    and sets the passed pointer to NULL
    */

    assert(buf);

    while (buf->numElements) {
        dequeue(buf, NULL, 0);
    }
    free(buf->tools);
    free(buf);
    buf = NULL;
}

int enqueue(BoundedBuffer* buf, void* data) {
    /*
    Allocates a new node with the given value and pushes it to the tail
    of the bounded buffer.

    If the buffer is full, waits until there is at least one free spot.

    Returns 0 on success, -1 if it is unable to allocate memory for the new node.
    */

    assert(buf);
    assert(data);

    // allocate new node outside of critical section to keep it as short as possible
    struct _node* newNode = _allocNode(data, buf->dataSize);

    if (!newNode) { // malloc failed; return -1 to caller
        return -1;
    }
    SYSCALL_OR_DIE_NZ(pthread_mutex_lock(&(buf->tools->mutex))); // gain mutual exclusion access

    while (buf->numElements == buf->capacity) { // buffer is full: wait
        SYSCALL_OR_DIE_NZ(pthread_cond_wait(&(buf->tools->full), &(buf->tools->mutex)));
    }

    if (buf->numElements) {
        buf->tailPtr->nextPtr = newNode;
    }
    else {
        buf->headPtr = newNode;
    }
    buf->tailPtr = newNode;
    buf->numElements++;

    SYSCALL_OR_DIE_NZ(pthread_cond_signal(&(buf->tools->empty))); // wake up a consumer thread (if any)
    SYSCALL_OR_DIE_NZ(pthread_mutex_unlock(&(buf->tools->mutex))); // waive mutual exclusion access

    return 0;
}

test.c

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "module/boundedbuffer.h"
#include "module/scerrhand.h"
#include <math.h>
#include <unistd.h>
#include <signal.h>


void* produce(void* buf) {
    puts("producer started");
    while (1) {
        srand(time(NULL));
        int n = (int)ceil(rand()) % 22;
        void* ptr = &n;
        SYSCALL_OR_DIE_NEG_ONE(enqueue((BoundedBuffer*)buf, ptr));
        printf("produced: %dn", n);
        sleep(1);
    }
}

void* consume(void* buf) {
    puts("consumer started");
    while (1) {
        int n;
        dequeue((BoundedBuffer*)buf, &n, sizeof(int));
        printf("read: %dn", n);
        sleep(1);
    }
}

BoundedBuffer* ref;

void cleanup() {
    puts("cleaning up...");
    destroyBoundedBuffer(ref);
    exit(EXIT_SUCCESS);
}


int main(int argc, char** argv) {
    BoundedBuffer* buf;
    pthread_t consumerTid, producerTid;

    // create buffer
    SYSCALL_OR_DIE_NULL(buf = allocBoundedBuffer(atoi(argv[1]), sizeof(int)));
    ref = buf;
    signal(SIGINT, cleanup);

    // initialize cond vars and concurrency tools struct

    // start threads
    puts("gonna start produer");
    SYSCALL_OR_DIE_NZ(pthread_create(&producerTid, NULL, produce, buf));
    puts("gonna start consumer");
    SYSCALL_OR_DIE_NZ(pthread_create(&consumerTid, NULL, consume, buf));

    pthread_join(producerTid, NULL);
    pthread_join(consumerTid, NULL);
}

scerrhand.h

#ifndef SC_ERR_HAND_H
#define SC_ERR_HAND_H

#include <stdlib.h>
// makes a system call and exits if return value is not zero
#define SYSCALL_OR_DIE_NZ(s) if(s) { puts("System call failed with nonzero status"); exit(EXIT_FAILURE); }

// makes a system call and exits if return value is -1
#define SYSCALL_OR_DIE_NEG_ONE(s) if((s) == -1) { puts("System call failed with status -1"); exit(EXIT_FAILURE); }

// makes a system call and exits if return value is NULL
#define SYSCALL_OR_DIE_NULL(s) if((s) == NULL) { puts("System call returned NULL"); exit(EXIT_FAILURE); }

#endif

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *