HEX
Server: Apache/2.4.6 () PHP/7.4.33
System: Linux chile-dev-app-1 5.4.17-2136.315.5.el7uek.x86_64 #2 SMP Wed Dec 21 19:57:57 PST 2022 x86_64
User: apache (48)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //lib/python3.6/site-packages/oci_utils/impl/virt/virt_check.py
# oci-utils
#
# Copyright (c) 2018, 2022 Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at
# http://oss.oracle.com/licenses/upl.

"""
Verify if we have valid KVM environment on a BM with the following
   - Verify IOMMU is enabled
   - Verify it is a BM
   - Verify SR-IOV is enabled on primary ethernet device
   - Verify the bridge on primary ethernet device has VEPA link mode set
"""

import logging
import os

from oci_utils import lsblk
from . import virt_utils
from .. import BRIDGE_CMD
from .. import print_choices
from .. import sudo_utils
from ..platform_helpers import (get_block_devices, get_phys_device)

_logger = logging.getLogger('oci-utils.virt.virt-check')


def iommu_check():
    """
    Verify if the IOMMU is enabled. Linux kernel logs the message "IOMMU:
    enabled" at boot time.

    Returns
    -------
        bool
            True on success, False otherwise.
    """
    iommu_on_exp = "iommu=on"
    output = sudo_utils.call_popen_output(
        ['cat /proc/cmdline', '|', 'egrep', '"{}"'.format(iommu_on_exp)])
    if not output:
        _logger.debug('IOMMU flag not set on kernel')
        return False
    return True


def sriov_numvfs_check(phys_dev):
    """
    Verify if the primary ethernet device has SR-IOV enabled.

    Parameters
    ----------
    phys_dev : str
        The primary ethernet device.

    Returns
    -------
       bool
            True on success, False otherwise.
    """
    sriov_fpath = '/sys/class/net/{}/device/sriov_numvfs'.format(phys_dev)
    try:
        f = open(sriov_fpath, "r")
        output = f.read()
        f.close()
        if int(output) > 0:
            return True
        _logger.debug('sriov_numvfs_check return false')
        return False
    except IOError as e:
        _logger.debug('error checking sriov_numvfs_check: %s' % str(e))
        return False


def br_link_mode_check(phys_dev):
    """
    Verify the bridge on primary ethernet device has VEPA link mode.

    Parameters
    ----------
    phys_dev : str
        The primary ethernet device.

    Returns
    -------
      bool
            True on success, False otherwise.
    """
    vepa = "VEPA"
    res = sudo_utils.call_popen_output(
        [BRIDGE_CMD, 'link', 'show', 'dev', '"{}"'.format(phys_dev)])
    if res and vepa == res.split()[-1].decode().upper():
        return True
    _logger.debug('br_link_mode_check, return False, server_type: %s ' % vepa)
    return False


def validate_kvm_env(check_for_bm_shape=True):
    """
    Verify if we have valid KVM environment on a BM with the following:
      - Verify IOMMU is enabled;
      - (for BM shapes) Verify SR-IOV is enabled on primary ethernet device;
      - (for BM shapes) Verify the bridge on primary ethernet device has VEPA link mode set.
    Parameters
    ----------
    check_for_bm_shape: boolean
        check for BM shape types ?
    Returns
    -------
        bool
            True on success, False otherwise.
    """
    phys_dev = get_phys_device()
    if phys_dev is None:
        _logger.debug('No physical device found')
        return False

    if not iommu_check():
        _logger.debug('iommu_check failed')
        return False

    if check_for_bm_shape:
        if not sriov_numvfs_check(phys_dev):
            _logger.debug('sriov_numvfs_check failed')
            return False
        if not br_link_mode_check(phys_dev):
            _logger.debug('br_link_mode_check failed')
            return False
    return True


def validate_domain_name(domain):
    """
    Checks if a domain name is already in use.  Returns False
    if it is, and True if not.

    Parameters
    ----------
    domain: str
        The domain name.

    Returns
    -------
        bool
             True on succes, False otherwise.
    """
    if domain in virt_utils.get_domains_name():
        return False
    return True


def validate_block_device(dev_orig):
    """
    Given a path, ensure that the path actually represents
    a device and that device is not currently assigned to
    a domain.

    Parameters
    ----------
    dev_orig: str
        The path to the block device.

    Returns
    -------
        bool
            True on success, False otherwise.
    """
    path_prefix = '/dev/disk/by-path'
    devices = lsblk.list_blk_dev()
    domains = virt_utils.get_domains_name()
    domain_disks = \
        {d: virt_utils.get_disks_from_domain(virt_utils.get_domain_xml(d))
         for d in domains}
    unused_devices = virt_utils.get_unused_block_devices(devices, domain_disks)
    dev = dev_orig

    if not dev_orig:
        # If not block device was provided, find one
        # that is not in use
        try:
            dev_orig = unused_devices.pop()
        except KeyError:
            _print_available_block_devices(unused_devices)
            return False
        dev = dev_orig

    try:
        os.stat(dev)
    except Exception:
        _logger.error("{} does not exist.".format(dev))
        _print_available_block_devices(unused_devices)
        return False

    # If the device is not already pointing to a consistent name,
    # convert it to one.
    if not dev.startswith(path_prefix) or dev == path_prefix:
        dev_orig = dev
        visited = []
        while True:
            try:

                dev = os.path.abspath(os.path.join(os.path.dirname(dev), os.readlink(dev)))
            except OSError as e:
                # Found  a real path.
                if e.errno == 22:
                    break
                _logger.error(
                        "Unexpected error occured while resolving {}.  Error reading {}: {}", dev_orig, dev, e)
                return False

            # Prevent infinite loops
            if dev in visited:
                _logger.error("Infinite loop encountered trying to resolve {}.", dev_orig)
                print_choices("Path:", visited + [dev])
                return False

            visited.append(dev)

        # Convert the resolved device into by-path format
        dev_map = get_block_devices()
        try:
            dev = dev_map[dev]
        except Exception:
            _logger.error("{} does not point to a block device.", dev_orig)
            _print_available_block_devices(unused_devices)
            return False

    # At this point, dev points to a file in /dev/disk/by-path'
    # and has been confirmed to exist.  It can be assumed
    # that the path is also a symlink
    dev_path = os.readlink(dev)
    dev_name = dev_path[dev_path.rfind('/') + 1:]
    if dev_name not in devices:
        _logger.error("{} is not a valid device", dev_orig)
        _print_available_block_devices(unused_devices)
        return False
    if virt_utils.block_device_has_mounts(devices[dev_name]):
        _logger.error("{} is in use by the host system", dev_orig)
        _print_available_block_devices(unused_devices)
        return False
    if not devices[dev_name].get('size'):
        _logger.error("{} is not a disk", dev_orig)
        _print_available_block_devices(unused_devices)
        return False

    for domain, disks in domain_disks.items():
        if dev in disks:
            _logger.error("{} is in use by \"{}\"", dev_orig, domain)
            _print_available_block_devices(unused_devices)
            return False

    return dev


def _print_available_block_devices(devices):
    """
    Display all available block devices.

    Parameters
    ----------
    devices: list
        List of available block devices.
    Returns
    -------
        No return value.
    """
    if not devices or len(devices) == 0:
        _logger.error("All block devices are currently in use.  Please attach a new block device via the OCI console.")
    else:
        print_choices("Available Block Devices:", devices)