Optimize generative AI workloads for environmental sustainability

Optimize generative AI workloads for environmental sustainability

The adoption of generative AI is rapidly expanding, reaching an ever-growing number of industries and users worldwide. With the increasing complexity and scale of generative AI models, it is crucial to work towards minimizing their environmental impact. This involves a continuous effort focused on energy reduction and efficiency by achieving the maximum benefit from the resources provisioned and minimizing the total resources required.

To add to our guidance for optimizing deep learning workloads for sustainability on AWS, this post provides recommendations that are specific to generative AI workloads. In particular, we provide practical best practices for different customization scenarios, including training models from scratch, fine-tuning with additional data using full or parameter-efficient techniques, Retrieval Augmented Generation (RAG), and prompt engineering. Although this post primarily focuses on large language models (LLM), we believe most of the recommendations can be extended to other foundation models.

Generative AI problem framing

When framing your generative AI problem, consider the following:

  • Align your use of generative AI with your sustainability goals – When scoping your project, be sure to take sustainability into account:
    • What are the trade-offs between a generative AI solution and a less resource-intensive traditional approach?
    • How can your generative AI project support sustainable innovation?
  • Use energy that has low carbon-intensity – When regulations and legal aspects allow, train and deploy your model on one of the 19 AWS Regions where the electricity consumed in 2022 was attributable to 100% renewable energy and Regions where the grid has a published carbon intensity that is lower than other locations (or Regions). For more detail, refer to How to select a Region for your workload based on sustainability goals. When selecting a Region, try to minimize data movement across networks: train your models close to your data and deploy your models close to your users.
  • Use managed services – Depending on your expertise and specific use case, weigh the options between opting for Amazon Bedrock, a serverless, fully managed service that provides access to a diverse range of foundation models through an API, or deploying your models on a fully managed infrastructure by using Amazon SageMaker. Using a managed service helps you operate more efficiently by shifting the responsibility of maintaining high utilization and sustainability optimization of the deployed hardware to AWS.
  • Define the right customization strategy – There are several strategies to enhance the capacities of your model, ranging from prompt engineering to full fine-tuning. Choose the most suitable strategy based on your specific needs while also considering the differences in resources required for each. For instance, fine-tuning might achieve higher accuracy than prompt engineering but consumes more resources and energy in the training phase. Make trade-offs: by opting for a customization approach that prioritizes acceptable performance over optimal performance, reductions in the resources used by your models can be achieved. The following figure summarizes the environmental impact of LLMs customization strategies.

Model customization

In this section, we share best practices for model customization.

Base model selection

Selecting the appropriate base model is a critical step in customizing generative AI workloads and can help reduce the need for extensive fine-tuning and associated resource usage. Consider the following factors:

  • Evaluate capabilities and limitations – Use the playgrounds of Amazon SageMaker JumpStart or Amazon Bedrock to easily test the capability of LLMs and assess their core limitations.
  • Reduce the need for customization – Make sure to gather information by using public resources such as open LLMs leaderboards, holistic evaluation benchmarks, or model cards to compare different LLMs and understand the specific domains, tasks, and languages for which they have been pre-trained on. Depending on your use case, consider domain-specific or multilingual models to reduce the need for additional customization.
  • Start with a small model size and small context window – Large model sizes and context windows (the number of tokens that can fit in a single prompt) can offer more performance and capabilities, but they also require more energy and resources for inference. Consider available versions of models with smaller sizes and context windows before scaling up to larger models. Specialized smaller models have their capacity concentrated on a specific target task. On these tasks, specialized models can behave qualitatively similarly to larger models (for example, GPT3.5, which has 175 billion parameters) while requiring fewer resources for training and inference. Examples of such models include Alpaca (7 billion parameters) or the utilization of T5 variants for multi-step math reasoning (11 billion parameters or more).

Prompt engineering

Effective prompt engineering can enhance the performance and efficiency of generative AI models. By carefully crafting prompts, you can guide the model’s behavior, reducing unnecessary iterations and resource requirements. Consider the following guidelines:

  • Keep prompts concise and avoid unnecessary details – Longer prompts lead to a higher number of tokens. As tokens increase in number, the model consumes more memory and computational resources. Consider incorporating zero-shot or few-shot learning to enable the model to adapt quickly by learning from just a few examples.
  • Experiment with different prompts gradually – Refine the prompts based on the desired output until you achieve the desired results. Depending on your task, explore advanced techniques such as self-consistency, Generated Knowledge Prompting, ReAct Prompting, or Automatic Prompt Engineer to further enhance the model’s capabilities.
  • Use reproducible prompts – With templates such as LangChain prompt templates, you can save or load your prompts history as files. This enhances prompt experimentation tracking, versioning, and reusability. When you know the prompts that produce the best answers for each model, you can reduce the computational resources used for prompt iterations and redundant experiments across different projects.

Retrieval Augmented Generation

Retrieval Augmented Generation (RAG) is a highly effective approach for augmenting model capabilities by retrieving and integrating pertinent external information from a predefined dataset. Because existing LLMs are used as is, this strategy avoids the energy and resources needed to train the model on new data or build a new model from scratch. Use tools such as Amazon Kendra or Amazon OpenSearch Service and LangChain to successfully build RAG-based solutions with Amazon Bedrock or SageMaker JumpStart.

Parameter-Efficient Fine-Tuning

Parameter-Efficient Fine-Tuning (PEFT) is a fundamental aspect of sustainability in generative AI. It aims to achieve performance comparable to fine-tuning, using fewer trainable parameters. By fine-tuning only a small number of model parameters while freezing most parameters of the pre-trained LLMs, we can reduce computational resources and energy consumption.

Use public libraries such as the Parameter-Efficient Fine-Tuning library to implement common PEFT techniques such as Low Rank Adaptation (LoRa), Prefix Tuning, Prompt Tuning, or P-Tuning. As an example, studies show the utilization of LoRa can reduce the number of trainable parameters by 10,000 times and the GPU memory requirement by 3 times, depending on the size of your model, with similar or better performance.

Fine-tuning

Fine-tune the entire pre-trained model with the additional data. This approach may achieve higher performance but is more resource-intensive than PEFT. Use this strategy when the available data significantly differs from the pre-training data.

By selecting the right fine-tuning approach, you can maximize the reuse of your model and avoid the resource usage associated with fine-tuning multiple models for each use case. For example, if you anticipate reusing the model within a specific domain or business unit in your organization, you may prefer domain adaptation. On the other hand, instruction-based fine-tuning is better suited for general use across multiple tasks.

Model training from scratch

In some cases, training an LLM model from scratch may be necessary. However, this approach can be computationally expensive and energy-intensive. To ensure optimal training, consider the following best practices:

Model inference and deployment

Consider the following best practices for model inference and deployment:

  • Use deep learning containers for large model inference – You can use deep learning containers for large model inference on SageMaker and open-source frameworks such as DeepSpeed, Hugging Face Accelerate, and FasterTransformer to implement techniques like weight pruning, distillation, compression, quantization, or compilation. These techniques reduce model size and optimize memory usage.
  • Set appropriate inference model parameters – During inference, you have the flexibility to adjust certain parameters that influence the model’s output. Understanding and appropriately setting these parameters allows you to obtain the most relevant responses from your models and minimize the number of iterations of prompt-tuning. This ultimately results in reduced memory usage and lower energy consumption. Key parameters to consider are temperature, top_p, top_k, and max_length.
  • Adopt an efficient inference infrastructure – You can deploy your models on an AWS Inferentia2 accelerator. Inf2 instances offer up to 50% better performance/watt over comparable Amazon Elastic Compute Cloud (Amazon EC2) instances because the underlying AWS Inferentia2 accelerators are purpose built to run deep learning models at scale. As the most energy-efficient option on Amazon EC2 for deploying ultra-large models, Inf2 instances help you meet your sustainability goals when deploying the latest innovations in generative AI.
  • Align inference Service Level Agreement (SLA) with sustainability goalsDefine SLAs that support your sustainability goals while meeting your business requirements. Define SLAs to meet your business requirements, not exceed them. Make trade-offs that significantly reduce your resources usage in exchange for acceptable decreases in service levels:

Resource usage monitoring and optimization

Implement an improvement process to track the impact of your optimizations over time. The goal of your improvements is to use all the resources you provision and complete the same work with the minimum resources possible. To operationalize this process, collect metrics about the utilization of your cloud resources. These metrics, combined with business metrics, can be used as proxy metrics for your carbon emissions.

To consistently monitor your environment, you can use Amazon CloudWatch to monitor system metrics like CPU, GPU, or memory utilization. If you are using NVIDIA GPU, consider NVIDIA System Management Interface (nvidia-smi) to monitor GPU utilization and performance state. For Trainium and AWS Inferentia accelerator, you can use AWS Neuron Monitor to monitor system metrics. Consider also SageMaker Profiler, which provides a detailed view into the AWS compute resources provisioned during training deep learning models on SageMaker. The following are some key metrics worth monitoring:

  • CPUUtilization, GPUUtilization, GPUMemoryUtilization, MemoryUtilization, and DiskUtilization in CloudWatch
  • nvidia_smi.gpu_utilization, nvidia_smi.gpu_memory_utilization, and nvidia_smi.gpu_performance_state in nvidia-smi logs.
  • vcpu_usage, memory_info, and neuroncore_utilization in Neuron Monitor.

Conclusion

As generative AI models are becoming bigger, it is essential to consider the environmental impact of our workloads.

In this post, we provided guidance for optimizing the compute, storage, and networking resources required to run your generative AI workloads on AWS while minimizing their environmental impact. Because the field of generative AI is continuously progressing, staying updated with the latest courses, research, and tools can help you find new ways to optimize your workloads for sustainability.


About the Authors

Dr. Wafae Bakkali is a Data Scientist at AWS, based in Paris, France. As a generative AI expert, Wafae is driven by the mission to empower customers in solving their business challenges through the utilization of generative AI techniques, ensuring they do so with maximum efficiency and sustainability.

Benoit de Chateauvieux is a Startup Solutions Architect at AWS, based in Montreal, Canada. As a former CTO, he enjoys helping startups build great products using the cloud. He also supports customers in solving their sustainability challenges through the cloud. Outside of work, you’ll find Benoit in canoe-camping expeditions, paddling across Canadian rivers.

Read More

Train and deploy ML models in a multicloud environment using Amazon SageMaker

Train and deploy ML models in a multicloud environment using Amazon SageMaker

As customers accelerate their migrations to the cloud and transform their business, some find themselves in situations where they have to manage IT operations in a multicloud environment. For example, you might have acquired a company that was already running on a different cloud provider, or you may have a workload that generates value from unique capabilities provided by AWS. Another example is independent software vendors (ISVs) that make their products and services available in different cloud platforms to benefit their end customers. Or an organization may be operating in a Region where a primary cloud provider is not available, and in order to meet the data sovereignty or data residency requirements, they can use a secondary cloud provider.

In these scenarios, as you start to embrace generative AI, large language models (LLMs) and machine learning (ML) technologies as a core part of your business, you may be looking for options to take advantage of AWS AI and ML capabilities outside of AWS in a multicloud environment. For example, you may want to make use of Amazon SageMaker to build and train ML model, or use Amazon SageMaker Jumpstart to deploy pre-built foundation or third party ML models, which you can deploy at the click of a few buttons. Or you may want to take advantage of Amazon Bedrock to build and scale generative AI applications, or you can leverage AWS’ pre-trained AI services, which don’t require you to learn machine learning skills. AWS provides support for scenarios where organizations want to bring their own model to Amazon SageMaker or into Amazon SageMaker Canvas for predictions.

In this post, we demonstrate one of the many options that you have to take advantage of AWS’s broadest and deepest set of AI/ML capabilities in a multicloud environment. We show how you can build and train an ML model in AWS and deploy the model in another platform. We train the model using Amazon SageMaker, store the model artifacts in Amazon Simple Storage Service (Amazon S3), and deploy and run the model in Azure. This approach is beneficial if you use AWS services for ML for its most comprehensive set of features, yet you need to run your model in another cloud provider in one of the situations we’ve discussed.

Key concepts

Amazon SageMaker Studio is a web-based, integrated development environment (IDE) for machine learning. SageMaker Studio allows data scientists, ML engineers, and data engineers to prepare data, build, train, and deploy ML models on one web interface. With SageMaker Studio, you can access purpose-built tools for every stage of the ML development lifecycle, from data preparation to building, training, and deploying your ML models, improving data science team productivity by up to ten times. SageMaker Studio notebooks are quick start, collaborative notebooks that integrate with purpose-built ML tools in SageMaker and other AWS services.

SageMaker is a comprehensive ML service enabling business analysts, data scientists, and MLOps engineers to build, train, and deploy ML models for any use case, regardless of ML expertise.

AWS provides Deep Learning Containers (DLCs) for popular ML frameworks such as PyTorch, TensorFlow, and Apache MXNet, which you can use with SageMaker for training and inference. DLCs are available as Docker images in Amazon Elastic Container Registry (Amazon ECR). The Docker images are preinstalled and tested with the latest versions of popular deep learning frameworks as well as other dependencies needed for training and inference. For a complete list of the pre-built Docker images managed by SageMaker, see Docker Registry Paths and Example Code. Amazon ECR supports security scanning, and is integrated with Amazon Inspector vulnerability management service to meet your organization’s image compliance security requirements, and to automate vulnerability assessment scanning. Organizations can also use AWS Trainium and AWS Inferentia for better price-performance for running ML training jobs or inference.

Solution overview

In this section, we describe how to build and train a model using SageMaker and deploy the model to Azure Functions. We use a SageMaker Studio notebook to build, train, and deploy the model. We train the model in SageMaker using a pre-built Docker image for PyTorch. Although we’re deploying the trained model to Azure in this case, you could use the same approach to deploy the model on other platforms such as on premises or other cloud platforms.

When we create a training job, SageMaker launches the ML compute instances and uses our training code and the training dataset to train the model. It saves the resulting model artifacts and other output in an S3 bucket that we specify as input to the training job. When model training is complete, we use the Open Neural Network Exchange (ONNX) runtime library to export the PyTorch model as an ONNX model.

Finally, we deploy the ONNX model along with a custom inference code written in Python to Azure Functions using the Azure CLI. ONNX supports most of the commonly used ML frameworks and tools. One thing to note is that converting an ML model to ONNX is useful if you want to want to use a different target deployment framework, such as PyTorch to TensorFlow. If you’re using the same framework on both the source and target, you don’t need to convert the model to ONNX format.

The following diagram illustrates the architecture for this approach.

Multicloud train and deploy architecture diagram

We use a SageMaker Studio notebook along with the SageMaker Python SDK to build and train our model. The SageMaker Python SDK is an open-source library for training and deploying ML models on SageMaker. For more details, refer to Create or Open an Amazon SageMaker Studio Notebook.

The code snippets in the following sections have been tested in the SageMaker Studio notebook environment using the Data Science 3.0 image and Python 3.0 kernel.

In this solution, we demonstrate the following steps:

  1. Train a PyTorch model.
  2. Export the PyTorch model as an ONNX model.
  3. Package the model and inference code.
  4. Deploy the model to Azure Functions.

Prerequisites

You should have the following prerequisites:

  • An AWS account.
  • A SageMaker domain and SageMaker Studio user. For instructions to create these, refer to Onboard to Amazon SageMaker Domain Using Quick setup.
  • The Azure CLI.
  • Access to Azure and credentials for a service principal that has permissions to create and manage Azure Functions.

Train a model with PyTorch

In this section, we detail the steps to train a PyTorch model.

Install dependencies

Install the libraries to carry out the steps required for model training and model deployment:

pip install torchvision onnx onnxruntime

Complete initial setup

We begin by importing the AWS SDK for Python (Boto3) and the SageMaker Python SDK. As part of the setup, we define the following:

  • A session object that provides convenience methods within the context of SageMaker and our own account.
  • A SageMaker role ARN used to delegate permissions to the training and hosting service. We need this so that these services can access the S3 buckets where our data and model are stored. For instructions on creating a role that meets your business needs, refer to SageMaker Roles. For this post, we use the same execution role as our Studio notebook instance. We get this role by calling sagemaker.get_execution_role().
  • The default Region where our training job will run.
  • The default bucket and the prefix we use to store the model output.

See the following code:

import sagemaker
import boto3
import os

execution_role = sagemaker.get_execution_role()
region = boto3.Session().region_name
session = sagemaker.Session()
bucket = session.default_bucket()
prefix = "sagemaker/mnist-pytorch"

Create the training dataset

We use the dataset available in the public bucket sagemaker-example-files-prod-{region}. The dataset contains the following files:

  • train-images-idx3-ubyte.gz – Contains training set images
  • train-labels-idx1-ubyte.gz – Contains training set labels
  • t10k-images-idx3-ubyte.gz – Contains test set images
  • t10k-labels-idx1-ubyte.gz – Contains test set labels

We use thetorchvision.datasets module to download the data from the public bucket locally before uploading it to our training data bucket. We pass this bucket location as an input to the SageMaker training job. Our training script uses this location to download and prepare the training data, and then train the model. See the following code:

MNIST.mirrors = [
    f"https://sagemaker-example-files-prod-{region}.s3.amazonaws.com/datasets/image/MNIST/"
]

MNIST(
    "data",
    download=True,
    transform=transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    ),
)

Create the training script

With SageMaker, you can bring your own model using script mode. With script mode, you can use the pre-built SageMaker containers and provide your own training script, which has the model definition, along with any custom libraries and dependencies. The SageMaker Python SDK passes our script as an entry_point to the container, which loads and runs the train function from the provided script to train our model.

When the training is complete, SageMaker saves the model output in the S3 bucket that we provided as a parameter to the training job.

Our training code is adapted from the following PyTorch example script. The following excerpt from the code shows the model definition and the train function:

# define network

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output
# train

def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            if args.dry_run:
                break

Train the model

Now that we have set up our environment and created our input dataset and custom training script, we can start the model training using SageMaker. We use the PyTorch estimator in the SageMaker Python SDK to start a training job on SageMaker. We pass in the required parameters to the estimator and call the fit method. When we call fit on the PyTorch estimator, SageMaker starts a training job using our script as training code:

from sagemaker.pytorch import PyTorch

output_location = f"s3://{bucket}/{prefix}/output"
print(f"training artifacts will be uploaded to: {output_location}")

hyperparameters={
    "batch-size": 100,
    "epochs": 1,
    "lr": 0.1,
    "gamma": 0.9,
    "log-interval": 100
}

instance_type = "ml.c4.xlarge"
estimator = PyTorch(
    entry_point="train.py",
    source_dir="code",  # directory of your training script
    role=execution_role,
    framework_version="1.13",
    py_version="py39",
    instance_type=instance_type,
    instance_count=1,
    volume_size=250,
    output_path=output_location,
    hyperparameters=hyperparameters
)

estimator.fit(inputs = {
    'training': f"{inputs}",
    'testing':  f"{inputs}"
})

Export the trained model as a ONNX model

After the training is complete and our model is saved to the predefined location in Amazon S3, we export the model to an ONNX model using the ONNX runtime.

We include the code to export our model to ONNX in our training script to run after the training is complete.

PyTorch exports the model to ONNX by running the model using our input and recording a trace of operators used to compute the output. We use a random input of the right type with the PyTorch torch.onnx.export function to export the model to ONNX. We also specify the first dimension in our input as dynamic so that our model accepts a variable batch_size of inputs during inference.

def export_to_onnx(model, model_dir, device):
    logger.info("Exporting the model to onnx.")
    dummy_input = torch.randn(1, 1, 28, 28).to(device)
    input_names = [ "input_0" ]
    output_names = [ "output_0" ]
    path = os.path.join(model_dir, 'mnist-pytorch.onnx')
    torch.onnx.export(model, dummy_input, path, verbose=True, input_names=input_names, output_names=output_names,
                     dynamic_axes={'input_0' : {0 : 'batch_size'},    # variable length axes
                                'output_0' : {0 : 'batch_size'}})

ONNX is an open standard format for deep learning models that enables interoperability between deep learning frameworks such as PyTorch, Microsoft Cognitive Toolkit (CNTK), and more. This means you can use any of these frameworks to train the model and subsequently export the pre-trained models in ONNX format. By exporting the model to ONNX, you get the benefit of a broader selection of deployment devices and platforms.

Download and extract the model artifacts

The ONNX model that our training script has saved has been copied by SageMaker to Amazon S3 in the output location that we specified when we started the training job. The model artifacts are stored as a compressed archive file called model.tar.gz. We download this archive file to a local directory in our Studio notebook instance and extract the model artifacts, namely the ONNX model.

import tarfile

local_model_file = 'model.tar.gz'
model_bucket,model_key = estimator.model_data.split('/',2)[-1].split('/',1)
s3 = boto3.client("s3")
s3.download_file(model_bucket,model_key,local_model_file)

model_tar = tarfile.open(local_model_file)
model_file_name = model_tar.next().name
model_tar.extractall('.')
model_tar.close()

Validate the ONNX model

The ONNX model is exported to a file named mnist-pytorch.onnx by our training script. After we have downloaded and extracted this file, we can optionally validate the ONNX model using the onnx.checker module. The check_model function in this module checks the consistency of a model. An exception is raised if the test fails.

import onnx

onnx_model = onnx.load("mnist-pytorch.onnx")
onnx.checker.check_model(onnx_model)

Package the model and inference code

For this post, we use .zip deployment for Azure Functions. In this method, we package our model, accompanying code, and Azure Functions settings in a .zip file and publish it to Azure Functions. The following code shows the directory structure of our deployment package:

mnist-onnx
├── function_app.py
├── model
│ └── mnist-pytorch.onnx
└── requirements.txt

List dependencies

We list the dependencies for our inference code in the requirements.txt file at the root of our package. This file is used to build the Azure Functions environment when we publish the package.

azure-functions
numpy
onnxruntime

Write inference code

We use Python to write the following inference code, using the ONNX Runtime library to load our model and run inference. This instructs the Azure Functions app to make the endpoint available at the /classify relative path.

import logging
import azure.functions as func
import numpy as np
import os
import onnxruntime as ort
import json


app = func.FunctionApp()

def preprocess(input_data_json):
    # convert the JSON data into the tensor input
    return np.array(input_data_json['data']).astype('float32')
    
def run_model(model_path, req_body):
    session = ort.InferenceSession(model_path)
    input_data = preprocess(req_body)
    logging.info(f"Input Data shape is {input_data.shape}.")
    input_name = session.get_inputs()[0].name  # get the id of the first input of the model   
    try:
        result = session.run([], {input_name: input_data})
    except (RuntimeError) as e:
        print("Shape={0} and error={1}".format(input_data.shape, e))
    return result[0] 

def get_model_path():
    d=os.path.dirname(os.path.abspath(__file__))
    return os.path.join(d , './model/mnist-pytorch.onnx')

@app.function_name(name="mnist_classify")
@app.route(route="classify", auth_level=func.AuthLevel.ANONYMOUS)
def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    # Get the img value from the post.
    try:
        req_body = req.get_json()
    except ValueError:
        pass

    if req_body:
        # run model
        result = run_model(get_model_path(), req_body)
        # map output to integer and return result string.
        digits = np.argmax(result, axis=1)
        logging.info(type(digits))
        return func.HttpResponse(json.dumps({"digits": np.array(digits).tolist()}))
    else:
        return func.HttpResponse(
             "This HTTP triggered function successfully.",
             status_code=200
        )

Deploy the model to Azure Functions

Now that we have the code packaged into the required .zip format, we’re ready to publish it to Azure Functions. We do that using the Azure CLI, a command line utility to create and manage Azure resources. Install the Azure CLI with the following code:

!pip install -q azure-cli

Then complete the following steps:

  1. Log in to Azure:
    !az login
  2. Set up the resource creation parameters:
    import random
    
    random_suffix = str(random.randint(10000,99999))
    resource_group_name = f"multicloud-{random_suffix}-rg"
    storage_account_name = f"multicloud{random_suffix}"
    location = "ukwest"
    sku_storage = "Standard_LRS"
    functions_version = "4"
    python_version = "3.9"
    function_app = f"multicloud-mnist-{random_suffix}"
    
  3. Use the following commands to create the Azure Functions app along with the prerequisite resources:
    !az group create --name {resource_group_name} --location {location}
    !az storage account create --name {storage_account_name} --resource-group {resource_group_name} --location {location} --sku {sku_storage}
    !az functionapp create --name {function_app} --resource-group {resource_group_name} --storage-account {storage_account_name} --consumption-plan-location "{location}" --os-type Linux --runtime python --runtime-version {python_version} --functions-version {functions_version}
    
  4. Set up the Azure Functions so that when we deploy the Functions package, the requirements.txt file is used to build our application dependencies:
    !az functionapp config appsettings set --name {function_app} --resource-group {resource_group_name} --settings @./functionapp/settings.json
  5. Configure the Functions app to run the Python v2 model and perform a build on the code it receives after .zip deployment:
    {
    	"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
    	"SCM_DO_BUILD_DURING_DEPLOYMENT": true
    }
    
  6. After we have the resource group, storage container, and Functions app with the right configuration, publish the code to the Functions app:
    !az functionapp deployment source config-zip -g {resource_group_name} -n {function_app} --src {function_archive} --build-remote true
    

Test the model

We have deployed the ML model to Azure Functions as an HTTP trigger, which means we can use the Functions app URL to send an HTTP request to the function to invoke the function and run the model.

To prepare the input, download the test images files from the SageMaker example files bucket and prepare a set of samples to the format required by the model:

from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt

transform=transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)

test_dataset = datasets.MNIST(root='../data',  download=True, train=False, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)

test_features, test_labels = next(iter(test_loader))

Use the requests library to send a post request to the inference endpoint with the sample inputs. The inference endpoint takes the format as shown in the following code:

import requests, json

def to_numpy(tensor):
    return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()

url = f"https://{function_app}.azurewebsites.net/api/classify"
response = requests.post(url, 
                json.dumps({"data":to_numpy(test_features).tolist()})
            )
predictions = json.loads(response.text)['digits']

Clean up

When you’re done testing the model, delete the resource group along with the contained resources, including the storage container and Functions app:

!az group delete --name {resource_group_name} --yes

Additionally, it is recommended to shut down idle resources within SageMaker Studio to reduce costs. For more information, refer to Save costs by automatically shutting down idle resources within Amazon SageMaker Studio.

Conclusion

In this post, we showed how you can build and train an ML model with SageMaker and deploy it to another cloud provider. In the solution, we used a SageMaker Studio notebook, but for production workloads, we recommended using MLOps to create repeatable training workflows to accelerate model development and deployment.

This post didn’t show all the possible ways to deploy and run a model in a multicloud environment. For example, you can also package your model into a container image along with inference code and dependency libraries to run the model as a containerized application in any platform. For more information about this approach, refer to Deploy container applications in a multicloud environment using Amazon CodeCatalyst. The intent of the post is to show how organizations can use AWS AI/ML capabilities in a multicloud environment.


About the authors

Raja Vaidyanathan is a Solutions Architect at AWS supporting global financial services customers. Raja works with customers to architect solutions to complex problems with long-term positive impact on their business. He’s a strong engineering professional skilled in IT strategy, enterprise data management, and application architecture, with particular interests in analytics and machine learning.

Amandeep Bajwa is a Senior Solutions Architect at AWS supporting financial services enterprises. He helps organizations achieve their business outcomes by identifying the appropriate cloud transformation strategy based on industry trends and organizational priorities. Some of the areas Amandeep consults on are cloud migration, cloud strategy (including hybrid and multicloud), digital transformation, data and analytics, and technology in general.

Prema Iyer is Senior Technical Account Manager for AWS Enterprise Support. She works with external customers on a variety of projects, helping them improve the value of their solutions when using AWS.

Read More

Generative AI and multi-modal agents in AWS: The key to unlocking new value in financial markets

Generative AI and multi-modal agents in AWS: The key to unlocking new value in financial markets

Multi-modal data is a valuable component of the financial industry, encompassing market, economic, customer, news and social media, and risk data. Financial organizations generate, collect, and use this data to gain insights into financial operations, make better decisions, and improve performance. However, there are challenges associated with multi-modal data due to the complexity and lack of standardization in financial systems and data formats and quality, as well as the fragmented and unstructured nature of the data. Financial clients have frequently described the operational overhead of gaining financial insights from multi-modal data, which necessitates complex extraction and transformation logic, leading to bloated effort and costs. Technical challenges with multi-modal data further include the complexity of integrating and modeling different data types, the difficulty of combining data from multiple modalities (text, images, audio, video), and the need for advanced computer science skills and sophisticated analysis tools.

One of the ways to handle multi-modal data that is gaining popularity is the use of multi-modal agents. Multi-modal agents are AI systems that can understand and analyze data in multiple modalities using the right tools in their toolkit. They are able to connect insights across these diverse data types to gain a more comprehensive understanding and generate appropriate responses. Multi-modal agents, in conjunction with generative AI, are finding a wide spread application in financial markets. The following are a few popular use cases:

  • Smart reporting and market intelligence – AI can analyze various sources of financial information to generate market intelligence reports, aiding analysts, investors, and companies to stay updated on trends. Multi-modal agents can summarize lengthy financial reports quickly, saving analysts significant time and effort.
  • Quantitative modeling and forecasting – Generative models can synthesize large volumes of financial data to train machine learning (ML) models for applications like stock price forecasting, portfolio optimization, risk modeling, and more. Multi-modal models that understand diverse data sources can provide more robust forecasts.
  • Compliance and fraud detection – This solution can be extended to include monitoring tools that analyze communication channels like calls, emails, chats, access logs, and more to identify potential insider trading or market manipulation. Detecting fraudulent collusion across data types requires multi-modal analysis.

A multi-modal agent with generative AI boosts the productivity of a financial analyst by automating repetitive and routine tasks, freeing time for analysts to focus on high-value work. Multi-modal agents can amplify an analyst’s ability to gain insights by assisting with research and analysis. Multi-modal agents can also generate enhanced quantitative analysis and financial models, enabling analysts to work faster and with greater accuracy.

Implementing a multi-modal agent with AWS consolidates key insights from diverse structured and unstructured data on a large scale. Multi-modal agents can easily combine the power of generative AI offerings from Amazon Bedrock and Amazon SageMaker JumpStart with the data processing capabilities from AWS Analytics and AI/ML services to provide agile solutions that enable financial analysts to efficiently analyze and gather insights from multi-modal data in a secure and scalable manner within AWS. Amazon offers a suite of AI services that enable natural language processing (NLP), speech recognition, text extraction, and search:

  • Amazon Comprehend is an NLP service that can analyze text for key phrases and analyze sentiment
  • Amazon Textract is an intelligent document processing service that can accurately extract text and data from documents
  • Amazon Transcribe is an automatic speech recognition service that can convert speech to text
  • Amazon Kendra is an enterprise search service powered by ML to find the information across a variety of data sources, including documents and knowledge bases

In this post, we showcase a scenario where a financial analyst interacts with the organization’s multi-modal data, residing on purpose-built data stores, to gather financial insights. In the interaction, we demonstrate how multi-modal agents plan and run the user query and retrieve the results from the relevant data sources. All this is achieved using AWS services, thereby increasing the financial analyst’s efficiency to analyze multi-modal financial data (text, speech, and tabular data) holistically.

The following screenshot shows an example of the UI.

Solution overview

The following diagram illustrates the conceptual architecture to use generative AI with multi-modal data using agents. The steps involved are as follows:

  1. The financial analyst poses questions via a platform such as chatbots.
  2. The platform uses a framework to determine the most suitable multi-modal agent tool to answer the question.
  3. Once identified, the platform runs the code that is linked to the previously identified tool.
  4. The tool generates an analysis of the financial data as requested by the financial analyst.
  5. In summarizing the results, large language models retrieve and report back to the financial analyst.

Technical architecture

The multi-modal agent orchestrates various tools based on natural language prompts from business users to generate insights. For unstructured data, the agent uses AWS Lambda functions with AI services such as Amazon Textract for document analysis, Amazon Transcribe for speech recognition, Amazon Comprehend for NLP, and Amazon Kendra for intelligent search. For structured data, the agent uses the SQL Connector and SQLAlchemy to analyze databases, which includes Amazon Athena. The agent also utilizes Python in Lambda and the Amazon SageMaker SDK for computations and quantitative modeling. The agent also has long-term memory for storing prompts and results in Amazon DynamoDB. The multi-modal agent resides in a SageMaker notebook and coordinates these tools based on English prompts from business users in a Streamlit UI.

The key components of the technical architecture are as follows:

  • Data storage and analytics – The quarterly financial earning recordings as audio files, financial annual reports as PDF files, and S&P stock data as CSV files are hosted on Amazon Simple Storage Service (Amazon S3). Data exploration on stock data is done using Athena.
  • Large language models – The large language models (LLMs) are available via Amazon Bedrock, SageMaker JumpStart, or an API.
  • Agents – We use LangChain’s agents for a non-predetermined chain of calls as user input to LLMs and other tools. In these types of chains, there is an agent that has access to a suite of tools. Each tool has been built for a specific task. Depending on the user input, the agent decides the tool or a combination of tools to call to answer the question. We created the following purpose-built agent tools for our scenario:
    • Stocks Querying Tool – To query S&P stocks data using Athena and SQLAlchemy.
    • Portfolio Optimization Tool – To build a portfolio based on the chosen stocks.
    • Financial Information Lookup Tool – To search for financial earnings information stored in multi-page PDF files using Amazon Kendra.
    • Python Calculation Tool – To use for mathematical calculations.
    • Sentiment Analysis Tool – To identify and score sentiments on a topic using Amazon Comprehend.
    • Detect Phrases Tool – To find key phrases in recent quarterly reports using Amazon Comprehend.
    • Text Extraction Tool – To convert the PDF versions of quarterly reports to text files using Amazon Textract.
    • Transcribe Audio Tool – To convert audio recordings to text files using Amazon Transcribe.

The agent memory that holds the chain of user interactions with the agent is saved in DynamoDB.

The following sections explain some of the primary steps with associated code. To dive deeper into the solution and code for all the steps shown here, refer to the GitHub repo.

Prerequisites

To run this solution, you must have an API key to an LLM such as Anthropic Claud2, or have access to Amazon Bedrock foundation models.

To generate responses from structured and unstructured data using LLMs and LangChain, you need access to LLMs through either Amazon Bedrock, SageMaker JumpStart, or API keys, and to use databases that are compatible with SQLAlchemy. AWS Identity and Access Management (IAM) policies are also required, the details which you can find in the GitHub repo.

Key components of a multi-modal agent

There are a few key components components of the multi-modal agent:

  • Functions defined for tools of the multi-modal agent
  • Tools defined for the multi-modal agent
  • Long-term memory for the multi-modal agent
  • Planner-executor based multi-modal agent (defined with tools, LLMs, and memory)

In this section, we illustrate the key components with associated code snippets.

Functions defined for tools of the multi-modal agent

The multi-modal agent needs to use various AI services to process different types of data—text, speech, images, and more. Some of these functions may need to call AWS AI services like Amazon Comprehend to analyze text, Amazon Textract to analyze images and documents, and Amazon Transcribe to convert speech to text. These functions can either be called locally within the agent or deployed as Lambda functions that the agent can invoke. The Lambda functions internally call the relevant AWS AI services and return the results to the agent. This approach modularizes the logic and makes the agent more maintainable and extensible.

The following function defines how to calculate the optimized portfolio based on the chosen stocks. One way to convert a Python-based function to an LLM tool is to use the BaseTool wrapper.

class OptimizePortfolio(BaseTool):

name = "Portfolio Optimization Tool"
description = """
use this tool when you need to build optimal portfolio or for optimization of stock price.
The stock_ls should be a list of stock symbols, such as ['WWW', 'AAA', 'GGGG'].
"""

def _run(self, stock_ls: List):

session = boto3.Session(region_name=region_name)
athena_client = session.client('athena')

database=database_name
table=table_Name
...

The following is the code for Lambda calling the AWS AI service (Amazon Comprehend, Amazon Textract, Amazon Transcribe) APIs:

def SentimentAnalysis(inputString):
print(inputString)
lambda_client = boto3.client('lambda')
lambda_payload = {"inputString:"+inputString}
response=lambda_client.invoke(FunctionName='FSI-SentimentDetecttion',
InvocationType='RequestResponse',
Payload=json.dumps(inputString))
print(response['Payload'].read())
return response

Tools defined for the multi-modal agent

The multi-modal agent has access to various tools to enable its functionality. It can query a stocks database to answer questions on stocks. It can optimize a portfolio using a dedicated tool. It can retrieve information from Amazon Kendra, Amazon’s enterprise search service. A Python REPL tool allows the agent to run Python code. An example of the structure of the tools, including their names and descriptions, is shown in the following code. The actual tool box of this post has eight tools: Stocks Querying Tool, Portfolio Optimization Tool, Financial Information Lookup Tool, Python Calculation Tool, Sentiment Analysis Tool, Detect Phrases Tool, Text Extraction Tool, and Transcribe Audio Tool.

tools = [
Tool(
name="Financial Information Lookup Tool",
func=run_chain,
description="""
Useful for when you need to look up financial information using Kendra.
"""
),
Tool(
name="Sentiment Analysis Tool",
func=SentimentAnalysis,
description="""
Useful for when you need to analyze the sentiment of a topic.
"""
),
Tool(
name="Detect Phrases Tool",
func=DetectKeyPhrases,
description="""
Useful for when you need to detect key phrases in recent quaterly reports.
"""
),
...
]

Long-term memory for the multi-modal agent

The following code illustrates the configuration of long-term memory for the multi-modal agent. In this code, DynamoDB table is added as memory to store prompts and answers for future reference.

chat_history_table = dynamodb_table_name

chat_history_memory = DynamoDBChatMessageHistory(table_name=chat_history_table, session_id=chat_session_id)
memory = ConversationBufferMemory(memory_key="chat_history",
chat_memory=chat_history_memory, return_messages=True)

Planner-executor based multi-modal agent

The planner-executor based multi-modal agent architecture has two main components: a planner and an executor. The planner generates a high-level plan with steps required to run and answer the prompt question. The executor then runs this plan by generating appropriate system responses for each plan step using the language model with necessary tools. See the following code:

llm = ChatAnthropic(temperature=0, anthropic_api_key=ANTHROPIC_API_KEY, max_tokens_to_sample = 512)
model = llm

planner = load_chat_planner(model)

system_message_prompt = SystemMessagePromptTemplate.from_template(combo_template)
human_message_prompt = planner.llm_chain.prompt.messages[1]
planner.llm_chain.prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

executor = load_agent_executor(model, tools, verbose=True)
agent = PlanAndExecute(planner=planner, executor=executor, verbose=True, max_iterations=2)

Example scenarios based on questions asked by financial analyst

In this section, we explore two example scenarios to illustrate the end-to-end steps performed by the multi-modal agent based on questions asked by financial analyst.

Scenario 1: Questions by financial analyst related to structured data

In this scenario, the financial analyst asks a question in English related to companies’ stocks to the multi-modal agent. The multi-modal LangChain agent comes up with a multi-step plan and decides what tools to use for each step. The following diagram illustrates an example workflow with the following steps:

  1. The financial analyst asks a financial question in English through the UI to the multi-modal agent.
  2. The agent identifies that it requires the database tool to answer the question. It generates a SQL query using an LLM based on the question and queries the Athena database.
  3. Athena runs the SQL query, retrieves the relevant result (stock price time series of the five companies), and passes the result with relevant data to the agent.
  4. The agent identifies that it requires a second tool to answer the question. It passes the retrieved data to the Python tool for portfolio optimization.
  5. The Python tool calculates the required optimal portfolio, including weights, and passes the answer to the LLM within the agent.
  6. The agent compiles the required information and calculations using the LLM and answers the financial analysist via the UI in English.

The financial analyst asks the following question:

What are the closing prices of stocks AAAA, WWW, DDD in year 2018? Can you build an optimized portfolio using these three stocks?

The following code shows the steps identified by the planner of the multi-modal agent:

steps=[Step(value='Use Stocks Querying Tool to find closing stock prices of AAAA, WWW and DDD in year 2018.'),
Step(value='Use Portfolio Optimization Tool to build an optimized portfolio using the three stocks.'),
Step(value='Provide the closing stock prices and optimized portfolio allocation.')]

The agent translated the question prompt into the following SQL query:

SELECT date, aaaa, www, ddd
FROM "blog-stock-prices-db"."stock_prices"
WHERE date BETWEEN '2018-01-01' AND '2018-12-31'
ORDER BY date DESC

The following code shows the final answer by the multi-modal agent based on the question:

The closing stock prices for AAAA, WWW and DDD in 2018 are:

AAAA: $172.44
WWW: $85.91
DDD: $9.82

The optimized portfolio allocation for stocks AAAA, WWW and DDD is:

AAAA: 55%
WWW: 45%
DDD: 0%'

Scenario 2: Questions by financial analyst related to unstructured data

In this scenario, a financial analyst asks multiple questions about the company’s financial health to the multi-modal agent. The agent invokes two tool agents, Amazon Kendra with unstructured data and a Python compute tool, to gather information and perform the necessary calculations. The agent then compiles the information from the tools using its language model and provides the full answer to the analyst. The key of this scenario is that the agent can remember the questions and answers of the previous questions, and incorporates the context into the conversation. The following diagram illustrates the workflow, which consists of the following steps:

  1. The financial analyst asks a financial question in English through the UI to the multi-modal agent.
  2. The agent identifies that it requires the Amazon Kendra tool to answer the question and invokes the Amazon Kendra agent tool.
  3. Amazon Kendra searches the unstructured data corpus in Amazon S3 and retrieves the relevant document containing the answer.
  4. Using Retrieval Augmented Generation (RAG), Amazon Kendra passes the answer from the relevant document to the LLM within the agent.
  5. The agent identifies that it requires a second tool to answer the second part of the question. It passes information from the first tool to the Python compute tool.
  6. The Python tool calculates the required ratio and passes the answer to the LLM within the agent.
  7. The agent compiles the required information and calculations using the LLM and answers the financial analyst via the UI.

The following are questions and answers between the analyst and multi-modal agent.

The agent answers the question “What is Amazon’s net loss for fiscal year ending in December 2022?” with “Amazon reported total net sales of $514.0 billion and a net loss of $2.7 billion for fiscal year 2022.” The source is the Amazon 10K report for the fiscal year ending December 2022, retrieved using the Retrive_Kendra Tool.

For the question “What is the net loss for the same year ?” the agent answers “$2.7 billion.” The source is the Amazon 10K report for the fiscal year ending December 2022, retrieved using the Retrive_Kendra Tool. Note that even though the question is similar to the second question, this question didn’t specify the company name or the year number. However, the agent was able to use the correct company name and year based on the context in conversation memory.

For the question “What is the formula for Net Profit Margin Ratio which is a type of profitability ratio ?” the agent answers “Amazon reported total net sales of $514.0 billion and a net loss of $2.7 billion for fiscal year 2022. The Net Profit Margin Ratio for Amazon for fiscal year 2022 is -0.5%.” The source is Retrieve_Kendra and PythonREPLTool().

Dive deeper into the solution

To dive deeper into the solution and the code shown in this post, check out the GitHub repo.

In the GitHub repo, you will be able to find a Python notebook that has the end-to-end solution, an AWS CloudFormation template for provisioning the infrastructure, unstructured data (earnings reports PDF files, earnings call audio files), and structured data (stocks time series).

In the appendix at the end, different questions asked by financial analyst, agent tools invoked, and the answer from the multi-modal agent has been tabulated.

Clean up

After you run the multi-modal agent, make sure to clean up any resources that won’t be utilized. Shut down and delete the databases used (Athena). In addition, delete the data in Amazon S3 and stop any SageMaker Studio notebook instances to not incur any further charges. Delete the Lambda functions and DynamoDB tables as part of long-term memory that aren’t used. If you used SageMaker JumpStart to deploy an LLM as a SageMaker real-time endpoint, delete the endpoint through either the SageMaker console or SageMaker Studio.

Conclusion

This post demonstrated the wide range of AWS storage, AI/ML, and compute services that you can use to build an advanced multi-modal AI solution along with the LangChain framework and generative AI. By integrating NLP, speech recognition, and ML technologies, the multi-modal agent developed in this scenario is able to analyze diverse financial data modalities (structured and unstructured) and respond to prompts in natural language (English). This boosts the productivity and efficiency of financial analysts, allowing them to focus on high-value, strategic work. Multi-modal AI solutions powered by AWS have the potential to transform financial markets by automating repetitive tasks, enhancing human capabilities, and generating data-driven insights efficiently. For structured data, a modern data architecture with multiple databases can be incorporated with generative AI and LLM. For more details, refer to Reinventing the data experience: Use generative AI and modern data architecture to unlock insights. Also, refer to Amazon Bedrock for use cases on generative AI, foundation models, and large language models.

Appendix

The following table contains example questions by a financial analyst to ask the multi-modal agent based on structured and unstructured data.

Question Purpose Question asked by financial analysts through UI Agent tools invoked Primary AWS services used by Agent tools Answer from multi-modal agent
Prompt the LangChain agent to preprocess and extract information from the collected data Convert the Amazon`s recent quarters 10k and 10q files to text format 1. Text Extraction Tool
  1. Amazon Textract
  2. AWS Lambda
  3. Amazon S3
The text files containing Amazon's financial filings have been cleaned and are ready for analysis.
Prompt the LangChain agent to transcribe the audio recordings to texts Can you transcribe most recent quaterly audio recording of earnings call to text format?
  1. Transcribe Audio Tool
  1. Amazon Transcribe
  2. AWS Lambda
  3. Amazon S3
I have the earnings call transcripts, now I can extract key phrases.
Prompt the LangChain agent to analyze the company’s financial health and performance Analyze Amazon's financial health based on the 10k report. Calculate important financial ratios. Please limit your answer to 5 sentences.
  1. Financial Information Lookup Tool
  2. Python Calculation Tool
  1. Amazon Kendra
  2. AWS Lambda
  3. Amazon S3
Here is a summary of Amazon’s financial health in 5 sentences:  Amazon has a low profit margin indicating low profitability relative to revenue. However, Amazon has a high return on assets indicating efficient use of assets to generate profits. Amazon has a low debt-to-equity ratio indicating low financial leverage and risk. Amazon has a high current ratio indicating good short-term liquidity position and ability to pay off short-term obligations. Overall, Amazon seems to be in a good financial position with efficient operations and low risk.
Prompt the LangChain agent to analyze the company’s financial performance from company’s earnings reports What were the main drivers of the company’s revenue growth in the quarter?  Please limit your answers to 5 sentences.
  1. Detect Phrases Tool
  2. Financial Information Lookup Tool
  1. Amazon Comprehend
  2. Amazon Kendra
  3. AWS Lambda
  4. Amazon S3
Here is a summary of the main drivers of revenue growth in 5 sentences: Amazon's revenue grew due to an increase in subscription and support revenue from new customers. Existing customer accounts also expanded, contributing to revenue growth. Higher net sales drove increased revenue for the company. Although I could not calculate the exact revenue growth percentage, the key phrases and information extracted point to revenue growth being driven primarily by new subscriptions, existing customer expansion, and greater net sales.
Prompt the LangChain agent to analyze the company’s management team and their plans for the future from company’s earnings reports What are the company's top priorities for the coming year? What are the biggest risks facing the company? Please limit your answers to 5 sentences.
  1. Sentiment Analysis Tool
  2. Detect Phrases Tool
  3. Financial Information Lookup Tool
  1. Amazon Comprehend
  2. Amazon Kendra
  3. AWS Lambda
  4. Amazon S3
Here is a summary response in 5 sentences: Amazon's priorities are investing in growth and expanding into new markets. However, risks like changes in consumer demand and competition threaten their business. By analyzing key phrases in financial reports, their focus on fulfillment, technology, and new products emerges, as do risks around suppliers, exchange rates, and retail competition. Though facing pressures, Amazon aims to innovate, improve customer experience, and strengthen their position. Continuous progress on price, speed, and selection will drive their competitive advantage.
Prompt the LangChain agent to analyze the company’s strategy from company’s earnings reports What are company’s financial and business goals for upcoming financial year/quarter? Please limit your answers to 5 sentences.
  1. Financial Information Lookup Tool
  2. Detect Phrases Tool
  1. Amazon Comprehend
  2. Amazon Kendra
  3. AWS Lambda
  4. Amazon S3
Here is a summary of Amazon's main goals in 5 sentences: Amazon seeks to grow by improving infrastructure and entering new markets.  They will invest in fulfillment and technology to enable expansion.  By moving into groceries, healthcare, and entertainment, Amazon can diversify their offerings.  Focusing on the customer through competitive pricing, selection, and innovation is key.  If Amazon achieves operational excellence and continuous progress, they will strengthen their position.
Prompt the LangChain agent to perform sentiment analysis on the earnings call recordings What is the sentiment around inflation in Amazon's earnings call?
  1. Sentiment Analysis Tool
  1. Amazon Comprehend
  2. AWS Lambda
  3. Amazon S3
The sentiment of mentions of 'inflation' in the Amazon earnings call transcript is negative. Justification: Quote 1: "We're seeing inflationary pressures, especially in fuel and transportation costs." Quote 2: "Inflation is a concern and we're seeing cost inflation, which is an unusual thing for us.
Prompt the LangChain agent to build an optimal portfolio using the collected data What are the closing prices of stocks AAAA, WWW, DDD in year 2018? Can you build an optimized portfolio using these three stocks?
  1. Portfolio Optimization Tool
  2. Stocks Querying Tool
  1. Amazon SageMaker
  2. Amazon Athena
The closing stock prices for AAAA, WWW and DDD in 2018 are:
AAAA: $172.44
WWW: $85.91
DDD: $9.82The optimized portfolio allocation for stocks AAAA, WWW and DDD is:
AAAA: 55%
WWW: 45%
DDD: 0%'

About the Authors

Sovik Kumar Nath is an AI/ML solution architect with AWS. He has extensive experience designing end-to-end machine learning and business analytics solutions in finance, operations, marketing, healthcare, supply chain management, and IoT. Sovik has published articles and holds a patent in ML model monitoring. He has double masters degrees from the University of South Florida, University of Fribourg, Switzerland, and a bachelors degree from the Indian Institute of Technology, Kharagpur. Outside of work, Sovik enjoys traveling, taking ferry rides, and watching movies.

Mohan Musti is Senior Technical Account Manger based out of Dallas. Mohan helps customers architect and optimize applications on AWS. Mohan has Computer Science and Engineering from JNT University ,India. In his spare time, he enjoys spending time with his family and camping.

Jia (Vivian) Li is a Senior Solutions Architect in AWS, with specialization in AI/ML. She currently supports customers in financial industry. Prior to joining AWS in 2022, she had 7 years of experience supporting enterprise customers use AI/ML in the cloud to drive business results. Vivian has a BS from Peking University and a PhD from University of Southern California. In her spare time, she enjoys all the water activities, and hiking in the beautiful mountains in her home state, Colorado.

Uchenna Egbe is an AIML Solutions Architect who enjoys building reusable AIML solutions. Uchenna has an MS from the University of Alaska Fairbanks. He spends his free time researching about herbs, teas, superfoods, and how to incorporate them into his daily diet.

Navneet Tuteja is a Data Specialist at Amazon Web Services. Before joining AWS, Navneet worked as a facilitator for organizations seeking to modernize their data architectures and implement comprehensive AI/ML solutions. She holds an engineering degree from Thapar University, as well as a master’s degree in statistics from Texas A&M University.

Praful Kava is a Sr. Specialist Solutions Architect at AWS. He guides customers to design and engineer Cloud scale Analytics pipelines on AWS. Outside work, he enjoys travelling with his family and exploring new hiking trails.

Read More

How VirtuSwap accelerates their pandas-based trading simulations with an Amazon SageMaker Studio custom container and AWS GPU instances

How VirtuSwap accelerates their pandas-based trading simulations with an Amazon SageMaker Studio custom container and AWS GPU instances

This post is written in collaboration with Dima Zadorozhny and Fuad Babaev from VirtuSwap.

VirtuSwap is a startup company developing innovative technology for decentralized exchange of assets on blockchains. VirtuSwap’s technology provides more efficient trading for assets that don’t have a direct pair between them. The absence of a direct pair leads to costly indirect trading, meaning that two or more trades are required to complete a desired swap, leading to double or triple trading costs. VirtuSwap’s Reserve-based Virtual Pools technology solves the problem by making every trade direct, saving up to 50% of trading costs. Read more at virtuswap.io.

In this post, we share how VirtuSwap used the bring-your-own-container feature in Amazon SageMaker Studio to build a robust environment to host their GPU-intensive simulations to solve linear optimization problems.

The challenge

The VirtuSwap Minerva engine creates recommendations for optimal distribution of liquidity between different liquidity pools, while taking into account multiple parameters, such as trading volumes, current market liquidity, and volatilities of traded assets, constrained by a total amount of liquidity available for distribution. To provide these recomndations, VirtuSwap Minerva uses thousands of historical trading pairs to simulate their run through various liquidity configurations to find the optimal distribution of liquidity, pool fees, and more.

The initial implementation was coded using pandas dataframes. However, as the simulation data grew, the runtime nearly quadrupled, along with the size of the problem. The result of this was that iterations slowed down and it was almost impossible to run larger dimensionality tasks. VirtuSwap realized that they needed to use GPU instances for the simulation to allow faster results.

VirtuSwap needed a GPU-compatible pandas-like library to run their simulation and chose cuDF, a GPU DataFrame library by Rapids. cuDF is used for loading, joining, aggregating, filtering, and otherwise manipulating data, in a pandas-like API that accelerates the work on dataframes, using CUDA for significantly faster performance than pandas.

Solution overview

VirtuSwap chose SageMaker Studio for end-to-end development, starting with iterative, interactive development in notebooks. Due to the flexibility of SageMaker Studio, they decided to use it for their simulation as well, taking advantage of Amazon SageMaker custom images, which allow VirtuSwap to bring their own custom libraries and software needed, such as cuDF. The following diagram illustrates the solution workflow.

In the following sections, we share the step-by-step instructions to build and use a Rapids cuDF image in SageMaker.

Prerequisites

To run this step-by-step guide, you need an AWS account with permissions to SageMaker, Amazon Elastic Container Registry (Amazon ECR), AWS Identity and Access Management (IAM), and AWS CodeBuild. In addition, you need to have a SageMaker domain ready.

Create IAM roles and policies

For the build process of SageMaker custom notebooks, we used AWS CloudShell, which provides all the required packages to build the custom image. In CloudShell, we used SageMaker Docker Build, a CLI for building Docker images for and in SageMaker Studio. The CLI can create the repository in Amazon ECR and build the container using CodeBuild. For that, we need to provide the tool an IAM role with proper permissions. Complete the following steps:

  1. Sign in to the AWS Management Console and open the IAM console.
  2. In the navigation pane on the left, choose Policies.
  3. Create a policy named sm-build-policy with the following permissions:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "codebuild:DeleteProject",
                    "codebuild:CreateProject",
                    "codebuild:BatchGetBuilds",
                    "codebuild:StartBuild"
                ],
                "Resource": "arn:aws:codebuild:*:*:project/sagemaker-studio*"
            },
            {
                "Effect": "Allow",
                "Action": "logs:CreateLogStream",
                "Resource": "arn:aws:logs:*:*:log-group:/aws/codebuild/sagemaker-studio*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "logs:GetLogEvents",
                    "logs:PutLogEvents"
                ],
                "Resource": "arn:aws:logs:*:*:log-group:/aws/codebuild/sagemaker-studio*:log-stream:*"
            },
            {
                "Effect": "Allow",
                "Action": "logs:CreateLogGroup",
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "ecr:CreateRepository",
                    "ecr:BatchGetImage",
                    "ecr:CompleteLayerUpload",
                    "ecr:DescribeImages",
                    "ecr:DescribeRepositories",
                    "ecr:UploadLayerPart",
                    "ecr:ListImages",
                    "ecr:InitiateLayerUpload",
                    "ecr:BatchCheckLayerAvailability",
                    "ecr:PutImage"
                ],
                "Resource": "arn:aws:ecr:*:*:repository/sagemaker-studio*"
            },
            {
                "Sid": "ReadAccessToPrebuiltAwsImages",
                "Effect": "Allow",
                "Action": [
                    "ecr:BatchGetImage",
                    "ecr:GetDownloadUrlForLayer"
                ],
                "Resource": [
                    "arn:aws:ecr:*:763104351884:repository/*",
                    "arn:aws:ecr:*:217643126080:repository/*",
                    "arn:aws:ecr:*:727897471807:repository/*",
                    "arn:aws:ecr:*:626614931356:repository/*",
                    "arn:aws:ecr:*:683313688378:repository/*",
                    "arn:aws:ecr:*:520713654638:repository/*",
                    "arn:aws:ecr:*:462105765813:repository/*"
                ]
            },
            {
                "Sid": "EcrAuthorizationTokenRetrieval",
                "Effect": "Allow",
                "Action": [
                    "ecr:GetAuthorizationToken"
                ],
                "Resource": [
                    "*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:DeleteObject",
                    "s3:PutObject"
                ],
                "Resource": "arn:aws:s3:::sagemaker-*/*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:CreateBucket"
                ],
                "Resource": "arn:aws:s3:::sagemaker*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:GetRole",
                    "iam:ListRoles"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": "iam:PassRole",
                "Resource": "arn:aws:iam::*:role/*",
                "Condition": {
                    "StringLikeIfExists": {
                        "iam:PassedToService": "codebuild.amazonaws.com"
                    }
                }
            },
            {
                "Effect": "Allow",
                "Action": [
                    "ecr:CreateRepository",
                    "ecr:BatchGetImage",
                    "ecr:CompleteLayerUpload",
                    "ecr:DescribeImages",
                    "ecr:DescribeRepositories",
                    "ecr:UploadLayerPart",
                    "ecr:ListImages",
                    "ecr:InitiateLayerUpload",
                    "ecr:BatchCheckLayerAvailability",
                    "ecr:PutImage"
                ],
                "Resource": "arn:aws:ecr:*:*:repository/*"
            }
        ]
    }

The permissions provide the ability to utilize the utility in full: create repositories, create a CodeBuild job, use Amazon Simple Storage Service (Amazon S3), and send logs to Amazon CloudWatch.

  1. Create a role named sm-build-role with the following trust policy, and add the policy sm-build-policy that you created earlier:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "codebuild.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }

Now, let’s review the steps in CloudShell.

Create a cuDF Docker image in CloudShell

For our purposes, we needed a Rapids CUDA image, which also includes an ipykernel, so that the image can be used in a SageMaker Studio notebook.

We use an existing CUDA image by RapidsAI that is available in the official Rapids AI Docker hub, and add the ipykernel installation.

In a CloudShell terminal, run the following command:

printf "FROM nvcr.io/nvidia/rapidsai/rapidsai:0.16-cuda10.1-base-ubuntu18.04
RUN pip install ipykernel && 
python -m ipykernel install --sys-prefix &&  
useradd --create-home --shell /bin/bash --gid 100 --uid 1000 sagemaker-user
USER sagemaker-user" > Dockerfile

This will create the Dockerfile that will build our custom Docker image for SageMaker.

Build and push the image to a repository

As mentioned, we used the SageMaker Docker Build library, which allows data scientists and developers to easily build custom container images. For more information, refer to Using the Amazon SageMaker Studio Image Build CLI to build container images from your Studio notebooks.

The following command creates an ECR repository (if the repository doesn’t exist). sm-docker will create it, and build and push the new Docker image to the created repository:

sm-docker build . --repository rapids:v1 --role sm-build-role

In case you are missing sm-docker in your CloudShell, run the following code:

pip3 install sagemaker-studio-image-build

On completion, the ECR image URI will be returned.

Create a SageMaker custom image

After you have created a custom Docker image and pushed it to your container repository (Amazon ECR), you can configure SageMaker to use that custom Docker image. Complete the following steps:

  1. On the SageMaker console, choose Images in the navigation pane.
  2. Choose Create image.
  3. Enter the image URI output from the previous section, then choose Next.
  4. For Image name and Image display name, enter rapids.
  5. For Description, enter a description.
  6. For IAM role, choose the proper IAM role for your SageMaker domain.
  7. For EFS mount path, enter /home/sagemaker-user (default).
  8. Expand Advanced configuration.
  9. For User ID, enter 1000.
  10. For Group ID, enter 100.

  1. In the Image type section, select SageMaker Studio Image.
  2. Choose Add kernel.
  3. For Kernel name, enter conda-env-rapids-py.
  4. For Kernel display name, enter rapids.
  5. Choose Submit to create the SageMaker image.

Attach the new image to your SageMaker Studio domain

Now that you have created the custom image, you need to make it available to use by attaching the image to your domain. Complete the following steps:

  1. On the SageMaker console, choose Domains in the navigation pane.
  2. Choose your domain. This step is optional; you can create and attach the custom image directly from the domain and skip this step.

  1. On the domain details page, choose the Environment tab, then choose Attach image.
  2. Select Existing image and select the new image (rapids) from the list.
  3. Choose Next.

  1. Review the custom image configuration and make sure to set Image type as SageMaker Studio Image, as in the previous step, with the same kernel name and kernel display name.
  2. Choose Submit.

The custom image is now available in SageMaker Studio and ready for use.

Create a new notebook with the image

For instructions to launch a new notebook, refer to Launch a custom SageMaker image in Amazon SageMaker Studio. Complete the following steps:

  1. On the SageMaker Studio console, choose Open launcher.
  2. Choose Change environment.

  1. For Image, choose the newly created image, rapids v1.
  2. For Kernel, choose rapids.
  3. For Instance type¸ choose your instance.

SageMaker Studio provides the option to customize your computing power by choosing an instance from the AWS accelerated compute, general purpose compute, compute optimized, or memory optimized families. This flexibility allowed you to seamlessly transition between CPUs and GPUs, as well as dynamically scale up or down the instance sizes as needed. For our notebook, we used the ml.g4dn.2xlarge instance type to test cuDF performance while utilizing GPU accelerator.

  1. Choose Select.

  1. Select your environment and choose Create notebook, then wait until the notebook kernel becomes ready.

Validate your custom image

To validate that your custom image was launched and cuDF is ready to use, create a new cell, enter import cudf, and run it.

Clean up

Power off the Jupyter instance running the test notebook in SageMaker Studio by choosing Running Terminals and Kernels and powering off the running instance.

Runtime comparison results

We conducted a runtime comparison of our code using both CPU and GPU on SageMaker g4dn.2xlarge instances, with a time complexity of O(N). The results, as shown in the following figure, reveal the efficiency of using GPUs over CPUs.

The main advantage of GPUs lies in their ability to perform parallel processing. As we increase the value of N, the runtime on CPUs increases at a rate of 3N. On the other hand, with GPUs, the rate of increase can be described as 2N, as illustrated in the preceding figure. The larger the problem size, the more efficient the GPU becomes. In our case, using a GPU was at least 20 times faster than using a CPU. This highlights the growing importance of GPUs in modern computing, especially for tasks that require large amounts of data to be processed quickly.

With SageMaker GPU instances, VirtuSwap is able to dramatically increase the dimensionality of the solved problems and find solutions faster.

Conclusion

In this post, we showed how VirtuSwap customized SageMaker Studio by using a custom image to solve a complex problem. With the ability to easily change the run environment and switch between different instances, sizes, and kernels, VirtuSwap was able to experiment fast and speed up the runtime by 15x and deliver a scalable solution.

As a next step, VirtuSwap is considering broadening their usage of SageMaker and running their processing in Amazon SageMaker Processing to process the massive data they’re collecting from various blockchains into their platform.


About the Authors

Adir Sharabi is a Principal Solutions Architect with Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud. He is also passionate about Data and helping customers to get the most out of it.

Omer Haim is a Senior Startup Solutions Architect at Amazon Web Services. He helps startups with their cloud journey, and is passionate about containers and ML. In his spare time, Omer likes to travel, and occasionally game with his son.

Dmitry Zadorozhny is a data analyst at virtuswap.io. He is responsible for data mining, processing and storage, as well as integrating cloud services such as AWS. Prior to joining virtuswap, he worked in the data science field and was an analytics ambassador lead at dydx foundation. Dima has a M.Sc in Computer Science. Dima enjoys playing computer games in his spare time.

Fuad Babaev serves as a Data Science Specialist at Virtuswap (virtuswap.io). He brings expertise in tackling complex optimization challenges, crafting simulations, and architecting models for trade processes. Outside of his professional career Fuad has a passion in playing chess.

Read More

Unlock ML insights using the Amazon SageMaker Feature Store Feature Processor

Unlock ML insights using the Amazon SageMaker Feature Store Feature Processor

Amazon SageMaker Feature Store provides an end-to-end solution to automate feature engineering for machine learning (ML). For many ML use cases, raw data like log files, sensor readings, or transaction records need to be transformed into meaningful features that are optimized for model training.

Feature quality is critical to ensure a highly accurate ML model. Transforming raw data into features using aggregation, encoding, normalization, and other operations is often needed and can require significant effort. Engineers must manually write custom data preprocessing and aggregation logic in Python or Spark for each use case.

This undifferentiated heavy lifting is cumbersome, repetitive, and error-prone. The SageMaker Feature Store Feature Processor reduces this burden by automatically transforming raw data into aggregated features suitable for batch training ML models. It lets engineers provide simple data transformation functions, then handles running them at scale on Spark and managing the underlying infrastructure. This enables data scientists and data engineers to focus on the feature engineering logic rather than implementation details.

In this post, we demonstrate how a car sales company can use the Feature Processor to transform raw sales transaction data into features in three steps:

  1. Local runs of data transformations.
  2. Remote runs at scale using Spark.
  3. Operationalization via pipelines.

We show how SageMaker Feature Store ingests the raw data, runs feature transformations remotely using Spark, and loads the resulting aggregated features into a feature group. These engineered features are can then be used to train ML models.

For this use case, we see how SageMaker Feature Store helps convert the raw car sales data into structured features. These features are subsequently used to gain insights like:

  • Average and maximum price of red convertibles from 2010
  • Models with best mileage vs. price
  • Sales trends of new vs. used cars over the years
  • Differences in average MSRP across locations

We also see how SageMaker Feature Store pipelines keep the features updated as new data comes in, enabling the company to continually gain insights over time.

Solution overview

We work with the dataset car_data.csv, which contains specifications such as model, year, status, mileage, price, and MSRP for used and new cars sold by the company. The following screenshot shows an example of the dataset.

"Image displaying a table of car data, including car model, year, mileage, price, and MSRP for various vehicles."

The solution notebook feature_processor.ipynb contains the following main steps, which we explain in this post:

  1. Create two feature groups: one called car-data for raw car sales records and another called car-data-aggregated for aggregated car sales records.
  2. Use the @feature_processor decorator to load data into the car-data feature group from Amazon Simple Storage Service (Amazon S3).
  3. Run the @feature_processor code remotely as a Spark application to aggregate the data.
  4. Operationalize the feature processor via SageMaker pipelines and schedule runs.
  5. Explore the feature processing pipelines and lineage in Amazon SageMaker Studio.
  6. Use aggregated features to train an ML model.

Prerequisites

To follow this tutorial, you need the following:

For this post, we refer to the following notebook, which demonstrates how to get started with Feature Processor using the SageMaker Python SDK.

Create feature groups

To create the feature groups, complete the following steps:

  1. Create a feature group definition for car-data as follows:
    # Feature Group - Car Sales CAR_SALES_FG_NAME = "car-data"
    CAR_SALES_FG_ARN = f"arn:aws:sagemaker:{region}:{aws_account_id}:feature-group/{CAR_SALES_FG_NAME}"
    CAR_SALES_FG_ROLE_ARN = offline_store_role
    CAR_SALES_FG_OFFLINE_STORE_S3_URI = f"s3://{s3_bucket}/{s3_offline_store_prefix}"
    CAR_SALES_FG_FEATURE_DEFINITIONS = [
        FeatureDefinition(feature_name="id", feature_type=FeatureTypeEnum.STRING),
        FeatureDefinition(feature_name="model", feature_type=FeatureTypeEnum.STRING),
        FeatureDefinition(feature_name="model_year", feature_type=FeatureTypeEnum.STRING),
        FeatureDefinition(feature_name="status", feature_type=FeatureTypeEnum.STRING),
        FeatureDefinition(feature_name="mileage", feature_type=FeatureTypeEnum.STRING),
        FeatureDefinition(feature_name="price", feature_type=FeatureTypeEnum.STRING),
        FeatureDefinition(feature_name="msrp", feature_type=FeatureTypeEnum.STRING),
        FeatureDefinition(feature_name="ingest_time", feature_type=FeatureTypeEnum.FRACTIONAL),
    ]

The features correspond to each column in the car_data.csv dataset (Model, Year, Status, Mileage, Price, and MSRP).

  1. Add the record identifier id and event time ingest_time to the feature group:
CAR_SALES_FG_RECORD_IDENTIFIER_NAME = "id"
CAR_SALES_FG_EVENT_TIME_FEATURE_NAME = "ingest_time"
  1. Create a feature group definition for car-data-aggregated as follows:
# Feature Group - Aggregated Car SalesAGG_CAR_SALES_FG_NAME = "car-data-aggregated"
AGG_CAR_SALES_FG_ARN = (
    f"arn:aws:sagemaker:{region}:{aws_account_id}:feature-group/{AGG_CAR_SALES_FG_NAME}"
)
AGG_CAR_SALES_FG_ROLE_ARN = offline_store_role
AGG_CAR_SALES_FG_OFFLINE_STORE_S3_URI = f"s3://{s3_bucket}/{s3_offline_store_prefix}"
AGG_CAR_SALES_FG_FEATURE_DEFINITIONS = [
    FeatureDefinition(feature_name="model_year_status", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="avg_mileage", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="max_mileage", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="avg_price", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="max_price", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="avg_msrp", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="max_msrp", feature_type=FeatureTypeEnum.STRING),
    FeatureDefinition(feature_name="ingest_time", feature_type=FeatureTypeEnum.FRACTIONAL),
]

For the aggregated feature group, the features are model year status, average mileage, max mileage, average price, max price, average MSRP, max MSRP, and ingest time. We add the record identifier model_year_status and event time ingest_time to this feature group.

  1. Now, create the car-data feature group:
# Create Feature Group - Car sale records.
car_sales_fg = FeatureGroup(
    name=CAR_SALES_FG_NAME,
    feature_definitions=CAR_SALES_FG_FEATURE_DEFINITIONS,
    sagemaker_session=sagemaker_session,
)

create_car_sales_fg_resp = car_sales_fg.create(
        record_identifier_name=CAR_SALES_FG_RECORD_IDENTIFIER_NAME,
        event_time_feature_name=CAR_SALES_FG_EVENT_TIME_FEATURE_NAME,
        s3_uri=CAR_SALES_FG_OFFLINE_STORE_S3_URI,
        enable_online_store=True,
        role_arn=CAR_SALES_FG_ROLE_ARN,
    )
  1. Create the car-data-aggregated feature group:
# Create Feature Group - Aggregated car sales records.
agg_car_sales_fg = FeatureGroup(
    name=AGG_CAR_SALES_FG_NAME,
    feature_definitions=AGG_CAR_SALES_FG_FEATURE_DEFINITIONS,
    sagemaker_session=sagemaker_session,
)

create_agg_car_sales_fg_resp = agg_car_sales_fg.create(
        record_identifier_name=AGG_CAR_SALES_FG_RECORD_IDENTIFIER_NAME,  
        event_time_feature_name=AGG_CAR_SALES_FG_EVENT_TIME_FEATURE_NAME,
        s3_uri=AGG_CAR_SALES_FG_OFFLINE_STORE_S3_URI,
        enable_online_store=True,
        role_arn=AGG_CAR_SALES_FG_ROLE_ARN,
    )

You can navigate to the SageMaker Feature Store option under Data on the SageMaker Studio Home menu to see the feature groups.

Image from Sagemaker Feature store with headers Feature group name and description

Use the @feature_processor decorator to load data

In this section, we locally transform the raw input data (car_data.csv) from Amazon S3 into the car-data feature group using the Feature Store Feature Processor. This initial local run allows us to develop and iterate before running remotely, and could be done on a sample of the data if desired for faster iteration.

With the @feature_processor decorator, your transformation function runs in a Spark runtime environment where the input arguments provided to your function and its return value are Spark DataFrames.

  1. Install the Feature Processor SDK from the SageMaker Python SDK and its extras using the following command:
pip install sagemaker[feature-processor]

The number of input parameters in your transformation function must match the number of inputs configured in the @feature_processor decorator. In this case, the @feature_processor decorator has car-data.csv as input and the car-data feature group as output, indicating this is a batch operation with the target_store as OfflineStore:

from sagemaker.feature_store.feature_processor import (
    feature_processor,
    FeatureGroupDataSource,
    CSVDataSource,
)

@feature_processor(
    inputs=[CSVDataSource(RAW_CAR_SALES_S3_URI)],
    output=CAR_SALES_FG_ARN,
    target_stores=["OfflineStore"],
)
  1. Define the transform() function to transform the data. This function performs the following actions:
    • Convert column names to lowercase.
    • Add the event time to the ingest_time column.
    • Remove punctuation and replace missing values with NA.
def transform(raw_s3_data_as_df):
    """Load data from S3, perform basic feature engineering, store it in a Feature Group"""
    from pyspark.sql.functions import regexp_replace
    from pyspark.sql.functions import lit
    import time

    transformed_df = (
        raw_s3_data_as_df.withColumn("Price", regexp_replace("Price", "$", ""))
        # Rename Columns
        .withColumnRenamed("Id", "id")
        .withColumnRenamed("Model", "model")
        .withColumnRenamed("Year", "model_year")
        .withColumnRenamed("Status", "status")
        .withColumnRenamed("Mileage", "mileage")
        .withColumnRenamed("Price", "price")
        .withColumnRenamed("MSRP", "msrp")
        # Add Event Time
        .withColumn("ingest_time", lit(int(time.time())))
        # Remove punctuation and fluff; replace with NA
        .withColumn("mileage", regexp_replace("mileage", "(,)|(mi.)", ""))
        .withColumn("mileage", regexp_replace("mileage", "Not available", "NA"))
        .withColumn("price", regexp_replace("price", ",", ""))
        .withColumn("msrp", regexp_replace("msrp", "(^MSRPs\$)|(,)", ""))
        .withColumn("msrp", regexp_replace("msrp", "Not specified", "NA"))
        .withColumn("msrp", regexp_replace("msrp", "\$d+[a-zA-Zs]+", "NA"))
        .withColumn("model", regexp_replace("model", "^dddds", ""))
    )
  1. Call the transform() function to store the data in the car-data feature group:
# Execute the FeatureProcessor
transform()

The output shows that the data is ingested successfully into the car-data feature group.

The output of the transform_df.show() function is as follows:

INFO:sagemaker:Ingesting transformed data to arn:aws:sagemaker:us-west-2:416578662734:feature-group/car-data with target_stores: ['OfflineStore']

+---+--------------------+----------+------+-------+--------+-----+-----------+
| id|               model|model_year|status|mileage|   price| msrp|ingest_time|
+---+--------------------+----------+------+-------+--------+-----+-----------+
|  0|    Acura TLX A-Spec|      2022|   New|     NA|49445.00|49445| 1686627154|
|  1|    Acura RDX A-Spec|      2023|   New|     NA|50895.00|   NA| 1686627154|
|  2|    Acura TLX Type S|      2023|   New|     NA|57745.00|   NA| 1686627154|
|  3|    Acura TLX Type S|      2023|   New|     NA|57545.00|   NA| 1686627154|
|  4|Acura MDX Sport H...|      2019|  Used| 32675 |40990.00|   NA| 1686627154|
|  5|    Acura TLX A-Spec|      2023|   New|     NA|50195.00|50195| 1686627154|
|  6|    Acura TLX A-Spec|      2023|   New|     NA|50195.00|50195| 1686627154|
|  7|    Acura TLX Type S|      2023|   New|     NA|57745.00|   NA| 1686627154|
|  8|    Acura TLX A-Spec|      2023|   New|     NA|47995.00|   NA| 1686627154|
|  9|    Acura TLX A-Spec|      2022|   New|     NA|49545.00|   NA| 1686627154|
| 10|Acura Integra w/A...|      2023|   New|     NA|36895.00|36895| 1686627154|
| 11|    Acura TLX A-Spec|      2023|   New|     NA|48395.00|48395| 1686627154|
| 12|Acura MDX Type S ...|      2023|   New|     NA|75590.00|   NA| 1686627154|
| 13|Acura RDX A-Spec ...|      2023|   New|     NA|55345.00|   NA| 1686627154|
| 14|    Acura TLX A-Spec|      2023|   New|     NA|50195.00|50195| 1686627154|
| 15|Acura RDX A-Spec ...|      2023|   New|     NA|55045.00|   NA| 1686627154|
| 16|    Acura TLX Type S|      2023|   New|     NA|56445.00|   NA| 1686627154|
| 17|    Acura TLX A-Spec|      2023|   New|     NA|47495.00|47495| 1686627154|
| 18|   Acura TLX Advance|      2023|   New|     NA|52245.00|52245| 1686627154|
| 19|    Acura TLX A-Spec|      2023|   New|     NA|50595.00|50595| 1686627154|
+---+--------------------+----------+------+-------+--------+-----+-----------+
only showing top 20 rows

We have successfully transformed the input data and ingested it in the car-data feature group.

Run the @feature_processor code remotely

In this section, we demonstrate running the feature processing code remotely as a Spark application using the @remote decorator described earlier. We run the feature processing remotely using Spark to scale to large datasets. Spark provides distributed processing on clusters to handle data that is too big for a single machine. The @remote decorator runs the local Python code as a single or multi-node SageMaker training job.

  1. Use the @remote decorator along with the @feature_processor decorator as follows:
@remote(spark_config=SparkConfig(), instance_type = "ml.m5.xlarge", ...)
@feature_processor(inputs=[FeatureGroupDataSource(CAR_SALES_FG_ARN)],
                   output=AGG_CAR_SALES_FG_ARN, target_stores=["OfflineStore"], enable_ingestion=False )

The spark_config parameter indicates this is run as a Spark application. The SparkConfig instance configures the Spark configuration and dependencies.

  1. Define the aggregate() function to aggregate the data using PySpark SQL and user-defined functions (UDFs). This function performs the following actions:
    • Concatenate model, year, and status to create model_year_status.
    • Take the average of price to create avg_price.
    • Take the max value of price to create max_price.
    • Take the average of mileage to create avg_mileage.
    • Take the max value of mileage to create max_mileage.
    • Take the average of msrp to create avg_msrp.
    • Take the max value of msrp to create max_msrp.
    • Group by model_year_status.
def aggregate(source_feature_group, spark):
    """
    Aggregate the data using a SQL query and UDF.
    """
    import time
    from pyspark.sql.types import StringType
    from pyspark.sql.functions import udf

    @udf(returnType=StringType())
    def custom_concat(*cols, delimeter: str = ""):
        return delimeter.join(cols)

    spark.udf.register("custom_concat", custom_concat)

    # Execute SQL string.
    source_feature_group.createOrReplaceTempView("car_data")
    aggregated_car_data = spark.sql(
        f"""
        SELECT
            custom_concat(model, "_", model_year, "_", status) as model_year_status,
            AVG(price) as avg_price,
            MAX(price) as max_price,
            AVG(mileage) as avg_mileage,
            MAX(mileage) as max_mileage,
            AVG(msrp) as avg_msrp,
            MAX(msrp) as max_msrp,
            "{int(time.time())}" as ingest_time
        FROM car_data
        GROUP BY model_year_status
        """
    )

    aggregated_car_data.show()

    return aggregated_car_data
  1. Run the aggregate() function, which creates a SageMaker training job to run the Spark application:
# Execute the aggregate function
aggregate()

As a result, SageMaker creates a training job to the Spark application defined earlier. It will create a Spark runtime environment using the sagemaker-spark-processing image.

We use SageMaker Training jobs here to run our Spark feature processing application. With SageMaker Training, you can reduce startup times to 1 minute or less by using warm pooling, which is unavailable in SageMaker Processing. This makes SageMaker Training better optimized for short batch jobs like feature processing where startup time is important.

  1. To view the details, on the SageMaker console, choose Training jobs under Training in the navigation pane, then choose the job with the name aggregate-<timestamp>.

Image shows the Sagemaker training job

The output of the aggregate() function generates telemetry code. Inside the output, you will see the aggregated data as follows:

+--------------------+------------------+---------+------------------+-----------+--------+--------+-----------+
|   model_year_status|         avg_price|max_price|       avg_mileage|max_mileage|avg_msrp|max_msrp|ingest_time|
+--------------------+------------------+---------+------------------+-----------+--------+--------+-----------+
|Acura CL 3.0_1997...|            7950.0|  7950.00|          100934.0|    100934 |    null|      NA| 1686634807|
|Acura CL 3.2 Type...|            6795.0|  7591.00|          118692.5|    135760 |    null|      NA| 1686634807|
|Acura CL 3_1998_Used|            9899.0|  9899.00|           63000.0|     63000 |    null|      NA| 1686634807|
|Acura ILX 2.0L Te...|         14014.125| 18995.00|         95534.875|     89103 |    null|      NA| 1686634807|
|Acura ILX 2.0L Te...|           15008.2| 16998.00|           94935.0|     88449 |    null|      NA| 1686634807|
|Acura ILX 2.0L Te...|           16394.6| 19985.00|           97719.4|     80000 |    null|      NA| 1686634807|
|Acura ILX 2.0L w/...|14567.181818181818| 16999.00| 96624.72727272728|     98919 |    null|      NA| 1686634807|
|Acura ILX 2.0L w/...|           16673.4| 18995.00|           84848.6|     96637 |    null|      NA| 1686634807|
|Acura ILX 2.0L w/...|12580.333333333334| 14546.00|100207.33333333333|     95782 |    null|      NA| 1686634807|
|Acura ILX 2.0L_20...|         14565.375| 17590.00|         92941.125|     81842 |    null|      NA| 1686634807|
|Acura ILX 2.0L_20...|           14877.9|  9995.00|           99739.5|     89252 |    null|      NA| 1686634807|
|Acura ILX 2.0L_20...|           15659.5| 15660.00|           82136.0|     89942 |    null|      NA| 1686634807|
|Acura ILX 2.0L_20...|17121.785714285714| 20990.00| 78278.14285714286|     96067 |    null|      NA| 1686634807|
|Acura ILX 2.4L (A...|           17846.0| 21995.00|          101558.0|     85974 |    null|      NA| 1686634807|
|Acura ILX 2.4L Pr...|           16327.0| 16995.00|           85238.0|     95356 |    null|      NA| 1686634807|
|Acura ILX 2.4L w/...|           12846.0| 12846.00|           75209.0|     75209 |    null|      NA| 1686634807|
|Acura ILX 2.4L_20...|           18998.0| 18998.00|           51002.0|     51002 |    null|      NA| 1686634807|
|Acura ILX 2.4L_20...|17908.615384615383| 19316.00| 74325.38461538461|     89116 |    null|      NA| 1686634807|
|Acura ILX 4DR SDN...|           18995.0| 18995.00|           37017.0|     37017 |    null|      NA| 1686634807|
|Acura ILX 8-SPD_2...|           24995.0| 24995.00|           22334.0|     22334 |    null|      NA| 1686634807|
+--------------------+------------------+---------+------------------+-----------+--------+--------+-----------+
only showing top 20 rows

When the training job is complete, you should see following output:

06-13 05:40 smspark-submit INFO     spark submit was successful. primary node exiting.
Training seconds: 153
Billable seconds: 153

Operationalize the feature processor via SageMaker pipelines

In this section, we demonstrate how to operationalize the feature processor by promoting it to a SageMaker pipeline and scheduling runs.

  1. First, upload the transformation_code.py file containing the feature processing logic to Amazon S3:
car_data_s3_uri = s3_path_join("s3://", sagemaker_session.default_bucket(),
                               'transformation_code', 'car-data-ingestion.py')
S3Uploader.upload(local_path='car-data-ingestion.py', desired_s3_uri=car_data_s3_uri)
print(car_data_s3_uri)
  1. Next, create a Feature Processor pipeline car_data_pipeline using the .to_pipeline() function:
car_data_pipeline_name = f"{CAR_SALES_FG_NAME}-ingestion-pipeline"
car_data_pipeline_arn = fp.to_pipeline(pipeline_name=car_data_pipeline_name,
                                      step=transform,
                                      transformation_code=TransformationCode(s3_uri=car_data_s3_uri) )
print(f"Created SageMaker Pipeline: {car_data_pipeline_arn}.")
  1. To run the pipeline, use the following code:
car_data_pipeline_execution_arn = fp.execute(pipeline_name=car_data_pipeline_name)
print(f"Started an execution with execution arn: {car_data_pipeline_execution_arn}")
  1. Similarly, you can create a pipeline for aggregated features called car_data_aggregated_pipeline and start a run.
  2. Schedule the car_data_aggregated_pipeline to run every 24 hours:
fp.schedule(pipeline_name=car_data_aggregated_pipeline_name,
           schedule_expression="rate(24 hours)", state="ENABLED")
print(f"Created a schedule.")

In the output section, you will see the ARN of pipeline and the pipeline execution role, and the schedule details:

{'pipeline_arn': 'arn:aws:sagemaker:us-west-2:416578662734:pipeline/car-data-aggregated-ingestion-pipeline',
 'pipeline_execution_role_arn': 'arn:aws:iam::416578662734:role/service-role/AmazonSageMaker-ExecutionRole-20230612T120731',
 'schedule_arn': 'arn:aws:scheduler:us-west-2:416578662734:schedule/default/car-data-aggregated-ingestion-pipeline',
 'schedule_expression': 'rate(24 hours)',
 'schedule_state': 'ENABLED',
 'schedule_start_date': '2023-06-13T06:05:17Z',
 'schedule_role': 'arn:aws:iam::416578662734:role/service-role/AmazonSageMaker-ExecutionRole-20230612T120731'}
  1. To get all the Feature Processor pipelines in this account, use the list_pipelines() function on the Feature Processor:
fp.list_pipelines()

The output will be as follows:

[{'pipeline_name': 'car-data-aggregated-ingestion-pipeline'},
 {'pipeline_name': 'car-data-ingestion-pipeline'}]

We have successfully created SageMaker Feature Processor pipelines.

Explore feature processing pipelines and ML lineage

In SageMaker Studio, complete the following steps:

  1. On the SageMaker Studio console, on the Home menu, choose Pipelines.

Image of Sagemaker Studio home tab highlighting pipelines option

You should see two pipelines created: car-data-ingestion-pipeline and car-data-aggregated-ingestion-pipeline.

Image of Sagemaker Studio pipelines with the list of pipelines

  1. Choose the car-data-ingestion-pipeline.

It shows the run details on the Executions tab.

Image of Sagemaker Studio of the car data ingestion pipeline

  1. To view the feature group populated by the pipeline, choose Feature Store under Data and choose car-data.

Image of Sagemaker Studio home highliting data

You will see the two feature groups we created in the previous steps.

Image of Sagemaker Studio with feature groups created

  1. Choose the car-data feature group.

You will see the features details on the Features tab.

Image of Sagemaker Studio with feature group and the features in the group

View pipeline runs

To view the pipeline runs, complete the following steps:

  1. On the Pipeline Executions tab, select car-data-ingestion-pipeline.

This will show all the runs.

Image shows the Sagemaker Feature group tab of the pipeline executions

  1. Choose one of the links to see the details of the run.

Image shows the sagemaker UI with the pipelines in execution

  1. To view lineage, choose Lineage.

The full lineage for car-data shows the input data source car_data.csv and upstream entities. The lineage for car-data-aggregated shows the input car-data feature group.

Image of Sagemaker UI of the feature group of car data

  1. Choose Load features and then choose Query upstream lineage on car-data and car-data-ingestion-pipeline to see all the upstream entities.

The full lineage for car-data feature group should look like the following screenshot.

Image shows the Sagemaker feature store with car lineage

Similarly, the lineage for the car-aggregated-data feature group should look like the following screenshot.

Image shoes the aggregated feature group from Sagemaker Feature Store UI

SageMaker Studio provides a single environment to track scheduled pipelines, view runs, explore lineage, and view the feature processing code.

The aggregated features such as average price, max price, average mileage, and more in the car-data-aggregated feature group provide insight into the nature of the data. You can also use these features as a dataset to train a model to predict car prices, or for other operations. However, training the model is out of scope for this post, which focuses on demonstrating the SageMaker Feature Store capabilities for feature engineering.

Clean up

Don’t forget to clean up the resources created as part of this post to avoid incurring ongoing charges.

  1. Disable the scheduled pipeline via the fp.schedule() method with the state parameter as Disabled:
# Disable the scheduled pipeline
fp.schedule(
pipeline_name=car_data_aggregated_pipeline_name,
schedule_expression="rate(24 hours)",
state="DISABLED",
)
  1. Delete both feature groups:
# Delete feature groups
car_sales_fg.delete()
agg_car_sales_fg.delete()

The data residing in the S3 bucket and offline feature store can incur costs, so you should delete them to avoid any charges.

  1. Delete the S3 objects.
  2. Delete the records from the feature store.

Conclusion

In this post, we demonstrated how a car sales company used SageMaker Feature Store Feature Processor to gain valuable insights from their raw sales data by:

  • Ingesting and transforming batch data at scale using Spark
  • Operationalizing feature engineering workflows via SageMaker pipelines
  • Providing lineage tracking and a single environment to monitor pipelines and explore features
  • Preparing aggregated features optimized for training ML models

By following these steps, the company was able to transform previously unusable data into structured features that could then be used to train a model to predict car prices. SageMaker Feature Store enabled them to focus on feature engineering rather than the underlying infrastructure.

We hope this post helps you unlock valuable ML insights from your own data using SageMaker Feature Store Feature Processor!

For more information on this, refer to Feature Processing and the SageMaker example on Amazon SageMaker Feature Store: Feature Processor Introduction.


About the Authors


Dhaval Shah
is a Senior Solutions Architect at AWS, specializing in Machine Learning. With a strong focus on digital native businesses, he empowers customers to leverage AWS and drive their business growth. As an ML enthusiast, Dhaval is driven by his passion for creating impactful solutions that bring positive change. In his leisure time, he indulges in his love for travel and cherishes quality moments with his family.

Ninad Joshi is a Senior Solutions Architect at AWS, helping global AWS customers design secure, scalable, and cost effective solutions in cloud to solve their complex real-world business challenges. His work in Machine Learning (ML) covers a wide range of AI/ML use cases, with a primary focus on End-to-End ML, Natural Language Processing, and Computer Vision. Prior to joining AWS, Ninad worked as a software developer for 12+ years. Outside of his professional endeavors, Ninad enjoys playing chess and exploring different gambits.

Read More

Orchestrate Ray-based machine learning workflows using Amazon SageMaker

Orchestrate Ray-based machine learning workflows using Amazon SageMaker

Machine learning (ML) is becoming increasingly complex as customers try to solve more and more challenging problems. This complexity often leads to the need for distributed ML, where multiple machines are used to train a single model. Although this enables parallelization of tasks across multiple nodes, leading to accelerated training times, enhanced scalability, and improved performance, there are significant challenges in effectively using distributed hardware. Data scientists have to address challenges like data partitioning, load balancing, fault tolerance, and scalability. ML engineers must handle parallelization, scheduling, faults, and retries manually, requiring complex infrastructure code.

In this post, we discuss the benefits of using Ray and Amazon SageMaker for distributed ML, and provide a step-by-step guide on how to use these frameworks to build and deploy a scalable ML workflow.

Ray, an open-source distributed computing framework, provides a flexible framework for distributed training and serving of ML models. It abstracts away low-level distributed system details through simple, scalable libraries for common ML tasks such as data preprocessing, distributed training, hyperparameter tuning, reinforcement learning, and model serving.

SageMaker is a fully managed service for building, training, and deploying ML models. Ray seamlessly integrates with SageMaker features to build and deploy complex ML workloads that are both efficient and reliable. The combination of Ray and SageMaker provides end-to-end capabilities for scalable ML workflows, and has the following highlighted features:

  • Distributed actors and parallelism constructs in Ray simplify developing distributed applications.
  • Ray AI Runtime (AIR) reduces friction of going from development to production. With Ray and AIR, the same Python code can scale seamlessly from a laptop to a large cluster.
  • The managed infrastructure of SageMaker and features like processing jobs, training jobs, and hyperparameter tuning jobs can use Ray libraries underneath for distributed computing.
  • Amazon SageMaker Experiments allows rapidly iterating and keeping track of trials.
  • Amazon SageMaker Feature Store provides a scalable repository for storing, retrieving, and sharing ML features for model training.
  • Trained models can be stored, versioned, and tracked in Amazon SageMaker Model Registry for governance and management.
  • Amazon SageMaker Pipelines allows orchestrating the end-to-end ML lifecycle from data preparation and training to model deployment as automated workflows.

Solution overview

This post focuses on the benefits of using Ray and SageMaker together. We set up an end-to-end Ray-based ML workflow, orchestrated using SageMaker Pipelines. The workflow includes parallel ingestion of data into the feature store using Ray actors, data preprocessing with Ray Data, training models and hyperparameter tuning at scale using Ray Train and hyperparameter optimization (HPO) tuning jobs, and finally model evaluation and registering the model into a model registry.

For our data, we use a synthetic housing dataset that consists of eight features (YEAR_BUILT, SQUARE_FEET, NUM_BEDROOM, NUM_BATHROOMS, LOT_ACRES, GARAGE_SPACES, FRONT_PORCH, and DECK) and our model will predict the PRICE of the house.

Each stage in the ML workflow is broken into discrete steps, with its own script that takes input and output parameters. In the next section, we highlight key code snippets from each step. The full code can be found on the aws-samples-for-ray GitHub repository.

Prerequisites

To use the SageMaker Python SDK and run the code associated with this post, you need the following prerequisites:

Ingest data into SageMaker Feature Store

The first step in the ML workflow is to read the source data file from Amazon Simple Storage Service (Amazon S3) in CSV format and ingest it into SageMaker Feature Store. SageMaker Feature Store is a purpose-built repository that makes it easy for teams to create, share, and manage ML features. It simplifies feature discovery, reuse, and sharing, leading to faster development, increased collaboration within customer teams, and reduced costs.

Ingesting features into the feature store contains the following steps:

  1. Define a feature group and create the feature group in the feature store.
  2. Prepare the source data for the feature store by adding an event time and record ID for each row of data.
  3. Ingest the prepared data into the feature group by using the Boto3 SDK.

In this section, we only highlight Step 3, because this is the part that involves parallel processing of the ingestion task using Ray. You can review the full code for this process in the GitHub repo.

The ingest_features method is defined inside a class called Featurestore. Note that the Featurestore class is decorated with @ray.remote. This indicates that an instance of this class is a Ray actor, a stateful and concurrent computational unit within Ray. It’s a programming model that allows you to create distributed objects that maintain an internal state and can be accessed concurrently by multiple tasks running on different nodes in a Ray cluster. Actors provide a way to manage and encapsulate the mutable state, making them valuable for building complex, stateful applications in a distributed setting. You can specify resource requirements in actors too. In this case, each instance of the FeatureStore class will require 0.5 CPUs. See the following code:

@ray.remote(num_cpus=0.5)
class Featurestore:
    def ingest_features(self,feature_group_name, df, region):
        """
        Ingest features to Feature Store Group
        Args:
            feature_group_name (str): Feature Group Name
            data_path (str): Path to the train/validation/test data in CSV format.
        """
        
        ...

You can interact with the actor by calling the remote operator. In the following code, the desired number of actors is passed in as an input argument to the script. The data is then partitioned based on the number of actors and passed to the remote parallel processes to be ingested into the feature store. You can call get on the object ref to block the execution of the current task until the remote computation is complete and the result is available. When the result is available, ray.get will return the result, and the execution of the current task will continue.

import modin.pandas as pd
import ray

df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# Split into partitions
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = []

for actor, partition in zip(actors, input_partitions):
    results.append(actor.ingest_features.remote(
                        args.feature_group_name, 
                        partition, args.region
                      )
                )

ray.get(results)

Prepare data for training, validation, and testing

In this step, we use Ray Dataset to efficiently split, transform, and scale our dataset in preparation for machine learning. Ray Dataset provides a standard way to load distributed data into Ray, supporting various storage systems and file formats. It has APIs for common ML data preprocessing operations like parallel transformations, shuffling, grouping, and aggregations. Ray Dataset also handles operations needing stateful setup and GPU acceleration. It integrates smoothly with other data processing libraries like Spark, Pandas, NumPy, and more, as well as ML frameworks like TensorFlow and PyTorch. This allows building end-to-end data pipelines and ML workflows on top of Ray. The goal is to make distributed data processing and ML easier for practitioners and researchers.

Let’s look at sections of the scripts that perform this data preprocessing. We start by loading the data from the feature store:

def load_dataset(feature_group_name, region):
    """
    Loads the data as a ray dataset from the offline featurestore S3 location
    Args:
        feature_group_name (str): name of the feature group
    Returns:
        ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store
    """
    session = sagemaker.Session(boto3.Session(region_name=region))
    fs_group = FeatureGroup(
        name=feature_group_name, 
        sagemaker_session=session
    )

    fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri")
    
    # Drop columns added by the feature store
    # Since these are not related to the ML problem at hand
    cols_to_drop = ["record_id", "event_time","write_time", 
                    "api_invocation_time", "is_deleted", 
                    "year", "month", "day", "hour"]           

    ds = ray.data.read_parquet(fs_data_loc)
    ds = ds.drop_columns(cols_to_drop)
    print(f"{fs_data_loc} count is {ds.count()}")
    return ds

We then split and scale data using the higher-level abstractions available from the ray.data library:

def split_dataset(dataset, train_size, val_size, test_size, random_state=None):
    """
    Split dataset into train, validation and test samples
    Args:
        dataset (ray.data.Dataset): input data
        train_size (float): ratio of data to use as training dataset
        val_size (float): ratio of data to use as validation dataset
        test_size (float): ratio of data to use as test dataset
        random_state (int): Pass an int for reproducible output across multiple function calls.
    Returns:
        train_set (ray.data.Dataset): train dataset
        val_set (ray.data.Dataset): validation dataset
        test_set (ray.data.Dataset): test dataset
    """
    # Shuffle this dataset with a fixed random seed.
    shuffled_ds = dataset.random_shuffle(seed=random_state)
    # Split the data into train, validation and test datasets
    train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size])
    return train_set, val_set, test_set

def scale_dataset(train_set, val_set, test_set, target_col):
    """
    Fit StandardScaler to train_set and apply it to val_set and test_set
    Args:
        train_set (ray.data.Dataset): train dataset
        val_set (ray.data.Dataset): validation dataset
        test_set (ray.data.Dataset): test dataset
        target_col (str): target col
    Returns:
        train_transformed (ray.data.Dataset): train data scaled
        val_transformed (ray.data.Dataset): val data scaled
        test_transformed (ray.data.Dataset): test data scaled
    """
    tranform_cols = dataset.columns()
    # Remove the target columns from being scaled
    tranform_cols.remove(target_col)
    # set up a standard scaler
    standard_scaler = StandardScaler(tranform_cols)
    # fit scaler to training dataset
    print("Fitting scaling to training data and transforming dataset...")
    train_set_transformed = standard_scaler.fit_transform(train_set)
    # apply scaler to validation and test datasets
    print("Transforming validation and test datasets...")
    val_set_transformed = standard_scaler.transform(val_set)
    test_set_transformed = standard_scaler.transform(test_set)
    return train_set_transformed, val_set_transformed, test_set_transformed

The processed train, validation, and test datasets are stored in Amazon S3 and will be passed as the input parameters to subsequent steps.

Perform model training and hyperparameter optimization

With our data preprocessed and ready for modeling, it’s time to train some ML models and fine-tune their hyperparameters to maximize predictive performance. We use XGBoost-Ray, a distributed backend for XGBoost built on Ray that enables training XGBoost models on large datasets by using multiple nodes and GPUs. It provides simple drop-in replacements for XGBoost’s train and predict APIs while handling the complexities of distributed data management and training under the hood.

To enable distribution of the training over multiple nodes, we utilize a helper class named RayHelper. As shown in the following code, we use the resource configuration of the training job and choose the first host as the head node:

class RayHelper():
    def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"):
        ....
        self.resource_config = self.get_resource_config()
        self.head_host = self.resource_config["hosts"][0]
        self.n_hosts = len(self.resource_config["hosts"])

We can use the host information to decide how to initialize Ray on each of the training job instances:

def start_ray(self): 
   head_ip = self._get_ip_from_host()
   # If the current host is the host choosen as the head node
   # run `ray start` with specifying the --head flag making this is the head node
    if self.resource_config["current_host"] == self.head_host:
        output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port', 
        self.ray_port, '--redis-password', self.redis_pass, 
        '--include-dashboard', 'false'], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        ray.init(address="auto", include_dashboard=False)
        self._wait_for_workers()
        print("All workers present and accounted for")
        print(ray.cluster_resources())

    else:
       # If the current host is not the head node, 
       # run `ray start` with specifying ip address as the head_host as the head node
        time.sleep(10)
        output = subprocess.run(['ray', 'start', 
        f"--address={head_ip}:{self.ray_port}", 
        '--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE)
        print(output.stdout.decode("utf-8"))
        sys.exit(0)

When a training job is started, a Ray cluster can be initialized by calling the start_ray() method on an instance of RayHelper:

if __name__ == '__main__':
    ray_helper = RayHelper()
    ray_helper.start_ray()
    args = read_parameters()
    sess = sagemaker.Session(boto3.Session(region_name=args.region))

We then use the XGBoost trainer from XGBoost-Ray for training:

def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result:
    """
    Creates a XGBoost trainer, train it, and return the result.        
    Args:
        ds_train (ray.data.dataset): Training dataset
        ds_val (ray.data.dataset): Validation dataset
        params (dict): Hyperparameters
        num_workers (int): number of workers to distribute the training across
        target_col (str): target column
    Returns:
        result (ray.air.result.Result): Result of the training job
    """
    
    train_set = RayDMatrix(ds_train, 'PRICE')
    val_set = RayDMatrix(ds_val, 'PRICE')
    
    evals_result = {}
    
    trainer = train(
        params=params,
        dtrain=train_set,
        evals_result=evals_result,
        evals=[(val_set, "validation")],
        verbose_eval=False,
        num_boost_round=100,
        ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1),
    )
    
    output_path=os.path.join(args.model_dir, 'model.xgb')
    
    trainer.save_model(output_path)
    
    valMAE = evals_result["validation"]["mae"][-1]
    valRMSE = evals_result["validation"]["rmse"][-1]
 
    print('[3] #011validation-mae:{}'.format(valMAE))
    print('[4] #011validation-rmse:{}'.format(valRMSE))
    
    local_testing = False
    try:
        load_run(sagemaker_session=sess)
    except:
        local_testing = True
    if not local_testing: # Track experiment if using SageMaker Training
        with load_run(sagemaker_session=sess) as run:
            run.log_metric('validation-mae', valMAE)
            run.log_metric('validation-rmse', valRMSE)

Note that while instantiating the trainer, we pass RayParams, which takes the number actors and number of CPUs per actors. XGBoost-Ray uses this information to distribute the training across all the nodes attached to the Ray cluster.

We now create a XGBoost estimator object based on the SageMaker Python SDK and use that for the HPO job.

Orchestrate the preceding steps using SageMaker Pipelines

To build an end-to-end scalable and reusable ML workflow, we need to use a CI/CD tool to orchestrate the preceding steps into a pipeline. SageMaker Pipelines has direct integration with SageMaker, the SageMaker Python SDK, and SageMaker Studio. This integration allows you to create ML workflows with an easy-to-use Python SDK, and then visualize and manage your workflow using SageMaker Studio. You can also track the history of your data within the pipeline execution and designate steps for caching.

SageMaker Pipelines creates a Directed Acyclic Graph (DAG) that includes steps needed to build an ML workflow. Each pipeline is a series of interconnected steps orchestrated by data dependencies between steps, and can be parameterized, allowing you to provide input variables as parameters for each run of the pipeline. SageMaker Pipelines has four types of pipeline parameters: ParameterString, ParameterInteger, ParameterFloat, and ParameterBoolean. In this section, we parameterize some of the input variables and set up the step caching configuration:

processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)
feature_group_name = ParameterString(
    name='FeatureGroupName',
    default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString(
    name='Bucket_Prefix',
    default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0)
    train_size = ParameterString(
    name='TrainSize',
    default_value="0.6"
)
val_size = ParameterString(
    name='ValidationSize',
    default_value="0.2"
)
test_size = ParameterString(
    name='TestSize',
    default_value="0.2"
)

cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

We define two processing steps: one for SageMaker Feature Store ingestion, the other for data preparation. This should look very similar to the previous steps described earlier. The only new line of code is the ProcessingStep after the steps’ definition, which allows us to take the processing job configuration and include it as a pipeline step. We further specify the dependency of the data preparation step on the SageMaker Feature Store ingestion step. See the following code:

feature_store_ingestion_step = ProcessingStep(
    name='FeatureStoreIngestion',
    step_args=fs_processor_args,
    cache_config=cache_config
)

preprocess_dataset_step = ProcessingStep(
    name='PreprocessData',
    step_args=processor_args,
    cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

Similarly, to build a model training and tuning step, we need to add a definition of TuningStep after the model training step’s code to allow us to run SageMaker hyperparameter tuning as a step in the pipeline:

tuning_step = TuningStep(
    name="HPTuning",
    tuner=tuner,
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
            "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
    cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

After the tuning step, we choose to register the best model into SageMaker Model Registry. To control the model quality, we implement a minimum quality gate that compares the best model’s objective metric (RMSE) against a threshold defined as the pipeline’s input parameter rmse_threshold. To do this evaluation, we create another processing step to run an evaluation script. The model evaluation result will be stored as a property file. Property files are particularly useful when analyzing the results of a processing step to decide how other steps should be run. See the following code:

# Specify where we'll store the model evaluation results so that other steps can access those results
evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='evaluation',
    path='evaluation.json',
)

# A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. 
# In this case, the top performing model is evaluated. 
evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=tuning_step.get_top_model_s3_uri(
                top_k=0, s3_bucket=bucket, prefix=s3_prefix
            ),
            destination='/opt/ml/processing/model',
        ),
        ProcessingInput(
            source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
            destination='/opt/ml/processing/test',
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation', source='/opt/ml/processing/evaluation'
        ),
    ],
    code='./pipeline_scripts/evaluate/script.py',
    property_files=[evaluation_report],
)

We define a ModelStep to register the best model into SageMaker Model Registry in our pipeline. In case the best model doesn’t pass our predetermined quality check, we additionally specify a FailStep to output an error message:

register_step = ModelStep(
    name='RegisterTrainedModel',
    step_args=model_registry_args
)

metrics_fail_step = FailStep(
    name="RMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

Next, we use a ConditionStep to evaluate whether the model registration step or failure step should be taken next in the pipeline. In our case, the best model will be registered if its RMSE score is lower than the threshold.

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=evaluation_step.name,
        property_file=evaluation_report,
        json_path='regression_metrics.rmse.value',
    ),
    right=rmse_threshold,
)
condition_step = ConditionStep(
    name='CheckEvaluation',
    conditions=[cond_lte],
    if_steps=[register_step],
    else_steps=[metrics_fail_step],
)

Finally, we orchestrate all the defined steps into a pipeline:

pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [
             feature_store_ingestion_step,
             preprocess_dataset_step,
             tuning_step,
             evaluation_step,
             condition_step
            ]

training_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        feature_group_name,
        train_size,
        val_size,
        test_size,
        bucket_prefix,
        rmse_threshold
    ],
    steps=step_list
)

# Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

The preceding pipeline can be visualized and executed directly in SageMaker Studio, or be executed by calling execution = training_pipeline.start(). The following figure illustrates the pipeline flow.

SageMaker Pipeline DAG

Additionally, we can review the lineage of artifacts generated by the pipeline execution.

from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

Deploy the model

After the best model is registered in SageMaker Model Registry via a pipeline run, we deploy the model to a real-time endpoint by using the fully managed model deployment capabilities of SageMaker. SageMaker has other model deployment options to meet the needs of different use cases. For details, refer to Deploy models for inference when choosing the right option for your use case. First, let’s get the model registered in SageMaker Model Registry:

xgb_regressor_model = ModelPackage(
    role_arn,
    model_package_arn=model_package_arn,
    name=model_name
)

The model’s current status is PendingApproval. We need to set its status to Approved prior to deployment:

sagemaker_client.update_model_package(
    ModelPackageArn=xgb_regressor_model.model_package_arn,
    ModelApprovalStatus='Approved'
)

xgb_regressor_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    endpoint_name=endpoint_name
)

Clean up

After you are done experimenting, remember to clean up the resources to avoid unnecessary charges. To clean up, delete the real-time endpoint, model group, pipeline, and feature group by calling the APIs DeleteEndpoint, DeleteModelPackageGroup, DeletePipeline, and DeleteFeatureGroup, respectively, and shut down all SageMaker Studio notebook instances.

Conclusion

This post demonstrated a step-by-step walkthrough on how to use SageMaker Pipelines to orchestrate Ray-based ML workflows. We also demonstrated the capability of SageMaker Pipelines to integrate with third-party ML tools. There are various AWS services that support Ray workloads in a scalable and secure fashion to ensure performance excellence and operational efficiency. Now, it’s your turn to explore these powerful capabilities and start optimizing your machine learning workflows with Amazon SageMaker Pipelines and Ray. Take action today and unlock the full potential of your ML projects!


About the Author

Raju Rangan is a Senior Solutions Architect at Amazon Web Services (AWS). He works with government sponsored entities, helping them build AI/ML solutions using AWS. When not tinkering with cloud solutions, you’ll catch him hanging out with family or smashing birdies in a lively game of badminton with friends.

Sherry Ding is a senior AI/ML specialist solutions architect at Amazon Web Services (AWS). She has extensive experience in machine learning with a PhD degree in computer science. She mainly works with public sector customers on various AI/ML-related business challenges, helping them accelerate their machine learning journey on the AWS Cloud. When not helping customers, she enjoys outdoor activities.

Read More

Designing resilient cities at Arup using Amazon SageMaker geospatial capabilities

Designing resilient cities at Arup using Amazon SageMaker geospatial capabilities

This post is co-authored with Richard Alexander and Mark Hallows from Arup.

Arup is a global collective of designers, consultants, and experts dedicated to sustainable development. Data underpins Arup consultancy for clients with world-class collection and analysis providing insight to make an impact.

The solution presented here is to direct decision-making processes for resilient city design. Informing design decisions towards more sustainable choices reduces the overall urban heat islands (UHI) effect and improves quality of life metrics for air quality, water quality, urban acoustics, biodiversity, and thermal comfort. Identifying key areas within an urban environment for intervention allows Arup to provide the best guidance in the industry and create better quality of life for citizens around the planet.

Urban heat islands describe the effect urban areas have on temperature compared to surrounding rural environments. Understanding how UHI affects our cities leads to improved designs that reduce the impact of urban heat on residents. The UHI effect impacts human health, greenhouse gas emissions, and water quality, and leads to increased energy usage. For city authorities, asset owners, and developers, understanding the impact on the population is key to improving quality of life and natural ecosystems. Modeling UHI accurately is a complex challenge, which Arup is now solving with earth observation data and Amazon SageMaker.

This post shows how Arup partnered with AWS to perform earth observation analysis with Amazon SageMaker geospatial capabilities to unlock UHI insights from satellite imagery. SageMaker geospatial capabilities make it easy for data scientists and machine learning (ML) engineers to build, train, and deploy models using geospatial data. SageMaker geospatial capabilities allow you to efficiently transform and enrich large-scale geospatial datasets, accelerate product development and time to insight with pre-trained ML models, and explore model predictions and geospatial data on an interactive map using 3D accelerated graphics and built-in visualization tools.

Overview of solution

The initial solution focuses on London, where during a heatwave in the summer of 2022, the UK Health Security Agency estimated 2,803 excess deaths were caused due to heat. Identifying areas within an urban environment where people may be more vulnerable to the UHI effect allows public services to direct assistance where it will have the greatest impact. This can even be forecast prior to high temperature events, reducing the impact of extreme weather and delivering a positive outcome for city dwellers.

Earth Observation (EO) data was used to perform the analysis at city scale. However, the total size poses challenges with traditional ways of storing, organizing, and querying data for large geographical areas. Arup addressed this challenge by partnering with AWS and using SageMaker geospatial capabilities to enable analysis at a city scale and beyond. As the geographic area grows to larger metropolitan areas like Los Angeles or Tokyo, the more storage and compute for analysis is required. The elasticity of AWS infrastructure is ideal for UHI analyses of urban environments of any size.

The solution: UHeat

Arup used SageMaker to develop UHeat, a digital solution that analyzes huge areas of cities to identify particular buildings, structures, and materials that are causing temperatures to rise. UHeat uses a combination of satellite imagery and open-source climate data to perform the analysis.

A small team at Arup undertook the initial analysis, during which additional data scientists needed to be trained on the SageMaker tooling and workflows. Onboarding data scientists to a new project used to take weeks using in-house tools. This now takes a matter of hours with SageMaker.

The first step of any EO analysis is the collection and preparation of the data. With SageMaker, Arup can access data from a catalog of geospatial data providers, including Sentinel-2 data, which was used for the London analysis. Built-in geospatial dataset access saves weeks of effort otherwise lost to collecting and preparing data from various data providers and vendors. EO imagery is frequently made up of small tiles which, to cover an area the size of London, need to be combined. This is known as a geomosaic, which can be created automatically using the managed geospatial operations in a SageMaker Geomosaic Earth Observation job.

After the EO data for the area of interest is compiled, the key influencing parameters for the analysis can be extracted. For UHI, Arup needed to be able to derive data on parameters for building geometry, building materials, anthropogenic heat sources, and coverage of existing and planned green spaces. Using optical imagery such as Sentinel-2, land cover classes including buildings, roads, water, vegetation cover, bare ground, and the albedo (measure of reflectiveness) of each of these surfaces can be calculated.

Calculating the values from the different bands in the satellite imagery allows them to be used as inputs into the SUEWS model, which provides a rigorous way of calculating UHI effect. The results of SUEWS are then visualized, in this case with Arup’s existing geospatial data platform. By adjusting values such as the albedo of a specific location, Arup are able to test the effect of mitigation strategies. Albedo performance can be further refined in simulations by modeling different construction materials, cladding, or roofing. Arup found that in one area of London, increasing albedo from 0.1 to 0.9 could decrease ambient temperature by 1.1°C during peak conditions. Over larger areas of interest, this modeling can also be used to forecast the UHI effect alongside climate projections to quantify the scale of the UHI effect.

With historical data from sources such as Sentinel-2, temporal studies can be completed. This enables Arup to visualize the UHI effect during periods of interest, such as the London summer 2022 heatwave. The Urban Heat Snapshot research Arup has completed reveals how the UHI effect is pushing up temperatures in cities like London, Madrid, Mumbai, and Los Angeles.

Collecting data for an area of interest

SageMaker eliminates the complexities in manually collecting data for Earth Observation jobs (EOJs) by providing a catalog of geospatial data providers. As of this writing, USGS Landsat, Sentinel-1, Copernicus DEM, NAIP: National Agriculture Imagery Program, and Sentinel-2 data is available directly from the catalog. You can also bring your own Planet Labs data when imagery at a higher resolution and frequency is required. Built-in geospatial dataset access saves weeks of effort otherwise lost to collecting data from various data providers and vendors. Coordinates for the polygon area of interest need to be provided as well as the time range for when EO imagery was collected.

Arup’s next step was to combine these images into a larger single raster covering the entire area of interest. This is known as mosaicking and is supported by passing GeoMosaicConfig to the SageMaker StartEarthObservationJob API.

We have provided some code samples representative of the steps Arup took:

input_config = {
    'AreaOfInterest': {
        'AreaOfInterestGeometry': {
            'PolygonGeometry': {
                'Coordinates': [
                    [
                        [-0.10813482652250173,51.52037502928192],
                        [-0.10813482652250173, 51.50403627237003],
                        [-0.0789364331937179, 51.50403627237003],
                        [-0.0789364331937179, 51.52037502928192],
                        [-0.10813482652250173, 51.52037502928192]
                    ]
                ]
            }
        }
    },
    'TimeRangeFilter': {
        'StartTime': '2020-01-01T00:00:00',
        'EndTime': '2023-01-1T00:00:00'
    },
    'PropertyFilters': {
        'Properties': [
            {
                'Property': {
                    'EoCloudCover': {
                        'LowerBound': 0,
                        'UpperBound': 1
                    }
                }
            }
        ],
    'LogicalOperator': 'AND'
    },
    'RasterDataCollectionArn': 'arn:aws:sagemaker-geospatial:us-west-2:378778860802:raster-data-collection/public/nmqj48dcu3g7ayw8'
}


eoj_config = {
    "JobConfig": {
        "CloudRemovalConfig": {
            "AlgorithmName": "INTERPOLATION",
            "InterpolationValue": "-9999",
            "TargetBands": ["red", "green", "blue", "nir", "swir16"],
        },
    }
}


#invoke EOJ this will run in the background for several minutes
eoj = sm_geo_client.start_earth_observation_job(
    Name="London-Observation-Job",
    ExecutionRoleArn=sm_exec_role,
    InputConfig={"RasterDataCollectionQuery":input_config},
   **eoj_config
)
print("EOJ started with... nName: {} nID: {}".format(eoj["Name"],eoj["Arn"]))

This can take a while to complete. You can check the status of your jobs like so:

eoj_arn = eoj["Arn"]
job_details = sm_geo_client.get_earth_observation_job(Arn=eoj_arn)
{k: v for k, v in job_details.items() if k in ["Arn", "Status", "DurationInSeconds"]}
# List all jobs in the account
sm_geo_client.list_earth_observation_jobs()["EarthObservationJobSummaries"]

Resampling

Next, the raster is resampled to normalize the pixel size across the collected images. You can use ResamplingConfig to achieve this by providing the value of the length of a side of the pixel:

eoj_config = {
    "JobConfig": {
        "ResamplingConfig": {
            "OutputResolution": {
                "UserDefined": {
                    "Value": 20, 
                    "Unit": "METERS"
                }
            },
        "AlgorithmName": "NEAR",
        },
    }
}

eojParams = {
    "Name": "Resample",
    "InputConfig": {
        "PreviousEarthObservationJobArn": eoj["Arn"]
    },
    **eoj_config,
    "ExecutionRoleArn": sm_exec_role,
}

eoj = sm_geo_client.start_earth_observation_job(**eojParams)
print("EOJ started with... nName: {} nID: {}".format(eoj["Name"],eoj["Arn"]))

Determining coverage

Determining land coverage such as vegetation is possible by applying a normalized difference vegetation index (NDVI). In practice, this can be calculated from the intensity of reflected red and near-infrared light. To apply such a calculation to EO data within SageMaker, the BandMathConfig can be supplied to the StartEarthObservationJob API:

job_config={
    "BandMathConfig": {
        'CustomIndices': {
            "Operations":[
                {
                    "Name": "NDVI",
                    "Equation": "(nir - red)/(nir+red)"
                }
            ]
        }
    }
}

eojParams = {
    "Name": "Bandmath",
    "InputConfig": {
        "PreviousEarthObservationJobArn": eoj["Arn"]
    },
    "JobConfig":job_config,
    "ExecutionRoleArn": sm_exec_role,
}

eoj = sm_geo_client.start_earth_observation_job(**eojParams)
print("EOJ started with... nName: {} nID: {}".format(eoj["Name"],eoj["Arn"]))

We can visualize the result of the band math job output within the SageMaker geospatial capabilities visualization tool. SageMaker geospatial capabilities can help you overlay model predictions on a base map and provide layered visualization to make collaboration easier. The GPU-powered interactive visualizer and Python notebooks provide a seamless way to explore millions of data points in a single window as well as collaborate on the insights and results.

SageMaker geospatial job visualisation of analysis showing central London

Preparing for visualization

As a final step, Arup prepares the various bands and calculated bands for visualization by combining them into a single GeoTIFF. For band stacking, SageMaker EOJs can be passed the StackConfig object, where the output resolution can be set based on the resolutions of the input images:

job_config={
    'StackConfig': {
        'OutputResolution': {
            'Predefined': 'HIGHEST'
        }
    }
}

eojParams = {
    "Name": "Stack",
    "InputConfig": {
        "PreviousEarthObservationJobArn": "arn:aws:sagemaker-geospatial:us-west-2:951737352731:earth-observation-job/8k2rfir84zb7"
    },
    "JobConfig":job_config,
    "ExecutionRoleArn": sm_exec_role,
}

eoj = sm_geo_client.start_earth_observation_job(**eojParams)
print("EOJ started with... nName: {} nID: {}".format(eoj["Name"],eoj["Arn"]))

Finally, the output GeoTIFF can be stored for later use in Amazon Simple Storage Service (Amazon S3) or visualized using SageMaker geospatial capabilities. By storing the output in Amazon S3, Arup can use the analysis in new projects and incorporate the data into new inference jobs. In Arup’s case, they used the processed GeoTIFF in their existing geographic information system visualization tooling to produce visualizations consistent with their product design themes.

London analysis visualised with Arup's existing product tooling

Conclusion

By utilizing the native functionality of SageMaker, Arup was able to conduct an analysis of UHI effect at city scale, which previously took weeks, in a few hours. This helps Arup enable their own clients to meet their sustainability targets faster and narrows the areas of focus where UHI effect mitigation strategies should be applied, saving precious resources and optimizing mitigation tactics. The analysis can also be integrated into future earth observation tooling as part of larger risk analysis projects, and helps Arup’s customers forecast the effect of UHI in different scenarios.

Companies such as Arup are unlocking sustainability through the cloud with earth observation data. Unlock the possibilities of earth observation data in your sustainability projects by exploring the SageMaker geospatial capabilities on the SageMaker console today. To find out more, refer to Amazon SageMaker geospatial capabilities, or get hands on with a guidance solution.


About the Authors

Portrait of Richard AlexanderRichard Alexander is an Associate Geospatial Data Scientist at Arup, based in Bristol. He has a proven track record of building successful teams and leading and delivering earth observation and data science-related projects across multiple environmental sectors.

Portrait of Mark HallowsMark Hallows is a Remote Sensing Specialist at Arup, based in London. Mark provides expertise in earth observation and geospatial data analysis to a broad range of clients and delivers insights and thought leadership using both traditional machine learning and deep learning techniques.

Portrait of Thomas AttreeThomas Attree is a Senior Solutions Architect at Amazon Web Services based in London. Thomas currently helps customers in the power and utilities industry and applies his passion for sustainability to help customers architect applications for energy efficiency, as well as advise on using cloud technology to empower sustainability projects.

Portrait of Tamara HerbertTamara Herbert is a Senior Application Developer with AWS Professional Services in the UK. She specializes in building modern and scalable applications for a wide variety of customers, currently focusing on those within the public sector. She is actively involved in building solutions and driving conversations that enable organizations to meet their sustainability goals both in and through the cloud.

Portrait of Anirudh ViswanathanAnirudh Viswanathan – is a Sr Product Manager, Technical – External Services with the SageMaker geospatial ML team. He holds a Masters in Robotics from Carnegie Mellon University and an MBA from the Wharton School of Business, and is named inventor on over 50 patents. He enjoys long-distance running, visiting art galleries, and Broadway shows.

Read More

Learn how to build and deploy tool-using LLM agents using AWS SageMaker JumpStart Foundation Models

Learn how to build and deploy tool-using LLM agents using AWS SageMaker JumpStart Foundation Models

Large language model (LLM) agents are programs that extend the capabilities of standalone LLMs with 1) access to external tools (APIs, functions, webhooks, plugins, and so on), and 2) the ability to plan and execute tasks in a self-directed fashion. Often, LLMs need to interact with other software, databases, or APIs to accomplish complex tasks. For example, an administrative chatbot that schedules meetings would require access to employees’ calendars and email. With access to tools, LLM agents can become more powerful—at the cost of additional complexity.

In this post, we introduce LLM agents and demonstrate how to build and deploy an e-commerce LLM agent using Amazon SageMaker JumpStart and AWS Lambda. The agent will use tools to provide new capabilities, such as answering questions about returns (“Is my return rtn001 processed?”) and providing updates about orders (“Could you tell me if order 123456 has shipped?”). These new capabilities require LLMs to fetch data from multiple data sources (orders, returns) and perform retrieval augmented generation (RAG).

To power the LLM agent, we use a Flan-UL2 model deployed as a SageMaker endpoint and use data retrieval tools built with AWS Lambda. The agent can subsequently be integrated with Amazon Lex and used as a chatbot inside websites or AWS Connect. We conclude the post with items to consider before deploying LLM agents to production. For a fully managed experience for building LLM agents, AWS also provides the agents for Amazon Bedrock feature (in preview).

A brief overview of LLM agent architectures

LLM agents are programs that use LLMs to decide when and how to use tools as necessary to complete complex tasks. With tools and task planning abilities, LLM agents can interact with outside systems and overcome traditional limitations of LLMs, such as knowledge cutoffs, hallucinations, and imprecise calculations. Tools can take a variety of forms, such as API calls, Python functions, or webhook-based plugins. For example, an LLM can use a “retrieval plugin” to fetch relevant context and perform RAG.

So what does it mean for an LLM to pick tools and plan tasks? There are numerous approaches (such as ReAct, MRKL, Toolformer, HuggingGPT, and Transformer Agents) to using LLMs with tools, and advancements are happening rapidly. But one simple way is to prompt an LLM with a list of tools and ask it to determine 1) if a tool is needed to satisfy the user query, and if so, 2) select the appropriate tool. Such a prompt typically looks like the following example and may include few-shot examples to improve the LLM’s reliability in picking the right tool.

‘’’
Your task is to select a tool to answer a user question. You have access to the following tools.

search: search for an answer in FAQs
order: order items
noop: no tool is needed

{few shot examples}

Question: {input}
Tool:
‘’’

More complex approaches involve using a specialized LLM that can directly decode “API calls” or “tool use,” such as GorillaLLM. Such finetuned LLMs are trained on API specification datasets to recognize and predict API calls based on instruction. Often, these LLMs require some metadata about available tools (descriptions, yaml, or JSON schema for their input parameters) in order to output tool invocations. This approach is taken by agents for Amazon Bedrock and OpenAI function calls. Note that LLMs generally need to be sufficiently large and complex in order to show tool selection ability.

Typical LLM Agent Architecture

Assuming task planning and tool selection mechanisms are chosen, a typical LLM agent program works in the following sequence:

  1. User request – The program takes a user input such as “Where is my order 123456?” from some client application.
  2. Plan next action(s) and select tool(s) to use – Next, the program uses a prompt to have the LLM generate the next action, for example, “Look up the orders table using OrdersAPI.” The LLM is prompted to suggest a tool name such as OrdersAPI from a predefined list of available tools and their descriptions. Alternatively, the LLM could be instructed to directly generate an API call with input parameters such as OrdersAPI(12345).
    1. Note that the next action may or may not involve using a tool or API. If not, the LLM would respond to user input without incorporating additional context from tools or simply return a canned response such as, “I cannot answer this question.”
  3. Parse tool request – Next, we need to parse out and validate the tool/action prediction suggested by the LLM. Validation is needed to ensure tool names, APIs, and request parameters aren’t hallucinated and that the tools are properly invoked according to specification. This parsing may require a separate LLM call.
  4. Invoke tool – Once valid tool name(s) and parameter(s) are ensured, we invoke the tool. This could be an HTTP request, function call, and so on.
  5. Parse output – The response from the tool may need additional processing. For example, an API call may result in a long JSON response, where only a subset of fields are of interest to the LLM. Extracting information in a clean, standardized format can help the LLM interpret the result more reliably.
  6. Interpret output – Given the output from the tool, the LLM is prompted again to make sense of it and decide whether it can generate the final answer back to the user or whether additional actions are required.
  7. Terminate or continue to step 2 – Either return a final answer or a default answer in the case of errors or timeouts.

Different agent frameworks execute the previous program flow differently. For example, ReAct combines tool selection and final answer generation into a single prompt, as opposed to using separate prompts for tool selection and answer generation. Also, this logic can be run in a single pass or run in a while statement (the “agent loop”), which terminates when the final answer is generated, an exception is thrown, or timeout occurs. What remains constant is that agents use the LLM as the centerpiece to orchestrate planning and tool invocations until the task terminates. Next, we show how to implement a simple agent loop using AWS services.

Solution overview

For this blog post, we implement an e-commerce support LLM agent that provides two functionalities powered by tools:

  • Return status retrieval tool – Answer questions about the status of returns such as, “What is happening to my return rtn001?”
  • Order status retrieval tool – Track the status of orders such as, “What’s the status of my order 123456?”

The agent effectively uses the LLM as a query router. Given a query (“What is the status of order 123456?”), select the appropriate retrieval tool to query across multiple data sources (that is, returns and orders). We accomplish query routing by having the LLM pick among multiple retrieval tools, which are responsible for interacting with a data source and fetching context. This extends the simple RAG pattern, which assumes a single data source.

Both retrieval tools are Lambda functions that take an id (orderId or returnId) as input, fetches a JSON object from the data source, and converts the JSON into a human friendly representation string that’s suitable to be used by LLM. The data source in a real-world scenario could be a highly scalable NoSQL database such as DynamoDB, but this solution employs simple Python Dict with sample data for demo purposes.

Additional functionalities can be added to the agent by adding Retrieval Tools and modifying prompts accordingly. This agent can be tested a standalone service that integrates with any UI over HTTP, which can be done easily with Amazon Lex.

Solution Overview

Here are some additional details about the key components:

  1. LLM inference endpoint – The core of an agent program is an LLM. We will use SageMaker JumpStart foundation model hub to easily deploy the Flan-UL2 model. SageMaker JumpStart makes it easy to deploy LLM inference endpoints to dedicated SageMaker instances.
  2. Agent orchestrator – Agent orchestrator orchestrates the interactions among the LLM, tools, and the client app. For our solution, we use an AWS Lambda function to drive this flow and employ the following as helper functions.
    • Task (tool) planner – Task planner uses the LLM to suggest one of 1) returns inquiry, 2) order inquiry, or 3) no tool. We use prompt engineering only and Flan-UL2 model as-is without fine-tuning.
    • Tool parser – Tool parser ensures that the tool suggestion from task planner is valid. Notably, we ensure that a single orderId or returnId can be parsed. Otherwise, we respond with a default message.
    • Tool dispatcher – Tool dispatcher invokes tools (Lambda functions) using the valid parameters.
    • Output parser – Output parser cleans and extracts relevant items from JSON into a human-readable string. This task is done both by each retrieval tool as well as within the orchestrator.
    • Output interpreter – Output interpreter’s responsibility is to 1) interpret the output from tool invocation and 2) determine whether the user request can be satisfied or additional steps are needed. If the latter, a final response is generated separately and returned to the user.

Now, let’s dive a bit deeper into the key components: agent orchestrator, task planner, and tool dispatcher.

Agent orchestrator

Below is an abbreviated version of the agent loop inside the agent orchestrator Lambda function. The loop uses helper functions such as task_planner or tool_parser, to modularize the tasks. The loop here is designed to run at most two times to prevent the LLM from being stuck in a loop unnecessarily long.

#.. imports ..
MAX_LOOP_COUNT = 2 # stop the agent loop after up to 2 iterations
# ... helper function definitions ...
def agent_handler(event):
    user_input = event["query"]
    print(f"user input: {user_input}") 
    
    final_generation = ""
    is_task_complete = False
    loop_count = 0 

    # start of agent loop
    while not is_task_complete and loop_count < MAX_LOOP_COUNT:
        tool_prediction = task_planner(user_input)
        print(f"tool_prediction: {tool_prediction}")  
        
        tool_name, tool_input, tool_output, error_msg = None, None, "", ""

        try:
            tool_name, tool_input = tool_parser(tool_prediction, user_input)
            print(f"tool name: {tool_name}") 
            print(f"tool input: {tool_input}") 
        except Exception as e:
            error_msg = str(e)
            print(f"tool parse error: {error_msg}")  
    
        if tool_name is not None: # if a valid tool is selected and parsed 
            raw_tool_output = tool_dispatch(tool_name, tool_input)
            tool_status, tool_output = output_parser(raw_tool_output)
            print(f"tool status: {tool_status}")  

            if tool_status == 200:
                is_task_complete, final_generation = output_interpreter(user_input, tool_output) 
            else:
                final_generation = tool_output
        else: # if no valid tool was selected and parsed, either return the default msg or error msg
            final_generation = DEFAULT_RESPONSES.NO_TOOL_FEEDBACK if error_msg == "" else error_msg
    
        loop_count += 1

    return {
        'statusCode': 200,
        'body': final_generation
    }

Task planner (tool prediction)

The agent orchestrator uses task planner to predict a retrieval tool based on user input. For our LLM agent, we will simply use prompt engineering and few shot prompting to teach the LLM this task in context. More sophisticated agents could use a fine-tuned LLM for tool prediction, which is beyond the scope of this post. The prompt is as follows:

tool_selection_prompt_template = """
Your task is to select appropriate tools to satisfy the user input. If no tool is required, then pick "no_tool"

Tools available are:

returns_inquiry: Database of information about a specific return's status, whether it's pending, processed, etc.
order_inquiry: Information about a specific order's status, such as shipping status, product, amount, etc.
no_tool: No tool is needed to answer the user input.

You can suggest multiple tools, separated by a comma.

Examples:
user: "What are your business hours?"
tool: no_tool

user: "Has order 12345 shipped?"
tool: order_inquiry

user: "Has return ret812 processed?"
tool: returns_inquiry

user: "How many days do I have until returning orders?"
tool: returns_inquiry

user: "What was the order total for order 38745?"
tool: order_inquiry

user: "Can I return my order 38756 based on store policy?"
tool: order_inquiry

user: "Hi"
tool: no_tool

user: "Are you an AI?"
tool: no_tool

user: "How's the weather?"
tool: no_tool

user: "What is the refund status of order 12347?"
tool: order_inquiry

user: "What is the refund status of return ret172?"
tool: returns_inquiry

user input: {}
tool:
"""

Tool dispatcher

The tool dispatch mechanism works via if/else logic to call appropriate Lambda functions depending on the tool’s name. The following is tool_dispatch helper function’s implementation. It’s used inside the agent loop and returns the raw response from the tool Lambda function, which is then cleaned by an output_parser function.


def tool_dispatch(tool_name, tool_input):
    #...
     
    tool_response = None 

    if tool_name == "returns_inquiry":
        tool_response = lambda_client.invoke(
            FunctionName=RETURNS_DB_TOOL_LAMBDA,
            InvocationType="RequestResponse",
            Payload=json.dumps({
              "returnId": tool_input  
            })
        )
    elif tool_name == "order_inquiry":
        tool_response = lambda_client.invoke(
            FunctionName=ORDERS_DB_TOOL_LAMBDA,
            InvocationType="RequestResponse",
            Payload=json.dumps({
                "orderId": tool_input
            })
        )
    else:
        raise ValueError("Invalid tool invocation")
        
    return tool_response

Deploy the solution

Important prerequisites – To get started with the deployment, you need to fulfill the following prerequisites:

  • Access to the AWS Management Console via a user who can launch AWS CloudFormation stacks
  • Familiarity with navigating the AWS Lambda and Amazon Lex consoles
  • Flan-UL2 requires a single ml.g5.12xlarge for deployment, which may necessitate increasing resource limits via a support ticket. In our example, we use us-east-1 as the Region, so please make sure to increase the service quota (if needed) in us-east-1.

Deploy using CloudFormation – You can deploy the solution to us-east-1 by clicking the button below:

Launch stack

Deploying the solution will take about 20 minutes and will create a LLMAgentStack stack, which:

  • deploys the SageMaker endpoint using Flan-UL2 model from SageMaker JumpStart;
  • deploys three Lambda functions: LLMAgentOrchestrator, LLMAgentReturnsTool, LLMAgentOrdersTool; and
  • deploys an AWS Lex bot that can be used to test the agent: Sagemaker-Jumpstart-Flan-LLM-Agent-Fallback-Bot.

Test the solution

The stack deploys an Amazon Lex bot with the name Sagemaker-Jumpstart-Flan-LLM-Agent-Fallback-Bot. The bot can be used to test the agent end-to-end. Here’s an additional comprehensive guide for testing AWS Amazon Lex bots with a Lambda integration and how the integration works at a high level. But in short, Amazon Lex bot is a resource that provides a quick UI to chat with the LLM agent running inside a Lambda function that we built (LLMAgentOrchestrator).

The sample test cases to consider are as follows:

  • Valid order inquiry (for example, “Which item was ordered for 123456?”)
    • Order “123456” is a valid order, so we should expect a reasonable answer (e.g. “Herbal Handsoap”)
  • Valid return inquiry for a return (for example, “When is my return rtn003 processed?”)
    • We should expect a reasonable answer about the return’s status.
  • Irrelevant to both returns or orders (for example, “How is the weather in Scotland right now?”)
    • An irrelevant question to returns or orders, thus a default answer should be returned (“Sorry, I cannot answer that question.”)
  • Invalid order inquiry (for example, “Which item was ordered for 383833?”)
    • The id 383832 does not exist in the orders dataset and hence we should fail gracefully (for example, “Order not found. Please check your Order ID.”)
  • Invalid return inquiry (for example, “When is my return rtn123 processed?”)
    • Similarly, id rtn123 does not exist in the returns dataset, and hence should fail gracefully.
  • Irrelevant return inquiry (for example, “What is the impact of return rtn001 on world peace?”)
    • This question, while it seems to pertain to a valid order, is irrelevant. The LLM is used to filter questions with irrelevant context.

To run these tests yourself, here are the instructions.

  1. On the Amazon Lex console (AWS Console > Amazon Lex), navigate to the bot entitled Sagemaker-Jumpstart-Flan-LLM-Agent-Fallback-Bot. This bot has already been configured to call the LLMAgentOrchestrator Lambda function whenever the FallbackIntent is triggered.
  2. In the navigation pane, choose Intents.
    intent navigation
  3. Choose Build at the top right corner
    lex bot start build
  4. 4. Wait for the build process to complete. When it’s done, you get a success message, as shown in the following screenshot.
    build complete status
  5. Test the bot by entering the test cases.

Cleanup

To avoid additional charges, delete the resources created by our solution by following these steps:

  • On the AWS CloudFormation console, select the stack named LLMAgentStack (or the custom name you picked).
  • Choose Delete
  • Check that the stack is deleted from the CloudFormation console.

Important: double-check that the stack is successfully deleted by ensuring that the Flan-UL2 inference endpoint is removed.

  • To check, go to AWS console > Sagemaker > Endpoints > Inference page.
  • The page should list all active endpoints.
  • Make sure sm-jumpstart-flan-bot-endpoint does not exist like the below screenshot.

sagemaker clean up

Considerations for production

Deploying LLM agents to production requires taking extra steps to ensure reliability, performance, and maintainability. Here are some considerations prior to deploying agents in production:

  • Selecting the LLM model to power the agent loop: For the solution discussed in this post, we used a Flan-UL2 model without fine-tuning to perform task planning or tool selection. In practice, using an LLM that is fine-tuned to directly output tool or API requests can increase reliability and performance, as well as simplify development. We could fine-tune an LLM on tool selection tasks or use a model that directly decodes tool tokens like Toolformer.
    • Using fine-tuned models can also simplify adding, removing, and updating tools available to an agent. With prompt-only based approaches, updating tools requires modifying every prompt inside the agent orchestrator, such as those for task planning, tool parsing, and tool dispatch. This can be cumbersome, and the performance may degrade if too many tools are provided in context to the LLM.
  • Reliability and performance: LLM agents can be unreliable, especially for complex tasks that cannot be completed within a few loops. Adding output validations, retries, structuring outputs from LLMs into JSON or yaml, and enforcing timeouts to provide escape hatches for LLMs stuck in loops can enhance reliability.

Conclusion

In this post, we explored how to build an LLM agent that can utilize multiple tools from the ground up, using low-level prompt engineering, AWS Lambda functions, and SageMaker JumpStart as building blocks. We discussed the architecture of LLM agents and the agent loop in detail. The concepts and solution architecture introduced in this blog post may be appropriate for agents that use a small number of a predefined set of tools. We also discussed several strategies for using agents in production. Agents for Bedrock, which is in preview, also provides a managed experience for building agents with native support for agentic tool invocations.


About the Author

John Hwang is a Generative AI Architect at AWS with special focus on Large Language Model (LLM) applications, vector databases, and generative AI product strategy. He is passionate about helping companies with AI/ML product development, and the future of LLM agents and co-pilots. Prior to joining AWS, he was a Product Manager at Alexa, where he helped bring conversational AI to mobile devices, as well as a derivatives trader at Morgan Stanley. He holds B.S. in computer science from Stanford University.

Read More