Created
May 28, 2025 17:56
-
-
Save adilcpm/92bee38a61e940c4583864f9502a898c to your computer and use it in GitHub Desktop.
Bybit clock offset
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{collections::HashMap, ops::Add, time::Duration}; | |
use arber::{ | |
config::get_config, | |
exec::bybit::structs::{AuthArgs, BybitWSAuthRequest}, | |
}, | |
}; | |
use bybit::ws::response::PrivateResponse; | |
use bytes::BufMut; | |
use chrono::Utc; | |
use futures::{SinkExt, StreamExt}; | |
use hmac::{Hmac, Mac}; | |
use sha2::Sha256; | |
use tokio::time::interval; | |
use tokio_tungstenite::{connect_async, tungstenite::Message}; | |
use tracing::{error, info}; | |
use utils::now_ms; | |
use yellowstone_grpc_proto::{ | |
geyser::SubscribeRequestFilterTransactions, | |
prelude::{subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestPing}, | |
}; | |
type HmacSha256 = Hmac<Sha256>; | |
const PRIVATE_WS_ENDPOINT: &str = "wss://stream.bybit.com:443/v5/private"; | |
const SIGNATURE_PART1: &str = "GET/realtime"; | |
const AUTH_CHANNEL: &str = "auth"; | |
#[derive(Debug, Clone)] | |
struct RttSample { | |
rtt_ms: u64, | |
server_timestamp: u64, | |
ping_timestamp: u64, | |
} | |
pub async fn bybit_ws_test() { | |
tracing_subscriber::fmt() | |
.with_writer(std::io::stdout) | |
.with_max_level(tracing::Level::INFO) | |
.init(); | |
let api_key: String = get_config().bybit_api_key_trade.clone(); | |
let secret: String = get_config().bybit_api_secret_trade.clone(); | |
let (ws_stream, _response) = connect_async(PRIVATE_WS_ENDPOINT).await.unwrap(); | |
let (mut write, mut read) = ws_stream.split(); | |
let expire = Utc::now().timestamp_millis().add(10000) as u64; | |
let mut mac = | |
HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); | |
let mut buffer: Vec<u8> = Vec::new(); | |
buffer.put(SIGNATURE_PART1.as_bytes()); | |
buffer.put(expire.to_string().as_bytes()); | |
mac.update(&buffer); | |
let result = mac.finalize(); | |
let code_bytes = result.into_bytes(); | |
let signature_start_login = hex::encode(code_bytes); | |
let auth_message = BybitWSAuthRequest { | |
op: AUTH_CHANNEL, | |
args: &[ | |
AuthArgs::Str(api_key.as_str()), | |
AuthArgs::Int(expire), | |
AuthArgs::Str(signature_start_login.as_str()), | |
], | |
}; | |
let auth_message_bytes = serde_json::to_vec(&auth_message).unwrap(); | |
write.send(Message::Binary(auth_message_bytes)).await.unwrap(); | |
let mut ping_interval = interval(Duration::from_secs(10)); | |
let mut ping_send_timestamp_ms = now_ms(); | |
// Store multiple RTT samples for better clock sync estimation | |
let mut rtt_samples: Vec<RttSample> = Vec::new(); | |
const MAX_SAMPLES: usize = 10; // Keep last 10 samples | |
const MIN_SAMPLES_FOR_CALCULATION: usize = 3; // Need at least 3 samples | |
loop { | |
tokio::select! { | |
// Branch 1: Handle incoming messages | |
msg = read.next() => { | |
match msg { | |
Some(Ok(message)) => match message { | |
Message::Text(text) => { | |
match serde_json::from_str(&text) { | |
Ok(res) => { | |
match res { | |
PrivateResponse::Pong(res) => { | |
let server_ts_ms = res.args[0].parse::<u64>().unwrap(); | |
let local_ts_ms = now_ms(); | |
let rtt = local_ts_ms - ping_send_timestamp_ms; | |
// Store the new sample | |
let sample = RttSample { | |
rtt_ms: rtt, | |
server_timestamp: server_ts_ms, | |
ping_timestamp: ping_send_timestamp_ms, | |
}; | |
rtt_samples.push(sample); | |
// Keep only the last MAX_SAMPLES | |
if rtt_samples.len() > MAX_SAMPLES { | |
rtt_samples.remove(0); | |
} | |
info!("Pong timestamp ms: {:?}", server_ts_ms); | |
info!("Local timestamp ms: {:?}", local_ts_ms); | |
info!("Current RTT: {:?} ms", rtt); | |
info!("Raw offset: {:?} ms", server_ts_ms as i64 - local_ts_ms as i64); | |
// Calculate offset using minimum RTT if we have enough samples | |
if rtt_samples.len() >= MIN_SAMPLES_FOR_CALCULATION { | |
// Find the sample with minimum RTT | |
let min_rtt_sample = rtt_samples.iter() | |
.min_by_key(|sample| sample.rtt_ms) | |
.unwrap(); | |
let min_rtt = min_rtt_sample.rtt_ms; | |
let one_way_min = min_rtt / 2; | |
// Use the minimum RTT sample for offset calculation | |
let estimated_local_time_at_server_response = | |
min_rtt_sample.ping_timestamp + one_way_min; | |
let offset_min_rtt = min_rtt_sample.server_timestamp as i64 - | |
estimated_local_time_at_server_response as i64; | |
// Also calculate current sample offset for comparison | |
let one_way_current = rtt / 2; | |
let estimated_local_time_current = ping_send_timestamp_ms + one_way_current; | |
let offset_current = server_ts_ms as i64 - estimated_local_time_current as i64; | |
info!("=== RTT ANALYSIS ==="); | |
info!("Samples collected: {}", rtt_samples.len()); | |
info!("Current RTT: {} ms", rtt); | |
info!("Minimum RTT: {} ms", min_rtt); | |
info!("RTT improvement: {} ms", rtt as i64 - min_rtt as i64); | |
info!("Current sample offset: {} ms", offset_current); | |
info!("Min RTT offset: {} ms", offset_min_rtt); | |
info!("Offset difference: {} ms", offset_current - offset_min_rtt); | |
// Show RTT distribution | |
let mut rtts: Vec<u64> = rtt_samples.iter().map(|s| s.rtt_ms).collect(); | |
rtts.sort(); | |
info!("RTT distribution: {:?}", rtts); | |
} else { | |
info!("Collecting samples... ({}/{})", rtt_samples.len(), MIN_SAMPLES_FOR_CALCULATION); | |
} | |
} | |
_ => info!("RECEIVED OTHER MESSAGE : {:?}", res), | |
} | |
} | |
Err(e) => { | |
error!("Error parsing JSON: {}", e); | |
} | |
} | |
} | |
_ => info!("RECEIVED OTHER MESSAGE : {:?}", message), | |
}, | |
Some(Err(e)) => { | |
error!("Error reading from from exec feed WebSocket v2: {}", e); | |
break; | |
} | |
None => { | |
info!("WebSocket stream ended"); | |
break; | |
} | |
} | |
}, | |
// Branch 2: Send ping at regular intervals | |
_ = ping_interval.tick() => { | |
ping_send_timestamp_ms = now_ms(); | |
if let Err(e) = write.send(Message::Text("{\"op\":\"ping\"}".to_string())).await { | |
error!("Failed to send ping: {:?}", e); | |
break; | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment