on 05-18-2023 10:31 AM
Hello Team,
For below nested json, the array is not getting normalized using
import pandas as pd
import json
def on_input(data):
df = pd.read_json(data)df = pd.json_normalize(json.loads(data))
api.send("output", df.to_json(orient="records"))
api.set_port_callback("input1", on_input)
I also tried max_level and then record_path but no luck.
Flow: Kafka producer(json string) -> avro decoder -> Python3 -> Hana Client
Is there any way to normalise the json to check iF Then else condition & then updated suffix for column field and eventually updated to DB with no duplicates. Here header.poNumber and data.id are primary keys/unique identifiers.
Example:
[
{
"header.poNumber":"9023496",
"data.id":"10013459",
"message.source":[
{
"createSource":null,
"timeStamp":"2023-05-12T19:30:00.0000000+02:00",
"type":"full"
},
{
"createSource":"testdev",
"timeStamp":"2023-05-11T19:30:00.0000000+02:00",
"type":"ordersEstimated"
},
{
"createSource": "event",
"timeStamp":"2023-05-12T12:30:00.0000000+01:00",
"type":"ordersCreated"
}
],
"message.time":[
{
"timeSource":"UTC",
"typeId":"full"
},
{
"timeSource":"IST",
"typeId":"actual"
}
]
}
]
Expected output:
If you need to keep 'actual' only, then one of the approaches might be filtering the JSON payload to keep only `actual` parts.
Then it is simple enough to flatten it in Pandas DataFrame.
Something like...
data_as_json=json.loads(data)
data_as_json_filtered=list()
for record in data_as_json:
record_filtered=dict()
for key in list(record.keys()):
if key=='message.time':
record_filtered.update({key: item for item in [time for time in record[key]]
if item['typeId']=='actual'})
elif key=='message.source':
record_filtered.update({key: item for item in [source for source in record[key]]
if item['type']=='actual'})
else:
record_filtered.update({key: record[key]})
data_as_json_filtered.append(record_filtered)
display(data_as_json_filtered)
df_source=pd.json_normalize(data_as_json_filtered)
...gives me
PS. Please mark the answer as Accepted or Helpful to help the rest of the community. Thank you.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
Thanks alot for your kind reply vitaliy.rudnytskiy this really helps and thanks for valuable inputs : )
As mentioned above I actually wanted to update like below
For message.source IF type = 'ordersEstimated' THEN update message.source.timeStamp1 and message.source.type1
IF type = 'ordersCreated' THEN update message.source.timeStamp2 and message.source.type2 ~ this is needed because no duplicate column names in hana db.
vitaliy.rudnytskiy Is this possible 😕
For message.time IF typeId = 'actual' THEN update message.time.timeStamp and message.time.typeId
I do not think you can simply normalize this record because it contains two arrays of dictionaries: `"message.time"` and `"message.source"`.
From the "expected output", I understand that you want to join values from `"message.time"` to values from `"message.source"` on the `type` attribute to create records.
So, I think you need to flatten two arrays into separate Pandas DataFrames, and then merge them on keys. Something like:
data_as_json=json.loads(data)<br>df_source=pd.json_normalize(data_as_json, record_path='message.source', meta=['header.poNumber', 'data.id'])
df_time=pd.json_normalize(data_as_json, record_path='message.time', meta=['header.poNumber', 'data.id'])
df=df_source.merge(df_time, left_on=['header.poNumber','data.id','type'], right_on=['header.poNumber','data.id','typeId'])
Here are my tests:
Regards,
-Vitaliy
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
Thanks alot for your kind reply vitaliy.rudnytskiy
. Output i'm expecting only one row as mentioned above.
While normalizing consider message.source IF type = 'ordersEstimated' THEN update message.source.timeStamp1 and message.source.type1
IF type = 'ordersCreated' THEN update message.source.timeStamp2 and message.source.type2
similarly for message.time IF typeId = 'actual' THEN update message.time.timeStamp and message.time.typeId
<strong>header.poNumber', </strong><strong>'data.id'</strong> are primary keys so no duplicates when updating to HANA Database. Above is one event (1 record)
Flow looks like Kafka producer -> avro decoder -> python3 -> hana client
vitaliy.rudnytskiy
User | Count |
---|---|
69 | |
8 | |
8 | |
6 | |
6 | |
6 | |
5 | |
5 | |
5 | |
4 |
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.