Skip to content

Instantly share code, notes, and snippets.

@theSoberSobber
Last active February 10, 2025 21:03
Show Gist options
  • Save theSoberSobber/8cd370a826c37c7a1447cd6bd13f7b03 to your computer and use it in GitHub Desktop.
Save theSoberSobber/8cd370a826c37c7a1447cd6bd13f7b03 to your computer and use it in GitHub Desktop.
A is supposed to be someone that generates a resource and requests it to perform an action, A waits min (max of TTL, write lock acquiring time , message to quit), B is an ack verification from the resource of a task so A stops requesting more resources, C is a service that forcefully says that A should stop and the task is done, it reads the las…
import { Injectable, OnModuleInit } from '@nestjs/common';
import { InjectRedis } from '@liaoliaots/nestjs-redis';
import { Redis } from 'ioredis';
import { v4 as uuidv4 } from 'uuid';
@Injectable()
export class RedisService implements OnModuleInit {
private redisSubscriber: Redis;
constructor(@InjectRedis() private readonly redis: Redis) {}
async onModuleInit() {
this.redisSubscriber = this.redis.duplicate();
}
async functionA(): Promise<boolean> {
const uuid = uuidv4();
console.log(`Subscribing to test.${uuid}`);
await this.redisSubscriber.subscribe('test.uuid');
while (true) {
// Try acquiring write lock
const lockAcquired = await this.acquireLock(uuid);
if (!lockAcquired) {
console.log('Waiting for lock to be available...');
await new Promise((res) => setTimeout(res, 100));
continue;
}
try {
// Store UUID in Redis
await this.redis.set(`id:${uuid}`, uuid);
console.log(`Stored ID ${uuid} in Redis`);
} finally {
await this.releaseLock(uuid);
}
console.log(`Waiting for message or lock availability for ${uuid}`);
const success = await this.waitForMessage(uuid);
if (success) {
console.log('Valid response received:', uuid);
await this.redisSubscriber.unsubscribe('test.uuid');
return true;
}
}
}
private async acquireLock(id: string): Promise<boolean> {
return (await this.redis.setnx(`lock:${id}`, 'locked')) === 1;
}
private async releaseLock(id: string): Promise<void> {
await this.redis.del(`lock:${id}`);
}
private waitForMessage(uuid: string): Promise<boolean> {
return new Promise((resolve) => {
const checkLockInterval = setInterval(async () => {
const lockExists = await this.redis.exists(`lock:${uuid}`);
if (!lockExists) {
clearInterval(checkLockInterval);
this.redisSubscriber.off('message', handleMessage);
resolve(true);
}
}, 100);
const handleMessage = (channel: string, message: string) => {
if (channel === 'test.uuid' && message === `stop:${uuid}`) {
console.log(`Received stop signal for ${uuid}`);
clearInterval(checkLockInterval);
this.redisSubscriber.off('message', handleMessage);
resolve(true);
}
};
this.redisSubscriber.on('message', handleMessage);
});
}
async functionB(id: string): Promise<void> {
const uuid = uuidv4();
console.log(`Publishing message to test.uuid: ${uuid}:${id}`);
await this.redis.publish('test.uuid', `${uuid}:${id}`);
}
async functionC(id: string): Promise<void> {
const lockKey = `lock:${id}`;
while (!(await this.acquireLock(id))) {
await new Promise((res) => setTimeout(res, 100));
}
try {
await this.redis.del(`id:${id}`);
console.log(`Deleted ID ${id} from Redis`);
await this.redis.publish('test.uuid', `stop:${id}`);
} finally {
await this.releaseLock(id);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment