Azure On Prem workload Setup

Modified on Tue, 12 Nov at 10:55 AM



Information Collected by Workload Scans:

CloudDefense collects the following information per workload:
- Port information
- Running/Installed packages
- Check of VM security best practices per security standards
- VM identity information

Note: No data is scanned in CWPP. PII and database related scans are covered separately in our DSPM module



Steps to install on-prem workload scans:


Architecture:



1. Install an Azure Function at the Customer’s End:

Create an Azure Function with permissions to trigger jobs for AKS (Azure Kubernetes Services) in their environment.

Permissions required for the Azure Function

To trigger AKS jobs, the Azure Function requires the following roles:

  • Azure Kubernetes Service Cluster User Role
  • Azure Kubernetes Service Contributor Role
  • Virtual Machine Contributor


These roles will be added from identity page using system assigned as "On"


2. Allow CloudDefense to invoke the Azure Function with minimum access:

This Azure Function will be triggered by CloudDefense’s backend. Therefore, the customer should provide CloudDefense with the privilege to invoke the Azure Function by adding the following information into our environment page (access to invoke function):


  • Function Name
  • URL: Default (Function Key)



Azure Function code (python):

import logging
import base64
import json
import os
from azure.identity import DefaultAzureCredential
from azure.mgmt.containerservice import ContainerServiceClient
from kubernetes import client, config
from kubernetes.client import V1Job, V1JobSpec, V1PodSpec, V1Container, V1EnvVar, V1ResourceRequirements, V1ObjectMeta, V1PodTemplateSpec
import uuid
import yaml  
import azure.functions as func
logging.basicConfig(level=logging.DEBUG)



def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Processing Azure Function request to trigger a Kubernetes job')

    # Parse request body
    try:
        req_body = req.get_json()
    except ValueError:
        return func.HttpResponse("Invalid request body", status_code=400)

    # Required fields for the Kubernetes job
    required_fields = [
        'AzureVmName', 'InstanceScanInfoID', 'ScanRequestSecret',
        'HostAddress', 'InstancePublicIP', 'InstanceSnapshotInfoID', 'AzureResourceGroup'
    ]
    missing_fields = [field for field in required_fields if field not in req_body]
    if missing_fields:
        return func.HttpResponse(f"Missing required fields: {', '.join(missing_fields)}", status_code=400)

    # Use managed identity to authenticate with Azure
    credential = DefaultAzureCredential()
    subscription_id = os.environ.get('AZURE_SUBSCRIPTION_ID')


    # Initialize AKS client and retrieve kubeconfig using managed identity
    aks_client = ContainerServiceClient(credential, subscription_id)
    kubeconfig_yaml = get_kubeconfig(aks_client, os.environ.get('AZURE_CLUSTER_NAME'), os.environ.get('AZURE_CLUSTER_RESOURCE_GROUP'))

    if not kubeconfig_yaml:
        return func.HttpResponse(f"Failed to retrieve kubeconfig {subscription_id}", status_code=500)

    # Load Kubernetes config into the client
    try:
        logging.info("Loading Kubernetes config from kubeconfig YAML")
        # Parse the YAML string into a Python dictionary
        kubeconfig_dict = yaml.safe_load(kubeconfig_yaml)
        logging.info(f"Kubeconfig dictionary: {kubeconfig_dict}")
        # Load the kubeconfig from the dictionary into the Kubernetes client
        config.load_kube_config_from_dict(kubeconfig_dict)
    except Exception as e:
        return func.HttpResponse(f"Failed to load Kubernetes config: {str(e)}", status_code=500)

    # Create Kubernetes job
    job = create_k8s_job(req_body, os.environ.get('AZURE_SUBSCRIPTION_ID'), os.environ.get('AZURE_TENANT_ID_SCAN'), os.environ.get('AZURE_CLIENT_ID_SCAN'), os.environ.get('AZURE_CLIENT_SECRET_SCAN'))
    batch_v1 = client.BatchV1Api()
    try:
        api_response = batch_v1.create_namespaced_job(body=job, namespace='default')
        logging.info(f"Job created: {api_response.metadata.name}")
        return func.HttpResponse(f"Job {api_response.metadata.name} created successfully", status_code=200)

    except Exception as e:
        logging.error(f"Failed to create Kubernetes job: {str(e)}")
        return func.HttpResponse(f"Failed to create job: {str(e)}", status_code=500)


def fix_base64_padding(base64_str):
    # Add padding if necessary to avoid "Incorrect padding" error
    return base64_str + '=' * (-len(base64_str) % 4)


def get_kubeconfig(aks_client, cluster_name, resource_group):
    """Retrieve the Kubernetes kubeconfig for the AKS cluster and return it as YAML or plain text."""
    try:
        # Fetch kubeconfig for the AKS cluster
        creds = aks_client.managed_clusters.list_cluster_admin_credentials(resource_group, cluster_name)
        logging.info(f"Kubernetes creds: {creds}")

        # Extract the base64-encoded kubeconfig
        if creds.kubeconfigs and len(creds.kubeconfigs) > 0:
            kubeconfig_b64 = creds.kubeconfigs[0].value

            # Convert bytearray to string if necessary
            if isinstance(kubeconfig_b64, bytearray):
                kubeconfig_b64 = kubeconfig_b64.decode('utf-8')  # Convert bytearray to string

            # Log the kubeconfig in Base64 format
            logging.info(f"Kubeconfig Base64 (before decoding): {kubeconfig_b64}")

            # Try to decode the Base64-encoded kubeconfig to YAML
            try:
                
                kubeconfig_yaml = base64.b64decode(fix_base64_padding(kubeconfig_b64)).decode('utf-8')
                logging.info(f"Successfully decoded Kubeconfig YAML")
                return kubeconfig_yaml  

            except (base64.binascii.Error, UnicodeDecodeError) as decode_error:
                
                logging.info(f"Base64 decoding failed, assuming plain-text kubeconfig: {decode_error}")
                return kubeconfig_b64  # Returning the plain-text kubeconfig directly
        else:
            logging.error("No kubeconfig found in the response.")
            return None

    except Exception as e:
        logging.error(f"Error retrieving kubeconfig: {str(e)}")
        return None



def create_k8s_job(request_body, azuresubscriptionid, azuretenentid, azureclientid, azureclientsecret):
    logging.info(f"Creating Kubernetes Job for Azure VM: {request_body['AzureVmName']}")
    """Create the Kubernetes Job Spec with ttlSecondsAfterFinished."""
    container = V1Container(
        name="workload-scanner",
        image="cdefense/workload-azure-run-command-cli:prod",
        image_pull_policy="Always",
        env=[
            V1EnvVar(name="INSTANCE_SCAN_INFO_ID", value=request_body['InstanceScanInfoID']),
            V1EnvVar(name="SCAN_REQUEST_SECRET", value=request_body['ScanRequestSecret']),
            V1EnvVar(name="HOST_ADDRESS", value=request_body['HostAddress']),
            V1EnvVar(name="INSTANCE_PUBLIC_IP", value=request_body['InstancePublicIP']),
            V1EnvVar(name="INSTANCE_SNAPSHOT_INFO_ID", value=request_body['InstanceSnapshotInfoID']),
            V1EnvVar(name="VM_NAME", value=request_body['AzureVmName']),
            V1EnvVar(name="RESOURCE_GROUP_NAME", value=request_body['AzureResourceGroup']),
            V1EnvVar(name="WORKLOAD-GOLANG-BACKEND", value="https://acs-cwpp-prod.clouddefenseai.com/"),
            V1EnvVar(name="AZURE_SUBSCRIPTION_ID", value=azuresubscriptionid),
            V1EnvVar(name="AZURE_TENANT_ID", value=azuretenentid),
            V1EnvVar(name="AZURE_CLIENT_ID", value=azureclientid),
            V1EnvVar(name="AZURE_CLIENT_SECRET", value=azureclientsecret),
        ],
        resources=V1ResourceRequirements(
            limits={"cpu": "900m", "memory": "912Mi"},
            requests={"cpu": "250m", "memory": "256Mi"}
        )
    )
    logging.info(f"Container created: {container.name}")
    pod_spec = V1PodSpec(containers=[container], restart_policy="Never")
    template = V1PodTemplateSpec(metadata=V1ObjectMeta(labels={"app": "workload-scanner"}), spec=pod_spec)
    job_spec = V1JobSpec(
        template=template, 
        backoff_limit=0,
        ttl_seconds_after_finished=60  # Automatically delete the job 1 minute after completion/failure
    )

    job_name = f"workloadscan-{uuid.uuid4()}".lower()
    job = V1Job(
        metadata=V1ObjectMeta(name=job_name),
        spec=job_spec
    )
    logging.info("Kubernetes Job created")

    return job


Once azure function is ready and deployed. Customer's needs to set few environment keys which would be

  •  Name of the resource group for AKS
  •  Name of the AKS cluster
  •   Azure client secret, clientid, tenantid, subscriptionid,  which can access to Virtual Machine Contributor role
Note: We need Azure clientid, clientsecret, tenanatid, subsscriptionid set as env variable because these credentials will be used by AKS jobs to run Azure Run command on the target workload machine.



3. AKS at the Customer’s End


Once the Azure Function is set up with sufficient permissions, the customer should create a Kubernetes cluster in their Azure environment, which can launch jobs using the following image:

cdefense/workload-azure-run-command-cli:prod


The purpose of this job is to launch Azure Run Commands on the targeted VMs to collect:

  • Vulnerable packages
  • Open ports on the target VM
  • End-of-standard support for the OS version
  • Security benchmark checks


4. WhiteList our load balancer


Once AKS and Azure function setup is done . To send data from customer network to CloudDefense, customers need to whitelist our Ingress DNS. 


Our Ingress DNS is:


k8s-prod-acsingre-2967d5e127-442314495.me-central-1.elb.amazonaws.com 


Once these setups are ready. Customers can directly come to our ACS platform and run the workload scan from the UI. Once scan is completed the results would looks something like












Was this article helpful?

That’s Great!

Thank you for your feedback

Sorry! We couldn't be helpful

Thank you for your feedback

Let us know how can we improve this article!

Select at least one of the reasons

Feedback sent

We appreciate your effort and will try to fix the article