Last active
July 28, 2022 21:05
-
-
Save aVolpe/afdbc575b7a76270afca494afe44c384 to your computer and use it in GitHub Desktop.
jpos utils
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import sys | |
import xml.etree.cElementTree as ET | |
import string | |
import random | |
import socket | |
from datetime import datetime | |
if len(sys.argv) < 2: | |
print(""" | |
A script that sends xml messages similar to the jpos XMLPackager. | |
USAGE: | |
python send_message.py IP:PORT field_id:field_value ... | |
WHERE | |
IP:PORT is the ip/port of the jpos cmf port | |
...FIELD_ID:FIELD_VALUE: are the rest of params, each with the format field_id:field_value | |
if field_value starts with a _ is considered a binary in hex format | |
if field_value starts with a __ is considered a ISOAmount, and it will be splited by '-' | |
to separate between currency and amount | |
Some fields are automatically populated with random values like the STAN, RRN | |
Some fields are automatically populated with the current date, like the 7 and 12 | |
USAGE WITH TEMPLATES: | |
This script also supports base messages that you can load as a 'template', to use this feature, | |
create a file with the extension '.xml', and pass as the third parameter: | |
python send_message.py IP:PORT template.xml field_id:field_value ... | |
This will change the rrn/stan/date fields, if you want to keep those fields, pass a four parameter | |
with the value 'keep': | |
python send_message.py IP:PORT template.xml keep field_id:field_value ... | |
""") | |
exit(0) | |
def sortchildrenby(parent, attr): | |
""" | |
Sort children, copied from https://stackoverflow.com/a/25339725/1126380 | |
""" | |
parent[:] = sorted(parent, key=lambda child: int(child.get(attr))) | |
def _pretty_print(current, parent=None, index=-1, depth=0): | |
""" | |
Makes a compact a pretty one, with line breaks and ident | |
Copied from: https://stackoverflow.com/a/65808327/1126380 | |
""" | |
for i, node in enumerate(current): | |
_pretty_print(node, current, i, depth + 1) | |
if parent is not None: | |
if index == 0: | |
parent.text = '\n' + ('\t' * depth) | |
else: | |
parent[index - 1].tail = '\n' + ('\t' * depth) | |
if index == len(parent) - 1: | |
current.tail = '\n' + ('\t' * (depth - 1)) | |
def _get_or_add_sub_iso(root, field_nro): | |
path = "isomsg[@id='" + field_nro + "']" | |
found = root.find(path) | |
if found is None: | |
sub_path = ET.SubElement(root, 'isomsg') | |
sub_path.set('id', field_nro) | |
return sub_path | |
return found | |
def _add_field(root, field_nro, field_value): | |
""" | |
Add a 'field' to an 'isomsg' | |
""" | |
parts = field_nro.split('.', 2) | |
if len(parts) > 1: | |
curr_root = root | |
curr_root = _get_or_add_sub_iso(root, parts[0]) | |
_add_field(curr_root, parts[1], field_value) | |
return | |
path = "field[@id='" + field_nro + "']" | |
found = root.find(path) | |
if found is not None: | |
root.remove(found) | |
field = ET.SubElement(root, "field") | |
field.set('id', field_nro) | |
if field_value[0:2] == '__': | |
""" Amount """ | |
field.set('type', 'amount') | |
field.set('currency', field_value.split('-')[0][2:]) | |
field.set('value', field_value.split('-')[1].replace('_', '.')) | |
elif field_value[0] == '_': | |
""" Binary """ | |
field.set('value', str(field_value[1:])) | |
field.set('type', 'binary') | |
else: | |
""" normal """ | |
field.set('value', str(field_value)) | |
def _add_param(root, param): | |
""" | |
Parses a param in the form of field:value | |
""" | |
parts = str(param).split(':', 2) | |
_add_field(root, parts[0], parts[1]) | |
def _generate_number(length): | |
""" | |
Return a random number of #length | |
""" | |
letters = string.digits | |
return ''.join(random.choice(letters) for i in range(length)) | |
def _send_and_receive(ip_port, message): | |
""" | |
Send a message, and then wait infinetly for a response | |
""" | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
ip = ip_port.split(':')[0] | |
port = ip_port.split(':')[1] | |
s.connect((ip, int(port))) | |
s.sendall(bytes(message)) | |
s.sendall('\n') # send end of message | |
print("Message send, waiting for reply") | |
data = s.recv(65000) | |
print("Message received:") | |
print(data) | |
except Exception as err: | |
s.close() | |
raise err | |
def _add_date_fields(iso): | |
""" | |
Adds the date fields, 7 and 12 | |
""" | |
now = datetime.now() | |
_add_field(iso, '7', now.strftime('%m%d%H%M%S')) | |
_add_field(iso, '12', now.strftime('%Y%m%d%H%M%S')) | |
if __name__ == "__main__": | |
first_index = 2 | |
param_2 = sys.argv[2] | |
if param_2 is not None and param_2.endswith('.xml'): | |
print("Loading template " + param_2) | |
first_index = 3 | |
iso = ET.parse(param_2).getroot() | |
should_keep = False if len(sys.argv) > 3 and sys.argv[3] == 'keep' else True | |
if should_keep: | |
print("Replacing date and stan/rrn fields") | |
_add_field(iso, '11', _generate_number(12)) | |
_add_field(iso, '37', _generate_number(12)) | |
_add_date_fields(iso) | |
else: | |
print("Keeping all fields from template") | |
first_index = 4 | |
else: | |
iso = ET.Element("isomsg") | |
_add_field(iso, '0', '2100') | |
_add_field(iso, '11', _generate_number(12)) | |
_add_field(iso, '37', _generate_number(12)) | |
_add_field(iso, '41', _generate_number(16)) | |
_add_field(iso, '22', '_10000010020000000100000000000000') | |
_add_field(iso, '113.2', '106') | |
_add_date_fields(iso) | |
print('Replacements: ' + str(sys.argv[first_index:])) | |
for x in sys.argv[first_index:]: | |
_add_param(iso, x) | |
ip_port = sys.argv[1] | |
sortchildrenby(iso, 'id') | |
_pretty_print(iso) | |
print("Sending message: ") | |
print(ET.tostring(iso)) | |
print("To: " + ip_port) | |
_send_and_receive(ip_port, ET.tostring(iso)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment