Supply Chain Management Blogs by SAP
Expand your SAP SCM knowledge and stay informed about supply chain management technology and solutions with blog posts by SAP. Follow and stay connected.
cancel
Showing results for 
Search instead for 
Did you mean: 
Saif
Product and Topic Expert
Product and Topic Expert

Consuming Kafka messages – Asset Intelligence Network


Asset Intelligence Network (AIN) is part of Intelligent Asset Management portfolio alongside Asset Strategy and Performance Management (ASPM) and Predictive Asset Insights (PAI) with a common core component, Asset Central Foundation (ACF).


Intelligent Asset Management Portfolio


Asset Intelligence Network triggers Kafka messages for various objects and for a growing list of scenarios. These messages open an avenue for customers to be notified of changes happening to the objects in AIN so that they can use it in various integration scenarios.

 

Apache Kafka

Apache Kafka is an open-source distributed event streaming platform used for data pipelines, streaming analytics, data integration and various other use cases. More details can be found here.

AIN and Apache Kafka

AIN utilises Apache Kafka as a service on Business Technology Platform (BTP) and provides a mechanism through which a Kafka cluster and relevant topics can be shared with its customers.

Customers can develop their own Kafka consumers to consume messages from the topics that are shared. In order to utilise the feature, customers need to place a request for the cluster / topics to be shared as described here.


High Level Architecture


 

How to Consume?


Once advertised, the details for the cluster and topics can be checked as follows. Use the REST API to fetch the details:

API: https://kafka-marketplace.cf.<landscape>.hana.ondemand.com/v1/marketplace?org=<cloud foundry org name>

Method: GET

Response:


Advertisement API response


Note: Pass the bearer token to the API. To generate, use cf oauth-token command.

  • Add/Assign necessary entitlements to your Subaccount with "reference" plan.

  • In your cloud foundry space create a service instance for the advertised cluster using the information from the above API response. Use Cloud foundry CLI.


cf create-service kafka reference <instance name> -c 
'{"advertisement":"<advertisement id from the API response>"}'

How to write a consumer?


JAVA:

The following example is written as a reactive vertx application to consume Kafka messages. However, there are many ways in which it can be written.
public class MainVerticle extends AbstractVerticle {

@Override
public void start(Promise<Void> startPromise) throws Exception {

String consumerGroup = "DemoGroup";
String topicName = "<subaccountid>.com.sap.dsc.ac.equipment";
//refer latest documentation for the updated topic names format or refer the response of the marketplace API
Properties props = new Properties();
props.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000");
props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");

// use consumer for interacting with Apache Kafka
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx,
KafkaRulesConsumerFactory.kafkaConsumerCreate(consumerGroup, props));
consumer.handler(record -> {
String rec = record.value().toString();
byte[] decoded = Base64.getMimeDecoder().decode(rec.getBytes());
System.out.println("Processing key=" + record.key() + ",value=" + rec +
",partition=" + record.partition() + ",offset=" + record.offset());
try {
String result = KafkaUtils.readAvroMessageWithSchema(decoded);
System.out.println(result);
}
catch(Exception e)
{
System.out.println(e);
}

});

// or just subscribe to a single topic
consumer
.subscribe(topicName)
.onSuccess(v ->
System.out.println("subscribed")
).onFailure(cause ->
System.out.println("Could not subscribe " + cause.getMessage())
);
}
}

In order to connect to the Kafka cluster, SASL_SSL (PLAIN) protocol is used. The root certificate is downloaded using the APIs listed in the service key of the instance. A keystore is created and is passed in the properties used to connect to the Kafka instance.

How to decode the message?

The messages from ACF use Avro based schema and are base64 encoded.
public static String readAvroMessageWithSchema(byte[] message) throws IOException {
StringBuilder queueMessage = new StringBuilder();
SeekableByteArrayInput input = new SeekableByteArrayInput(message);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader dataFileReader = new DataFileReader(input, reader);
GenericRecord data = (GenericRecord) dataFileReader.next();

return data.toString();
}

 

Some key dependencies used:
"io.vertx:vertx-kafka-client:4.0.3"
"io.projectreactor.kafka:reactor-kafka:1.3.3"
"org.apache.avro:avro:1.10.2"


Consumer in Java


 

NodeJs:
async function kConnect() {
let tokenVal;
tokenVal = await config().token();

const kafka = new Kafka({
clientId: 'node-kconsumer',
brokers: credentials.sslBroker.split(','),
ssl: {
rejectUnauthorized: false,
ca: [fs.readFileSync('./kConsumer/ssl/kafka.crt', 'utf-8')],
},
sasl: {
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username: credentials.username,
password: tokenVal
},
});

const consumer = kafka.consumer({groupId: 'my-group1'})
await consumer.connect()
await consumer.subscribe({topic: '<subaccountid>.com.sap.dsc.ac.equipment',fromBeginning: true})
//Refer documentation or the advertisement API response for the topic name format

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value,
headers: message.headers,
})

console.log("JSON "+message.value.toJSON());
console.log("STRR "+message.value.toString());
avro().avroConvert(message.value.toString());
},
})

}


In this example as well SASL_SSL protocol is used to connect to the Kafka instance. The root certificate can also be generated using the APIs exposed in the service key of the instance.
curl https://kafka-service-broker.cf.<landscape>.
hana.ondemand.com/certs/rootCA/current > kafka.crt

The node example uses the following key dependencies:
const avro = require('avro-js');const {Kafka} = require("kafkajs");


Consumer in Nodejs


Python:
def main():
env = AppEnv()
kafka_env = env.get_service(label='kafka')
token = getToken(kafka_env)
consumer = KafkaConsumer(<subaccountid>.com.sap.dsc.ac.equipment',
bootstrap_servers=kafka_env.credentials[u'cluster'][u'brokers'].split(','),
auto_offset_reset='earliest',
enable_auto_commit=True,
client_id='py-kconsumer',
group_id='Pyconn',
security_protocol='SASL_SSL',
ssl_cafile='./ssl/kafka.crt',
sasl_mechanism='PLAIN',
sasl_plain_username=kafka_env.credentials[u'username'],
sasl_plain_password=token,
api_version=(2, 5, 1)
)
print("subscribed") for message in consumer:
print(message)


The message can be converted using avro python library as:
reader = DataFileReader(open("./message.avro", "rb"), DatumReader())
for data in reader:
print(data)
reader.close()

Key dependencies used:
from kafka import KafkaConsumer
import avro.schema
from avro.datafile import DataFileReader,
from avro.io import DatumReader

In addition, customers can also leverage Kafka Adapter provided by Cloud Platform Integration.

Conclusion


Kafka event streaming is quite powerful in terms of getting real-time or near real-time updates for your data. AIN utilises this for providing a pathway for customers who wish to integrate with myriad set of disparate systems and helps provide consistency and efficient synchronisation of the data across distributed systems. Kafka in conjunction with Apace Avro makes integration scenarios more stable, providing better compatibility support across releases and upgrades.

See also:

Apache Kafka

Apache Avro

Events in AIN

SAP Cloud Platform Integration Kafka Adapter blog
3 Comments
rutika_bodas
Participant
Nice write-up Saif. Well explained!
lochner_louw
Participant
0 Kudos
Pretty cool and will need to try this.

 

Question on the messages from Kafka.

From reading the External Events PDF and asking around I heard that the messages are supposed to be in the CloudEvents spec. I assume the content of the AVRO payload is the same CloudEvents spec?

 

Thanks

Lochner

 
Saif
Product and Topic Expert
Product and Topic Expert
lochner.louw2 Yes, the Avro schema is aligned with the CloudEvents spec.