cancel
Showing results for 
Search instead for 
Did you mean: 

JSON Normalize fails for Nested Json array

rajeshps
Participant
0 Kudos

Hello Team,

For below nested json, the array is not getting normalized using

import pandas as pd
import json

def on_input(data):

df = pd.read_json(data)df = pd.json_normalize(json.loads(data))

api.send("output", df.to_json(orient="records"))

api.set_port_callback("input1", on_input)

I also tried max_level and then record_path but no luck.

Flow: Kafka producer(json string) -> avro decoder -> Python3 -> Hana Client

Is there any way to normalise the json to check iF Then else condition & then updated suffix for column field and eventually updated to DB with no duplicates. Here header.poNumber and data.id are primary keys/unique identifiers.

Example:

 [
   {
      "header.poNumber":"9023496",
      "data.id":"10013459",
      "message.source":[
         {
            "createSource":null,
            "timeStamp":"2023-05-12T19:30:00.0000000+02:00",
            "type":"full"
         },
         {
            "createSource":"testdev",
            "timeStamp":"2023-05-11T19:30:00.0000000+02:00",
            "type":"ordersEstimated"
         },
         {
            "createSource": "event",
            "timeStamp":"2023-05-12T12:30:00.0000000+01:00",
            "type":"ordersCreated"
         }
      ],
      "message.time":[
         {
            "timeSource":"UTC",
            "typeId":"full"
         },
         {
            "timeSource":"IST",
            "typeId":"actual"
         }
      ]
   }
]

Expected output:

rajeshps
Participant
0 Kudos

Please check if this looks good vitaliy.rudnytskiy

import pandas as pd
import json

def on_input(data):
  data_as_json=json.loads(data)

  data_as_json_filtered=list()
  for record in data_as_json:
     record_filtered=dict()
     for key in list(record.keys()):
         if key=='message.time':
             record_filtered.update({key: item for item in [time for time in record[key]] 
                      if item['typeId']=='actual'})
         elif key=='message.source':
             record_filtered.update({key: item for item in [source for source in record[key]] 
                      if item['type']=='actual'})
         else:
             record_filtered.update({key: record[key]})
     data_as_json_filtered.append(record_filtered)
  display(data_as_json_filtered)

  df_source=pd.json_normalize(data_as_json_filtered)

  api.send("output", df_source.to_json(orient="records"))

api.set_port_callback("input1", on_input)

Thanks you very much!

rajeshps
Participant
0 Kudos
import pandas as pd
import json

def on_input(data):
    data_as_json = json.loads(data)
    data_as_json_filtered=list()
    for record in data_as_json:
        record_filtered = dict()
        for key in list(record.keys()):
            if key=='message.time':
                record_filtered.update({key: item for item in [time for time in record[key]] 
                        if item['typeId']=='actual'})
            elif key=='message.source':
                record_filtered.update({key: item for item in [source for source in record[key]] 
                          if item['type']=='actual'})
            else:
                record_filtered.update({key: record[key]})
        data_as_json_filtered.append(record_filtered)
    df_source=pd.json_normalize(data_as_json_filtered)
    api.send("output", df_source.to_json(orient="records"))
api.set_port_callback("input1", on_input)

vitaliy.rudnytskiy

It is failing at for key in list(record.keys()) with below error


Group messages:

Group: default; Messages: Graph failure: operator.com.sap.system.python3Operator:python3operator1: Error while executing callback registered on port(s) ['input1']: 'str' object has no attribute 'keys' [line 10]

Process(es) terminated with error(s). restartOnFailure==false

error123.png

input is json string and outpus is basic string in Python3 operator

Please require your valuable inputs and support. Thank you vitaliy.rudnytskiy


Vitaliy-R
Developer Advocate
Developer Advocate

Hi Rajesh,

Do you have an exact value of the `data` that is sent to the input of the `on_input(data)`?

It might be different from the one you shared in the example.

Regards,
-Witalij

rajeshps
Participant
0 Kudos

vitaliy.rudnytskiy

Below is the data sent to python 3 operator input 1 as json string. This needs filtering and then updated to database table with no duplicate columns.

poNumber and id are primary keys/uinque identifiers at each row. Thankyou!

{
   "header":{
      "poNumber":"9023496"
   },
   "data":{
      "id":"10013459"
   },
   "message":{
      "source":[
         {
            "createSource":null,
            "timeStamp":"2023-05-12T19:30:00.0000000+02:00",
            "type":"full"
         },
         {
            "createSource":"testdev",
            "timeStamp":"2023-05-11T19:30:00.0000000+02:00",
            "type":"ordersEstimated"
         },
         {
            "createSource":"event",
            "timeStamp":"2023-05-12T12:30:00.0000000+01:00",
            "type":"ordersCreated"
         }
      ],
      "time":[
         {
            "timeSource":"UTC",
            "typeId":"full"
         },
         {
            "timeSource":"IST",
            "typeId":"actual"
         }
      ]
   }
}
Vitaliy-R
Developer Advocate
Developer Advocate

There two JSON documents have different formatting:

1/ The first one was an array starting with `[`, while the second one is not as it is a dictionary starting with `{`

2/ The first one had `"message"` keys already flattened, while the second has them nested.

So, following the same idea from my previous code snippet, you need to re-write it to properly process the structure you are getting on the input.

Regards,
-Witalij

rajeshps
Participant
0 Kudos

vitaliy.rudnytskiy

Could you please help me with he snippet. I tried and getting below error

Failing at for key in list(record.keys()) with below error

Group: default; Messages: Graph failure: operator.com.sap.system.python3Operator:python3operator1: Error while executing callback registered on port(s) ['input1']: 'str' object has no attribute 'keys' [line 10]

Thankyou much in advance !

Vitaliy-R
Developer Advocate
Developer Advocate
0 Kudos

As I wrote above: you do not have an array of records in the later JSON example, compared to the JSON in the original question. Therefore you do not loop through records.

In a sample snippet (but you need to review it and -- first of all -- fix the exact schema JSON payload is coming to your operator to make sure it is not failing):

record_filtered=dict()
record = data_as_json
for key1 in list(record.keys()):
if key1=='message':
msg_filtered=dict()
for key2 in record[key1].keys():
if key2=='time':
msg_filtered.update({key2: item
for item in [time for time in record[key1][key2]]
if item['typeId']=='actual'}
)
elif key2=='source':
msg_filtered.update({key2: item
for item in [source for source in record[key1][key2]]
if item['type']=='actual'}
)
else: pass
record_filtered.update({key1: msg_filtered})
else:
record_filtered.update({key1: record[key1]})
data_as_json_filtered=record_filtered
display(data_as_json_filtered)

Regards,
-Witalij

Accepted Solutions (1)

Accepted Solutions (1)

Vitaliy-R
Developer Advocate
Developer Advocate
0 Kudos

If you need to keep 'actual' only, then one of the approaches might be filtering the JSON payload to keep only `actual` parts.

Then it is simple enough to flatten it in Pandas DataFrame.

Something like...

data_as_json=json.loads(data)
data_as_json_filtered=list()
for record in data_as_json:
record_filtered=dict()
for key in list(record.keys()):
if key=='message.time':
record_filtered.update({key: item for item in [time for time in record[key]]
if item['typeId']=='actual'})
elif key=='message.source':
record_filtered.update({key: item for item in [source for source in record[key]]
if item['type']=='actual'})
else:
record_filtered.update({key: record[key]})
data_as_json_filtered.append(record_filtered)
display(data_as_json_filtered)
df_source=pd.json_normalize(data_as_json_filtered)

...gives me

PS. Please mark the answer as Accepted or Helpful to help the rest of the community. Thank you.

rajeshps
Participant
0 Kudos

Thanks alot for your kind reply vitaliy.rudnytskiy this really helps and thanks for valuable inputs : )

As mentioned above I actually wanted to update like below

For message.source IF type = 'ordersEstimated' THEN update message.source.timeStamp1 and message.source.type1

IF type = 'ordersCreated' THEN update message.source.timeStamp2 and message.source.type2 ~ this is needed because no duplicate column names in hana db.

vitaliy.rudnytskiy

Is this possible 😕

For message.time IF typeId = 'actual' THEN update message.time.timeStamp and message.time.typeId

Vitaliy-R
Developer Advocate
Developer Advocate
0 Kudos

Can you please accept the answer, if it helped, and mark your question as answered? Thank you. -Witalij

Answers (1)

Answers (1)

Vitaliy-R
Developer Advocate
Developer Advocate
0 Kudos

I do not think you can simply normalize this record because it contains two arrays of dictionaries: `"message.time"` and `"message.source"`.

From the "expected output", I understand that you want to join values from `"message.time"` to values from `"message.source"` on the `type` attribute to create records.

So, I think you need to flatten two arrays into separate Pandas DataFrames, and then merge them on keys. Something like:

data_as_json=json.loads(data)<br>df_source=pd.json_normalize(data_as_json, record_path='message.source', meta=['header.poNumber', 'data.id'])
df_time=pd.json_normalize(data_as_json, record_path='message.time', meta=['header.poNumber', 'data.id'])
df=df_source.merge(df_time, left_on=['header.poNumber','data.id','type'], right_on=['header.poNumber','data.id','typeId'])

Here are my tests:

Regards,
-Vitaliy

rajeshps
Participant
0 Kudos

Thanks alot for your kind reply vitaliy.rudnytskiy

. Output i'm expecting only one row as mentioned above.

While normalizing consider message.source IF type = 'ordersEstimated' THEN update message.source.timeStamp1 and message.source.type1

IF type = 'ordersCreated' THEN update message.source.timeStamp2 and message.source.type2

similarly for message.time IF typeId = 'actual' THEN update message.time.timeStamp and message.time.typeId

<strong>header.poNumber', </strong><strong>'data.id'</strong> are primary keys so no duplicates when updating to HANA Database. Above is one event (1 record)
Flow looks like Kafka producer -> avro decoder -> python3 -> hana client

vitaliy.rudnytskiy