HEX
Server: Apache/2.4.6 () PHP/7.4.33
System: Linux chile-dev-app-1 5.4.17-2136.315.5.el7uek.x86_64 #2 SMP Wed Dec 21 19:57:57 PST 2022 x86_64
User: apache (48)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //usr/libexec/oled-tools/lkce
#!/usr/bin/env python3
#
# Copyright (c) 2023, Oracle and/or its affiliates.
# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
#
# This code is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 2 only, as
# published by the Free Software Foundation.
#
# This code is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# version 2 for more details (a copy is included in the LICENSE file that
# accompanied this code).
#
# You should have received a copy of the GNU General Public License version
# 2 along with this work; if not, see <https://www.gnu.org/licenses/>.
#
# Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
# or visit www.oracle.com if you need additional information or have any
# questions.

"""
Program for user-interaction
"""

import fcntl
import glob
import os
import platform
import re
import shutil
import stat
import subprocess  # nosec
import sys
from typing import Sequence, Mapping, Optional, List


def update_key_values_file(
        path: str,
        key_values: Mapping[str, Optional[str]],
        sep: str) -> None:
    """Update key-values in a file.

    For all (key, new_value) pairs in key_values, if value is not None, update
    the value of that key in the file to new_value; otherwise remove the
    key-value pair from the file.  The file is assumed to have a key-value per
    line, with sep as separator.  A line might contain only a key, even without
    a separator, in which case the value is assumed to be empty.
    """
    with open(path) as fdesc:
        data = fdesc.read().splitlines()

    with open(path, "w") as fdesc:
        for line in data:
            key, *_ = line.split("=", maxsplit=1)
            key = key.strip()

            if key in key_values:
                new_value = key_values[key]

                # If new_value is not None, update the value; otherwise remove
                # it (i.e. don't write key-new_value back to the file).
                if new_value is not None:
                    fdesc.write(f"{key}{sep}{new_value}\n")
            else:
                # line doesn't match lines to update; write it back as is
                fdesc.write(f"{line}\n")


class Lkce:
    """Class to include user interaction related functionality"""
    # pylint: disable=too-many-instance-attributes

    def __init__(self) -> None:
        """Constructor for Lkce class"""
        self.lkce_home = "/etc/oled/lkce"
        self.lkce_bindir = "/usr/lib/oled-tools"
        self.lkce_config_file = self.lkce_home + "/lkce.conf"
        self.kdump_kernel_ver = platform.uname().release
        self.vmcore = "yes"

        self.set_defaults()

        # set values from config file
        if os.path.exists(self.lkce_config_file):
            self.read_config(self.lkce_config_file)

        # lkce as a kdump_pre hook to kexec-tools
        self.lkce_kdump_sh = self.lkce_home + "/lkce_kdump.sh"
        self.lkce_kdump_dir = self.lkce_home + "/lkce_kdump.d"
        self.kdump_conf = "/etc/kdump.conf"

        tmp = shutil.which("timeout")
        if tmp is None:
            print("timeout command not found.")
            sys.exit(1)

        self.timeout_path = tmp
    # def __init__

    # default values
    def set_defaults(self) -> None:
        """set default values"""
        self.enable_kexec = "no"
        self.vmlinux_path = "/usr/lib/debug/lib/modules/" + \
            self.kdump_kernel_ver + "/vmlinux"
        self.crash_cmds_file = self.lkce_home + "/crash_cmds_file"
        self.vmcore = "yes"
        self.max_out_files = "50"
        self.lkce_outdir = "/var/oled/lkce"
    # def set_default

    def configure_default(self) -> None:
        """Configure lkce with default values

        Creates self.crash_cmds_file and self.lkce_config_file
        """
        os.makedirs(self.lkce_home, exist_ok=True)

        if self.enable_kexec == "yes":
            print("trying to disable lkce")
            self.disable_lkce_kexec()

        self.set_defaults()

        # crash_cmds_file
        filename = self.crash_cmds_file
        content = """#
# This is the input file for crash utility. You can edit this manually
# Add your own list of crash commands one per line.
#
bt
bt -a
bt -FF
dev
kmem -s
foreach bt
log
mod
mount
net
ps -m
ps -S
runq
quit
"""
        try:
            file = open(filename, "w")
            file.write(content)
            file.close()
        except OSError:
            print(f"Unable to operate on file: {filename}")
            return

        # config file
        filename = self.lkce_config_file
        content = """##
# This is configuration file for lkce
# Use 'oled lkce configure' command to change values
##

#enable lkce in kexec kernel
enable_kexec=""" + self.enable_kexec + """

#debuginfo vmlinux path. Need to install debuginfo kernel to get it
vmlinux_path=""" + self.vmlinux_path + """

#path to file containing crash commands to execute
crash_cmds_file=""" + self.crash_cmds_file + """

#lkce output directory path
lkce_outdir=""" + self.lkce_outdir + """

#enable vmcore generation post kdump_report
vmcore=""" + self.vmcore + """

#maximum number of outputfiles to retain. Older file gets deleted
max_out_files=""" + self.max_out_files

        try:
            file = open(filename, "w")
            file.write(content)
            file.close()
        except OSError:
            print("Unable to operate on file: {filename}")
            return

        print("configured with default values")
    # def configure_default

    def read_config(self, filename: str) -> None:
        """Read config file and update the class variables"""
        if not os.path.exists(filename):
            return

        try:
            file = open(filename, "r")
        except OSError:
            print("Unable to open file: {filename}")
            return

        for line in file.readlines():
            if re.search("^#", line):  # ignore lines starting with '#'
                continue

            # trim space/tab/newline from the line
            line = re.sub(r"\s+", "", line)

            entry = re.split("=", line)
            if "enable_kexec" in entry[0] and entry[1]:
                self.enable_kexec = entry[1]

            elif "vmlinux_path" in entry[0] and entry[1]:
                self.vmlinux_path = entry[1]

            elif "crash_cmds_file" in entry[0] and entry[1]:
                self.crash_cmds_file = entry[1]

            elif "lkce_outdir" in entry[0] and entry[1]:
                self.lkce_outdir = entry[1]

            elif "vmcore" in entry[0] and entry[1]:
                self.vmcore = entry[1]

            elif "max_out_files" in entry[0] and entry[1]:
                self.max_out_files = entry[1]
    # def read_config

    def create_lkce_kdump(self) -> None:
        """create lkce_kdump.sh script.

        lkce_kdump.sh is attached as kdump_pre hook in /etc/kdump.conf
        """
        filename = self.lkce_kdump_sh
        os.makedirs(self.lkce_kdump_dir, exist_ok=True)

        mount_cmd = "mount -o bind /sysroot"  # OL7 and above

        # create lkce_kdump.sh script
        content = """#!/bin/sh
# This is a kdump_pre script
# /etc/kdump.conf is used to configure kdump_pre script

# Generate vmcore post lkce_kdump scripts execution
LKCE_VMCORE=""" + self.vmcore + """

# Timeout for lkce_kdump scripts in seconds
LKCE_TIMEOUT="120"

# Temporary directory to mount the actual root partition
LKCE_DIR="/lkce_kdump"

mkdir $LKCE_DIR
""" + mount_cmd + """ $LKCE_DIR
mount -o bind /proc $LKCE_DIR/proc
mount -o bind /dev $LKCE_DIR/dev

LKCE_KDUMP_SCRIPTS=$LKCE_DIR""" + self.lkce_kdump_dir + """/*

#get back control after $LKCE_TIMEOUT to proceed
export LKCE_KDUMP_SCRIPTS
export LKCE_DIR
""" + self.timeout_path + """ $LKCE_TIMEOUT /bin/sh -c '
echo "LKCE_KDUMP_SCRIPTS=$LKCE_KDUMP_SCRIPTS";
for file in $LKCE_KDUMP_SCRIPTS;
do
    cmd=${file#$LKCE_DIR};
    echo "Executing $cmd";
    chroot $LKCE_DIR $cmd;
done;'

umount $LKCE_DIR/dev
umount $LKCE_DIR/proc
umount $LKCE_DIR

unset LKCE_KDUMP_SCRIPTS
unset LKCE_DIR

if [ "$LKCE_VMCORE" == "no" ]; then
    echo "lkce_kdump.sh: vmcore generation is disabled"
    exit 1
fi

exit 0
"""
        try:
            file = open(filename, "w")
            file.write(content)
            file.close()
        except OSError:
            print("Unable to operate on file: {filename}")
            return

        mode = os.stat(filename).st_mode
        os.chmod(filename, mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
    # def create_lkce_kdump

    def remove_lkce_kdump(self) -> int:
        """Remove lkce_kdump.sh as kdump_pre hook in /etc/kdump.conf"""
        return self.update_kdump_conf("--remove")
    # def remove_lkce_kdump()

    def update_kdump_conf(self, arg: str) -> int:
        """Add/remove /etc/kdump.conf with kdump_pre hook"""
        if not os.path.exists(self.kdump_conf):
            print(f"error: can not find {self.kdump_conf}. "
                  "Please retry after installing kexec-tools")
            return 1

        kdump_pre_line = f"kdump_pre {self.lkce_kdump_sh}"
        kdump_timeout_line = f"extra_bins {self.timeout_path}"

        with open(self.kdump_conf) as conf_fd:
            conf_lines = conf_fd.read().splitlines()

        kdump_pre_value = None
        for line in conf_lines:
            if line.startswith("kdump_pre "):
                kdump_pre_value = line

        if arg == "--remove":
            if kdump_pre_value != kdump_pre_line:
                print(f"lkce_kdump entry not set in {self.kdump_conf}")
                return 1

            # remove lkce_kdump config
            with open(self.kdump_conf, "w") as conf_fd:
                for line in conf_lines:
                    if line not in (kdump_pre_line, kdump_timeout_line):
                        conf_fd.write(f"{line}\n")

            restart_kdump_service()
            return 0

        # arg == "--add"
        if kdump_pre_value == kdump_pre_line:
            print("lkce_kdump is already enabled to run lkce scripts")
        elif kdump_pre_value:
            # kdump_pre is enabled, but it is not our lkce_kdump script
            print(f"lkce_kdump entry not set in {self.kdump_conf} "
                  "(manual setting needed)\n"
                  f"present entry in kdump.conf:\n{kdump_pre_value}\n"
                  "Hint: edit the present kdump_pre script and make it run"
                  f" {self.lkce_kdump_sh}")
            return 1
        else:
            # add lkce_kdump config
            with open(self.kdump_conf, "a") as conf_fd:
                conf_fd.write(
                    f"{kdump_pre_line}\n{kdump_timeout_line}\n")
            restart_kdump_service()

        return 0
    # def update_kdump_conf

    def report(self, subargs: List[str]) -> None:
        """Generate report from vmcore"""
        if not subargs:
            print("error: report option need additional arguments "
                  "[oled lkce help]")
            return

        d_subargs = {}
        for subarg in subargs:
            subarg.strip()
            entry = subarg.split("=", 1)

            if len(entry) < 2:
                print("error: unknown report option %s" % subarg)
                continue

            if entry[0] not in ("--vmcore", "--vmlinux",
                                "--crash_cmds", "--outfile"):

                print("error: unknown report option %s" % subarg)
                break

            d_subargs[entry[0].strip()] = entry[1].strip()
        # for

        vmcore = d_subargs.get("--vmcore", None)
        vmlinux = d_subargs.get("--vmlinux", None)
        crash_cmds = d_subargs.get("--crash_cmds", None)
        outfile = d_subargs.get("--outfile", None)

        if vmcore is None:
            print("error: vmcore not specified")
            return

        if vmlinux is None:
            vmlinux = self.vmlinux_path
        if not (os.path.exists(vmlinux) and os.path.isfile(vmlinux)):
            print("error: vmlinux '%s' not found" % vmlinux)
            return

        if crash_cmds is None:
            # use configured crash commands file
            if not (os.path.exists(self.crash_cmds_file) and
                    os.path.isfile(self.crash_cmds_file)):
                print(f"crash_cmds_file '{self.crash_cmds_file}' not found")
                return

            cmd: Sequence[str] = ("crash", vmcore, vmlinux, "-i",
                                  self.crash_cmds_file)
            cmd_input = None
        else:
            # use specified crash commands
            cmd = ("crash", vmcore, vmlinux)
            crash_cmds = crash_cmds + "," + "quit"
            cmd_input = "\n".join(crash_cmds.split(",")).encode("utf-8")

        print("lkce: executing '{}'".format(" ".join(cmd)))

        if outfile:
            with open(outfile, "w") as output_fd:
                subprocess.run(
                    cmd, input=cmd_input, stdout=output_fd, check=True,
                    shell=False)  # nosec
        else:
            subprocess.run(
                cmd, input=cmd_input, stdout=sys.stdout, check=True,
                shell=False)  # nosec
    # def report

    def configure(self, subargs: List[str]) -> None:
        """Configure lkce

        Based on subarg, you can configure with default values, show the values
        and also set to given values
        """
        if not subargs:  # default
            subargs = ["--show"]

        values_to_update = {}
        filename = self.lkce_config_file
        for subarg in subargs:
            subarg.strip()
            if subarg == "--default":
                self.configure_default()
            elif subarg == "--show":
                if not os.path.exists(filename):
                    print("config file not found")
                    return

                print("%15s : %s" % ("vmlinux path", self.vmlinux_path))
                print("%15s : %s" % ("crash_cmds_file", self.crash_cmds_file))
                print("%15s : %s" % ("vmcore", self.vmcore))
                print("%15s : %s" % ("lkce_outdir", self.lkce_outdir))
                print("%15s : %s" % ("lkce_in_kexec", self.enable_kexec))
                print("%15s : %s" % ("max_out_files", self.max_out_files))
            else:
                entry = subarg.split("=", 1)

                if len(entry) < 2:
                    print("error: unknown configure option %s" % subarg)
                    return

                if entry[0] not in ("--vmlinux_path", "--crash_cmds_file",
                                    "--lkce_outdir", "--vmcore",
                                    "--max_out_files"):
                    print("error: unknown configure option %s" % subarg)
                    return

                values_to_update[entry[0].strip("-")] = entry[1].strip()
        # for

        vmcore = values_to_update.get("vmcore", None)
        if values_to_update:
            if vmcore and self.config_vmcore(vmcore):
                return

            update_key_values_file(filename, values_to_update, sep="=")
    # def configure

    def config_vmcore(self, value: str) -> int:
        """Configure vmcore value in lkce_config_file.

        Called from configure()
        """
        if value not in ['yes', 'no']:
            print(f"error: invalid option '{value}'")
            return 1

        filename = self.lkce_kdump_sh
        if not os.path.exists(filename):
            print("error: Please enable lkce first, using 'oled lkce enable'")
            return 1

        self.vmcore = value
        self.create_lkce_kdump()
        restart_kdump_service()
        return 0
    # def config_vmcore

    def enable_lkce_kexec(self) -> None:
        """Enable lkce to generate report on crashed kernel in kexec mode"""
        if not os.path.exists(self.lkce_kdump_sh):
            self.create_lkce_kdump()

        if self.update_kdump_conf("--add") == 1:
            return

        update_key_values_file(
            self.lkce_config_file, {"enable_kexec": "yes"}, sep="=")
        print("enabled_kexec mode")
    # def enable_lkce_kexec

    def disable_lkce_kexec(self) -> None:
        """Disable lkce to generate report on crashed kernel in kexec mode"""
        if not os.path.exists(self.lkce_config_file):
            print(f"config file '{self.lkce_config_file}' not found")
            return

        if self.update_kdump_conf("--remove") == 1:
            return

        update_key_values_file(
            self.lkce_config_file, {"enable_kexec": "no"}, sep="=")

        try:
            os.remove(self.lkce_kdump_sh)
        except OSError:
            pass
        print("disabled kexec mode")
    # def disable kexec

    def status(self) -> None:
        """Show current configuration values"""
        self.configure(subargs=["--show"])

        if not os.path.exists(self.vmlinux_path):
            print(f"NOTE: {self.vmlinux_path}: file not found")
            print("kernel debuginfo rpm installed?")

        if subprocess.run(
                ("rpm", "-q", "crash"), shell=False,  # nosec
                check=False).returncode != 0:
            print("NOTE: crash is not installed")
    # def status

    def clean(self, subarg: List[str]) -> None:
        """Clean up old crash reports to save space"""
        if "--all" in subarg:
            val = input(f"lkce removes all the files in {self.lkce_outdir} "
                        "dir. do you want to proceed(yes/no)? [no]:")
            if "yes" in val:
                for file in glob.glob(f"{self.lkce_outdir}/crash*out"):
                    try:
                        os.remove(file)
                    except OSError:
                        pass
            # if "yes"
        else:
            val = input("lkce deletes all but last three "
                        f"{self.lkce_outdir}/crash*out files. "
                        "do you want to proceed(yes/no)? [no]:")
            if "yes" in val:
                crash_files = glob.glob(f"{self.lkce_outdir}/crash*out")

                # remove all crash files but the 3 newest ones
                for file in sorted(crash_files, reverse=True)[3:]:
                    try:
                        os.remove(file)
                    except OSError:
                        pass
    # def clean

    def listfiles(self) -> None:
        """List crash reports already generated"""
        dirname = self.lkce_outdir
        os.makedirs(dirname, exist_ok=True)

        print("Followings are the crash*out found in %s dir:" % dirname)
        for filename in os.listdir(dirname):
            if re.search("crash.*out", filename):
                print("%s/%s" % (dirname, filename))
        # for
    # def listfiles

# class LKCE


def restart_kdump_service() -> None:
    """Restart kdump service"""
    cmd = ("systemctl", "restart", "kdump")  # OL7 and above

    print("Restarting kdump service...")
    subprocess.run(cmd, shell=False, check=True)  # nosec
    print("done!")
# def restart_kdump_service


def usage() -> int:
    """Print usage"""
    usage_ = """Usage: """ + os.path.basename(sys.argv[0]) + """ <options>
options:
    report <report-options> -- Generate a report from vmcore
    report-options:
        --vmcore=/path/to/vmcore 		- path to vmcore
        [--vmlinux=/path/to/vmlinux] 		- path to vmlinux
        [--crash_cmds=cmd1,cmd2,cmd3,..]	- crash commands to include
        [--outfile=/path/to/outfile] 		- write output to a file

    configure [--default] 	-- configure lkce with default values
    configure [--show] 	-- show lkce configuration -- default
    configure [config-options]
    config-options:
        [--vmlinux_path=/path/to/vmlinux] 	- set vmlinux_path
        [--crash_cmds_file=/path/to/file] 	- set crash_cmds_file
        [--vmcore=yes/no]			- set vmcore generation in kdump kernel
        [--lkce_outdir=/path/to/directory] 	- set lkce output directory
        [--max_out_files=<number>] 		- set max_out_files

    enable_kexec    -- enable lkce in kdump kernel
    disable_kexec   -- disable lkce in kdump kernel
    status          -- status of lkce

    clean [--all]   -- clear crash report files
    list            -- list crash report files
"""
    print(usage_)
    sys.exit(0)
# def usage


def main() -> int:
    """Main routine"""
    lkce = Lkce()

    if len(sys.argv) < 2:
        usage()

    arg = sys.argv[1]
    if arg == "report":
        lkce.report(sys.argv[2:])

    elif arg == "configure":
        lkce.configure(sys.argv[2:])

    elif arg == "enable_kexec":
        lkce.enable_lkce_kexec()

    elif arg == "disable_kexec":
        lkce.disable_lkce_kexec()

    elif arg == "status":
        lkce.status()

    elif arg == "clean":
        lkce.clean(sys.argv[2:])

    elif arg == "list":
        lkce.listfiles()

    elif arg in ("help", "-help", "--help"):
        usage()

    else:
        print(f"Invalid option: {arg}")
        print("Try 'oled lkce help' for more information")

    return 0
# def main


if __name__ == '__main__':
    if not os.geteuid() == 0:
        print("Please run lkce as root user.")
        sys.exit(1)

    LOCK_DIR = "/var/run/oled-tools"
    try:
        if not os.path.isdir(LOCK_DIR):
            os.makedirs(LOCK_DIR)
        LOCK_FILE = LOCK_DIR + "/lkce.lock"

        FH = open(LOCK_FILE, "w")
    except OSError:
        print(f"Unable to open file: {LOCK_FILE}")
        sys.exit()

    # try lock
    try:
        fcntl.flock(FH, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except OSError:  # no lock
        print("another instance of lkce is running.")
        sys.exit()

    try:
        main()
        FH.close()
        os.remove(LOCK_FILE)
    except KeyboardInterrupt:
        print("\nInterrupted by user ctrl+c")