Last active
February 12, 2023 20:01
-
-
Save muromec/e09dc075ad7446057e90e71215ac4445 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function spawn(fn, pname, toParent = ()=> null) { | |
let current = null; | |
let state = null; | |
function getState() { | |
return state; | |
}; | |
let listeners = []; | |
let buffer = []; | |
function subscribe(fn) { | |
listeners.push(fn); | |
} | |
function send ({ to = pparent, ...msg }) { | |
let ret = to.next({ state, reply: toSelf, msg }); | |
if (!ret.done) { | |
state = ret.value; | |
} | |
tick(); | |
} | |
function tick () { | |
listeners.forEach(fn => fn(state)); | |
let msg; | |
while(msg = buffer.pop()) { | |
toParent({pname, ...msg}); | |
} | |
} | |
function toSelf(msg) { | |
send({ to: current, ...msg}); | |
} | |
function fromChild(msg) { | |
if(msg.type === 'HI') { | |
return msg.subscribe(tick); | |
} | |
toSelf(msg); | |
} | |
function toBuffer(msg) { | |
buffer.push(msg); | |
} | |
function fork (fn, pname) { | |
return spawn(fn, pname, fromChild); | |
} | |
return (...args) => { | |
const process = {pname, send: toSelf, getState, subscribe, fork, toParent: toBuffer}; | |
const task = watchExit(fn)(process, ...args); | |
current = task; | |
task.next(); | |
toSelf({ type: 'INIT' }); | |
toParent({ type: 'HI', subscribe }); | |
return process; | |
} | |
} | |
function attach(supervisor, fn, pname) { | |
return (...args) => { | |
supervisor.send({ type: 'RUN', fn, args, pname}); | |
} | |
} | |
function watchExit(fn) { | |
return function* (process, ...args) { | |
yield* fn(process, ...args); | |
process.toParent({ type: 'EXIT', p: process}); | |
} | |
} | |
function* runDispatch(name, fn) { | |
let state = null; | |
let msg; | |
while(true) { | |
({state, msg} = yield state); | |
console.log('msg', name, ' <- ', msg); | |
state = fn(state, msg); | |
if (state === 'STOPPED') { | |
break; | |
} | |
} | |
} | |
function* supervise({pname, toParent, fork}) { | |
const initialState = Object.freeze({ | |
processes: Object.freeze([]), | |
}); | |
yield* runDispatch(pname, (state, msg)=> { | |
if (msg.type === 'INIT') { | |
return initialState; | |
}; | |
if (msg.type === 'RUN') { | |
const newProcess = fork(msg.fn, msg.pname)(...msg.args); | |
return Object.freeze({ | |
...state, | |
processes: Object.freeze([...state.processes, newProcess]) | |
}); | |
} | |
if (msg.type === 'ERROR' || msg.type === 'ABORT') { | |
state.processes.forEach(p=> p.send({ type: 'ABORT'})); | |
} | |
if (msg.type === 'EXIT') { | |
let processes = Object.freeze(state.processes.filter( | |
iter=> iter !== msg.p | |
)); | |
if (processes.length === 0) { | |
return 'STOPPED'; | |
} else { | |
return Object.freeze({...state, processes}); | |
} | |
} | |
if (msg.type === 'OK') { | |
toParent(msg); | |
} | |
return state; | |
}); | |
} | |
function* xfetch({ pname, toParent, send: toSelf }, { url }) { | |
const controller = new AbortController(); | |
const signal = controller.signal; | |
(async function do_request() { | |
try { | |
const res = await fetch(url.href, { signal }); | |
const text = await res.text(); | |
toSelf({ type: 'OK', text }); | |
} catch (e) { | |
const isAborted = (e instanceof DOMException && e.name === 'AbortError'); | |
if (isAborted) { | |
toSelf({ type: 'ABORTED', pname }); | |
} else { | |
console.log('e', e); | |
toSelf({ type: 'ERROR', pname }); | |
} | |
} | |
toSelf({ type: 'DONE' }); | |
})(); | |
yield* runDispatch(pname, (state, msg)=> { | |
if (msg.type === 'INIT') { | |
return 'pending'; | |
} | |
if (msg.type === 'ABORT') { | |
controller.abort(); | |
} | |
if (msg.type === 'DONE') { | |
return 'STOPPED'; | |
} | |
if (msg.type === 'ABORTED') { | |
toParent(msg); | |
return 'aborted'; | |
} | |
if (msg.type === 'ERROR') { | |
toParent(msg); | |
return 'failed'; | |
} | |
if (msg.type === 'OK') { | |
toParent(msg); | |
return 'ok'; | |
} | |
return state; | |
}); | |
} | |
function* main({ pname, fork }) { | |
const s = fork(supervise, 'super')(); | |
const urls = [ | |
new URL('https://api.myip.com'), | |
new URL('https://x.myip.com'), | |
]; | |
for(let url of urls) { | |
attach(s, xfetch, `xfetch ${url.href}`)({ url }); | |
} | |
yield* runDispatch(pname, (state, msg)=> { | |
if (msg.type === 'INIT') { | |
return {s}; | |
} | |
if (msg.type === 'ABORT') { | |
s.send({ type: 'ABORT'}); | |
} | |
if (msg.type === 'EXIT') { | |
return {s: null}; | |
} | |
if (msg.type === 'OK') { | |
return 'STOPPED'; | |
} | |
return state; | |
}); | |
} | |
const m = spawn(main, 'main')(); | |
//m.send({ type: 'ABORT', p: null}); | |
m.subscribe(() => { | |
const {s} = m.getState(); | |
if(s) { | |
const xs = s.getState(); | |
const {processes} = s.getState(); | |
console.log('processes', processes.map(p=> p.getState())); | |
} else { | |
console.log('no supervisor found'); | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment