Source code for mlx.warnings.warnings

#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0

import argparse
import errno
import glob
import json
import logging
import os
import subprocess
import sys
from importlib.metadata import distribution
from pathlib import Path

from ruamel.yaml import YAML

from .exceptions import WarningsConfigError
from .junit_checker import JUnitChecker
from .polyspace_checker import PolyspaceChecker
from .regex_checker import CoverityChecker, DoxyChecker, SphinxChecker, XMLRunnerChecker
from .robot_checker import RobotChecker

__version__ = distribution("mlx.warnings").version

LOGGER = logging.getLogger(__name__)


[docs] class WarningsPlugin: def __init__(self, cq_enabled=False): """ Function for initializing the parsers Args: cq_enabled (bool): optional - enable generation of Code Quality report """ self.activated_checkers = {} self.cq_enabled = cq_enabled self.public_checkers = (SphinxChecker, DoxyChecker, JUnitChecker, XMLRunnerChecker, CoverityChecker, RobotChecker, PolyspaceChecker) self._minimum = 0 self._maximum = 0 self.count = 0 self.printout = False
[docs] def activate_checker(self, checker_type, *logging_args): """ Activate additional checkers after initialization Args: checker_type (WarningsChecker): checker class Return: WarningsChecker: activated checker object """ checker = checker_type(*logging_args) checker.cq_enabled = self.cq_enabled and checker.name in ("doxygen", "sphinx", "xmlrunner", "polyspace", "coverity") self.activated_checkers[checker.name] = checker return checker
[docs] def activate_checker_name(self, name, *args): """ Activates checker by name Args: name (str): checker name Returns: WarningsChecker: activated checker object, or None when no checker with the given name exists """ for checker_type in self.public_checkers: if checker_type.name == name: checker = self.activate_checker(checker_type, *args) return checker else: LOGGER.error(f"Checker {name} does not exist")
[docs] def get_checker(self, name): """Get checker by name Args: name (str): checker name Return: checker object (WarningsChecker) """ return self.activated_checkers[name]
[docs] def check(self, content): """ Count the number of warnings in a specified content Args: content (str): The content to parse """ if self.printout: LOGGER.warning(content) if not self.activated_checkers: LOGGER.error("No checkers activated. Please use activate_checker function") else: for checker in self.activated_checkers.values(): if checker.name == "polyspace": raise WarningsConfigError("Function check() cannot be used with Polyspace checker.") else: checker.check(content)
[docs] def check_logfile(self, file): """ Count the number of warnings in a specified content Args: content (_io.TextIOWrapper): The open file to parse """ if not self.activated_checkers: LOGGER.error("No checkers activated. Please use activate_checker function") elif "polyspace" in self.activated_checkers: if len(self.activated_checkers) > 1: raise WarningsConfigError("Polyspace checker cannot be combined with other warnings checkers") self.activated_checkers["polyspace"].check(file) else: content = file.read() for checker in self.activated_checkers.values(): checker.check(content)
[docs] def configure_maximum(self, maximum): """Configure the maximum amount of warnings for each activated checker Args: maximum (int): maximum amount of warnings allowed """ for checker in self.activated_checkers.values(): checker.maximum = maximum
[docs] def configure_minimum(self, minimum): """Configure the minimum amount of warnings for each activated checker Args: minimum (int): minimum amount of warnings allowed """ for checker in self.activated_checkers.values(): checker.minimum = minimum
[docs] def return_count(self, name=None): """Getter function for the amount of found warnings If the name parameter is set, this function will return the amount of warnings found by that checker. If not, the function will return the sum of the warnings found by all registered checkers. Args: name (WarningsChecker): The checker for which to return the amount of warnings (if set) Returns: int: Amount of found warnings """ self.count = 0 if name is None: for checker in self.activated_checkers.values(): self.count += checker.return_count() else: self.count = self.activated_checkers[name].return_count() return self.count
[docs] def return_check_limits(self, name=None): """Function for determining the return value of the script If the name parameter is set, this function will check (and return) the return value of that checker. If not, this function checks whether the warnings for each registered checker are within the configured limits. Args: name (WarningsChecker): The checker for which to check the return value Return: int: 0 if the amount of warnings is within limits, the count of warnings otherwise (or 1 in case of a count of 0 warnings) """ if name is None: for checker in self.activated_checkers.values(): retval = checker.return_check_limits() if retval: return retval else: return self.activated_checkers[name].return_check_limits() return 0
[docs] def toggle_printout(self, printout): """Toggle printout of all the parsed content Useful for command input where we want to print content as well Args: printout (bool): True enables the printout, False provides more silent mode """ self.printout = printout
[docs] def config_parser(self, config, *logging_args): """Parsing configuration dict extracted by previously opened JSON or YAML file Args: config (dict/Path): Content or path of configuration file """ if isinstance(config, Path): with open(config, encoding="utf-8") as open_file: if config.suffix.lower().startswith(".y"): config = YAML().load(open_file) else: config = json.load(open_file) # activate checker for checker_type in self.public_checkers: if checker_type.name in config: checker_config = config[checker_type.name] try: if bool(checker_config["enabled"]): checker = self.activate_checker(checker_type, *logging_args) checker.parse_config(checker_config) LOGGER.info(f"{checker.name_repr}: Config parsing completed") except KeyError as err: raise WarningsConfigError(f"Incomplete config. Missing: {err}") from err
[docs] def write_code_quality_report(self, out_file): """Generates the Code Quality report artifact as a JSON file that implements a subset of the Code Climate spec Args: out_file (str): Location for the output file """ results = [] for checker in self.activated_checkers.values(): results.extend(checker.cq_findings) content = json.dumps(results, indent=4, sort_keys=False) Path(out_file).parent.mkdir(parents=True, exist_ok=True) with open(out_file, "w", encoding="utf-8", newline="\n") as open_file: open_file.write(f"{content}\n")
[docs] def warnings_wrapper(args): parser = argparse.ArgumentParser(prog="mlx-warnings") group1 = parser.add_argument_group("Configuration command line options") group1.add_argument("--coverity", dest="coverity", action="store_true") group1.add_argument("-d", "--doxygen", dest="doxygen", action="store_true") group1.add_argument("-j", "--junit", dest="junit", action="store_true") group1.add_argument("-r", "--robot", dest="robot", action="store_true") group1.add_argument("-s", "--sphinx", dest="sphinx", action="store_true") group1.add_argument("-x", "--xmlrunner", dest="xmlrunner", action="store_true") group1.add_argument("--name", default="", help="Name of the Robot Framework test suite to check results of") group1.add_argument("-m", "--maxwarnings", "--max-warnings", type=int, default=0, help="Maximum amount of warnings accepted") group1.add_argument("--minwarnings", "--min-warnings", type=int, default=0, help="Minimum amount of warnings accepted") group1.add_argument("--exact-warnings", type=int, default=0, help="Exact amount of warnings expected") group2 = parser.add_argument_group("Configuration file with options") group2.add_argument("--config", dest="configfile", action="store", required=False, type=Path, help="Config file in JSON or YAML format provides toggle of checkers and their limits") group2.add_argument("--include-sphinx-deprecation", dest="include_sphinx_deprecation", action="store_true", help="Sphinx checker will include warnings matching (RemovedInSphinx\\d+Warning) regex") parser.add_argument("-o", "--output", type=Path, help="Output file that contains all counted warnings") parser.add_argument("-C", "--code-quality", help="Output Code Quality report artifact for GitLab CI") parser.add_argument("-v", "--verbose", dest="verbose", action="store_true") parser.add_argument("--command", dest="command", action="store_true", help="Treat program arguments as command to execute to obtain data") parser.add_argument("--ignore-retval", dest="ignore", action="store_true", help="Ignore return value of the executed command") parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}") parser.add_argument("logfile", nargs="+", help="Logfile (or command) that might contain warnings") parser.add_argument("flags", nargs=argparse.REMAINDER, help="Possible not-used flags from above are considered as command flags") args = parser.parse_args(args) code_quality_enabled = bool(args.code_quality) if args.output is not None and args.output.exists(): os.remove(args.output) LOGGER.addHandler(logging.StreamHandler()) LOGGER.setLevel(logging.WARNING) if args.verbose: LOGGER.setLevel(logging.INFO) logging_args = [args.verbose, args.output] warnings = WarningsPlugin(cq_enabled=code_quality_enabled) # Read config file if args.configfile is not None: checker_flags = args.sphinx or args.doxygen or args.junit or args.coverity or args.xmlrunner or args.robot warning_args = args.maxwarnings or args.minwarnings or args.exact_warnings if checker_flags or warning_args: LOGGER.error("Configfile cannot be provided with other arguments") sys.exit(2) warnings.config_parser(args.configfile, *logging_args) else: if args.sphinx: warnings.activate_checker_name("sphinx", *logging_args) if args.doxygen: warnings.activate_checker_name("doxygen", *logging_args) if args.junit: warnings.activate_checker_name("junit", *logging_args) if args.xmlrunner: warnings.activate_checker_name("xmlrunner", *logging_args) if args.coverity: warnings.activate_checker_name("coverity", *logging_args) if args.robot: robot_checker = warnings.activate_checker_name("robot", *logging_args) if robot_checker is not None: robot_checker.parse_config({ "suites": [{"name": args.name, "min": 0, "max": 0}], "check_suite_names": True, }) if args.exact_warnings: if args.maxwarnings | args.minwarnings: LOGGER.error("expected-warnings cannot be provided with maxwarnings or minwarnings") sys.exit(2) warnings.configure_maximum(args.exact_warnings) warnings.configure_minimum(args.exact_warnings) else: warnings.configure_maximum(args.maxwarnings) warnings.configure_minimum(args.minwarnings) if args.include_sphinx_deprecation and "sphinx" in warnings.activated_checkers.keys(): warnings.get_checker("sphinx").include_sphinx_deprecation() if args.command: if "polyspace" in warnings.activated_checkers: raise WarningsConfigError("Input argument command cannot be combined with Polyspace checker enabled") cmd = args.logfile if args.flags: cmd.extend(args.flags) warnings.toggle_printout(True) retval = warnings_command(warnings, cmd) if (not args.ignore) and (retval != 0): return retval else: if args.flags: LOGGER.warning(f"Some keyword arguments have been ignored because they followed positional arguments: " f"{' '.join(args.flags)!r}") retval = warnings_logfile(warnings, args.logfile) if retval != 0: return retval warnings.return_count() if args.code_quality: warnings.write_code_quality_report(args.code_quality) return warnings.return_check_limits()
def warnings_command(warnings, cmd): """Execute command to obtain input for parsing for warnings Usually log files are output of the commands. To avoid this additional step this function runs a command instead and parses the stderr and stdout of the command for warnings. Args: warnings (WarningsPlugin): Object for warnings where errors should be logged cmd (list): List of commands (str), which should be executed to obtain input for parsing Return: int: Return value of executed command(s) Raises: OSError: When program is not installed. """ try: LOGGER.info(f"Executing: {cmd}") proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=1, universal_newlines=True) out, err = proc.communicate() # Check stdout if out: try: warnings.check(out.decode(encoding="utf-8")) except AttributeError: warnings.check(out) # Check stderr if err: try: warnings.check(err.decode(encoding="utf-8")) except AttributeError: warnings.check(err) return proc.returncode except OSError as err: if err.errno == errno.ENOENT: LOGGER.error("It seems like program " + str(cmd) + " is not installed.") raise def warnings_logfile(warnings, log): """Parse logfile for warnings Args: warnings (WarningsPlugin): Object for warnings where errors should be logged log: Logfile for parsing Return: 0: Log files existed and are parsed successfully 1: Log files did not exist """ # args.logfile doesn't necessarily contain wildcards, but just to be safe, we # assume it does, and try to expand them. # This mechanism is put in place to allow wildcards to be passed on even when # executing the script on windows (in that case there is no shell expansion of wildcards) # so that the script can be used in the exact same way even when moving from one # OS to another. for file_wildcard in log: if glob.glob(file_wildcard): for logfile in glob.glob(file_wildcard): with open(logfile) as file: warnings.check_logfile(file) else: LOGGER.error(f"FILE: {file_wildcard} does not exist") return 1 return 0 def main(): sys.exit(warnings_wrapper(sys.argv[1:])) if __name__ == "__main__": main()