Created
January 6, 2021 16:57
-
-
Save liggitt/7cf544f833b6baec029bb21dc247ecfd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"runtime" | |
"sync" | |
"time" | |
) | |
func main() { | |
fmt.Println("ctrl+c to stop...") | |
swh := &sampleAndWaterMarkHistograms{ | |
sampleAndWaterMarkObserverGenerator: &sampleAndWaterMarkObserverGenerator{ | |
t0: time.Now(), | |
samplePeriod: time.Millisecond, | |
}, | |
} | |
// set up a few goroutines per CPU | |
for i := 0; i < 4*runtime.NumCPU(); i++ { | |
go func() { | |
for { | |
swh.Set(1) | |
time.Sleep(time.Millisecond) | |
} | |
}() | |
} | |
select {} | |
} | |
type sampleAndWaterMarkObserverGenerator struct { | |
t0 time.Time | |
samplePeriod time.Duration | |
} | |
func (swg *sampleAndWaterMarkObserverGenerator) quantize(when time.Time) int64 { | |
if when.Before(swg.t0) { | |
panic(fmt.Errorf("Time went backwards: t0=%v, when=%v", swg.t0, when)) | |
} | |
return int64(when.Sub(swg.t0) / swg.samplePeriod) | |
} | |
type sampleAndWaterMarkHistograms struct { | |
*sampleAndWaterMarkObserverGenerator | |
sync.Mutex | |
x1 float64 | |
sampleAndWaterMarkAccumulator | |
} | |
type sampleAndWaterMarkAccumulator struct { | |
lastSet time.Time | |
lastSetInt int64 // lastSet / samplePeriod | |
x float64 | |
relX float64 // x / x1 | |
loRelX, hiRelX float64 | |
} | |
func (saw *sampleAndWaterMarkHistograms) Set(x float64) { | |
saw.innerSet(func() { | |
saw.x = x | |
}) | |
} | |
func (saw *sampleAndWaterMarkHistograms) innerSet(updateXOrX1 func()) { | |
var when time.Time | |
var whenInt int64 | |
var acc sampleAndWaterMarkAccumulator | |
var wellOrdered bool | |
func() { | |
saw.Lock() | |
defer saw.Unlock() | |
when = time.Now() | |
whenInt = saw.quantize(when) | |
acc = saw.sampleAndWaterMarkAccumulator | |
wellOrdered = !when.Before(acc.lastSet) | |
updateXOrX1() | |
saw.relX = saw.x / saw.x1 | |
if wellOrdered { | |
if acc.lastSetInt < whenInt { | |
saw.loRelX, saw.hiRelX = acc.relX, acc.relX | |
saw.lastSetInt = whenInt | |
} | |
saw.lastSet = when | |
} | |
// `wellOrdered` should always be true because we are using | |
// monotonic clock readings and they never go backwards. Yet | |
// very small backwards steps (under 1 microsecond) have been | |
// observed | |
// (https://github.com/kubernetes/kubernetes/issues/96459). | |
// In the backwards case, treat the current reading as if it | |
// had occurred at time `saw.lastSet` and log an error. It | |
// would be wrong to update `saw.lastSet` in this case because | |
// that plants a time bomb for future updates to | |
// `saw.lastSetInt`. | |
if saw.relX < saw.loRelX { | |
saw.loRelX = saw.relX | |
} else if saw.relX > saw.hiRelX { | |
saw.hiRelX = saw.relX | |
} | |
}() | |
if !wellOrdered { | |
lastSetS := acc.lastSet.String() | |
whenS := when.String() | |
fmt.Printf("Time went backwards from %s to %s\n", lastSetS, whenS) | |
} | |
for acc.lastSetInt < whenInt { | |
acc.lastSetInt++ | |
acc.loRelX, acc.hiRelX = acc.relX, acc.relX | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment