Optimized Error Management and Retry Strategy with Kafka on SAP Cloud Integration

Estimated read time 20 min read

Target Audience:

If you are using Kafka as the message broker for your inbound integrations with CPI, you are likely aware that Kafka persists messages on its topics with a pre-defined retention period. This blog discusses on how you can leverage these persisted messages to retry failed messages in CPI, without relying on CPI’s internal persistence mechanisms such as JMS queues or the Data store.

Motivation:

1. In Kafka, each message on a Kafka topic is uniquely identified by its message offset and partition.

2. If we store these references in CPI whenever there is a message failure, we can later retry the same failed messages using the stored Kafka Offset and Partition.

 

Solution Approach:

 

The Kafka topic persists messages based on the retention period set on the Topic, with each message uniquely identified by its partition and offset. We will use four flows:

Flow 1: Receives the message from the Kafka topic and sends it to the core flow using ProcessDirect.

Flow 2 (Core Flow): Contains the core logic and sends the message to the target. If Flow 2 execution fails, the message reference (Offset and partition) is passed to Flow3.

*Flow 3 and Flow 4 will be common Flows for all your inbound Kafka integrations (Kafka Sender), these flows will handle the retry and error processing for inbound Kafka integrations.

Flow 3 (Load Global Variable): Stores the offset and partition of failed messages temporarily in the SAP CI Global variable.

Flow 4 (Retry Flow): Runs on a schedule, reads the SAP CI global variable, and fetches failed messages using a Groovy script (using Kafka message offset and partition). The script reads the message directly from the Kafka topic and sends it to the target by redirecting the messages to Integration Flow 2.

IFlow Configuration Details

Flow 1:
This flow is a passthrough where the message coming from Kafka Topic is fetched and sent to Flow 2 (Core Flow) through process direct.

When the message is picked by the flow, two headers (kafka.OFFSET and kafka.PARTITION) are sent along with the message, make sure you configure these in the Allowed Headers in the IFlow Runtime configuration:

Flow 2:
This is the Core Flow; all your message transformation and other conditions would have to applied here.

Define Transformation Logic: Here you can define your transformation logic. In the above flow, I did not include the mapping step. However, if you have a mapping step, the ErrorType should be set to NonDelieveryError before the mapping step and reinitialized to DeliveryError before delivery. This ensures the correct ErrorType is received in the exception sub-flow.Fetch Global Variable: In this step:If this is a first-time failure run it will create a Global Variable with a default value (will later get assigned in Load Global Variable Flow).For the subsequent failure runs it will fetch the existing Global Variable.

3. Set Exception Type and Create Failure Message: I’ve added custom headers, Exception Message, Kafka Offset and Kafka Partition. This will assist developers in locating failed message payloads in the Kafka Topic by utilizing the relevant headers. They can then use this information to identify the root cause of the issue without needing to log the payload for analysis in SAP CI (e.g., through Datastore, MPL logging, etc

 

The below blog provides details on how Custom Headers can be created. These headers will also aid you in searching for the right message during monitoring.

https://community.sap.com/t5/technology-blogs-by-members/dynamically-creating-mpl-custom-headers-without-hardcoding-in-groovy-script/ba-p/13691202

4. Set Details for Message Retry: This is a Request Reply step where we will call the Load Global Variable flow through process Direct. This Flow will load the Global Variable with the reference of the failed Message in Kafka i.e. the Offset and Partition. We will also add a retry counter to track how many times a failed message has been retried.

Flow 3 (Load Global Variable):

This Flow has 3 routes which will be executed conditionally:
Route 1: There is a Delivery Exception and Global Variable Doesn’t Exists:
When the Flow goes into exception the first time the Global Variable doesn’t exist, we will create the Global Variable in this route with the Offset, Partition and Retry Count with an initial value set to 0.
For Ex: If the message from an Offset 51, Partition 1 failed then we will create the Global Variable with the value:

Partition,Offset,RetryCount
1,51,0

Route 2: There is a Delivery Exception and Global Variable Exists:
For the subsequent errors when the flow is executed, we would update the retry count if the message reference (Kafka partition, offset) already existed. If it doesn’t exist, we will create a new line with the refence and retry count.
For Ex: If the same message from Offset 51, Partition 1 failed then we will update the Global Variable with the value:

Partition,Offset,RetryCount
1,51,1

If it was a new failure for a different message from an Offset 52, Partition 2 then we will update the Global Variable with the value:
Partition,Header,RetryCount
1,51,1
2,52,0

Route 3: There is a Non-Delivery Exception
For non-delivery exceptions like Mapping Error or Data issue, we will not update the variable in such cases. The same variable will be overwritten.
However, if a message is retried due to a delivery exception because the receiver was unavailable, and once the receiver becomes available it responds with a failure indicating that further retries would be ineffective, we will remove the reference from the variable in such cases. For example, if a message from Offset 52, Partition 2 is retried and the target system responds with an error such as Application_FieldValidationException on the next retry attempt, continuing to retry the message would be futile. Therefore, we will remove the reference from the variable.

Partition,Header,RetryCount
1,51,1

Flow 4: Retry Flow

The flow is built to facilitate retries for all your Inbound flow. Basically, the Global Variables from all your Kafka Flows will be specified here, the flow will read each of the Global Variables with references to the Failed message (offset, partition) and retry messages accordingly.

 

Set Global Variable List: We will set the names of the Global Variables which should be executed by the retry Flow. For Ex: If we have 4 inbound IFlows from Kafka from different Topics, we will create 4 Global Variable and their names will be set here as comma separated values (This will be externalized).  Set Connectivity Headers:  the connectivity details required for connecting and authenticating the Kafka servers will be provided here.

I have listed the headers that we have externalized: Bootstrap Server (Kafka Server host: Port), Group ID (ID of the consumer Group while initiating connection to Kafka), For TLS authentication we have passed the Key Pair and Root Certification Alias from CPI security Materials,

3. Find Number of Global Variables: here we count the number of Global variables specified in Step 1, this will be used in the Looping Process call condition expression.

 Ex:

4. Trigger processing for Global Variable: This is a looping process call which will process Global Variables one by one.

We will be working on Individual Global Variables in iterations here on (Local Process: Read Individual Global Variable):

5. Fetch Details from Value Mapping: We will maintain the Value Mapping with Key as Global Variable Name; we will get the Topic Name (to fetch the messages) and process Direct URL (URL to send message core Flow) from here. We will use this to process the failed messages.

6. Read Global Variable: We will read the Global Variable. This will give us the list of references of the failed messages (Partition and Offset), and this will be used in the subsequent steps to fetch the failed messages.

Ex: Partition,Header,RetryCount
      1,51,5
      2,52,0


Here the Global Variable has two failed message one from Partition 1 on Offset 51 and another in Partition 2 on Offset 52.

7. Find Number of Messages: We will find the number of failed messages from the Global Variable read in the previous step and use that in the Condition Expression of the Looping process call.
Ex:

8. Trigger Failed Messages:

This is again a looping process call which will trigger the failed messages using the message references in the Global variable one by one. The details will be explained in the subsequent section .

*Please read the section, before moving to the next points

9. Fetch Updated Global Variable and increment loop count: Here we will fetch the Global Variable again which would have the updated retry count for the failed messages in the process call executed in Step 8.

10. Remove successful messages from Global Variable: All the message references that would have been successfully executed in Step 8 would be removed from the Global Variable. Also, the log status for each message processed is captured in this Step.

11. Update modified Global variable: This is a Write Variable step where the Global Variable updated in Step 10 will be overwritten.

In this section I will explain the looping process call executed in Step 8:

Read Individual Kafka Messages:

Let’s consider an Example where we have the below 2 entries in our Global Variable and consider that we have a maximum Retry Limit as 5. Since our Global Variable has two lines the below local integration process will be executed twice:

 

 

 

12. Fetch Retry Count: For the first execution of the Local Integration process. For Ex. For the Global Variable:

Ex: Partition,Header,RetryCount
      1,51,5
      2,52,0

The First reference: 1,51,5 from the Global Variable will be considered. We will extract the retry count from this line. It will be 5 in this case.

Route 1: Retry Limit Exceeded (Step 16 and 17):
If the Retry Limit exceeds the maximum number of retries, we don’t do anything and just increase the loop counter.

Route 2: Retry Limit Not Exceeded:

13. Read Kafka Messages: This is a groovy script which fetches messages from:

1. I have created a Java Archive (JAR) to read a specific message from the Kafka Topic (seek) this is based on the blog below:

https://learn.conduktor.io/kafka/java-consumer-seek-and-assign/

We have used the Kafka-Client library:

https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

2. The Java Archive is then called from the Groovy script Read Kafka Messages. The script will essentially pass all the externalized details from Step 2.  Bootstrap Server (Kafka Server host: Port), Group ID (ID of the consumer Group while initiating connection to Kafka). For TLS authentication we have passed the Key Pair and Root Certification Alias,
This is then be used to fetch authentication details from the Security Artifacts in SAP CI to the function in the Java Archive (JAR).
Please refer the below blog to understand how to fetch security artifacts from SAP CI:

https://community.sap.com/t5/technology-blogs-by-members/sap-cpi-fetch-all-security-artifacts-don-t-read-this/ba-p/13483159

3. We have used TLS Authentication to authenticate our connection to the Kafka Server in the Java code. Please refer the below blog to set your certificates for authentication in the Java code:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key

The below properties can be used to set TLS authentication details:

Properties

Description

ssl.truststore.certificates

Your root CA certificate in PEM String format

ssl.keystore.type

This will be PEM

ssl.keystore.certificate.chain

Public Certificate Chain supplied with the Key in PEM String format

ssl.keystore.key

Your Key for TLS Authentication

14. Call receiver trigger Flow: This will send the message retrieved in the previous step to Flow 2 via process Direct. For Retries. The Status of each message processed is captured against the Offset, Partition as below (Property):

 

This is later used to remove the successfully processed message in Step 10.

15. Increment Loop count: The loop counter is increased, and we move to the next message reference in the Global Variable. The steps are repeated till we have iterated through all the references in the Global Variable.

The below swim lane diagram depicts flow of message through the different flows:

 

 

 

 

 

 

​ Target Audience:If you are using Kafka as the message broker for your inbound integrations with CPI, you are likely aware that Kafka persists messages on its topics with a pre-defined retention period. This blog discusses on how you can leverage these persisted messages to retry failed messages in CPI, without relying on CPI’s internal persistence mechanisms such as JMS queues or the Data store.Motivation:1. In Kafka, each message on a Kafka topic is uniquely identified by its message offset and partition.2. If we store these references in CPI whenever there is a message failure, we can later retry the same failed messages using the stored Kafka Offset and Partition. Solution Approach: The Kafka topic persists messages based on the retention period set on the Topic, with each message uniquely identified by its partition and offset. We will use four flows:Flow 1: Receives the message from the Kafka topic and sends it to the core flow using ProcessDirect.Flow 2 (Core Flow): Contains the core logic and sends the message to the target. If Flow 2 execution fails, the message reference (Offset and partition) is passed to Flow3.*Flow 3 and Flow 4 will be common Flows for all your inbound Kafka integrations (Kafka Sender), these flows will handle the retry and error processing for inbound Kafka integrations.Flow 3 (Load Global Variable): Stores the offset and partition of failed messages temporarily in the SAP CI Global variable.Flow 4 (Retry Flow): Runs on a schedule, reads the SAP CI global variable, and fetches failed messages using a Groovy script (using Kafka message offset and partition). The script reads the message directly from the Kafka topic and sends it to the target by redirecting the messages to Integration Flow 2.IFlow Configuration DetailsFlow 1:This flow is a passthrough where the message coming from Kafka Topic is fetched and sent to Flow 2 (Core Flow) through process direct.When the message is picked by the flow, two headers (kafka.OFFSET and kafka.PARTITION) are sent along with the message, make sure you configure these in the Allowed Headers in the IFlow Runtime configuration:Flow 2:This is the Core Flow; all your message transformation and other conditions would have to applied here.Define Transformation Logic: Here you can define your transformation logic. In the above flow, I did not include the mapping step. However, if you have a mapping step, the ErrorType should be set to NonDelieveryError before the mapping step and reinitialized to DeliveryError before delivery. This ensures the correct ErrorType is received in the exception sub-flow.Fetch Global Variable: In this step:If this is a first-time failure run it will create a Global Variable with a default value (will later get assigned in Load Global Variable Flow).For the subsequent failure runs it will fetch the existing Global Variable.3. Set Exception Type and Create Failure Message: I’ve added custom headers, Exception Message, Kafka Offset and Kafka Partition. This will assist developers in locating failed message payloads in the Kafka Topic by utilizing the relevant headers. They can then use this information to identify the root cause of the issue without needing to log the payload for analysis in SAP CI (e.g., through Datastore, MPL logging, etc The below blog provides details on how Custom Headers can be created. These headers will also aid you in searching for the right message during monitoring.https://community.sap.com/t5/technology-blogs-by-members/dynamically-creating-mpl-custom-headers-without-hardcoding-in-groovy-script/ba-p/136912024. Set Details for Message Retry: This is a Request Reply step where we will call the Load Global Variable flow through process Direct. This Flow will load the Global Variable with the reference of the failed Message in Kafka i.e. the Offset and Partition. We will also add a retry counter to track how many times a failed message has been retried.Flow 3 (Load Global Variable):This Flow has 3 routes which will be executed conditionally:Route 1: There is a Delivery Exception and Global Variable Doesn’t Exists:When the Flow goes into exception the first time the Global Variable doesn’t exist, we will create the Global Variable in this route with the Offset, Partition and Retry Count with an initial value set to 0.For Ex: If the message from an Offset 51, Partition 1 failed then we will create the Global Variable with the value:Partition,Offset,RetryCount1,51,0Route 2: There is a Delivery Exception and Global Variable Exists:For the subsequent errors when the flow is executed, we would update the retry count if the message reference (Kafka partition, offset) already existed. If it doesn’t exist, we will create a new line with the refence and retry count.For Ex: If the same message from Offset 51, Partition 1 failed then we will update the Global Variable with the value:Partition,Offset,RetryCount1,51,1If it was a new failure for a different message from an Offset 52, Partition 2 then we will update the Global Variable with the value:Partition,Header,RetryCount1,51,12,52,0Route 3: There is a Non-Delivery ExceptionFor non-delivery exceptions like Mapping Error or Data issue, we will not update the variable in such cases. The same variable will be overwritten.However, if a message is retried due to a delivery exception because the receiver was unavailable, and once the receiver becomes available it responds with a failure indicating that further retries would be ineffective, we will remove the reference from the variable in such cases. For example, if a message from Offset 52, Partition 2 is retried and the target system responds with an error such as Application_FieldValidationException on the next retry attempt, continuing to retry the message would be futile. Therefore, we will remove the reference from the variable.Partition,Header,RetryCount1,51,1Flow 4: Retry FlowThe flow is built to facilitate retries for all your Inbound flow. Basically, the Global Variables from all your Kafka Flows will be specified here, the flow will read each of the Global Variables with references to the Failed message (offset, partition) and retry messages accordingly. Set Global Variable List: We will set the names of the Global Variables which should be executed by the retry Flow. For Ex: If we have 4 inbound IFlows from Kafka from different Topics, we will create 4 Global Variable and their names will be set here as comma separated values (This will be externalized).  Set Connectivity Headers:  the connectivity details required for connecting and authenticating the Kafka servers will be provided here.I have listed the headers that we have externalized: Bootstrap Server (Kafka Server host: Port), Group ID (ID of the consumer Group while initiating connection to Kafka), For TLS authentication we have passed the Key Pair and Root Certification Alias from CPI security Materials,3. Find Number of Global Variables: here we count the number of Global variables specified in Step 1, this will be used in the Looping Process call condition expression. Ex:4. Trigger processing for Global Variable: This is a looping process call which will process Global Variables one by one.We will be working on Individual Global Variables in iterations here on (Local Process: Read Individual Global Variable):5. Fetch Details from Value Mapping: We will maintain the Value Mapping with Key as Global Variable Name; we will get the Topic Name (to fetch the messages) and process Direct URL (URL to send message core Flow) from here. We will use this to process the failed messages.6. Read Global Variable: We will read the Global Variable. This will give us the list of references of the failed messages (Partition and Offset), and this will be used in the subsequent steps to fetch the failed messages.Ex: Partition,Header,RetryCount      1,51,5      2,52,0Here the Global Variable has two failed message one from Partition 1 on Offset 51 and another in Partition 2 on Offset 52.7. Find Number of Messages: We will find the number of failed messages from the Global Variable read in the previous step and use that in the Condition Expression of the Looping process call.Ex:8. Trigger Failed Messages:This is again a looping process call which will trigger the failed messages using the message references in the Global variable one by one. The details will be explained in the subsequent section .*Please read the section, before moving to the next points9. Fetch Updated Global Variable and increment loop count: Here we will fetch the Global Variable again which would have the updated retry count for the failed messages in the process call executed in Step 8.10. Remove successful messages from Global Variable: All the message references that would have been successfully executed in Step 8 would be removed from the Global Variable. Also, the log status for each message processed is captured in this Step.11. Update modified Global variable: This is a Write Variable step where the Global Variable updated in Step 10 will be overwritten.In this section I will explain the looping process call executed in Step 8:Read Individual Kafka Messages:Let’s consider an Example where we have the below 2 entries in our Global Variable and consider that we have a maximum Retry Limit as 5. Since our Global Variable has two lines the below local integration process will be executed twice:   12. Fetch Retry Count: For the first execution of the Local Integration process. For Ex. For the Global Variable:Ex: Partition,Header,RetryCount      1,51,5      2,52,0The First reference: 1,51,5 from the Global Variable will be considered. We will extract the retry count from this line. It will be 5 in this case.Route 1: Retry Limit Exceeded (Step 16 and 17):If the Retry Limit exceeds the maximum number of retries, we don’t do anything and just increase the loop counter.Route 2: Retry Limit Not Exceeded:13. Read Kafka Messages: This is a groovy script which fetches messages from:1. I have created a Java Archive (JAR) to read a specific message from the Kafka Topic (seek) this is based on the blog below:https://learn.conduktor.io/kafka/java-consumer-seek-and-assign/We have used the Kafka-Client library:https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients2. The Java Archive is then called from the Groovy script Read Kafka Messages. The script will essentially pass all the externalized details from Step 2.  Bootstrap Server (Kafka Server host: Port), Group ID (ID of the consumer Group while initiating connection to Kafka). For TLS authentication we have passed the Key Pair and Root Certification Alias,This is then be used to fetch authentication details from the Security Artifacts in SAP CI to the function in the Java Archive (JAR).Please refer the below blog to understand how to fetch security artifacts from SAP CI:https://community.sap.com/t5/technology-blogs-by-members/sap-cpi-fetch-all-security-artifacts-don-t-read-this/ba-p/134831593. We have used TLS Authentication to authenticate our connection to the Kafka Server in the Java code. Please refer the below blog to set your certificates for authentication in the Java code:https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+keyThe below properties can be used to set TLS authentication details:PropertiesDescriptionssl.truststore.certificatesYour root CA certificate in PEM String formatssl.keystore.typeThis will be PEMssl.keystore.certificate.chainPublic Certificate Chain supplied with the Key in PEM String formatssl.keystore.keyYour Key for TLS Authentication14. Call receiver trigger Flow: This will send the message retrieved in the previous step to Flow 2 via process Direct. For Retries. The Status of each message processed is captured against the Offset, Partition as below (Property): This is later used to remove the successfully processed message in Step 10.15. Increment Loop count: The loop counter is increased, and we move to the next message reference in the Global Variable. The steps are repeated till we have iterated through all the references in the Global Variable.The below swim lane diagram depicts flow of message through the different flows:        Read More Technology Blogs by Members articles 

#SAP

#SAPTechnologyblog

You May Also Like

More From Author