Last active
January 9, 2025 07:25
-
-
Save lquenti/58a64f93bcfea2bd5790a0e28ccba282 to your computer and use it in GitHub Desktop.
A fast multi processing (assuming no knowledge about fork!) double buffer, flushing back buffer to disk
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include<stdatomic.h> | |
#include<stdio.h> | |
#include<stdlib.h> | |
#include<string.h> | |
#include<fcntl.h> | |
#include<pthread.h> | |
#include<unistd.h> | |
/* Lars Quentin 2025, licensed as WTFPL */ | |
#define N 32 | |
int BUF1[N] = {0}; | |
int BUF2[N] = {0}; | |
pthread_mutex_t BUF1_LOCK = PTHREAD_MUTEX_INITIALIZER; | |
pthread_mutex_t BUF2_LOCK = PTHREAD_MUTEX_INITIALIZER; | |
int *front = BUF1; | |
int *back = BUF2; | |
pthread_mutex_t *front_lock = &BUF1_LOCK; | |
pthread_mutex_t *back_lock = &BUF2_LOCK; | |
/* We can't use an atomic int here because: | |
* when there is currently a blockage, we have to wait until we can fetch. | |
* The process accessing the last buffer index **has to be** the one that | |
* cleans up the array and resets to zero. Until the buffers are switched and | |
* the idx is reset to zero, no other process should be able to fetch. | |
*/ | |
// atomic_int front_idx = 0; | |
int idx; | |
pthread_mutex_t idx_lock = PTHREAD_MUTEX_INITIALIZER; | |
pid_t expected_pid = 0; // only accessed after lock | |
int fd; // based on last expected pid | |
char filename[101]; // based on last expected pid | |
void *thread_func(void *arg) { | |
long val=*(long *)arg; | |
free(arg); | |
while (1) { | |
pthread_mutex_lock(&idx_lock); | |
int my_idx = idx; | |
idx++; | |
// do the work | |
front[my_idx] = val; | |
if (my_idx < N-1) { | |
// If we do not have to switch yet, release the mutex for next fetch | |
printf("%d ", idx); | |
pthread_mutex_unlock(&idx_lock); | |
} else { // my_idx == N-1 | |
int *to_be_saved = front; | |
pthread_mutex_t *to_be_saved_lock = front_lock; | |
pthread_mutex_t *other_lock = back_lock; | |
// Buffer is under maintenance; Lock! | |
pthread_mutex_lock(to_be_saved_lock); | |
// If the previous buffer is already flushed out (i.e. unlocked), switch to it | |
pthread_mutex_lock(other_lock); | |
int *tmp = front; front = back; back = tmp; | |
pthread_mutex_t *tmp2 = front_lock; front_lock = back_lock; back_lock = tmp2; | |
idx = 0; | |
pthread_mutex_unlock(other_lock); | |
// At this point, People can already increment on it again | |
pthread_mutex_unlock(&idx_lock); | |
// Now we only have to write it out | |
// Since we want multiprocessing support, it can happen that we forked | |
// processes, thus we do not "own" our file descriptor anymore. | |
// | |
// Note that this also covers initialization | |
pid_t current_pid = getpid(); | |
if (current_pid != expected_pid) { | |
expected_pid = current_pid; | |
sprintf(filename, "output_%d.txt", expected_pid); | |
fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644); | |
} | |
// How we actually write out the numbers | |
char buf[51]; | |
for (size_t i=0; i<N; ++i) { | |
sprintf(buf, "%d ", to_be_saved[i]); | |
write(fd, buf, strlen(buf)); | |
} | |
// Work done! | |
pthread_mutex_unlock(to_be_saved_lock); | |
printf("\nWriteout happened at %ld\n", val); | |
} | |
} | |
} | |
int main() { | |
pthread_t thread1, thread2; | |
long *id1 = malloc(sizeof(long)); | |
long *id2 = malloc(sizeof(long)); | |
*id1 = 1; | |
*id2 = 2; | |
pthread_create(&thread1, NULL, thread_func, id1); | |
pthread_create(&thread2, NULL, thread_func, id2); | |
pthread_join(thread1, NULL); | |
pthread_join(thread2, NULL); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment