Last active
February 10, 2025 21:03
-
-
Save theSoberSobber/8cd370a826c37c7a1447cd6bd13f7b03 to your computer and use it in GitHub Desktop.
A is supposed to be someone that generates a resource and requests it to perform an action, A waits min (max of TTL, write lock acquiring time , message to quit), B is an ack verification from the resource of a task so A stops requesting more resources, C is a service that forcefully says that A should stop and the task is done, it reads the las…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Injectable, OnModuleInit } from '@nestjs/common'; | |
import { InjectRedis } from '@liaoliaots/nestjs-redis'; | |
import { Redis } from 'ioredis'; | |
import { v4 as uuidv4 } from 'uuid'; | |
@Injectable() | |
export class RedisService implements OnModuleInit { | |
private redisSubscriber: Redis; | |
constructor(@InjectRedis() private readonly redis: Redis) {} | |
async onModuleInit() { | |
this.redisSubscriber = this.redis.duplicate(); | |
} | |
async functionA(): Promise<boolean> { | |
const uuid = uuidv4(); | |
console.log(`Subscribing to test.${uuid}`); | |
await this.redisSubscriber.subscribe('test.uuid'); | |
while (true) { | |
// Try acquiring write lock | |
const lockAcquired = await this.acquireLock(uuid); | |
if (!lockAcquired) { | |
console.log('Waiting for lock to be available...'); | |
await new Promise((res) => setTimeout(res, 100)); | |
continue; | |
} | |
try { | |
// Store UUID in Redis | |
await this.redis.set(`id:${uuid}`, uuid); | |
console.log(`Stored ID ${uuid} in Redis`); | |
} finally { | |
await this.releaseLock(uuid); | |
} | |
console.log(`Waiting for message or lock availability for ${uuid}`); | |
const success = await this.waitForMessage(uuid); | |
if (success) { | |
console.log('Valid response received:', uuid); | |
await this.redisSubscriber.unsubscribe('test.uuid'); | |
return true; | |
} | |
} | |
} | |
private async acquireLock(id: string): Promise<boolean> { | |
return (await this.redis.setnx(`lock:${id}`, 'locked')) === 1; | |
} | |
private async releaseLock(id: string): Promise<void> { | |
await this.redis.del(`lock:${id}`); | |
} | |
private waitForMessage(uuid: string): Promise<boolean> { | |
return new Promise((resolve) => { | |
const checkLockInterval = setInterval(async () => { | |
const lockExists = await this.redis.exists(`lock:${uuid}`); | |
if (!lockExists) { | |
clearInterval(checkLockInterval); | |
this.redisSubscriber.off('message', handleMessage); | |
resolve(true); | |
} | |
}, 100); | |
const handleMessage = (channel: string, message: string) => { | |
if (channel === 'test.uuid' && message === `stop:${uuid}`) { | |
console.log(`Received stop signal for ${uuid}`); | |
clearInterval(checkLockInterval); | |
this.redisSubscriber.off('message', handleMessage); | |
resolve(true); | |
} | |
}; | |
this.redisSubscriber.on('message', handleMessage); | |
}); | |
} | |
async functionB(id: string): Promise<void> { | |
const uuid = uuidv4(); | |
console.log(`Publishing message to test.uuid: ${uuid}:${id}`); | |
await this.redis.publish('test.uuid', `${uuid}:${id}`); | |
} | |
async functionC(id: string): Promise<void> { | |
const lockKey = `lock:${id}`; | |
while (!(await this.acquireLock(id))) { | |
await new Promise((res) => setTimeout(res, 100)); | |
} | |
try { | |
await this.redis.del(`id:${id}`); | |
console.log(`Deleted ID ${id} from Redis`); | |
await this.redis.publish('test.uuid', `stop:${id}`); | |
} finally { | |
await this.releaseLock(id); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment