Created
April 8, 2024 13:43
-
-
Save dineshdharme/553b01316e1df6609ef3f5c9280a037e to your computer and use it in GitHub Desktop.
Dynamic Json Formatting in Pyspark using schema_of_json function.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
https://stackoverflow.com/questions/78290764/flatten-dynamic-json-payload-string-using-pyspark/ | |
There is a nifty method `schema_of_json` in pyspark which derives the schema of json string and applies to the whole column. | |
So the following method to handly dynamic json payloads is as follows: | |
- First take `json_payload` of first row of dataframe | |
- Create a schema of the json_payload using `schema_of_json` | |
- Then if all rows are correctly parsed, there would be no `null` value. | |
If there is a null value, it means those rows haven't been parsed correctly. | |
So we will conver those rows back to string. Then using `.contains` we will | |
check if `null` string is present in the string column. | |
- This way we will have two dataframes. One dataframe will have correctly | |
parsed values. Another dataframe will have incorrectly parsed values. | |
- Now we will repeat the process over incorrectly parsed dataframe. | |
Here's a sample script with custom data. | |
from pyspark.sql.functions import * | |
from pyspark.sql import SparkSession, Row | |
from pyspark.sql.types import StringType, StructType, StructField | |
spark = SparkSession.builder \ | |
.appName("JsonPayloadDataFrame") \ | |
.getOrCreate() | |
schema = StructType([ | |
StructField("json_payload", StringType(), True) | |
]) | |
data = [ | |
Row(json_payload='{"id": 1, "name": "Alice"}'), | |
Row(json_payload='{"id": 2, "tags": ["spark", "python"], "active": true}'), | |
Row(json_payload='{"id": 3, "details": {"age": 30, "location": "New York"}}'), | |
Row(json_payload='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}') | |
] | |
df = spark.createDataFrame(data, schema=schema) | |
df.printSchema() | |
df.show(n=30, truncate=False) | |
wanted_df_list = [] | |
df_looped = df | |
while not df_looped.isEmpty(): | |
print("USING SCHEMA FROM ONE ROW") | |
current_string_repr = df_looped.limit(1).select(col("json_payload")).rdd.map(lambda x: x["json_payload"]).collect()[0] | |
print(f"{current_string_repr=}") | |
df_parsed = df_looped.withColumn("parsed_struct", from_json(col("json_payload"), schema_of_json(current_string_repr))) | |
df_parsed.show(n=30, truncate=False) | |
df_parsed.printSchema() | |
df_null_check = df_parsed.withColumn("null_present_str", col("parsed_struct").cast(StringType())) | |
df_null_check.show(n=30, truncate=False) | |
df_null_check.printSchema() | |
df_null_present = df_null_check.withColumn("null_present_bool", col("null_present_str").contains("null")) | |
df_null_present.show(n=30, truncate=False) | |
df_null_present.printSchema() | |
df_partial_correct = df_null_present.filter(col("null_present_bool") == False) | |
df_partial_incorrect = df_null_present.filter(col("null_present_bool") == True) | |
df_partial_correct.cache().show(n=30, truncate=False) | |
df_partial_correct.printSchema() | |
df_partial_incorrect.cache().show(n=30, truncate=False) | |
df_partial_incorrect.printSchema() | |
wanted_df_list.append(df_partial_correct) | |
df_looped = df_partial_incorrect | |
for df_ele in wanted_df_list: | |
df_ele.show(n=30, truncate=False) | |
Final Output : | |
+--------------------------+-------------+----------------+-----------------+ | |
|json_payload |parsed_struct|null_present_str|null_present_bool| | |
+--------------------------+-------------+----------------+-----------------+ | |
|{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false | | |
+--------------------------+-------------+----------------+-----------------+ | |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false | | |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
+---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false | | |
+---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false | | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
Full Output : | |
USING SCHEMA FROM ONE ROW | |
current_string_repr='{"id": 1, "name": "Alice"}' | |
+-------------------------------------------------------------------------------------------------------------+-------------+ | |
|json_payload |parsed_struct| | |
+-------------------------------------------------------------------------------------------------------------+-------------+ | |
|{"id": 1, "name": "Alice"} |{1, Alice} | | |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} | | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} | | |
+-------------------------------------------------------------------------------------------------------------+-------------+ | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+ | |
|json_payload |parsed_struct|null_present_str| | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+ | |
|{"id": 1, "name": "Alice"} |{1, Alice} |{1, Alice} | | |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} | | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} | | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+ | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
|json_payload |parsed_struct|null_present_str|null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
|{"id": 1, "name": "Alice"} |{1, Alice} |{1, Alice} |false | | |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |true | | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |true | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |true | | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
+--------------------------+-------------+----------------+-----------------+ | |
|json_payload |parsed_struct|null_present_str|null_present_bool| | |
+--------------------------+-------------+----------------+-----------------+ | |
|{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false | | |
+--------------------------+-------------+----------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
|json_payload |parsed_struct|null_present_str|null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |true | | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |true | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |true | | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
USING SCHEMA FROM ONE ROW | |
current_string_repr='{"id": 2, "tags": ["spark", "python"], "active": true}' | |
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str|null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+ | |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{2, null} |true | | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{3, null} |true | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{4, null} |true | | |
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{true, 2, [spark, python]}|true | | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{null, 3, null} |true | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{null, 4, null} |true | | |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{true, 2, [spark, python]}|false | | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{null, 3, null} |true | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{null, 4, null} |true | | |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false | | |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str|null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+ | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL}|{null, 3, null} |true | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL}|{null, 4, null} |true | | |
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+ | |
USING SCHEMA FROM ONE ROW | |
current_string_repr='{"id": 3, "details": {"age": 30, "location": "New York"}}' | |
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str|null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+ | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{null, 3, null} |true | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4, null} |true | | |
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{{30, New York}, 3}|true | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true | | |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{{30, New York}, 3}|false | | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true | | |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+ | |
+---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false | | |
+---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
|json_payload |parsed_struct|null_present_str|null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true | | |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+ | |
USING SCHEMA FROM ONE ROW | |
current_string_repr='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}' | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str|null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+ | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{null, 4} |true | | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|true | | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false | | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false | | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
+------------+-------------+----------------+-----------------+ | |
|json_payload|parsed_struct|null_present_str|null_present_bool| | |
+------------+-------------+----------------+-----------------+ | |
+------------+-------------+----------------+-----------------+ | |
+--------------------------+-------------+----------------+-----------------+ | |
|json_payload |parsed_struct|null_present_str|null_present_bool| | |
+--------------------------+-------------+----------------+-----------------+ | |
|{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false | | |
+--------------------------+-------------+----------------+-----------------+ | |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false | | |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+ | |
+---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false | | |
+---------------------------------------------------------+-------------------+-------------------+-----------------+ | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|json_payload |parsed_struct |null_present_str |null_present_bool| | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false | | |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+ | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment