Created
December 13, 2023 09:24
-
-
Save brendancol/fdbb2c3b5ace71193ffaa4f5a40dd81f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from enum import Enum | |
from dateutil.relativedelta import relativedelta | |
from typing import Tuple, List | |
def get_today() -> datetime: | |
tz = pytz.timezone('America/New_York') | |
return datetime.now(tz) | |
class RelativeTimeFrame(str, Enum): | |
CURRENT = 'c' | |
LAST = 'l' | |
NEXT = 'n' | |
PLUS = '+' | |
MINUS = '-' | |
TODAY = 't' | |
class TimeFrame(str, Enum): | |
DAY = 'd' | |
WEEK = 'w' | |
MONTH = 'm' | |
QUARTER = 'q' | |
YEAR = 'y' | |
class Day(str, Enum): | |
MONDAY = '1' | |
TUESDAY = '2' | |
WEDNESDAY = '3' | |
THURSDAY = '4' | |
FRIDAY = '5' | |
SATURDAY = '6' | |
SUNDAY = '7' | |
class Quarter(str, Enum): | |
FIRST = '1' | |
SECOND = '2' | |
THIRD = '3' | |
FOURTH = '4' | |
class Operator(str, Enum): | |
BACKWARD = '-' | |
FORWARD = '+' | |
LAST = 'l' | |
NEXT = 'n' | |
START_OF_PERIOD = '^' | |
END_OF_PERIOD = '$' | |
class TimeUnit(str, Enum): | |
DAY = 'd' | |
WEEK = 'w' | |
MONTH = 'm' | |
QUARTER = 'q' | |
YEAR = 'y' | |
class SpecialCommand(str, Enum): | |
TODAY = 't' | |
TOMORROW = 'T' | |
YESTERDAY = 'Y' | |
THIS_WEEK = 'W' | |
THIS_MONTH = 'M' | |
THIS_QUARTER = 'Q' | |
THIS_YEAR = 'YY' | |
VISUAL_MODE = 'v' | |
START_MODE = '[' | |
END_MODE = ']' | |
SEARCH = '/' | |
class ValueChar(str, Enum): | |
VALUE_0 = '0' | |
VALUE_1 = '1' | |
VALUE_2 = '2' | |
VALUE_3 = '3' | |
VALUE_4 = '4' | |
VALUE_5 = '5' | |
VALUE_6 = '6' | |
VALUE_7 = '7' | |
VALUE_8 = '8' | |
VALUE_9 = '9' | |
class Snap(str, Enum): | |
START = '^' | |
END = '$' | |
class SubCommand(object): | |
def __init__(self): | |
self.operator = None | |
self.value = None | |
self.unit = None | |
self.special = None | |
def __repr__(self): | |
return f'SubCommand<operator={self.operator}, value={self.value}, unit={self.unit}, special={self.special}>' | |
@property | |
def is_visual_mode(self): | |
return self.special == SpecialCommand.VISUAL_MODE | |
@property | |
def is_start_mode(self): | |
return self.special == SpecialCommand.START_MODE | |
@property | |
def is_end_mode(self): | |
return self.special == SpecialCommand.END_MODE | |
class DateTimeRangeTransformer(object): | |
def __init__(self, command_str): | |
self._command_str = command_str | |
self.commands = self._interpret_subcommands(command_str) | |
def __repr__(self): | |
return self._command_str | |
@property | |
def first(self): | |
return self.commands[0] | |
def __call__(self, start_date: datetime, | |
end_date: datetime) -> Tuple[datetime, datetime]: | |
for cmd in self.start_date_modifiers: | |
start_date += self._get_relativedelta(start_date, | |
cmd, | |
snap=Snap.START) | |
for cmd in self.end_date_modifiers: | |
end_date += self._get_relativedelta(end_date, | |
cmd, | |
snap=Snap.END) | |
return start_date, end_date | |
def _visual_index(self): | |
for i, c in enumerate(self.commands): | |
if c.special == SpecialCommand.VISUAL_MODE: | |
return i | |
return None | |
@property | |
def start_date_modifiers(self): | |
if self._visual_index() is not None: | |
cmds = self.commands[:self._visual_index()] | |
else: | |
cmds = self.commands | |
for c in cmds: | |
if c.is_end_mode: | |
continue | |
else: | |
yield c | |
@property | |
def end_date_modifiers(self): | |
if self._visual_index() is not None: | |
cmds = self.commands[self._visual_index()+1:] | |
else: | |
cmds = self.commands | |
for c in cmds: | |
if c.is_start_mode: | |
continue | |
else: | |
yield c | |
def _interpret_subcommands(self, cmd_str: str) -> List[SubCommand]: | |
cmds = cmd_str.split(' ') | |
for i in range(len(cmds)): | |
group = cmds[i] | |
cmd = SubCommand() | |
if group == 'YY': | |
cmd.special = SpecialCommand(group) | |
else: | |
numerics = '' | |
for char in group: | |
if char in Operator.__members__.values(): | |
cmd.operator = Operator(char) | |
elif char in TimeUnit.__members__.values(): | |
cmd.unit = TimeUnit(char) | |
elif char in SpecialCommand.__members__.values(): | |
cmd.special = SpecialCommand(char) | |
elif char in ValueChar.__members__.values(): | |
numerics += char | |
# cast numeric if it exists | |
if len(numerics): | |
cmd.value = int(numerics) | |
if cmd.special in (SpecialCommand.TODAY, | |
SpecialCommand.YESTERDAY, | |
SpecialCommand.TOMORROW): | |
cmd.value = 0 | |
cmd.unit = TimeUnit.DAY | |
elif cmd.special == SpecialCommand.THIS_WEEK: | |
cmd.value = 0 | |
cmd.unit = TimeUnit.WEEK | |
elif cmd.special == SpecialCommand.THIS_MONTH: | |
cmd.value = 0 | |
cmd.unit = TimeUnit.MONTH | |
elif cmd.special == SpecialCommand.THIS_QUARTER: | |
cmd.value = 0 | |
cmd.unit = TimeUnit.QUARTER | |
elif cmd.special == SpecialCommand.THIS_YEAR: | |
cmd.value = 0 | |
cmd.unit = TimeUnit.YEAR | |
cmds[i] = cmd | |
return cmds | |
def snap_datetime_to_unit_start(self, d: datetime, unit: TimeUnit) -> datetime: | |
if unit == TimeUnit.DAY: | |
return d.replace(hour=0, minute=0, second=0, microsecond=0) | |
elif unit == TimeUnit.WEEK: | |
return d - relativedelta(days=d.weekday()) | |
elif unit == TimeUnit.MONTH: | |
return d.replace(day=1, hour=0, minute=0, second=0, microsecond=0) | |
elif unit == TimeUnit.QUARTER: | |
if d.month in [1, 2, 3]: | |
return d.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) | |
elif d.month in [4, 5, 6]: | |
return d.replace(month=4, day=1, hour=0, minute=0, second=0, microsecond=0) | |
elif d.month in [7, 8, 9]: | |
return d.replace(month=7, day=1, hour=0, minute=0, second=0, microsecond=0) | |
elif d.month in [10, 11, 12]: | |
return d.replace(month=10, day=1, hour=0, minute=0, second=0, microsecond=0) | |
elif unit == TimeUnit.YEAR: | |
return d.replace(day=1, month=1, hour=0, minute=0, second=0, microsecond=0) | |
def snap_datetime_to_unit_end(self, d: datetime, unit: TimeUnit) -> datetime: | |
if unit == TimeUnit.DAY: | |
return d.replace(hour=23, minute=59, second=59, microsecond=999999) | |
elif unit == TimeUnit.WEEK: | |
return d + relativedelta(days=6-d.weekday()) | |
elif unit == TimeUnit.MONTH: | |
return d.replace(day=1, hour=23, minute=59, second=59, microsecond=999999) + relativedelta(months=1, days=-1) | |
elif unit == TimeUnit.QUARTER: | |
if d.month in [1, 2, 3]: | |
return d.replace(month=3, day=31, hour=23, minute=59, second=59, microsecond=999999) | |
elif d.month in [4, 5, 6]: | |
return d.replace(month=6, day=30, hour=23, minute=59, second=59, microsecond=999999) | |
elif d.month in [7, 8, 9]: | |
return d.replace(month=9, day=30, hour=23, minute=59, second=59, microsecond=999999) | |
elif d.month in [10, 11, 12]: | |
return d.replace(month=12, day=31, hour=23, minute=59, second=59, microsecond=999999) | |
elif unit == TimeUnit.YEAR: | |
return d.replace(day=1, month=1, hour=23, minute=59, second=59, microsecond=999999) + relativedelta(years=1, days=-1) | |
def _compute_relative_delta(self, | |
anchor: datetime, | |
cmd: SubCommand, | |
snap: Snap) -> relativedelta: | |
ruff = None | |
# LAST | |
if cmd.unit == TimeUnit.DAY and cmd.operator == Operator.LAST: | |
ruff = anchor - relativedelta(days=1) | |
elif cmd.unit == TimeUnit.WEEK and cmd.operator == Operator.LAST: | |
ruff = anchor - relativedelta(days=7) | |
elif cmd.unit == TimeUnit.MONTH and cmd.operator == Operator.LAST: | |
ruff = anchor - relativedelta(months=1) | |
elif cmd.unit == TimeUnit.QUARTER and cmd.operator == Operator.LAST: | |
ruff = anchor - relativedelta(months=3) | |
elif cmd.unit == TimeUnit.YEAR and cmd.operator == Operator.LAST: | |
ruff = anchor - relativedelta(years=1) | |
# NEXT | |
elif cmd.unit == TimeUnit.DAY and cmd.operator == Operator.NEXT: | |
ruff = anchor + relativedelta(days=1) | |
elif cmd.unit == TimeUnit.WEEK and cmd.operator == Operator.NEXT: | |
ruff = anchor + relativedelta(days=7) | |
elif cmd.unit == TimeUnit.MONTH and cmd.operator == Operator.NEXT: | |
ruff = anchor + relativedelta(months=1) | |
elif cmd.unit == TimeUnit.QUARTER and cmd.operator == Operator.NEXT: | |
ruff = anchor + relativedelta(months=3) | |
elif cmd.unit == TimeUnit.YEAR and cmd.operator == Operator.NEXT: | |
ruff = anchor + relativedelta(years=1) | |
# PREDEFINED | |
elif cmd.special in (SpecialCommand.TODAY, | |
SpecialCommand.THIS_WEEK, | |
SpecialCommand.THIS_MONTH, | |
SpecialCommand.THIS_QUARTER, | |
SpecialCommand.THIS_YEAR): | |
ruff = get_today() | |
elif cmd.special == SpecialCommand.YESTERDAY: | |
ruff = get_today() - relativedelta(days=1) | |
elif cmd.special == SpecialCommand.TOMORROW: | |
ruff = get_today() + relativedelta(days=1) | |
if snap == Snap.START: | |
final_date = self.snap_datetime_to_unit_start(ruff, cmd.unit) | |
elif snap == Snap.END: | |
final_date = self.snap_datetime_to_unit_end(ruff, cmd.unit) | |
return relativedelta(final_date, anchor) | |
def _compute_absolute_delta(self, cmd: SubCommand) -> relativedelta: | |
value = cmd.value | |
if cmd.operator in (Operator.BACKWARD): | |
value *= -1 | |
if cmd.unit == TimeUnit.DAY: | |
return relativedelta(days=value) | |
elif cmd.unit == TimeUnit.WEEK: | |
return relativedelta(weeks=value) | |
elif cmd.unit == TimeUnit.MONTH: | |
return relativedelta(months=value) | |
elif cmd.unit == TimeUnit.QUARTER: | |
return relativedelta(months=3*value) | |
elif cmd.unit == TimeUnit.YEAR: | |
return relativedelta(years=value) | |
else: | |
raise ValueError(f'Unknown unit: {cmd.unit}') | |
def _get_relativedelta(self, | |
anchor: datetime, | |
cmd: SubCommand, | |
snap: Snap) -> relativedelta: | |
if cmd.operator in (Operator.BACKWARD, Operator.FORWARD): | |
return self._compute_absolute_delta(cmd) | |
else: | |
return self._compute_relative_delta(anchor, cmd, snap) | |
if __name__ == '__main__': | |
import pytz | |
import pandas as pd | |
pd.set_option('display.max_columns', 50) | |
tz = pytz.timezone('America/New_York') | |
today = datetime.now(tz) | |
start_date = today.replace(hour=0, minute=0, second=0, microsecond=0) | |
end_date = today.replace(hour=23, minute=59, second=59, microsecond=999999) | |
cmds = [ | |
DateTimeRangeTransformer("-1d v +2d"), | |
DateTimeRangeTransformer("-1d"), | |
DateTimeRangeTransformer("-1m v +2w"), | |
DateTimeRangeTransformer("-2y v -2w"), | |
DateTimeRangeTransformer("-1y v -1m"), | |
DateTimeRangeTransformer("-1y +1m v -1m"), | |
DateTimeRangeTransformer("[-1y"), | |
DateTimeRangeTransformer("]+1y"), | |
DateTimeRangeTransformer("lw"), | |
DateTimeRangeTransformer("nw"), | |
DateTimeRangeTransformer("lm"), | |
DateTimeRangeTransformer("nm"), | |
DateTimeRangeTransformer("lq"), | |
DateTimeRangeTransformer("nq"), | |
DateTimeRangeTransformer("ly"), | |
DateTimeRangeTransformer("ny"), | |
DateTimeRangeTransformer("lw [-2d"), | |
DateTimeRangeTransformer("lw ]+2d"), | |
DateTimeRangeTransformer("t"), | |
DateTimeRangeTransformer("T"), | |
DateTimeRangeTransformer("W"), | |
DateTimeRangeTransformer("M"), | |
DateTimeRangeTransformer("Q"), | |
DateTimeRangeTransformer("Y"), | |
DateTimeRangeTransformer("YY"), | |
] | |
results = [] | |
for c in cmds: | |
obj = {} | |
obj['cmd'] = c | |
result = c(start_date, end_date) | |
obj['start_date'] = result[0].strftime('%a %Y-%m-%d %H:%M:%S') | |
obj['end_date'] = result[1].strftime('%a %Y-%m-%d %H:%M:%S') | |
obj['delta'] = result[1] - result[0] | |
obj['start_delta'] = result[0] - start_date | |
obj['end_delta'] = result[1] - end_date | |
results.append(obj) | |
df = pd.DataFrame.from_records(results) | |
print(df) |
Author
brendancol
commented
Dec 13, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment