Skip to content

Instantly share code, notes, and snippets.

@lquenti
Last active January 9, 2025 07:25
Show Gist options
  • Save lquenti/58a64f93bcfea2bd5790a0e28ccba282 to your computer and use it in GitHub Desktop.
Save lquenti/58a64f93bcfea2bd5790a0e28ccba282 to your computer and use it in GitHub Desktop.
A fast multi processing (assuming no knowledge about fork!) double buffer, flushing back buffer to disk
#include<stdatomic.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<fcntl.h>
#include<pthread.h>
#include<unistd.h>
/* Lars Quentin 2025, licensed as WTFPL */
#define N 32
int BUF1[N] = {0};
int BUF2[N] = {0};
pthread_mutex_t BUF1_LOCK = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t BUF2_LOCK = PTHREAD_MUTEX_INITIALIZER;
int *front = BUF1;
int *back = BUF2;
pthread_mutex_t *front_lock = &BUF1_LOCK;
pthread_mutex_t *back_lock = &BUF2_LOCK;
/* We can't use an atomic int here because:
* when there is currently a blockage, we have to wait until we can fetch.
* The process accessing the last buffer index **has to be** the one that
* cleans up the array and resets to zero. Until the buffers are switched and
* the idx is reset to zero, no other process should be able to fetch.
*/
// atomic_int front_idx = 0;
int idx;
pthread_mutex_t idx_lock = PTHREAD_MUTEX_INITIALIZER;
pid_t expected_pid = 0; // only accessed after lock
int fd; // based on last expected pid
char filename[101]; // based on last expected pid
void *thread_func(void *arg) {
long val=*(long *)arg;
free(arg);
while (1) {
pthread_mutex_lock(&idx_lock);
int my_idx = idx;
idx++;
// do the work
front[my_idx] = val;
if (my_idx < N-1) {
// If we do not have to switch yet, release the mutex for next fetch
printf("%d ", idx);
pthread_mutex_unlock(&idx_lock);
} else { // my_idx == N-1
int *to_be_saved = front;
pthread_mutex_t *to_be_saved_lock = front_lock;
pthread_mutex_t *other_lock = back_lock;
// Buffer is under maintenance; Lock!
pthread_mutex_lock(to_be_saved_lock);
// If the previous buffer is already flushed out (i.e. unlocked), switch to it
pthread_mutex_lock(other_lock);
int *tmp = front; front = back; back = tmp;
pthread_mutex_t *tmp2 = front_lock; front_lock = back_lock; back_lock = tmp2;
idx = 0;
pthread_mutex_unlock(other_lock);
// At this point, People can already increment on it again
pthread_mutex_unlock(&idx_lock);
// Now we only have to write it out
// Since we want multiprocessing support, it can happen that we forked
// processes, thus we do not "own" our file descriptor anymore.
//
// Note that this also covers initialization
pid_t current_pid = getpid();
if (current_pid != expected_pid) {
expected_pid = current_pid;
sprintf(filename, "output_%d.txt", expected_pid);
fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);
}
// How we actually write out the numbers
char buf[51];
for (size_t i=0; i<N; ++i) {
sprintf(buf, "%d ", to_be_saved[i]);
write(fd, buf, strlen(buf));
}
// Work done!
pthread_mutex_unlock(to_be_saved_lock);
printf("\nWriteout happened at %ld\n", val);
}
}
}
int main() {
pthread_t thread1, thread2;
long *id1 = malloc(sizeof(long));
long *id2 = malloc(sizeof(long));
*id1 = 1;
*id2 = 2;
pthread_create(&thread1, NULL, thread_func, id1);
pthread_create(&thread2, NULL, thread_func, id2);
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment