SAP AI Core is the platform for managing AI workloads on the Business Technology Platform (BTP). It is often used to run machine learning workloads and serves as a backend service for applications, such as those built with CAP. Consolidating the logs from AI workloads with the logs of the applications that use them provides an advantage for operations and debugging. In this blog, we’ll show how to send logs from AI Core deployments to the SAP Cloud Logging service for a unified view.
Background on SAP Cloud Logging:
SAP Cloud Logging is a service on the SAP Business Technology Platform (BTP) that enables the collection, storage, and analysis of observability data, such as logs, metrics, and traces, from environments like Cloud Foundry, Kyma, Kubernetes, and others. By consolidating this data, it provides a unified view that simplifies monitoring and troubleshooting across your applications and services.
One of its key strengths is the seamless integration with Cloud Foundry and Kyma. When the Cloud Logging service is bound to a Cloud Foundry application instance, it automatically collects the application’s logs, making the setup process straightforward and reducing manual configuration.
For me the most important advantage is its user interface, based on the open-source OpenSearch project. It provides powerful tools for searching, filtering, and analyzing logs.
Ingesting Data to SAP Cloud Logging:
SAP Cloud Logging offers two primary methods for ingesting observability data:
OpenTelemetry API: SAP Cloud Logging supports the OpenTelemetry standard, a comprehensive framework for collecting logs, traces, and metrics across multiple programming languages. This framework provides extensive tracing capabilities and offers options for both automatic and manual instrumentation. Automatic instrumentation can often be applied without modifying existing code, while manual instrumentation allows for finer control within the codebase. However, in environments like AI Core, which do not natively support auto-instrumentation, one can choose to implement OpenTelemetry instrumentation manually within workloads.
JSON mTLS API: Another option is to use the JSON API endpoint provided by SAP Cloud Logging, which accepts log data in JSON format. This method is simpler to implement compared to OpenTelemetry and is particularly useful in scenarios where a quick or lightweight setup is preferred. Logs can be sent directly to the endpoint via HTTP requests, providing a straightforward integration option. Additionally, tools like Fluent Bit can be configured to forward logs to this endpoint with minimal effort, although this might require adjustments to deployment configurations or Dockerfiles. Alternatively, one can create a custom solution, such as a small Python script, to send logs directly to the endpoint for a streamlined and efficient approach.
In the following sections, I will demonstrate a simple Python-based method for sending logs to the JSON API endpoint, offering a easy-to-copy solution for integrating AI Core workloads with SAP Cloud Logging. I do encourage to look at pre-built alternatives though.
Restriction: Collecting the logs from within the Python Environment limits us to those logs created on the application level, we cannot access some logs generated at the platform level..
Prerequisites:
For the example in this blog we need a) an instance of AI Core and b) an instance of SAP Cloud Logging. Both are services on the BTP and there is very good documentation on how to set them up. For both services we need a service key, you can create them using the BTP Cockpit. After obtaining the keys, let’s have a look at the service key for Cloud Logging:
{
“client-ca”: “—–BEGIN CERTIFICATE—–n[REDACTED CLIENT CERTIFICATE DATA]n—–END CERTIFICATE—–n”,
“dashboards-endpoint”: “dashboards-[REDACTED]-cloud.logs.services.sap.hana.ondemand.com”,
“dashboards-password”: “[REDACTED_PASSWORD]”,
“dashboards-username”: “[REDACTED_USERNAME]”,
“ingest-mtls-cert”: “—–BEGIN CERTIFICATE—–n[REDACTED MTLS CERTIFICATE DATA]n—–END CERTIFICATE—–n”,
“ingest-mtls-endpoint”: “ingest-mtls-[REDACTED]-cloud.logs.services.sap.hana.ondemand.com”,
“ingest-mtls-key”: “—–BEGIN PRIVATE KEY—–n[REDACTED PRIVATE KEY DATA]n—–END PRIVATE KEY—–n”
}
It contains all the necessary credentials to ingest logs. The dashboard credentials can be used to log in to the OpenSearch UI. For our example later, we use the ingest-mtls-… credentials, so have them ready 🙂
Cloud Logging Handler:
Now, let’s take a look at a code sample for sending logs. I chose to use a logging handler based on Python’s standard logging module:
from concurrent.futures import ThreadPoolExecutor
import gzip
import os
import json
import logging
import requests
from cloud_logging_handler.secret_utils import mtls_client_cert_from_env, mtls_endpoint_from_env
executor = ThreadPoolExecutor(max_workers=1)
class CloudLoggingHandler(logging.Handler):
def __init__(self):
super().__init__()
self.endpoint = mtls_endpoint_from_env()
self.client_cert = mtls_client_cert_from_env()
self.executor = executor
def emit(self, record):
try:
# Prepare log message payload
log_message_formatted = self.format(record)
payload = {
“msg”: log_message_formatted,
“date”: record.created,
“filename”: record.filename,
“level”: record.levelname,
“thread”: record.threadName,
“deployment_id”: os.environ.get(“HOSTNAME”, “”)[:16], # a env variable set by ai core gives us some info on the deployment_id
“deployment_name”: os.environ.get(“DEPLOYMENT_NAME”, “”) # a env variable set us to give the workload a name
}
# Submit the background task to the ThreadPoolExecutor
self.executor.submit(self._send_log, payload)
except Exception as e:
logging.error(e)
def _send_log(self, payload):
try:
json_bytes = json.dumps(payload).encode(‘utf-8’)
gzipped_data = gzip.compress(json_bytes) # Convert payload to gzipped JSON
# Send the log to the ingest-mtls endpoint
response = requests.put(
self.endpoint,
data=gzipped_data,
headers={‘Content-Encoding’: ‘gzip’},
cert=self.client_cert # hand in client certificate paths
)
response.raise_for_status()
except Exception as e:
logging.error(e)
I implemented a custom logging handler that sends log data to a cloud logging service over HTTP. This handler is designed to meet the specific API requirements of the cloud logging service, such as gzipping the request payload and authenticating using mTLS.
Cloud Logging supports handling JSON logs with custom attributes, making it easier to filter logs. By default, I included several attributes that are available in Python logs, such as log_level and thread. However, you can also add custom attributes, such as deployment_id or deployment_name for AI Core, to further enrich your log data.
Since the request can take a few milliseconds to complete and I don’t want to block the main application flow, I decided to handle the requests asynchronously using a background thread with a ThreadPoolExecutor.
However, there are some potential risks regarding thread safety with this approach. I recommend testing it thoroughly in your own environment, as I’ve experienced crashes when using it with FastAPI. For Flask, using multiple workers can cause issues as well, although it works fine with just one worker. There are additional optimizations that could be made, such as sharing a connection between requests, but for my use case, this solution was sufficiently fast. If you do not care about performance you can also keep it synchronous or even send out logs only in certain time intervals.
A unique requirement for cloud logging in our case is the use of mTLS authentication. This necessitates supplying both a client certificate and its associated key. We retrieve these from the service key of the cloud logging service instance. To handle this, I created a helper function that reads the certificate and key from environment variables and writes them to temporary files for use by the requests library.
import tempfile
import os
endpoint = os.environ[“CLOUD_LOGGING_INGEST_MTLS_ENDPOINT”]
cert = os.environ[“CLOUD_LOGGING_INGEST_MTLS_CERT”]
key = os.environ[“CLOUD_LOGGING_INGEST_MTLS_KEY”]
def mtls_client_cert_from_env():
“””takes care of loading the env variables and storing the certificates in a temporary file”””
cert_str = os.environ[“CLOUD_LOGGING_INGEST_MTLS_CERT”]
key_str = os.environ[“CLOUD_LOGGING_INGEST_MTLS_KEY”]
# Create temporary files to store the cert and key
with tempfile.NamedTemporaryFile(delete=False) as cert_file, tempfile.NamedTemporaryFile(delete=False) as key_file:
cert_file.write(cert_str.encode(‘utf-8’))
key_file.write(key_str.encode(‘utf-8’))
cert_file_path = cert_file.name
key_file_path = key_file.name
return (cert_file_path, key_file_path)
def mtls_endpoint_from_env():
“””return endpoint from env variable”””
return “https://” + os.environ[“CLOUD_LOGGING_INGEST_MTLS_ENDPOINT”]
Make sure to set the environment variables with the values from the service key.
Here’s an example of how the CloudLoggingHandler can be added to any project using the following syntax:
import logging
from cloud_logging_handler.cloud_logging_handler import CloudLoggingHandler # Import the custom CloudLoggingHandler
# Create a logger instance
logger = logging.getLogger(__name__)
# Set the logging level
logger.setLevel(logging.INFO)
# Add the CloudLoggingHandler to the logger
cloud_handler = CloudLoggingHandler()
logger.addHandler(cloud_handler)
# Example log statements
logger.info(“This is an info message”)
logger.error(“This is an error message”)
Let’s have a look at a full example with AI Core.
Example deployment on AI Core:
For AI Core we go with creating a sample Scenario including a minimalistic deployment.
The template for our endeavor can look like this:
apiVersion: ai.sap.com/v1alpha1
kind: ServingTemplate
metadata:
name: cloudloggingsample
annotations:
scenarios.ai.sap.com/description: “cloudloggingsample”
scenarios.ai.sap.com/name: “cloudloggingsample”
executables.ai.sap.com/description: “cloudloggingsample”
executables.ai.sap.com/name: “cloudloggingsample”
labels:
scenarios.ai.sap.com/id: “cloudloggingsample”
ai.sap.com/version: “1.0”
spec:
template:
apiVersion: “serving.kserve.io/v1beta1”
metadata:
annotations: |
autoscaling.knative.dev/metric: concurrency
autoscaling.knative.dev/target: 1
autoscaling.knative.dev/targetBurstCapacity: 0
labels: |
ai.sap.com/resourcePlan: starter
spec: |
predictor:
imagePullSecrets:
– name: felixdockersecrect
minReplicas: 1
maxReplicas: 5
containers:
– name: kserve-container
image: docker.io/bfwork/cloudloggingsample
imagePullPolicy: IfNotPresent
ports:
– containerPort: 8080
protocol: TCP
env:
– name: CLOUD_LOGGING_INGEST_MTLS_ENDPOINT
valueFrom:
secretKeyRef:
name: cloud-logging-secret
key: CLOUD_LOGGING_INGEST_MTLS_ENDPOINT
– name: CLOUD_LOGGING_INGEST_MTLS_CERT
valueFrom:
secretKeyRef:
name: cloud-logging-secret
key: CLOUD_LOGGING_INGEST_MTLS_CERT
– name: CLOUD_LOGGING_INGEST_MTLS_KEY
valueFrom:
secretKeyRef:
name: cloud-logging-secret
key: CLOUD_LOGGING_INGEST_MTLS_KEY
– name: DEPLOYMENT_NAME
value: “CLOUD_LOGGING_SAMPLE”
Note: To pass the Cloud Logging credentials to the container, we reference a secret. For more information on how to work with environment variables in AI Core, check out this blog.
Aside from that, this is a simple serving template, which exposes the server on port 8080.
You can create the secret to reference in AI Launchpad as follows:
Don’t forget to encode the values using base64. I’ve included a utility script in the sample repository to assist with this.
With the credentials and template in place, let’s take a look at the serving code:
import random
import logging
from flask import Flask, jsonify
from cloud_logging_handler.cloud_logging_handler import CloudLoggingHandler
logger = logging.getLogger(“SampleAIApp”)
logger.setLevel(logging.DEBUG)
# Set up the custom Cloud Logging handler
cloud_logging_handler = CloudLoggingHandler()
logger.addHandler(cloud_logging_handler)
logger.info(“Init Server!!”)
# Initialize FastAPI app
app = Flask(__name__)
@app.route(“/v2/predict/”, methods=[‘POST’])
def predict():
logger.info(f”Request to /v2/predict/”)
logger.info(f”Input Data Recieved, preparing prediction”)
prediction = round(random.uniform(0, 1), 2) # Perform inference using the loaded model
logger.info(f”Successfully predicted a value: {prediction}”)
return jsonify({“prediction”: prediction }) # Return the prediction as JSON response
@app.route(“/v2/hello/”, methods=[‘POST’])
def hello():
logger.info(f”Request to /v2/hello/”)
logger.info(“Hello World!!”)
return jsonify({“world”: “hello”})
if __name__ == “__main__”:
app.run(host=’0.0.0.0′, port=8080) # should not be used in production
Here, we bring everything together: we start a server on port 8080, integrate our custom logging handler, and expose a few sample endpoints. Additionally, we use the logging module extensively to generate some sample logs.
Wrapping it into a Docker container concludes our development efforts:
FROM python:3.7 as base
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt
FROM base as final
#
# Creates directory within your Docker image
RUN mkdir -p /app/src/
RUN mkdir -p /app/model/
RUN mkdir -p /app/data/
#
# Copies file from your Local system TO path in Docker image
COPY . /app/src/
#
# Enable permission to execute anything inside the folder app
RUN chgrp -R 65534 /app &&
chmod -R 777 /app
WORKDIR /app
# for serving default command
CMD [ “python3”, “/app/src/serve.py”]
Now we can create a deployment and send some requests to the inference endpoint.
Result – Analyze Logs in Open Search:
Finally, let’s take a look at the result. The major advantage of using Cloud Logging is the ability to leverage the powerful OpenSearch UI for querying logs. This improves the developer experience, especially in large-scale projects, as it allows us to use a single tool across multiple environments.
Our logs are stored under the logs-json-* index. In the UI, under the “Discover” section, we can select a timeframe and choose which attributes we want to display:
It looks great! All the different fields we’ve submitted—such as the message, deployment name, thread, log level, and even the deployment ID are there. This allows us to filter logs by specific deployments, which is really useful.
I must admit, using the OpenSearch UI is far more satisfying than the AI Launchpad Deployment Log viewer 😊
I hope you found this helpful. Checkout the full code example. If you have any questions, feel free to leave a comment!
SAP AI Core is the platform for managing AI workloads on the Business Technology Platform (BTP). It is often used to run machine learning workloads and serves as a backend service for applications, such as those built with CAP. Consolidating the logs from AI workloads with the logs of the applications that use them provides an advantage for operations and debugging. In this blog, we’ll show how to send logs from AI Core deployments to the SAP Cloud Logging service for a unified view.Background on SAP Cloud Logging:SAP Cloud Logging is a service on the SAP Business Technology Platform (BTP) that enables the collection, storage, and analysis of observability data, such as logs, metrics, and traces, from environments like Cloud Foundry, Kyma, Kubernetes, and others. By consolidating this data, it provides a unified view that simplifies monitoring and troubleshooting across your applications and services.One of its key strengths is the seamless integration with Cloud Foundry and Kyma. When the Cloud Logging service is bound to a Cloud Foundry application instance, it automatically collects the application’s logs, making the setup process straightforward and reducing manual configuration.For me the most important advantage is its user interface, based on the open-source OpenSearch project. It provides powerful tools for searching, filtering, and analyzing logs.Ingesting Data to SAP Cloud Logging:SAP Cloud Logging offers two primary methods for ingesting observability data:OpenTelemetry API: SAP Cloud Logging supports the OpenTelemetry standard, a comprehensive framework for collecting logs, traces, and metrics across multiple programming languages. This framework provides extensive tracing capabilities and offers options for both automatic and manual instrumentation. Automatic instrumentation can often be applied without modifying existing code, while manual instrumentation allows for finer control within the codebase. However, in environments like AI Core, which do not natively support auto-instrumentation, one can choose to implement OpenTelemetry instrumentation manually within workloads.JSON mTLS API: Another option is to use the JSON API endpoint provided by SAP Cloud Logging, which accepts log data in JSON format. This method is simpler to implement compared to OpenTelemetry and is particularly useful in scenarios where a quick or lightweight setup is preferred. Logs can be sent directly to the endpoint via HTTP requests, providing a straightforward integration option. Additionally, tools like Fluent Bit can be configured to forward logs to this endpoint with minimal effort, although this might require adjustments to deployment configurations or Dockerfiles. Alternatively, one can create a custom solution, such as a small Python script, to send logs directly to the endpoint for a streamlined and efficient approach.In the following sections, I will demonstrate a simple Python-based method for sending logs to the JSON API endpoint, offering a easy-to-copy solution for integrating AI Core workloads with SAP Cloud Logging. I do encourage to look at pre-built alternatives though.Restriction: Collecting the logs from within the Python Environment limits us to those logs created on the application level, we cannot access some logs generated at the platform level..Prerequisites:For the example in this blog we need a) an instance of AI Core and b) an instance of SAP Cloud Logging. Both are services on the BTP and there is very good documentation on how to set them up. For both services we need a service key, you can create them using the BTP Cockpit. After obtaining the keys, let’s have a look at the service key for Cloud Logging: {
“client-ca”: “—–BEGIN CERTIFICATE—–n[REDACTED CLIENT CERTIFICATE DATA]n—–END CERTIFICATE—–n”,
“dashboards-endpoint”: “dashboards-[REDACTED]-cloud.logs.services.sap.hana.ondemand.com”,
“dashboards-password”: “[REDACTED_PASSWORD]”,
“dashboards-username”: “[REDACTED_USERNAME]”,
“ingest-mtls-cert”: “—–BEGIN CERTIFICATE—–n[REDACTED MTLS CERTIFICATE DATA]n—–END CERTIFICATE—–n”,
“ingest-mtls-endpoint”: “ingest-mtls-[REDACTED]-cloud.logs.services.sap.hana.ondemand.com”,
“ingest-mtls-key”: “—–BEGIN PRIVATE KEY—–n[REDACTED PRIVATE KEY DATA]n—–END PRIVATE KEY—–n”
} It contains all the necessary credentials to ingest logs. The dashboard credentials can be used to log in to the OpenSearch UI. For our example later, we use the ingest-mtls-… credentials, so have them ready 🙂Cloud Logging Handler:Now, let’s take a look at a code sample for sending logs. I chose to use a logging handler based on Python’s standard logging module: from concurrent.futures import ThreadPoolExecutor
import gzip
import os
import json
import logging
import requests
from cloud_logging_handler.secret_utils import mtls_client_cert_from_env, mtls_endpoint_from_env
executor = ThreadPoolExecutor(max_workers=1)
class CloudLoggingHandler(logging.Handler):
def __init__(self):
super().__init__()
self.endpoint = mtls_endpoint_from_env()
self.client_cert = mtls_client_cert_from_env()
self.executor = executor
def emit(self, record):
try:
# Prepare log message payload
log_message_formatted = self.format(record)
payload = {
“msg”: log_message_formatted,
“date”: record.created,
“filename”: record.filename,
“level”: record.levelname,
“thread”: record.threadName,
“deployment_id”: os.environ.get(“HOSTNAME”, “”)[:16], # a env variable set by ai core gives us some info on the deployment_id
“deployment_name”: os.environ.get(“DEPLOYMENT_NAME”, “”) # a env variable set us to give the workload a name
}
# Submit the background task to the ThreadPoolExecutor
self.executor.submit(self._send_log, payload)
except Exception as e:
logging.error(e)
def _send_log(self, payload):
try:
json_bytes = json.dumps(payload).encode(‘utf-8’)
gzipped_data = gzip.compress(json_bytes) # Convert payload to gzipped JSON
# Send the log to the ingest-mtls endpoint
response = requests.put(
self.endpoint,
data=gzipped_data,
headers={‘Content-Encoding’: ‘gzip’},
cert=self.client_cert # hand in client certificate paths
)
response.raise_for_status()
except Exception as e:
logging.error(e) I implemented a custom logging handler that sends log data to a cloud logging service over HTTP. This handler is designed to meet the specific API requirements of the cloud logging service, such as gzipping the request payload and authenticating using mTLS.Cloud Logging supports handling JSON logs with custom attributes, making it easier to filter logs. By default, I included several attributes that are available in Python logs, such as log_level and thread. However, you can also add custom attributes, such as deployment_id or deployment_name for AI Core, to further enrich your log data.Since the request can take a few milliseconds to complete and I don’t want to block the main application flow, I decided to handle the requests asynchronously using a background thread with a ThreadPoolExecutor.However, there are some potential risks regarding thread safety with this approach. I recommend testing it thoroughly in your own environment, as I’ve experienced crashes when using it with FastAPI. For Flask, using multiple workers can cause issues as well, although it works fine with just one worker. There are additional optimizations that could be made, such as sharing a connection between requests, but for my use case, this solution was sufficiently fast. If you do not care about performance you can also keep it synchronous or even send out logs only in certain time intervals.A unique requirement for cloud logging in our case is the use of mTLS authentication. This necessitates supplying both a client certificate and its associated key. We retrieve these from the service key of the cloud logging service instance. To handle this, I created a helper function that reads the certificate and key from environment variables and writes them to temporary files for use by the requests library. import tempfile
import os
endpoint = os.environ[“CLOUD_LOGGING_INGEST_MTLS_ENDPOINT”]
cert = os.environ[“CLOUD_LOGGING_INGEST_MTLS_CERT”]
key = os.environ[“CLOUD_LOGGING_INGEST_MTLS_KEY”]
def mtls_client_cert_from_env():
“””takes care of loading the env variables and storing the certificates in a temporary file”””
cert_str = os.environ[“CLOUD_LOGGING_INGEST_MTLS_CERT”]
key_str = os.environ[“CLOUD_LOGGING_INGEST_MTLS_KEY”]
# Create temporary files to store the cert and key
with tempfile.NamedTemporaryFile(delete=False) as cert_file, tempfile.NamedTemporaryFile(delete=False) as key_file:
cert_file.write(cert_str.encode(‘utf-8’))
key_file.write(key_str.encode(‘utf-8’))
cert_file_path = cert_file.name
key_file_path = key_file.name
return (cert_file_path, key_file_path)
def mtls_endpoint_from_env():
“””return endpoint from env variable”””
return “https://” + os.environ[“CLOUD_LOGGING_INGEST_MTLS_ENDPOINT”] Make sure to set the environment variables with the values from the service key.Here’s an example of how the CloudLoggingHandler can be added to any project using the following syntax: import logging
from cloud_logging_handler.cloud_logging_handler import CloudLoggingHandler # Import the custom CloudLoggingHandler
# Create a logger instance
logger = logging.getLogger(__name__)
# Set the logging level
logger.setLevel(logging.INFO)
# Add the CloudLoggingHandler to the logger
cloud_handler = CloudLoggingHandler()
logger.addHandler(cloud_handler)
# Example log statements
logger.info(“This is an info message”)
logger.error(“This is an error message”) Let’s have a look at a full example with AI Core.Example deployment on AI Core:For AI Core we go with creating a sample Scenario including a minimalistic deployment.The template for our endeavor can look like this: apiVersion: ai.sap.com/v1alpha1
kind: ServingTemplate
metadata:
name: cloudloggingsample
annotations:
scenarios.ai.sap.com/description: “cloudloggingsample”
scenarios.ai.sap.com/name: “cloudloggingsample”
executables.ai.sap.com/description: “cloudloggingsample”
executables.ai.sap.com/name: “cloudloggingsample”
labels:
scenarios.ai.sap.com/id: “cloudloggingsample”
ai.sap.com/version: “1.0”
spec:
template:
apiVersion: “serving.kserve.io/v1beta1”
metadata:
annotations: |
autoscaling.knative.dev/metric: concurrency
autoscaling.knative.dev/target: 1
autoscaling.knative.dev/targetBurstCapacity: 0
labels: |
ai.sap.com/resourcePlan: starter
spec: |
predictor:
imagePullSecrets:
– name: felixdockersecrect
minReplicas: 1
maxReplicas: 5
containers:
– name: kserve-container
image: docker.io/bfwork/cloudloggingsample
imagePullPolicy: IfNotPresent
ports:
– containerPort: 8080
protocol: TCP
env:
– name: CLOUD_LOGGING_INGEST_MTLS_ENDPOINT
valueFrom:
secretKeyRef:
name: cloud-logging-secret
key: CLOUD_LOGGING_INGEST_MTLS_ENDPOINT
– name: CLOUD_LOGGING_INGEST_MTLS_CERT
valueFrom:
secretKeyRef:
name: cloud-logging-secret
key: CLOUD_LOGGING_INGEST_MTLS_CERT
– name: CLOUD_LOGGING_INGEST_MTLS_KEY
valueFrom:
secretKeyRef:
name: cloud-logging-secret
key: CLOUD_LOGGING_INGEST_MTLS_KEY
– name: DEPLOYMENT_NAME
value: “CLOUD_LOGGING_SAMPLE” Note: To pass the Cloud Logging credentials to the container, we reference a secret. For more information on how to work with environment variables in AI Core, check out this blog.Aside from that, this is a simple serving template, which exposes the server on port 8080.You can create the secret to reference in AI Launchpad as follows:Don’t forget to encode the values using base64. I’ve included a utility script in the sample repository to assist with this.With the credentials and template in place, let’s take a look at the serving code: import random
import logging
from flask import Flask, jsonify
from cloud_logging_handler.cloud_logging_handler import CloudLoggingHandler
logger = logging.getLogger(“SampleAIApp”)
logger.setLevel(logging.DEBUG)
# Set up the custom Cloud Logging handler
cloud_logging_handler = CloudLoggingHandler()
logger.addHandler(cloud_logging_handler)
logger.info(“Init Server!!”)
# Initialize FastAPI app
app = Flask(__name__)
@app.route(“/v2/predict/”, methods=[‘POST’])
def predict():
logger.info(f”Request to /v2/predict/”)
logger.info(f”Input Data Recieved, preparing prediction”)
prediction = round(random.uniform(0, 1), 2) # Perform inference using the loaded model
logger.info(f”Successfully predicted a value: {prediction}”)
return jsonify({“prediction”: prediction }) # Return the prediction as JSON response
@app.route(“/v2/hello/”, methods=[‘POST’])
def hello():
logger.info(f”Request to /v2/hello/”)
logger.info(“Hello World!!”)
return jsonify({“world”: “hello”})
if __name__ == “__main__”:
app.run(host=’0.0.0.0′, port=8080) # should not be used in production Here, we bring everything together: we start a server on port 8080, integrate our custom logging handler, and expose a few sample endpoints. Additionally, we use the logging module extensively to generate some sample logs.Wrapping it into a Docker container concludes our development efforts: FROM python:3.7 as base
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt
FROM base as final
#
# Creates directory within your Docker image
RUN mkdir -p /app/src/
RUN mkdir -p /app/model/
RUN mkdir -p /app/data/
#
# Copies file from your Local system TO path in Docker image
COPY . /app/src/
#
# Enable permission to execute anything inside the folder app
RUN chgrp -R 65534 /app &&
chmod -R 777 /app
WORKDIR /app
# for serving default command
CMD [ “python3”, “/app/src/serve.py”] Now we can create a deployment and send some requests to the inference endpoint.Result – Analyze Logs in Open Search:Finally, let’s take a look at the result. The major advantage of using Cloud Logging is the ability to leverage the powerful OpenSearch UI for querying logs. This improves the developer experience, especially in large-scale projects, as it allows us to use a single tool across multiple environments.Our logs are stored under the logs-json-* index. In the UI, under the “Discover” section, we can select a timeframe and choose which attributes we want to display:It looks great! All the different fields we’ve submitted—such as the message, deployment name, thread, log level, and even the deployment ID are there. This allows us to filter logs by specific deployments, which is really useful.I must admit, using the OpenSearch UI is far more satisfying than the AI Launchpad Deployment Log viewer 😊I hope you found this helpful. Checkout the full code example. If you have any questions, feel free to leave a comment! Read More Technology Blogs by SAP articles
#SAP
#SAPTechnologyblog