Created
July 8, 2020 08:28
-
-
Save vrymel/d14d7f4dacb747e7758ab74904c10844 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import boto3 | |
from botocore.exceptions import ClientError | |
import requests | |
import arrow | |
import csv | |
from time import sleep | |
from decimal import * | |
BASE_URL = os.environ['PSE_BASE_URL'] | |
QUOTE_API_PATH = os.environ['PSE_QUOTE_API_PATH'] | |
API_ENDPOINT = "{}{}".format(BASE_URL, QUOTE_API_PATH) | |
TRADING_DATE_FORMAT = "YYYY-MM-DD HH:mm:ss" | |
BUCKET_NAME = "psedata" | |
PRICE_DAILY_PREFIX = "price_daily" | |
COMPANY_LIST_NAME = "company_list.csv" | |
TMP_COMPANY_LIST = "/tmp/{}".format(COMPANY_LIST_NAME) | |
TMP_DIRECTORY = "/tmp" | |
CHUNK_SIZE = 15 | |
s3_client = boto3.client("s3") | |
dynamodb = boto3.resource("dynamodb", region_name=os.environ['DYNAMODB_REGION'], | |
endpoint_url=os.environ['DYNAMODB_ENDPOINT']) | |
dynamodb_table = dynamodb.Table(os.environ['DYNAMODB_TABLE']) | |
def lambda_handler(event, context): | |
target_date = get_event_datetime(event['time']) | |
target_file_name = "{}.csv".format(target_date.isoformat()) | |
s3_key_name = "{}/{}".format(PRICE_DAILY_PREFIX, target_file_name) | |
working_file_path = "{}/{}".format(TMP_DIRECTORY, target_file_name) | |
prepare_local_working_file(working_file_path, s3_key_name) | |
last_completed_stock_code = get_last_completed_stock_code(working_file_path) | |
company_list = get_company_list() | |
if last_completed_stock_code: | |
print("Fast-forward after {}".format(last_completed_stock_code)) | |
company_list = filter_completed_stock_codes(last_completed_stock_code, company_list) | |
# If there is no previous stock code detected write the header | |
if not last_completed_stock_code: | |
header = "STOCK_CODE,OPEN,LOW,HIGH,CLOSE,AVG_PRICE,VOLUME,VALUE\n" | |
write_price_actions(working_file_path, [header]) | |
chunk = [] | |
for price_action in request_price_actions(target_date, company_list): | |
# Only write when there is movement | |
if price_action.get("open"): | |
dynamodb_write(price_action) | |
pa = "{},{},{},{},{},{},{},{}\n".format(price_action.get("stock_code"), | |
price_action.get("open"), | |
price_action.get("low"), | |
price_action.get("high"), | |
price_action.get("close"), | |
price_action.get("avg_price"), | |
price_action.get("volume"), | |
price_action.get("value")) | |
print(pa) | |
chunk.append(pa) | |
if len(chunk) == CHUNK_SIZE: | |
print("Chunk full... Uploading to S3.") | |
write_price_actions(working_file_path, chunk) | |
upload_working_file(working_file_path, s3_key_name) | |
chunk = [] | |
# Wait 0.25 sec in between requests | |
sleep(0.25) | |
# Write and upload remaining chunk that did not meet the CHUNK_SIZE maximum | |
if len(chunk) > 0: | |
write_price_actions(working_file_path, chunk) | |
upload_working_file(working_file_path, s3_key_name) | |
print("Job done!") | |
def request_price_actions(target_date, company_list): | |
trading_date = target_date.isoformat() | |
for company in company_list: | |
response = requests.post(API_ENDPOINT, data={"security": company.get("security_id")}, headers={"referer": BASE_URL}) | |
if response: | |
json = response.json() | |
records = json.get("records") | |
today = get_target_date(target_date, records) | |
if today: | |
price_action = { | |
"stock_code": company.get("stock_code"), | |
"trading_date": trading_date, | |
"open": today.get("sqOpen"), | |
"low": today.get("sqLow"), | |
"high": today.get("sqHigh"), | |
"close": today.get("sqClose"), | |
"avg_price": today.get("avgPrice"), | |
"volume": today.get("totalVolume"), | |
"value": today.get("totalValue") | |
} | |
else: | |
price_action = { | |
"stock_code": company.get("stock_code"), | |
"trading_date": trading_date, | |
"open": None, | |
"low": None, | |
"high": None, | |
"close": None, | |
"avg_price": None, | |
"volume": None, | |
"value": None | |
} | |
yield price_action | |
def get_event_datetime(event_date_str): | |
return arrow.get(event_date_str).date() | |
def get_company_list(): | |
with open(TMP_COMPANY_LIST, "wb") as file: | |
s3_client.download_fileobj(BUCKET_NAME, COMPANY_LIST_NAME, file) | |
company_details = [] | |
with open(TMP_COMPANY_LIST, "r") as file: | |
reader = csv.reader(file, delimiter=",", quotechar='"') | |
next(reader) | |
for row in reader: | |
company_details.append({ | |
"stock_code": row[0], | |
"company_name": row[1], | |
"company_id": row[2], | |
"security_id": row[3] | |
}) | |
return company_details | |
def filter_completed_stock_codes(after_stock_code, company_list): | |
start_new_company_list = False | |
_company_list = [] | |
for c in company_list: | |
if not start_new_company_list and c.get("stock_code") == after_stock_code: | |
start_new_company_list = True | |
continue | |
if start_new_company_list: | |
_company_list.append(c) | |
return _company_list | |
def get_target_date(target_datetime, records): | |
# The PSE API data is sorted from latest to oldest. So if the target_datetime | |
# is the current date then this iteration would just execute once and returns. | |
for r in records: | |
raw_target_date = r.get("tradingDate") | |
trading_date = arrow.get(raw_target_date, TRADING_DATE_FORMAT) | |
if target_datetime == trading_date.date(): | |
return r | |
return False | |
def write_price_actions(working_file_path, price_actions): | |
with open(working_file_path, "a") as file: | |
for pa in price_actions: | |
file.write(pa) | |
def upload_working_file(source_file_path, key_name): | |
s3_client.upload_file(source_file_path, BUCKET_NAME, key_name, ExtraArgs={ | |
'StorageClass': 'STANDARD_IA' | |
}) | |
def prepare_local_working_file(local_working_file_path, key_name): | |
file_check = None | |
try: | |
file_check = s3_client.head_object(Bucket=BUCKET_NAME, Key=key_name) | |
except ClientError: | |
print("Object does not exist") | |
pass | |
if file_check: | |
print("Object exist, downloading") | |
with open(local_working_file_path, "wb") as file: | |
s3_client.download_fileobj(BUCKET_NAME, key_name, file) | |
else: | |
# Create or truncate file if it exist in local for some reason | |
with open(local_working_file_path, "w"): | |
pass | |
def get_last_completed_stock_code(local_working_file_path): | |
with open(local_working_file_path) as file: | |
lines = file.readlines() | |
lines_length = len(lines) | |
# If it's empty or just the header row | |
if lines_length <= 1: | |
return False | |
last_line = lines[lines_length - 1] | |
values = last_line.split(",") | |
stock_code = values[0] | |
return stock_code.strip() | |
def dynamodb_write(price_action): | |
item = { | |
"stock_code": price_action.get("stock_code"), | |
"trading_date": price_action.get("trading_date"), | |
"open": dynamodb_decimal(price_action.get("open")), | |
"low": dynamodb_decimal(price_action.get("low")), | |
"high": dynamodb_decimal(price_action.get("high")), | |
"close": dynamodb_decimal(price_action.get("close")), | |
"volume": dynamodb_decimal(price_action.get("volume")) | |
} | |
dynamodb_table.put_item(Item=item) | |
# See https://github.com/boto/boto3/issues/665 why this needs to be done | |
def dynamodb_decimal(value): | |
return Decimal(str(value)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment