sys.path.insert(1, os.path.join(our_path, '../tools'))
from binman import elf
-from patman import tools
+from u_boot_pylib import tools
# A typical symbol looks like this:
# _u_boot_list_2_evspy_info_2_EVT_MISC_INIT_F_3_sandbox_misc_init_f
run_test "binman" ./tools/binman/binman --toolpath ${TOOLS_DIR} test
run_test "patman" ./tools/patman/patman test
+run_test "u_boot_pylib" ./tools/u_boot_pylib/u_boot_pylib
run_test "buildman" ./tools/buildman/buildman -t ${skip}
run_test "fdt" ./tools/dtoc/test_fdt -t
import tempfile
import urllib.error
-from patman import command
-from patman import terminal
-from patman import tools
-from patman import tout
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+from u_boot_pylib import tools
+from u_boot_pylib import tout
BINMAN_DIR = os.path.dirname(os.path.realpath(__file__))
from binman import bintool
from binman.bintool import Bintool
-from patman import command
-from patman import terminal
-from patman import test_util
-from patman import tools
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
# pylint: disable=R0904
class TestBintool(unittest.TestCase):
import tempfile
from binman import bintool
-from patman import tools
+from u_boot_pylib import tools
# pylint: disable=C0103
class Bintoollz4(bintool.Bintool):
import tempfile
from binman import bintool
-from patman import tools
+from u_boot_pylib import tools
# pylint: disable=C0103
class Bintoollzma_alone(bintool.Bintool):
from binman import bintool
from binman import elf
-from patman import command
-from patman import tools
+from u_boot_pylib import command
+from u_boot_pylib import tools
# Set to True to enable printing output while working
DEBUG = False
from binman import cbfs_util
from binman.cbfs_util import CbfsWriter
from binman import elf
-from patman import test_util
-from patman import tools
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
U_BOOT_DATA = b'1234'
U_BOOT_DTB_DATA = b'udtb'
import re
import sys
-from patman import tools
from binman import bintool
from binman import cbfs_util
-from patman import command
from binman import elf
from binman import entry
-from patman import tout
+from u_boot_pylib import command
+from u_boot_pylib import tools
+from u_boot_pylib import tout
# These are imported if needed since they import libfdt
state = None
import struct
import tempfile
-from patman import command
-from patman import tools
-from patman import tout
+from u_boot_pylib import command
+from u_boot_pylib import tools
+from u_boot_pylib import tout
ELF_TOOLS = True
try:
import unittest
from binman import elf
-from patman import command
-from patman import test_util
-from patman import tools
-from patman import tout
+from u_boot_pylib import command
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
+from u_boot_pylib import tout
binman_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
from binman import bintool
from binman import elf
from dtoc import fdt_util
-from patman import tools
-from patman.tools import to_hex, to_hex_size
-from patman import tout
+from u_boot_pylib import tools
+from u_boot_pylib.tools import to_hex, to_hex_size
+from u_boot_pylib import tout
modules = {}
from binman.etype.blob import Entry_blob
from dtoc import fdt
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class TestEntry(unittest.TestCase):
def setUp(self):
from binman.entry import Entry, EntryArg
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry__testing(Entry):
from binman.etype.section import Entry_section
from binman.fip_util import FIP_TYPES, FipReader, FipWriter, UUID_LEN
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry_atf_fip(Entry_section):
"""ARM Trusted Firmware's Firmware Image Package (FIP)
from binman.entry import Entry
from binman import state
from dtoc import fdt_util
-from patman import tools
-from patman import tout
+from u_boot_pylib import tools
+from u_boot_pylib import tout
class Entry_blob(Entry):
"""Arbitrary binary blob
from binman.etype.blob import Entry_blob
from dtoc import fdt_util
-from patman import tools
-from patman import tout
+from u_boot_pylib import tools
+from u_boot_pylib import tout
class Entry_blob_ext(Entry_blob):
"""Externally built binary blob
from binman.etype.blob import Entry_blob
from dtoc import fdt_util
-from patman import tools
-from patman import tout
+from u_boot_pylib import tools
+from u_boot_pylib import tout
class Entry_blob_ext_list(Entry_blob):
"""List of externally built binary blobs
"""
from binman.entry import Entry
-from patman import tools
-from patman import tout
+from u_boot_pylib import tools
+from u_boot_pylib import tout
FDTMAP_MAGIC = b'_FDTMAP_'
FDTMAP_HDR_LEN = 16
from binman.etype.section import Entry_section
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
# This is imported if needed
state = None
from binman.entry import Entry
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry_fill(Entry):
"""An entry which is filled to a particular byte value
from binman import elf
from dtoc import fdt_util
from dtoc.fdt import Fdt
-from patman import tools
+from u_boot_pylib import tools
# Supported operations, with the fit,operation property
OP_GEN_FDT_NODES, OP_SPLIT_ELF = range(2)
from binman.entry import Entry
from binman import fmap_util
-from patman import tools
-from patman.tools import to_hex_size
-from patman import tout
+from u_boot_pylib import tools
+from u_boot_pylib.tools import to_hex_size
+from u_boot_pylib import tout
class Entry_fmap(Entry):
from collections import OrderedDict
-from patman import command
+from u_boot_pylib import command
from binman.entry import Entry, EntryArg
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
# Build GBB flags.
# (src/platform/vboot_reference/firmware/include/gbb_header.h)
from binman.entry import Entry
from binman.etype.blob_ext import Entry_blob_ext
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry_intel_ifwi(Entry_blob_ext):
"""Intel Integrated Firmware Image (IFWI) file
from binman.entry import Entry
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry_mkimage(Entry):
"""Binary produced by mkimage
from binman.entry import Entry
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry_null(Entry):
"""An entry which has no contents of its own
import os
import struct
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
from binman.entry import Entry
from binman.etype.collection import Entry_collection
from binman.entry import Entry
from binman import state
from dtoc import fdt_util
-from patman import tools
-from patman import tout
-from patman.tools import to_hex_size
+from u_boot_pylib import tools
+from u_boot_pylib import tout
+from u_boot_pylib.tools import to_hex_size
class Entry_section(Entry):
from binman.entry import Entry, EntryArg
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry_text(Entry):
from binman.entry import Entry
from binman.etype.blob_dtb import Entry_blob_dtb
-from patman import tools
+from u_boot_pylib import tools
# This is imported if needed
state = None
from binman.etype.blob import Entry_blob
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry_u_boot_elf(Entry_blob):
"""U-Boot ELF image
from binman.etype.blob import Entry_blob
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry_u_boot_env(Entry_blob):
"""An entry which contains a U-Boot environment
from binman import elf
from binman.entry import Entry
from binman.etype.blob import Entry_blob
-from patman import tools
+from u_boot_pylib import tools
class Entry_u_boot_spl_bss_pad(Entry_blob):
"""U-Boot SPL binary padded with a BSS region
# Entry-type module for expanded U-Boot SPL binary
#
-from patman import tout
+from u_boot_pylib import tout
from binman import state
from binman.etype.blob_phase import Entry_blob_phase
from binman import elf
from binman.entry import Entry
from binman.etype.blob import Entry_blob
-from patman import tools
+from u_boot_pylib import tools
class Entry_u_boot_tpl_bss_pad(Entry_blob):
"""U-Boot TPL binary padded with a BSS region
# Entry-type module for expanded U-Boot TPL binary
#
-from patman import tout
+from u_boot_pylib import tout
from binman import state
from binman.etype.blob_phase import Entry_blob_phase
import struct
-from patman import command
from binman.entry import Entry
from binman.etype.blob import Entry_blob
from binman.etype.u_boot_with_ucode_ptr import Entry_u_boot_with_ucode_ptr
-from patman import tools
+from u_boot_pylib import command
+from u_boot_pylib import tools
class Entry_u_boot_tpl_with_ucode_ptr(Entry_u_boot_with_ucode_ptr):
"""U-Boot TPL with embedded microcode pointer
from binman.entry import Entry
from binman.etype.blob import Entry_blob
-from patman import tools
+from u_boot_pylib import tools
class Entry_u_boot_ucode(Entry_blob):
"""U-Boot microcode block
from binman import elf
from binman.entry import Entry
from binman.etype.blob import Entry_blob
-from patman import tools
+from u_boot_pylib import tools
class Entry_u_boot_vpl_bss_pad(Entry_blob):
"""U-Boot VPL binary padded with a BSS region
# Entry-type module for expanded U-Boot VPL binary
#
-from patman import tout
+from u_boot_pylib import tout
from binman import state
from binman.etype.blob_phase import Entry_blob_phase
from binman.entry import Entry
from binman.etype.blob import Entry_blob
from dtoc import fdt_util
-from patman import tools
-from patman import command
+from u_boot_pylib import tools
+from u_boot_pylib import command
class Entry_u_boot_with_ucode_ptr(Entry_blob):
"""U-Boot with embedded microcode pointer
from binman.etype.collection import Entry_collection
from dtoc import fdt_util
-from patman import tools
+from u_boot_pylib import tools
class Entry_vblock(Entry_collection):
"""An entry which contains a Chromium OS verified boot block
from dtoc import fdt
from dtoc import fdt_util
from dtoc.fdt import FdtScan
-from patman import tools
+from u_boot_pylib import tools
class TestFdt(unittest.TestCase):
@classmethod
sys.path.insert(2, os.path.join(OUR_PATH, '..'))
# pylint: disable=C0413
-from patman import command
-from patman import tools
+from u_boot_pylib import command
+from u_boot_pylib import tools
# The TOC header, at the start of the FIP
HEADER_FORMAT = '<IIQ'
sys.path.insert(2, os.path.join(OUR_PATH, '..'))
# pylint: disable=C0413
-from patman import test_util
-from patman import tools
from binman import bintool
from binman import fip_util
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
FIPTOOL = bintool.Bintool.create('fiptool')
HAVE_FIPTOOL = FIPTOOL.is_present()
import struct
import sys
-from patman import tools
+from u_boot_pylib import tools
# constants imported from lib/fmap.h
FMAP_SIGNATURE = b'__FMAP__'
from binman.etype import fdtmap
from binman.etype import image_header
from binman.image import Image
-from patman import command
-from patman import test_util
-from patman import tools
-from patman import tout
+from u_boot_pylib import command
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
+from u_boot_pylib import tout
# Contents of test files, corresponding to different entry types
U_BOOT_DATA = b'1234'
from binman.etype import section
from dtoc import fdt
from dtoc import fdt_util
-from patman import tools
-from patman import tout
+from u_boot_pylib import tools
+from u_boot_pylib import tout
class Image(section.Entry_section):
"""A Image, representing an output from binman
import unittest
from binman.image import Image
-from patman.test_util import capture_sys_output
+from u_boot_pylib.test_util import capture_sys_output
class TestImage(unittest.TestCase):
def testInvalidFormat(self):
sys.path.insert(2, our1_path)
from binman import bintool
-from patman import test_util
+from u_boot_pylib import test_util
# Bring in the libfdt module
sys.path.insert(2, 'scripts/dtc/pylibfdt')
from binman import cmdline
from binman import control
-from patman import test_util
+from u_boot_pylib import test_util
def RunTests(debug, verbosity, processes, test_preserve_dirs, args, toolpath):
"""Run the functional tests and any embedded doctests
for path in toolpath:
extra_args += ' --toolpath %s' % path
test_util.run_test_coverage('tools/binman/binman', None,
- ['*test*', '*main.py', 'tools/patman/*', 'tools/dtoc/*'],
+ ['*test*', '*main.py', 'tools/patman/*', 'tools/dtoc/*',
+ 'tools/u_boot_pylib/*'],
args.build_dir, all_set, extra_args or None)
def RunBinman(args):
from dtoc import fdt
import os
-from patman import tools
-from patman import tout
+from u_boot_pylib import tools
+from u_boot_pylib import tout
OUR_PATH = os.path.dirname(os.path.realpath(__file__))
from buildman import builderthread
from buildman import toolchain
-from patman import command
from patman import gitutil
-from patman import terminal
-from patman.terminal import tprint
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+from u_boot_pylib.terminal import tprint
# This indicates an new int or hex Kconfig property with no default
# It hangs the build since the 'conf' tool cannot proceed without valid input.
import threading
from buildman import cfgutil
-from patman import command
from patman import gitutil
+from u_boot_pylib import command
RETURN_CODE_RETRY = -1
BASE_ELF_FILENAMES = ['u-boot', 'spl/u-boot-spl', 'tpl/u-boot-tpl']
import re
-from patman import tools
+from u_boot_pylib import tools
RE_LINE = re.compile(r'(# )?CONFIG_([A-Z0-9_]+)(=(.*)| is not set)')
RE_CFG = re.compile(r'(~?)(CONFIG_)?([A-Z0-9_]+)(=.*)?')
from buildman import cfgutil
from buildman import toolchain
from buildman.builder import Builder
-from patman import command
from patman import gitutil
from patman import patchstream
-from patman import terminal
-from patman import tools
-from patman.terminal import tprint
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+from u_boot_pylib import tools
+from u_boot_pylib.terminal import tprint
def GetPlural(count):
"""Returns a plural 's' if count is not 1"""
from buildman import cmdline
from buildman import control
from buildman import toolchain
-from patman import command
from patman import gitutil
-from patman import terminal
-from patman import test_util
-from patman import tools
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
settings_data = '''
# Buildman settings file
from buildman import toolchain
from patman import patchstream
from patman import gitutil
-from patman import terminal
-from patman import test_util
+from u_boot_pylib import terminal
+from u_boot_pylib import test_util
def RunTests(skip_net_tests, verboose, args):
from buildman import func_test
from buildman import control
from buildman import toolchain
from patman import commit
-from patman import command
-from patman import terminal
-from patman import test_util
-from patman import tools
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
use_network = True
import urllib.request, urllib.error, urllib.parse
from buildman import bsettings
-from patman import command
-from patman import terminal
-from patman import tools
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+from u_boot_pylib import tools
(PRIORITY_FULL_PREFIX, PRIORITY_PREFIX_GCC, PRIORITY_PREFIX_GCC_PATH,
PRIORITY_CALC) = list(range(4))
from dtoc import fdt_util
import libfdt
from libfdt import QUIET_NOTFOUND
-from patman import tools
+from u_boot_pylib import tools
# This deals with a device tree, presenting it as an assortment of Node and
# Prop objects, representing nodes and properties, respectively. This file
import sys
import tempfile
-from patman import command
-from patman import tools
+from u_boot_pylib import command
+from u_boot_pylib import tools
def fdt32_to_cpu(val):
"""Convert a device tree cell to an integer
'../../build-sandbox_spl/scripts/dtc/pylibfdt'))
from dtoc import dtb_platdata
-from patman import test_util
+from u_boot_pylib import test_util
def run_tests(processes, args):
"""Run all the test we have for dtoc
"""Run the tests and check that we get 100% coverage"""
sys.argv = [sys.argv[0]]
test_util.run_test_coverage('tools/dtoc/dtoc', '/main.py',
- ['tools/patman/*.py', '*/fdt*', '*test*'], args.build_dir)
+ ['tools/patman/*.py', 'tools/u_boot_pylib/*','*/fdt*', '*test*'],
+ args.build_dir)
if __name__ != '__main__':
from dtoc.dtb_platdata import tab_to
from dtoc.src_scan import conv_name_to_c
from dtoc.src_scan import get_compat_name
-from patman import test_util
-from patman import tools
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
OUR_PATH = os.path.dirname(os.path.realpath(__file__))
from dtoc.fdt_util import fdt32_to_cpu, fdt64_to_cpu
from dtoc.fdt import Type, BytesToValue
import libfdt
-from patman import test_util
-from patman import tools
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
#pylint: disable=protected-access
build_dir (str): Directory containing the build output
"""
test_util.run_test_coverage('tools/dtoc/test_fdt.py', None,
- ['tools/patman/*.py', '*test_fdt.py'], build_dir)
+ ['tools/patman/*.py', 'tools/u_boot_pylib/*', '*test_fdt.py'],
+ build_dir)
def run_tests(names, processes):
from unittest import mock
from dtoc import src_scan
-from patman import test_util
-from patman import tools
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
OUR_PATH = os.path.dirname(os.path.realpath(__file__))
# SPDX-License-Identifier: GPL-2.0+
-__all__ = ['checkpatch', 'command', 'commit', 'control', 'cros_subprocess',
- 'func_test', 'get_maintainer', 'gitutil', '__main__', 'patchstream',
- 'project', 'series', 'setup', 'settings', 'terminal',
- 'test_checkpatch', 'test_util', 'tools', 'tout']
+__all__ = ['checkpatch', 'commit', 'control', 'func_test', 'get_maintainer',
+ 'gitutil', '__main__', 'patchstream', 'project', 'series',
+ 'settings','setup', 'status', 'test_checkpatch', 'test_settings']
from patman import gitutil
from patman import project
from patman import settings
-from patman import terminal
-from patman import test_util
from patman import test_checkpatch
-from patman import tools
+from u_boot_pylib import terminal
+from u_boot_pylib import test_util
+from u_boot_pylib import tools
epilog = '''Create patches from commits in a branch, check them and email them
as specified by tags you place in the commits. Use -n to do a dry run first.'''
result = test_util.run_test_suites(
'patman', False, False, False, None, None, None,
[test_checkpatch.TestPatch, func_test.TestFunctional,
- 'gitutil', 'settings', 'terminal'])
+ 'gitutil', 'settings'])
sys.exit(0 if result.wasSuccessful() else 1)
import re
import sys
-from patman import command
from patman import gitutil
-from patman import terminal
+from u_boot_pylib import command
+from u_boot_pylib import terminal
EMACS_PREFIX = r'(?:[0-9]{4}.*\.patch:[0-9]+: )?'
TYPE_NAME = r'([A-Z_]+:)?'
+++ /dev/null
-# SPDX-License-Identifier: GPL-2.0+
-# Copyright (c) 2011 The Chromium OS Authors.
-#
-
-import os
-
-from patman import cros_subprocess
-
-"""Shell command ease-ups for Python."""
-
-class CommandResult:
- """A class which captures the result of executing a command.
-
- Members:
- stdout: stdout obtained from command, as a string
- stderr: stderr obtained from command, as a string
- return_code: Return code from command
- exception: Exception received, or None if all ok
- """
- def __init__(self, stdout='', stderr='', combined='', return_code=0,
- exception=None):
- self.stdout = stdout
- self.stderr = stderr
- self.combined = combined
- self.return_code = return_code
- self.exception = exception
-
- def to_output(self, binary):
- if not binary:
- self.stdout = self.stdout.decode('utf-8')
- self.stderr = self.stderr.decode('utf-8')
- self.combined = self.combined.decode('utf-8')
- return self
-
-
-# This permits interception of RunPipe for test purposes. If it is set to
-# a function, then that function is called with the pipe list being
-# executed. Otherwise, it is assumed to be a CommandResult object, and is
-# returned as the result for every run_pipe() call.
-# When this value is None, commands are executed as normal.
-test_result = None
-
-def run_pipe(pipe_list, infile=None, outfile=None,
- capture=False, capture_stderr=False, oneline=False,
- raise_on_error=True, cwd=None, binary=False,
- output_func=None, **kwargs):
- """
- Perform a command pipeline, with optional input/output filenames.
-
- Args:
- pipe_list: List of command lines to execute. Each command line is
- piped into the next, and is itself a list of strings. For
- example [ ['ls', '.git'] ['wc'] ] will pipe the output of
- 'ls .git' into 'wc'.
- infile: File to provide stdin to the pipeline
- outfile: File to store stdout
- capture: True to capture output
- capture_stderr: True to capture stderr
- oneline: True to strip newline chars from output
- output_func: Output function to call with each output fragment
- (if it returns True the function terminates)
- kwargs: Additional keyword arguments to cros_subprocess.Popen()
- Returns:
- CommandResult object
- """
- if test_result:
- if hasattr(test_result, '__call__'):
- # pylint: disable=E1102
- result = test_result(pipe_list=pipe_list)
- if result:
- return result
- else:
- return test_result
- # No result: fall through to normal processing
- result = CommandResult(b'', b'', b'')
- last_pipe = None
- pipeline = list(pipe_list)
- user_pipestr = '|'.join([' '.join(pipe) for pipe in pipe_list])
- kwargs['stdout'] = None
- kwargs['stderr'] = None
- while pipeline:
- cmd = pipeline.pop(0)
- if last_pipe is not None:
- kwargs['stdin'] = last_pipe.stdout
- elif infile:
- kwargs['stdin'] = open(infile, 'rb')
- if pipeline or capture:
- kwargs['stdout'] = cros_subprocess.PIPE
- elif outfile:
- kwargs['stdout'] = open(outfile, 'wb')
- if capture_stderr:
- kwargs['stderr'] = cros_subprocess.PIPE
-
- try:
- last_pipe = cros_subprocess.Popen(cmd, cwd=cwd, **kwargs)
- except Exception as err:
- result.exception = err
- if raise_on_error:
- raise Exception("Error running '%s': %s" % (user_pipestr, str))
- result.return_code = 255
- return result.to_output(binary)
-
- if capture:
- result.stdout, result.stderr, result.combined = (
- last_pipe.communicate_filter(output_func))
- if result.stdout and oneline:
- result.output = result.stdout.rstrip(b'\r\n')
- result.return_code = last_pipe.wait()
- else:
- result.return_code = os.waitpid(last_pipe.pid, 0)[1]
- if raise_on_error and result.return_code:
- raise Exception("Error running '%s'" % user_pipestr)
- return result.to_output(binary)
-
-def output(*cmd, **kwargs):
- kwargs['raise_on_error'] = kwargs.get('raise_on_error', True)
- return run_pipe([cmd], capture=True, **kwargs).stdout
-
-def output_one_line(*cmd, **kwargs):
- """Run a command and output it as a single-line string
-
- The command us expected to produce a single line of output
-
- Returns:
- String containing output of command
- """
- raise_on_error = kwargs.pop('raise_on_error', True)
- result = run_pipe([cmd], capture=True, oneline=True,
- raise_on_error=raise_on_error, **kwargs).stdout.strip()
- return result
-
-def run(*cmd, **kwargs):
- return run_pipe([cmd], **kwargs).stdout
-
-def run_list(cmd):
- return run_pipe([cmd], capture=True).stdout
-
-def stop_all():
- cros_subprocess.stay_alive = False
from patman import checkpatch
from patman import gitutil
from patman import patchstream
-from patman import terminal
+from u_boot_pylib import terminal
def setup():
"""Do required setup before doing anything"""
+++ /dev/null
-# Copyright (c) 2012 The Chromium OS Authors.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
-# Licensed to PSF under a Contributor Agreement.
-# See http://www.python.org/2.4/license for licensing details.
-
-"""Subprocess execution
-
-This module holds a subclass of subprocess.Popen with our own required
-features, mainly that we get access to the subprocess output while it
-is running rather than just at the end. This makes it easier to show
-progress information and filter output in real time.
-"""
-
-import errno
-import os
-import pty
-import select
-import subprocess
-import sys
-import unittest
-
-
-# Import these here so the caller does not need to import subprocess also.
-PIPE = subprocess.PIPE
-STDOUT = subprocess.STDOUT
-PIPE_PTY = -3 # Pipe output through a pty
-stay_alive = True
-
-
-class Popen(subprocess.Popen):
- """Like subprocess.Popen with ptys and incremental output
-
- This class deals with running a child process and filtering its output on
- both stdout and stderr while it is running. We do this so we can monitor
- progress, and possibly relay the output to the user if requested.
-
- The class is similar to subprocess.Popen, the equivalent is something like:
-
- Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- But this class has many fewer features, and two enhancement:
-
- 1. Rather than getting the output data only at the end, this class sends it
- to a provided operation as it arrives.
- 2. We use pseudo terminals so that the child will hopefully flush its output
- to us as soon as it is produced, rather than waiting for the end of a
- line.
-
- Use communicate_filter() to handle output from the subprocess.
-
- """
-
- def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
- shell=False, cwd=None, env=None, **kwargs):
- """Cut-down constructor
-
- Args:
- args: Program and arguments for subprocess to execute.
- stdin: See subprocess.Popen()
- stdout: See subprocess.Popen(), except that we support the sentinel
- value of cros_subprocess.PIPE_PTY.
- stderr: See subprocess.Popen(), except that we support the sentinel
- value of cros_subprocess.PIPE_PTY.
- shell: See subprocess.Popen()
- cwd: Working directory to change to for subprocess, or None if none.
- env: Environment to use for this subprocess, or None to inherit parent.
- kwargs: No other arguments are supported at the moment. Passing other
- arguments will cause a ValueError to be raised.
- """
- stdout_pty = None
- stderr_pty = None
-
- if stdout == PIPE_PTY:
- stdout_pty = pty.openpty()
- stdout = os.fdopen(stdout_pty[1])
- if stderr == PIPE_PTY:
- stderr_pty = pty.openpty()
- stderr = os.fdopen(stderr_pty[1])
-
- super(Popen, self).__init__(args, stdin=stdin,
- stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
- **kwargs)
-
- # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
- # We want to use the master half on our end from now on. Setting this here
- # does make some assumptions about the implementation of subprocess, but
- # those assumptions are pretty minor.
-
- # Note that if stderr is STDOUT, then self.stderr will be set to None by
- # this constructor.
- if stdout_pty is not None:
- self.stdout = os.fdopen(stdout_pty[0])
- if stderr_pty is not None:
- self.stderr = os.fdopen(stderr_pty[0])
-
- # Insist that unit tests exist for other arguments we don't support.
- if kwargs:
- raise ValueError("Unit tests do not test extra args - please add tests")
-
- def convert_data(self, data):
- """Convert stdout/stderr data to the correct format for output
-
- Args:
- data: Data to convert, or None for ''
-
- Returns:
- Converted data, as bytes
- """
- if data is None:
- return b''
- return data
-
- def communicate_filter(self, output, input_buf=''):
- """Interact with process: Read data from stdout and stderr.
-
- This method runs until end-of-file is reached, then waits for the
- subprocess to terminate.
-
- The output function is sent all output from the subprocess and must be
- defined like this:
-
- def output([self,] stream, data)
- Args:
- stream: the stream the output was received on, which will be
- sys.stdout or sys.stderr.
- data: a string containing the data
-
- Returns:
- True to terminate the process
-
- Note: The data read is buffered in memory, so do not use this
- method if the data size is large or unlimited.
-
- Args:
- output: Function to call with each fragment of output.
-
- Returns:
- A tuple (stdout, stderr, combined) which is the data received on
- stdout, stderr and the combined data (interleaved stdout and stderr).
-
- Note that the interleaved output will only be sensible if you have
- set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
- the timing of the output in the subprocess. If a subprocess flips
- between stdout and stderr quickly in succession, by the time we come to
- read the output from each we may see several lines in each, and will read
- all the stdout lines, then all the stderr lines. So the interleaving
- may not be correct. In this case you might want to pass
- stderr=cros_subprocess.STDOUT to the constructor.
-
- This feature is still useful for subprocesses where stderr is
- rarely used and indicates an error.
-
- Note also that if you set stderr to STDOUT, then stderr will be empty
- and the combined output will just be the same as stdout.
- """
-
- read_set = []
- write_set = []
- stdout = None # Return
- stderr = None # Return
-
- if self.stdin:
- # Flush stdio buffer. This might block, if the user has
- # been writing to .stdin in an uncontrolled fashion.
- self.stdin.flush()
- if input_buf:
- write_set.append(self.stdin)
- else:
- self.stdin.close()
- if self.stdout:
- read_set.append(self.stdout)
- stdout = bytearray()
- if self.stderr and self.stderr != self.stdout:
- read_set.append(self.stderr)
- stderr = bytearray()
- combined = bytearray()
-
- stop_now = False
- input_offset = 0
- while read_set or write_set:
- try:
- rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
- except select.error as e:
- if e.args[0] == errno.EINTR:
- continue
- raise
-
- if not stay_alive:
- self.terminate()
-
- if self.stdin in wlist:
- # When select has indicated that the file is writable,
- # we can write up to PIPE_BUF bytes without risk
- # blocking. POSIX defines PIPE_BUF >= 512
- chunk = input_buf[input_offset : input_offset + 512]
- bytes_written = os.write(self.stdin.fileno(), chunk)
- input_offset += bytes_written
- if input_offset >= len(input_buf):
- self.stdin.close()
- write_set.remove(self.stdin)
-
- if self.stdout in rlist:
- data = b''
- # We will get an error on read if the pty is closed
- try:
- data = os.read(self.stdout.fileno(), 1024)
- except OSError:
- pass
- if not len(data):
- self.stdout.close()
- read_set.remove(self.stdout)
- else:
- stdout += data
- combined += data
- if output:
- stop_now = output(sys.stdout, data)
- if self.stderr in rlist:
- data = b''
- # We will get an error on read if the pty is closed
- try:
- data = os.read(self.stderr.fileno(), 1024)
- except OSError:
- pass
- if not len(data):
- self.stderr.close()
- read_set.remove(self.stderr)
- else:
- stderr += data
- combined += data
- if output:
- stop_now = output(sys.stderr, data)
- if stop_now:
- self.terminate()
-
- # All data exchanged. Translate lists into strings.
- stdout = self.convert_data(stdout)
- stderr = self.convert_data(stderr)
- combined = self.convert_data(combined)
-
- self.wait()
- return (stdout, stderr, combined)
-
-
-# Just being a unittest.TestCase gives us 14 public methods. Unless we
-# disable this, we can only have 6 tests in a TestCase. That's not enough.
-#
-# pylint: disable=R0904
-
-class TestSubprocess(unittest.TestCase):
- """Our simple unit test for this module"""
-
- class MyOperation:
- """Provides a operation that we can pass to Popen"""
- def __init__(self, input_to_send=None):
- """Constructor to set up the operation and possible input.
-
- Args:
- input_to_send: a text string to send when we first get input. We will
- add \r\n to the string.
- """
- self.stdout_data = ''
- self.stderr_data = ''
- self.combined_data = ''
- self.stdin_pipe = None
- self._input_to_send = input_to_send
- if input_to_send:
- pipe = os.pipe()
- self.stdin_read_pipe = pipe[0]
- self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
-
- def output(self, stream, data):
- """Output handler for Popen. Stores the data for later comparison"""
- if stream == sys.stdout:
- self.stdout_data += data
- if stream == sys.stderr:
- self.stderr_data += data
- self.combined_data += data
-
- # Output the input string if we have one.
- if self._input_to_send:
- self._stdin_write_pipe.write(self._input_to_send + '\r\n')
- self._stdin_write_pipe.flush()
-
- def _basic_check(self, plist, oper):
- """Basic checks that the output looks sane."""
- self.assertEqual(plist[0], oper.stdout_data)
- self.assertEqual(plist[1], oper.stderr_data)
- self.assertEqual(plist[2], oper.combined_data)
-
- # The total length of stdout and stderr should equal the combined length
- self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
-
- def test_simple(self):
- """Simple redirection: Get process list"""
- oper = TestSubprocess.MyOperation()
- plist = Popen(['ps']).communicate_filter(oper.output)
- self._basic_check(plist, oper)
-
- def test_stderr(self):
- """Check stdout and stderr"""
- oper = TestSubprocess.MyOperation()
- cmd = 'echo fred >/dev/stderr && false || echo bad'
- plist = Popen([cmd], shell=True).communicate_filter(oper.output)
- self._basic_check(plist, oper)
- self.assertEqual(plist [0], 'bad\r\n')
- self.assertEqual(plist [1], 'fred\r\n')
-
- def test_shell(self):
- """Check with and without shell works"""
- oper = TestSubprocess.MyOperation()
- cmd = 'echo test >/dev/stderr'
- self.assertRaises(OSError, Popen, [cmd], shell=False)
- plist = Popen([cmd], shell=True).communicate_filter(oper.output)
- self._basic_check(plist, oper)
- self.assertEqual(len(plist [0]), 0)
- self.assertEqual(plist [1], 'test\r\n')
-
- def test_list_args(self):
- """Check with and without shell works using list arguments"""
- oper = TestSubprocess.MyOperation()
- cmd = ['echo', 'test', '>/dev/stderr']
- plist = Popen(cmd, shell=False).communicate_filter(oper.output)
- self._basic_check(plist, oper)
- self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
- self.assertEqual(len(plist [1]), 0)
-
- oper = TestSubprocess.MyOperation()
-
- # this should be interpreted as 'echo' with the other args dropped
- cmd = ['echo', 'test', '>/dev/stderr']
- plist = Popen(cmd, shell=True).communicate_filter(oper.output)
- self._basic_check(plist, oper)
- self.assertEqual(plist [0], '\r\n')
-
- def test_cwd(self):
- """Check we can change directory"""
- for shell in (False, True):
- oper = TestSubprocess.MyOperation()
- plist = Popen('pwd', shell=shell, cwd='/tmp').communicate_filter(
- oper.output)
- self._basic_check(plist, oper)
- self.assertEqual(plist [0], '/tmp\r\n')
-
- def test_env(self):
- """Check we can change environment"""
- for add in (False, True):
- oper = TestSubprocess.MyOperation()
- env = os.environ
- if add:
- env ['FRED'] = 'fred'
- cmd = 'echo $FRED'
- plist = Popen(cmd, shell=True, env=env).communicate_filter(oper.output)
- self._basic_check(plist, oper)
- self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
-
- def test_extra_args(self):
- """Check we can't add extra arguments"""
- self.assertRaises(ValueError, Popen, 'true', close_fds=False)
-
- def test_basic_input(self):
- """Check that incremental input works
-
- We set up a subprocess which will prompt for name. When we see this prompt
- we send the name as input to the process. It should then print the name
- properly to stdout.
- """
- oper = TestSubprocess.MyOperation('Flash')
- prompt = 'What is your name?: '
- cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
- plist = Popen([cmd], stdin=oper.stdin_read_pipe,
- shell=True).communicate_filter(oper.output)
- self._basic_check(plist, oper)
- self.assertEqual(len(plist [1]), 0)
- self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
-
- def test_isatty(self):
- """Check that ptys appear as terminals to the subprocess"""
- oper = TestSubprocess.MyOperation()
- cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
- 'else echo "not %d" >&%d; fi;')
- both_cmds = ''
- for fd in (1, 2):
- both_cmds += cmd % (fd, fd, fd, fd, fd)
- plist = Popen(both_cmds, shell=True).communicate_filter(oper.output)
- self._basic_check(plist, oper)
- self.assertEqual(plist [0], 'terminal 1\r\n')
- self.assertEqual(plist [1], 'terminal 2\r\n')
-
- # Now try with PIPE and make sure it is not a terminal
- oper = TestSubprocess.MyOperation()
- plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- shell=True).communicate_filter(oper.output)
- self._basic_check(plist, oper)
- self.assertEqual(plist [0], 'not 1\n')
- self.assertEqual(plist [1], 'not 2\n')
-
-if __name__ == '__main__':
- unittest.main()
from patman.patchstream import PatchStream
from patman.series import Series
from patman import settings
-from patman import terminal
-from patman import tools
-from patman.test_util import capture_sys_output
+from u_boot_pylib import terminal
+from u_boot_pylib import tools
+from u_boot_pylib.test_util import capture_sys_output
import pygit2
from patman import status
import shlex
import shutil
-from patman import command
from patman import gitutil
+from u_boot_pylib import command
def find_get_maintainer(script_file_name):
import os
import sys
-from patman import command
from patman import settings
-from patman import terminal
+from u_boot_pylib import command
+from u_boot_pylib import terminal
# True to use --no-decorate - we check this in setup()
use_no_decorate = True
import shutil
import tempfile
-from patman import command
from patman import commit
from patman import gitutil
from patman.series import Series
+from u_boot_pylib import command
# Tags that we detect and remove
RE_REMOVE = re.compile(r'^BUG=|^TEST=|^BRANCH=|^Review URL:'
from patman import get_maintainer
from patman import gitutil
from patman import settings
-from patman import terminal
-from patman import tools
+from u_boot_pylib import terminal
+from u_boot_pylib import tools
# Series-xxx tags that we understand
valid_series = ['to', 'cc', 'version', 'changes', 'prefix', 'notes', 'name',
from patman import patchstream
from patman.patchstream import PatchStream
-from patman import terminal
-from patman import tout
+from u_boot_pylib import terminal
+from u_boot_pylib import tout
# Patches which are part of a multi-patch series are shown with a prefix like
# [prefix, version, sequence], for example '[RFC, v2, 3/5]'. All but the last
+++ /dev/null
-# SPDX-License-Identifier: GPL-2.0+
-# Copyright (c) 2011 The Chromium OS Authors.
-#
-
-"""Terminal utilities
-
-This module handles terminal interaction including ANSI color codes.
-"""
-
-import os
-import re
-import shutil
-import sys
-
-# Selection of when we want our output to be colored
-COLOR_IF_TERMINAL, COLOR_ALWAYS, COLOR_NEVER = range(3)
-
-# Initially, we are set up to print to the terminal
-print_test_mode = False
-print_test_list = []
-
-# The length of the last line printed without a newline
-last_print_len = None
-
-# credit:
-# stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python
-ansi_escape = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
-
-class PrintLine:
- """A line of text output
-
- Members:
- text: Text line that was printed
- newline: True to output a newline after the text
- colour: Text colour to use
- """
- def __init__(self, text, colour, newline=True, bright=True):
- self.text = text
- self.newline = newline
- self.colour = colour
- self.bright = bright
-
- def __eq__(self, other):
- return (self.text == other.text and
- self.newline == other.newline and
- self.colour == other.colour and
- self.bright == other.bright)
-
- def __str__(self):
- return ("newline=%s, colour=%s, bright=%d, text='%s'" %
- (self.newline, self.colour, self.bright, self.text))
-
-
-def calc_ascii_len(text):
- """Calculate the length of a string, ignoring any ANSI sequences
-
- When displayed on a terminal, ANSI sequences don't take any space, so we
- need to ignore them when calculating the length of a string.
-
- Args:
- text: Text to check
-
- Returns:
- Length of text, after skipping ANSI sequences
-
- >>> col = Color(COLOR_ALWAYS)
- >>> text = col.build(Color.RED, 'abc')
- >>> len(text)
- 14
- >>> calc_ascii_len(text)
- 3
- >>>
- >>> text += 'def'
- >>> calc_ascii_len(text)
- 6
- >>> text += col.build(Color.RED, 'abc')
- >>> calc_ascii_len(text)
- 9
- """
- result = ansi_escape.sub('', text)
- return len(result)
-
-def trim_ascii_len(text, size):
- """Trim a string containing ANSI sequences to the given ASCII length
-
- The string is trimmed with ANSI sequences being ignored for the length
- calculation.
-
- >>> col = Color(COLOR_ALWAYS)
- >>> text = col.build(Color.RED, 'abc')
- >>> len(text)
- 14
- >>> calc_ascii_len(trim_ascii_len(text, 4))
- 3
- >>> calc_ascii_len(trim_ascii_len(text, 2))
- 2
- >>> text += 'def'
- >>> calc_ascii_len(trim_ascii_len(text, 4))
- 4
- >>> text += col.build(Color.RED, 'ghi')
- >>> calc_ascii_len(trim_ascii_len(text, 7))
- 7
- """
- if calc_ascii_len(text) < size:
- return text
- pos = 0
- out = ''
- left = size
-
- # Work through each ANSI sequence in turn
- for m in ansi_escape.finditer(text):
- # Find the text before the sequence and add it to our string, making
- # sure it doesn't overflow
- before = text[pos:m.start()]
- toadd = before[:left]
- out += toadd
-
- # Figure out how much non-ANSI space we have left
- left -= len(toadd)
-
- # Add the ANSI sequence and move to the position immediately after it
- out += m.group()
- pos = m.start() + len(m.group())
-
- # Deal with text after the last ANSI sequence
- after = text[pos:]
- toadd = after[:left]
- out += toadd
-
- return out
-
-
-def tprint(text='', newline=True, colour=None, limit_to_line=False, bright=True):
- """Handle a line of output to the terminal.
-
- In test mode this is recorded in a list. Otherwise it is output to the
- terminal.
-
- Args:
- text: Text to print
- newline: True to add a new line at the end of the text
- colour: Colour to use for the text
- """
- global last_print_len
-
- if print_test_mode:
- print_test_list.append(PrintLine(text, colour, newline, bright))
- else:
- if colour:
- col = Color()
- text = col.build(colour, text, bright=bright)
- if newline:
- print(text)
- last_print_len = None
- else:
- if limit_to_line:
- cols = shutil.get_terminal_size().columns
- text = trim_ascii_len(text, cols)
- print(text, end='', flush=True)
- last_print_len = calc_ascii_len(text)
-
-def print_clear():
- """Clear a previously line that was printed with no newline"""
- global last_print_len
-
- if last_print_len:
- print('\r%s\r' % (' '* last_print_len), end='', flush=True)
- last_print_len = None
-
-def set_print_test_mode(enable=True):
- """Go into test mode, where all printing is recorded"""
- global print_test_mode
-
- print_test_mode = enable
- get_print_test_lines()
-
-def get_print_test_lines():
- """Get a list of all lines output through tprint()
-
- Returns:
- A list of PrintLine objects
- """
- global print_test_list
-
- ret = print_test_list
- print_test_list = []
- return ret
-
-def echo_print_test_lines():
- """Print out the text lines collected"""
- for line in print_test_list:
- if line.colour:
- col = Color()
- print(col.build(line.colour, line.text), end='')
- else:
- print(line.text, end='')
- if line.newline:
- print()
-
-
-class Color(object):
- """Conditionally wraps text in ANSI color escape sequences."""
- BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
- BOLD = -1
- BRIGHT_START = '\033[1;%dm'
- NORMAL_START = '\033[22;%dm'
- BOLD_START = '\033[1m'
- RESET = '\033[0m'
-
- def __init__(self, colored=COLOR_IF_TERMINAL):
- """Create a new Color object, optionally disabling color output.
-
- Args:
- enabled: True if color output should be enabled. If False then this
- class will not add color codes at all.
- """
- try:
- self._enabled = (colored == COLOR_ALWAYS or
- (colored == COLOR_IF_TERMINAL and
- os.isatty(sys.stdout.fileno())))
- except:
- self._enabled = False
-
- def start(self, color, bright=True):
- """Returns a start color code.
-
- Args:
- color: Color to use, .e.g BLACK, RED, etc.
-
- Returns:
- If color is enabled, returns an ANSI sequence to start the given
- color, otherwise returns empty string
- """
- if self._enabled:
- base = self.BRIGHT_START if bright else self.NORMAL_START
- return base % (color + 30)
- return ''
-
- def stop(self):
- """Returns a stop color code.
-
- Returns:
- If color is enabled, returns an ANSI color reset sequence,
- otherwise returns empty string
- """
- if self._enabled:
- return self.RESET
- return ''
-
- def build(self, color, text, bright=True):
- """Returns text with conditionally added color escape sequences.
-
- Keyword arguments:
- color: Text color -- one of the color constants defined in this
- class.
- text: The text to color.
-
- Returns:
- If self._enabled is False, returns the original text. If it's True,
- returns text with color escape sequences based on the value of
- color.
- """
- if not self._enabled:
- return text
- if color == self.BOLD:
- start = self.BOLD_START
- else:
- base = self.BRIGHT_START if bright else self.NORMAL_START
- start = base % (color + 30)
- return start + text + self.RESET
import tempfile
from patman import settings
-from patman import tools
+from u_boot_pylib import tools
@contextlib.contextmanager
+++ /dev/null
-# SPDX-License-Identifier: GPL-2.0+
-#
-# Copyright (c) 2016 Google, Inc
-#
-
-from contextlib import contextmanager
-import doctest
-import glob
-import multiprocessing
-import os
-import sys
-import unittest
-
-from patman import command
-
-from io import StringIO
-
-use_concurrent = True
-try:
- from concurrencytest import ConcurrentTestSuite
- from concurrencytest import fork_for_tests
-except:
- use_concurrent = False
-
-
-def run_test_coverage(prog, filter_fname, exclude_list, build_dir, required=None,
- extra_args=None):
- """Run tests and check that we get 100% coverage
-
- Args:
- prog: Program to run (with be passed a '-t' argument to run tests
- filter_fname: Normally all *.py files in the program's directory will
- be included. If this is not None, then it is used to filter the
- list so that only filenames that don't contain filter_fname are
- included.
- exclude_list: List of file patterns to exclude from the coverage
- calculation
- build_dir: Build directory, used to locate libfdt.py
- required: List of modules which must be in the coverage report
- extra_args (str): Extra arguments to pass to the tool before the -t/test
- arg
-
- Raises:
- ValueError if the code coverage is not 100%
- """
- # This uses the build output from sandbox_spl to get _libfdt.so
- path = os.path.dirname(prog)
- if filter_fname:
- glob_list = glob.glob(os.path.join(path, '*.py'))
- glob_list = [fname for fname in glob_list if filter_fname in fname]
- else:
- glob_list = []
- glob_list += exclude_list
- glob_list += ['*libfdt.py', '*site-packages*', '*dist-packages*']
- glob_list += ['*concurrencytest*']
- test_cmd = 'test' if 'binman' in prog or 'patman' in prog else '-t'
- prefix = ''
- if build_dir:
- prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir
- cmd = ('%spython3-coverage run '
- '--omit "%s" %s %s %s -P1' % (prefix, ','.join(glob_list),
- prog, extra_args or '', test_cmd))
- os.system(cmd)
- stdout = command.output('python3-coverage', 'report')
- lines = stdout.splitlines()
- if required:
- # Convert '/path/to/name.py' just the module name 'name'
- test_set = set([os.path.splitext(os.path.basename(line.split()[0]))[0]
- for line in lines if '/etype/' in line])
- missing_list = required
- missing_list.discard('__init__')
- missing_list.difference_update(test_set)
- if missing_list:
- print('Missing tests for %s' % (', '.join(missing_list)))
- print(stdout)
- ok = False
-
- coverage = lines[-1].split(' ')[-1]
- ok = True
- print(coverage)
- if coverage != '100%':
- print(stdout)
- print("To get a report in 'htmlcov/index.html', type: python3-coverage html")
- print('Coverage error: %s, but should be 100%%' % coverage)
- ok = False
- if not ok:
- raise ValueError('Test coverage failure')
-
-
-# Use this to suppress stdout/stderr output:
-# with capture_sys_output() as (stdout, stderr)
-# ...do something...
-@contextmanager
-def capture_sys_output():
- capture_out, capture_err = StringIO(), StringIO()
- old_out, old_err = sys.stdout, sys.stderr
- try:
- sys.stdout, sys.stderr = capture_out, capture_err
- yield capture_out, capture_err
- finally:
- sys.stdout, sys.stderr = old_out, old_err
-
-
-class FullTextTestResult(unittest.TextTestResult):
- """A test result class that can print extended text results to a stream
-
- This is meant to be used by a TestRunner as a result class. Like
- TextTestResult, this prints out the names of tests as they are run,
- errors as they occur, and a summary of the results at the end of the
- test run. Beyond those, this prints information about skipped tests,
- expected failures and unexpected successes.
-
- Args:
- stream: A file-like object to write results to
- descriptions (bool): True to print descriptions with test names
- verbosity (int): Detail of printed output per test as they run
- Test stdout and stderr always get printed when buffering
- them is disabled by the test runner. In addition to that,
- 0: Print nothing
- 1: Print a dot per test
- 2: Print test names
- """
- def __init__(self, stream, descriptions, verbosity):
- self.verbosity = verbosity
- super().__init__(stream, descriptions, verbosity)
-
- def printErrors(self):
- "Called by TestRunner after test run to summarize the tests"
- # The parent class doesn't keep unexpected successes in the same
- # format as the rest. Adapt it to what printErrorList expects.
- unexpected_successes = [
- (test, 'Test was expected to fail, but succeeded.\n')
- for test in self.unexpectedSuccesses
- ]
-
- super().printErrors() # FAIL and ERROR
- self.printErrorList('SKIP', self.skipped)
- self.printErrorList('XFAIL', self.expectedFailures)
- self.printErrorList('XPASS', unexpected_successes)
-
- def addSkip(self, test, reason):
- """Called when a test is skipped."""
- # Add empty line to keep spacing consistent with other results
- if not reason.endswith('\n'):
- reason += '\n'
- super().addSkip(test, reason)
-
-
-def run_test_suites(toolname, debug, verbosity, test_preserve_dirs, processes,
- test_name, toolpath, class_and_module_list):
- """Run a series of test suites and collect the results
-
- Args:
- toolname: Name of the tool that ran the tests
- debug: True to enable debugging, which shows a full stack trace on error
- verbosity: Verbosity level to use (0-4)
- test_preserve_dirs: True to preserve the input directory used by tests
- so that it can be examined afterwards (only useful for debugging
- tests). If a single test is selected (in args[0]) it also preserves
- the output directory for this test. Both directories are displayed
- on the command line.
- processes: Number of processes to use to run tests (None=same as #CPUs)
- test_name: Name of test to run, or None for all
- toolpath: List of paths to use for tools
- class_and_module_list: List of test classes (type class) and module
- names (type str) to run
- """
- sys.argv = [sys.argv[0]]
- if debug:
- sys.argv.append('-D')
- if verbosity:
- sys.argv.append('-v%d' % verbosity)
- if toolpath:
- for path in toolpath:
- sys.argv += ['--toolpath', path]
-
- suite = unittest.TestSuite()
- loader = unittest.TestLoader()
- runner = unittest.TextTestRunner(
- stream=sys.stdout,
- verbosity=(1 if verbosity is None else verbosity),
- resultclass=FullTextTestResult,
- )
-
- if use_concurrent and processes != 1:
- suite = ConcurrentTestSuite(suite,
- fork_for_tests(processes or multiprocessing.cpu_count()))
-
- for module in class_and_module_list:
- if isinstance(module, str) and (not test_name or test_name == module):
- suite.addTests(doctest.DocTestSuite(module))
-
- for module in class_and_module_list:
- if isinstance(module, str):
- continue
- # Test the test module about our arguments, if it is interested
- if hasattr(module, 'setup_test_args'):
- setup_test_args = getattr(module, 'setup_test_args')
- setup_test_args(preserve_indir=test_preserve_dirs,
- preserve_outdirs=test_preserve_dirs and test_name is not None,
- toolpath=toolpath, verbosity=verbosity)
- if test_name:
- # Since Python v3.5 If an ImportError or AttributeError occurs
- # while traversing a name then a synthetic test that raises that
- # error when run will be returned. Check that the requested test
- # exists, otherwise these errors are included in the results.
- if test_name in loader.getTestCaseNames(module):
- suite.addTests(loader.loadTestsFromName(test_name, module))
- else:
- suite.addTests(loader.loadTestsFromTestCase(module))
-
- print(f" Running {toolname} tests ".center(70, "="))
- result = runner.run(suite)
- print()
-
- return result
+++ /dev/null
-# SPDX-License-Identifier: GPL-2.0+
-#
-# Copyright (c) 2016 Google, Inc
-#
-
-import glob
-import os
-import shlex
-import shutil
-import sys
-import tempfile
-import urllib.request
-
-from patman import command
-from patman import tout
-
-# Output directly (generally this is temporary)
-outdir = None
-
-# True to keep the output directory around after exiting
-preserve_outdir = False
-
-# Path to the Chrome OS chroot, if we know it
-chroot_path = None
-
-# Search paths to use for filename(), used to find files
-search_paths = []
-
-tool_search_paths = []
-
-# Tools and the packages that contain them, on debian
-packages = {
- 'lz4': 'liblz4-tool',
- }
-
-# List of paths to use when looking for an input file
-indir = []
-
-def prepare_output_dir(dirname, preserve=False):
- """Select an output directory, ensuring it exists.
-
- This either creates a temporary directory or checks that the one supplied
- by the user is valid. For a temporary directory, it makes a note to
- remove it later if required.
-
- Args:
- dirname: a string, name of the output directory to use to store
- intermediate and output files. If is None - create a temporary
- directory.
- preserve: a Boolean. If outdir above is None and preserve is False, the
- created temporary directory will be destroyed on exit.
-
- Raises:
- OSError: If it cannot create the output directory.
- """
- global outdir, preserve_outdir
-
- preserve_outdir = dirname or preserve
- if dirname:
- outdir = dirname
- if not os.path.isdir(outdir):
- try:
- os.makedirs(outdir)
- except OSError as err:
- raise ValueError(
- f"Cannot make output directory 'outdir': 'err.strerror'")
- tout.debug("Using output directory '%s'" % outdir)
- else:
- outdir = tempfile.mkdtemp(prefix='binman.')
- tout.debug("Using temporary directory '%s'" % outdir)
-
-def _remove_output_dir():
- global outdir
-
- shutil.rmtree(outdir)
- tout.debug("Deleted temporary directory '%s'" % outdir)
- outdir = None
-
-def finalise_output_dir():
- global outdir, preserve_outdir
-
- """Tidy up: delete output directory if temporary and not preserved."""
- if outdir and not preserve_outdir:
- _remove_output_dir()
- outdir = None
-
-def get_output_filename(fname):
- """Return a filename within the output directory.
-
- Args:
- fname: Filename to use for new file
-
- Returns:
- The full path of the filename, within the output directory
- """
- return os.path.join(outdir, fname)
-
-def get_output_dir():
- """Return the current output directory
-
- Returns:
- str: The output directory
- """
- return outdir
-
-def _finalise_for_test():
- """Remove the output directory (for use by tests)"""
- global outdir
-
- if outdir:
- _remove_output_dir()
- outdir = None
-
-def set_input_dirs(dirname):
- """Add a list of input directories, where input files are kept.
-
- Args:
- dirname: a list of paths to input directories to use for obtaining
- files needed by binman to place in the image.
- """
- global indir
-
- indir = dirname
- tout.debug("Using input directories %s" % indir)
-
-def get_input_filename(fname, allow_missing=False):
- """Return a filename for use as input.
-
- Args:
- fname: Filename to use for new file
- allow_missing: True if the filename can be missing
-
- Returns:
- fname, if indir is None;
- full path of the filename, within the input directory;
- None, if file is missing and allow_missing is True
-
- Raises:
- ValueError if file is missing and allow_missing is False
- """
- if not indir or fname[:1] == '/':
- return fname
- for dirname in indir:
- pathname = os.path.join(dirname, fname)
- if os.path.exists(pathname):
- return pathname
-
- if allow_missing:
- return None
- raise ValueError("Filename '%s' not found in input path (%s) (cwd='%s')" %
- (fname, ','.join(indir), os.getcwd()))
-
-def get_input_filename_glob(pattern):
- """Return a list of filenames for use as input.
-
- Args:
- pattern: Filename pattern to search for
-
- Returns:
- A list of matching files in all input directories
- """
- if not indir:
- return glob.glob(pattern)
- files = []
- for dirname in indir:
- pathname = os.path.join(dirname, pattern)
- files += glob.glob(pathname)
- return sorted(files)
-
-def align(pos, align):
- if align:
- mask = align - 1
- pos = (pos + mask) & ~mask
- return pos
-
-def not_power_of_two(num):
- return num and (num & (num - 1))
-
-def set_tool_paths(toolpaths):
- """Set the path to search for tools
-
- Args:
- toolpaths: List of paths to search for tools executed by run()
- """
- global tool_search_paths
-
- tool_search_paths = toolpaths
-
-def path_has_file(path_spec, fname):
- """Check if a given filename is in the PATH
-
- Args:
- path_spec: Value of PATH variable to check
- fname: Filename to check
-
- Returns:
- True if found, False if not
- """
- for dir in path_spec.split(':'):
- if os.path.exists(os.path.join(dir, fname)):
- return True
- return False
-
-def get_host_compile_tool(env, name):
- """Get the host-specific version for a compile tool
-
- This checks the environment variables that specify which version of
- the tool should be used (e.g. ${HOSTCC}).
-
- The following table lists the host-specific versions of the tools
- this function resolves to:
-
- Compile Tool | Host version
- --------------+----------------
- as | ${HOSTAS}
- ld | ${HOSTLD}
- cc | ${HOSTCC}
- cpp | ${HOSTCPP}
- c++ | ${HOSTCXX}
- ar | ${HOSTAR}
- nm | ${HOSTNM}
- ldr | ${HOSTLDR}
- strip | ${HOSTSTRIP}
- objcopy | ${HOSTOBJCOPY}
- objdump | ${HOSTOBJDUMP}
- dtc | ${HOSTDTC}
-
- Args:
- name: Command name to run
-
- Returns:
- host_name: Exact command name to run instead
- extra_args: List of extra arguments to pass
- """
- host_name = None
- extra_args = []
- if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip',
- 'objcopy', 'objdump', 'dtc'):
- host_name, *host_args = env.get('HOST' + name.upper(), '').split(' ')
- elif name == 'c++':
- host_name, *host_args = env.get('HOSTCXX', '').split(' ')
-
- if host_name:
- return host_name, extra_args
- return name, []
-
-def get_target_compile_tool(name, cross_compile=None):
- """Get the target-specific version for a compile tool
-
- This first checks the environment variables that specify which
- version of the tool should be used (e.g. ${CC}). If those aren't
- specified, it checks the CROSS_COMPILE variable as a prefix for the
- tool with some substitutions (e.g. "${CROSS_COMPILE}gcc" for cc).
-
- The following table lists the target-specific versions of the tools
- this function resolves to:
-
- Compile Tool | First choice | Second choice
- --------------+----------------+----------------------------
- as | ${AS} | ${CROSS_COMPILE}as
- ld | ${LD} | ${CROSS_COMPILE}ld.bfd
- | | or ${CROSS_COMPILE}ld
- cc | ${CC} | ${CROSS_COMPILE}gcc
- cpp | ${CPP} | ${CROSS_COMPILE}gcc -E
- c++ | ${CXX} | ${CROSS_COMPILE}g++
- ar | ${AR} | ${CROSS_COMPILE}ar
- nm | ${NM} | ${CROSS_COMPILE}nm
- ldr | ${LDR} | ${CROSS_COMPILE}ldr
- strip | ${STRIP} | ${CROSS_COMPILE}strip
- objcopy | ${OBJCOPY} | ${CROSS_COMPILE}objcopy
- objdump | ${OBJDUMP} | ${CROSS_COMPILE}objdump
- dtc | ${DTC} | (no CROSS_COMPILE version)
-
- Args:
- name: Command name to run
-
- Returns:
- target_name: Exact command name to run instead
- extra_args: List of extra arguments to pass
- """
- env = dict(os.environ)
-
- target_name = None
- extra_args = []
- if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip',
- 'objcopy', 'objdump', 'dtc'):
- target_name, *extra_args = env.get(name.upper(), '').split(' ')
- elif name == 'c++':
- target_name, *extra_args = env.get('CXX', '').split(' ')
-
- if target_name:
- return target_name, extra_args
-
- if cross_compile is None:
- cross_compile = env.get('CROSS_COMPILE', '')
-
- if name in ('as', 'ar', 'nm', 'ldr', 'strip', 'objcopy', 'objdump'):
- target_name = cross_compile + name
- elif name == 'ld':
- try:
- if run(cross_compile + 'ld.bfd', '-v'):
- target_name = cross_compile + 'ld.bfd'
- except:
- target_name = cross_compile + 'ld'
- elif name == 'cc':
- target_name = cross_compile + 'gcc'
- elif name == 'cpp':
- target_name = cross_compile + 'gcc'
- extra_args = ['-E']
- elif name == 'c++':
- target_name = cross_compile + 'g++'
- else:
- target_name = name
- return target_name, extra_args
-
-def get_env_with_path():
- """Get an updated environment with the PATH variable set correctly
-
- If there are any search paths set, these need to come first in the PATH so
- that these override any other version of the tools.
-
- Returns:
- dict: New environment with PATH updated, or None if there are not search
- paths
- """
- if tool_search_paths:
- env = dict(os.environ)
- env['PATH'] = ':'.join(tool_search_paths) + ':' + env['PATH']
- return env
-
-def run_result(name, *args, **kwargs):
- """Run a tool with some arguments
-
- This runs a 'tool', which is a program used by binman to process files and
- perhaps produce some output. Tools can be located on the PATH or in a
- search path.
-
- Args:
- name: Command name to run
- args: Arguments to the tool
- for_host: True to resolve the command to the version for the host
- for_target: False to run the command as-is, without resolving it
- to the version for the compile target
- raise_on_error: Raise an error if the command fails (True by default)
-
- Returns:
- CommandResult object
- """
- try:
- binary = kwargs.get('binary')
- for_host = kwargs.get('for_host', False)
- for_target = kwargs.get('for_target', not for_host)
- raise_on_error = kwargs.get('raise_on_error', True)
- env = get_env_with_path()
- if for_target:
- name, extra_args = get_target_compile_tool(name)
- args = tuple(extra_args) + args
- elif for_host:
- name, extra_args = get_host_compile_tool(env, name)
- args = tuple(extra_args) + args
- name = os.path.expanduser(name) # Expand paths containing ~
- all_args = (name,) + args
- result = command.run_pipe([all_args], capture=True, capture_stderr=True,
- env=env, raise_on_error=False, binary=binary)
- if result.return_code:
- if raise_on_error:
- raise ValueError("Error %d running '%s': %s" %
- (result.return_code,' '.join(all_args),
- result.stderr or result.stdout))
- return result
- except ValueError:
- if env and not path_has_file(env['PATH'], name):
- msg = "Please install tool '%s'" % name
- package = packages.get(name)
- if package:
- msg += " (e.g. from package '%s')" % package
- raise ValueError(msg)
- raise
-
-def tool_find(name):
- """Search the current path for a tool
-
- This uses both PATH and any value from set_tool_paths() to search for a tool
-
- Args:
- name (str): Name of tool to locate
-
- Returns:
- str: Full path to tool if found, else None
- """
- name = os.path.expanduser(name) # Expand paths containing ~
- paths = []
- pathvar = os.environ.get('PATH')
- if pathvar:
- paths = pathvar.split(':')
- if tool_search_paths:
- paths += tool_search_paths
- for path in paths:
- fname = os.path.join(path, name)
- if os.path.isfile(fname) and os.access(fname, os.X_OK):
- return fname
-
-def run(name, *args, **kwargs):
- """Run a tool with some arguments
-
- This runs a 'tool', which is a program used by binman to process files and
- perhaps produce some output. Tools can be located on the PATH or in a
- search path.
-
- Args:
- name: Command name to run
- args: Arguments to the tool
- for_host: True to resolve the command to the version for the host
- for_target: False to run the command as-is, without resolving it
- to the version for the compile target
-
- Returns:
- CommandResult object
- """
- result = run_result(name, *args, **kwargs)
- if result is not None:
- return result.stdout
-
-def filename(fname):
- """Resolve a file path to an absolute path.
-
- If fname starts with ##/ and chroot is available, ##/ gets replaced with
- the chroot path. If chroot is not available, this file name can not be
- resolved, `None' is returned.
-
- If fname is not prepended with the above prefix, and is not an existing
- file, the actual file name is retrieved from the passed in string and the
- search_paths directories (if any) are searched to for the file. If found -
- the path to the found file is returned, `None' is returned otherwise.
-
- Args:
- fname: a string, the path to resolve.
-
- Returns:
- Absolute path to the file or None if not found.
- """
- if fname.startswith('##/'):
- if chroot_path:
- fname = os.path.join(chroot_path, fname[3:])
- else:
- return None
-
- # Search for a pathname that exists, and return it if found
- if fname and not os.path.exists(fname):
- for path in search_paths:
- pathname = os.path.join(path, os.path.basename(fname))
- if os.path.exists(pathname):
- return pathname
-
- # If not found, just return the standard, unchanged path
- return fname
-
-def read_file(fname, binary=True):
- """Read and return the contents of a file.
-
- Args:
- fname: path to filename to read, where ## signifiies the chroot.
-
- Returns:
- data read from file, as a string.
- """
- with open(filename(fname), binary and 'rb' or 'r') as fd:
- data = fd.read()
- #self._out.Info("Read file '%s' size %d (%#0x)" %
- #(fname, len(data), len(data)))
- return data
-
-def write_file(fname, data, binary=True):
- """Write data into a file.
-
- Args:
- fname: path to filename to write
- data: data to write to file, as a string
- """
- #self._out.Info("Write file '%s' size %d (%#0x)" %
- #(fname, len(data), len(data)))
- with open(filename(fname), binary and 'wb' or 'w') as fd:
- fd.write(data)
-
-def get_bytes(byte, size):
- """Get a string of bytes of a given size
-
- Args:
- byte: Numeric byte value to use
- size: Size of bytes/string to return
-
- Returns:
- A bytes type with 'byte' repeated 'size' times
- """
- return bytes([byte]) * size
-
-def to_bytes(string):
- """Convert a str type into a bytes type
-
- Args:
- string: string to convert
-
- Returns:
- A bytes type
- """
- return string.encode('utf-8')
-
-def to_string(bval):
- """Convert a bytes type into a str type
-
- Args:
- bval: bytes value to convert
-
- Returns:
- Python 3: A bytes type
- Python 2: A string type
- """
- return bval.decode('utf-8')
-
-def to_hex(val):
- """Convert an integer value (or None) to a string
-
- Returns:
- hex value, or 'None' if the value is None
- """
- return 'None' if val is None else '%#x' % val
-
-def to_hex_size(val):
- """Return the size of an object in hex
-
- Returns:
- hex value of size, or 'None' if the value is None
- """
- return 'None' if val is None else '%#x' % len(val)
-
-def print_full_help(fname):
- """Print the full help message for a tool using an appropriate pager.
-
- Args:
- fname: Path to a file containing the full help message
- """
- pager = shlex.split(os.getenv('PAGER', ''))
- if not pager:
- lesspath = shutil.which('less')
- pager = [lesspath] if lesspath else None
- if not pager:
- pager = ['more']
- command.run(*pager, fname)
-
-def download(url, tmpdir_pattern='.patman'):
- """Download a file to a temporary directory
-
- Args:
- url (str): URL to download
- tmpdir_pattern (str): pattern to use for the temporary directory
-
- Returns:
- Tuple:
- Full path to the downloaded archive file in that directory,
- or None if there was an error while downloading
- Temporary directory name
- """
- print('- downloading: %s' % url)
- leaf = url.split('/')[-1]
- tmpdir = tempfile.mkdtemp(tmpdir_pattern)
- response = urllib.request.urlopen(url)
- fname = os.path.join(tmpdir, leaf)
- fd = open(fname, 'wb')
- meta = response.info()
- size = int(meta.get('Content-Length'))
- done = 0
- block_size = 1 << 16
- status = ''
-
- # Read the file in chunks and show progress as we go
- while True:
- buffer = response.read(block_size)
- if not buffer:
- print(chr(8) * (len(status) + 1), '\r', end=' ')
- break
-
- done += len(buffer)
- fd.write(buffer)
- status = r'%10d MiB [%3d%%]' % (done // 1024 // 1024,
- done * 100 // size)
- status = status + chr(8) * (len(status) + 1)
- print(status, end=' ')
- sys.stdout.flush()
- print('\r', end='')
- sys.stdout.flush()
- fd.close()
- if done != size:
- print('Error, failed to download')
- os.remove(fname)
- fname = None
- return fname, tmpdir
+++ /dev/null
-# SPDX-License-Identifier: GPL-2.0+
-# Copyright (c) 2016 Google, Inc
-#
-# Terminal output logging.
-#
-
-import sys
-
-from patman import terminal
-
-# Output verbosity levels that we support
-ERROR, WARNING, NOTICE, INFO, DETAIL, DEBUG = range(6)
-
-in_progress = False
-
-"""
-This class handles output of progress and other useful information
-to the user. It provides for simple verbosity level control and can
-output nothing but errors at verbosity zero.
-
-The idea is that modules set up an Output object early in their years and pass
-it around to other modules that need it. This keeps the output under control
-of a single class.
-
-Public properties:
- verbose: Verbosity level: 0=silent, 1=progress, 3=full, 4=debug
-"""
-def __enter__():
- return
-
-def __exit__(unused1, unused2, unused3):
- """Clean up and remove any progress message."""
- clear_progress()
- return False
-
-def user_is_present():
- """This returns True if it is likely that a user is present.
-
- Sometimes we want to prompt the user, but if no one is there then this
- is a waste of time, and may lock a script which should otherwise fail.
-
- Returns:
- True if it thinks the user is there, and False otherwise
- """
- return stdout_is_tty and verbose > 0
-
-def clear_progress():
- """Clear any active progress message on the terminal."""
- global in_progress
- if verbose > 0 and stdout_is_tty and in_progress:
- _stdout.write('\r%s\r' % (" " * len (_progress)))
- _stdout.flush()
- in_progress = False
-
-def progress(msg, warning=False, trailer='...'):
- """Display progress information.
-
- Args:
- msg: Message to display.
- warning: True if this is a warning."""
- global in_progress
- clear_progress()
- if verbose > 0:
- _progress = msg + trailer
- if stdout_is_tty:
- col = _color.YELLOW if warning else _color.GREEN
- _stdout.write('\r' + _color.build(col, _progress))
- _stdout.flush()
- in_progress = True
- else:
- _stdout.write(_progress + '\n')
-
-def _output(level, msg, color=None):
- """Output a message to the terminal.
-
- Args:
- level: Verbosity level for this message. It will only be displayed if
- this as high as the currently selected level.
- msg; Message to display.
- error: True if this is an error message, else False.
- """
- if verbose >= level:
- clear_progress()
- if color:
- msg = _color.build(color, msg)
- if level < NOTICE:
- print(msg, file=sys.stderr)
- else:
- print(msg)
-
-def do_output(level, msg):
- """Output a message to the terminal.
-
- Args:
- level: Verbosity level for this message. It will only be displayed if
- this as high as the currently selected level.
- msg; Message to display.
- """
- _output(level, msg)
-
-def error(msg):
- """Display an error message
-
- Args:
- msg; Message to display.
- """
- _output(ERROR, msg, _color.RED)
-
-def warning(msg):
- """Display a warning message
-
- Args:
- msg; Message to display.
- """
- _output(WARNING, msg, _color.YELLOW)
-
-def notice(msg):
- """Display an important infomation message
-
- Args:
- msg; Message to display.
- """
- _output(NOTICE, msg)
-
-def info(msg):
- """Display an infomation message
-
- Args:
- msg; Message to display.
- """
- _output(INFO, msg)
-
-def detail(msg):
- """Display a detailed message
-
- Args:
- msg; Message to display.
- """
- _output(DETAIL, msg)
-
-def debug(msg):
- """Display a debug message
-
- Args:
- msg; Message to display.
- """
- _output(DEBUG, msg)
-
-def user_output(msg):
- """Display a message regardless of the current output level.
-
- This is used when the output was specifically requested by the user.
- Args:
- msg; Message to display.
- """
- _output(0, msg)
-
-def init(_verbose=WARNING, stdout=sys.stdout):
- """Initialize a new output object.
-
- Args:
- verbose: Verbosity level (0-4).
- stdout: File to use for stdout.
- """
- global verbose, _progress, _color, _stdout, stdout_is_tty
-
- verbose = _verbose
- _progress = '' # Our last progress message
- _color = terminal.Color()
- _stdout = stdout
-
- # TODO(sjg): Move this into Chromite libraries when we have them
- stdout_is_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
- stderr_is_tty = hasattr(sys.stderr, 'isatty') and sys.stderr.isatty()
-
-def uninit():
- clear_progress()
-
-init()
import re
import sys
-from patman import command
+from u_boot_pylib import command
def rm_kconfig_include(path):
"""Remove a path from Kconfig files
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0+
+
+__all__ = ['command', 'cros_subprocess','terminal', 'test_util', 'tools',
+ 'tout']
--- /dev/null
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Copyright 2023 Google LLC
+#
+
+import os
+import sys
+
+if __name__ == "__main__":
+ # Allow 'from u_boot_pylib import xxx to work'
+ our_path = os.path.dirname(os.path.realpath(__file__))
+ sys.path.append(os.path.join(our_path, '..'))
+
+ # Run tests
+ from u_boot_pylib import terminal
+ from u_boot_pylib import test_util
+
+ result = test_util.run_test_suites(
+ 'u_boot_pylib', False, False, False, None, None, None,
+ ['terminal'])
+
+ sys.exit(0 if result.wasSuccessful() else 1)
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright (c) 2011 The Chromium OS Authors.
+#
+
+import os
+
+from u_boot_pylib import cros_subprocess
+
+"""Shell command ease-ups for Python."""
+
+class CommandResult:
+ """A class which captures the result of executing a command.
+
+ Members:
+ stdout: stdout obtained from command, as a string
+ stderr: stderr obtained from command, as a string
+ return_code: Return code from command
+ exception: Exception received, or None if all ok
+ """
+ def __init__(self, stdout='', stderr='', combined='', return_code=0,
+ exception=None):
+ self.stdout = stdout
+ self.stderr = stderr
+ self.combined = combined
+ self.return_code = return_code
+ self.exception = exception
+
+ def to_output(self, binary):
+ if not binary:
+ self.stdout = self.stdout.decode('utf-8')
+ self.stderr = self.stderr.decode('utf-8')
+ self.combined = self.combined.decode('utf-8')
+ return self
+
+
+# This permits interception of RunPipe for test purposes. If it is set to
+# a function, then that function is called with the pipe list being
+# executed. Otherwise, it is assumed to be a CommandResult object, and is
+# returned as the result for every run_pipe() call.
+# When this value is None, commands are executed as normal.
+test_result = None
+
+def run_pipe(pipe_list, infile=None, outfile=None,
+ capture=False, capture_stderr=False, oneline=False,
+ raise_on_error=True, cwd=None, binary=False,
+ output_func=None, **kwargs):
+ """
+ Perform a command pipeline, with optional input/output filenames.
+
+ Args:
+ pipe_list: List of command lines to execute. Each command line is
+ piped into the next, and is itself a list of strings. For
+ example [ ['ls', '.git'] ['wc'] ] will pipe the output of
+ 'ls .git' into 'wc'.
+ infile: File to provide stdin to the pipeline
+ outfile: File to store stdout
+ capture: True to capture output
+ capture_stderr: True to capture stderr
+ oneline: True to strip newline chars from output
+ output_func: Output function to call with each output fragment
+ (if it returns True the function terminates)
+ kwargs: Additional keyword arguments to cros_subprocess.Popen()
+ Returns:
+ CommandResult object
+ """
+ if test_result:
+ if hasattr(test_result, '__call__'):
+ # pylint: disable=E1102
+ result = test_result(pipe_list=pipe_list)
+ if result:
+ return result
+ else:
+ return test_result
+ # No result: fall through to normal processing
+ result = CommandResult(b'', b'', b'')
+ last_pipe = None
+ pipeline = list(pipe_list)
+ user_pipestr = '|'.join([' '.join(pipe) for pipe in pipe_list])
+ kwargs['stdout'] = None
+ kwargs['stderr'] = None
+ while pipeline:
+ cmd = pipeline.pop(0)
+ if last_pipe is not None:
+ kwargs['stdin'] = last_pipe.stdout
+ elif infile:
+ kwargs['stdin'] = open(infile, 'rb')
+ if pipeline or capture:
+ kwargs['stdout'] = cros_subprocess.PIPE
+ elif outfile:
+ kwargs['stdout'] = open(outfile, 'wb')
+ if capture_stderr:
+ kwargs['stderr'] = cros_subprocess.PIPE
+
+ try:
+ last_pipe = cros_subprocess.Popen(cmd, cwd=cwd, **kwargs)
+ except Exception as err:
+ result.exception = err
+ if raise_on_error:
+ raise Exception("Error running '%s': %s" % (user_pipestr, str))
+ result.return_code = 255
+ return result.to_output(binary)
+
+ if capture:
+ result.stdout, result.stderr, result.combined = (
+ last_pipe.communicate_filter(output_func))
+ if result.stdout and oneline:
+ result.output = result.stdout.rstrip(b'\r\n')
+ result.return_code = last_pipe.wait()
+ else:
+ result.return_code = os.waitpid(last_pipe.pid, 0)[1]
+ if raise_on_error and result.return_code:
+ raise Exception("Error running '%s'" % user_pipestr)
+ return result.to_output(binary)
+
+def output(*cmd, **kwargs):
+ kwargs['raise_on_error'] = kwargs.get('raise_on_error', True)
+ return run_pipe([cmd], capture=True, **kwargs).stdout
+
+def output_one_line(*cmd, **kwargs):
+ """Run a command and output it as a single-line string
+
+ The command us expected to produce a single line of output
+
+ Returns:
+ String containing output of command
+ """
+ raise_on_error = kwargs.pop('raise_on_error', True)
+ result = run_pipe([cmd], capture=True, oneline=True,
+ raise_on_error=raise_on_error, **kwargs).stdout.strip()
+ return result
+
+def run(*cmd, **kwargs):
+ return run_pipe([cmd], **kwargs).stdout
+
+def run_list(cmd):
+ return run_pipe([cmd], capture=True).stdout
+
+def stop_all():
+ cros_subprocess.stay_alive = False
--- /dev/null
+# Copyright (c) 2012 The Chromium OS Authors.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+#
+# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
+# Licensed to PSF under a Contributor Agreement.
+# See http://www.python.org/2.4/license for licensing details.
+
+"""Subprocess execution
+
+This module holds a subclass of subprocess.Popen with our own required
+features, mainly that we get access to the subprocess output while it
+is running rather than just at the end. This makes it easier to show
+progress information and filter output in real time.
+"""
+
+import errno
+import os
+import pty
+import select
+import subprocess
+import sys
+import unittest
+
+
+# Import these here so the caller does not need to import subprocess also.
+PIPE = subprocess.PIPE
+STDOUT = subprocess.STDOUT
+PIPE_PTY = -3 # Pipe output through a pty
+stay_alive = True
+
+
+class Popen(subprocess.Popen):
+ """Like subprocess.Popen with ptys and incremental output
+
+ This class deals with running a child process and filtering its output on
+ both stdout and stderr while it is running. We do this so we can monitor
+ progress, and possibly relay the output to the user if requested.
+
+ The class is similar to subprocess.Popen, the equivalent is something like:
+
+ Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ But this class has many fewer features, and two enhancement:
+
+ 1. Rather than getting the output data only at the end, this class sends it
+ to a provided operation as it arrives.
+ 2. We use pseudo terminals so that the child will hopefully flush its output
+ to us as soon as it is produced, rather than waiting for the end of a
+ line.
+
+ Use communicate_filter() to handle output from the subprocess.
+
+ """
+
+ def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
+ shell=False, cwd=None, env=None, **kwargs):
+ """Cut-down constructor
+
+ Args:
+ args: Program and arguments for subprocess to execute.
+ stdin: See subprocess.Popen()
+ stdout: See subprocess.Popen(), except that we support the sentinel
+ value of cros_subprocess.PIPE_PTY.
+ stderr: See subprocess.Popen(), except that we support the sentinel
+ value of cros_subprocess.PIPE_PTY.
+ shell: See subprocess.Popen()
+ cwd: Working directory to change to for subprocess, or None if none.
+ env: Environment to use for this subprocess, or None to inherit parent.
+ kwargs: No other arguments are supported at the moment. Passing other
+ arguments will cause a ValueError to be raised.
+ """
+ stdout_pty = None
+ stderr_pty = None
+
+ if stdout == PIPE_PTY:
+ stdout_pty = pty.openpty()
+ stdout = os.fdopen(stdout_pty[1])
+ if stderr == PIPE_PTY:
+ stderr_pty = pty.openpty()
+ stderr = os.fdopen(stderr_pty[1])
+
+ super(Popen, self).__init__(args, stdin=stdin,
+ stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
+ **kwargs)
+
+ # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
+ # We want to use the master half on our end from now on. Setting this here
+ # does make some assumptions about the implementation of subprocess, but
+ # those assumptions are pretty minor.
+
+ # Note that if stderr is STDOUT, then self.stderr will be set to None by
+ # this constructor.
+ if stdout_pty is not None:
+ self.stdout = os.fdopen(stdout_pty[0])
+ if stderr_pty is not None:
+ self.stderr = os.fdopen(stderr_pty[0])
+
+ # Insist that unit tests exist for other arguments we don't support.
+ if kwargs:
+ raise ValueError("Unit tests do not test extra args - please add tests")
+
+ def convert_data(self, data):
+ """Convert stdout/stderr data to the correct format for output
+
+ Args:
+ data: Data to convert, or None for ''
+
+ Returns:
+ Converted data, as bytes
+ """
+ if data is None:
+ return b''
+ return data
+
+ def communicate_filter(self, output, input_buf=''):
+ """Interact with process: Read data from stdout and stderr.
+
+ This method runs until end-of-file is reached, then waits for the
+ subprocess to terminate.
+
+ The output function is sent all output from the subprocess and must be
+ defined like this:
+
+ def output([self,] stream, data)
+ Args:
+ stream: the stream the output was received on, which will be
+ sys.stdout or sys.stderr.
+ data: a string containing the data
+
+ Returns:
+ True to terminate the process
+
+ Note: The data read is buffered in memory, so do not use this
+ method if the data size is large or unlimited.
+
+ Args:
+ output: Function to call with each fragment of output.
+
+ Returns:
+ A tuple (stdout, stderr, combined) which is the data received on
+ stdout, stderr and the combined data (interleaved stdout and stderr).
+
+ Note that the interleaved output will only be sensible if you have
+ set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
+ the timing of the output in the subprocess. If a subprocess flips
+ between stdout and stderr quickly in succession, by the time we come to
+ read the output from each we may see several lines in each, and will read
+ all the stdout lines, then all the stderr lines. So the interleaving
+ may not be correct. In this case you might want to pass
+ stderr=cros_subprocess.STDOUT to the constructor.
+
+ This feature is still useful for subprocesses where stderr is
+ rarely used and indicates an error.
+
+ Note also that if you set stderr to STDOUT, then stderr will be empty
+ and the combined output will just be the same as stdout.
+ """
+
+ read_set = []
+ write_set = []
+ stdout = None # Return
+ stderr = None # Return
+
+ if self.stdin:
+ # Flush stdio buffer. This might block, if the user has
+ # been writing to .stdin in an uncontrolled fashion.
+ self.stdin.flush()
+ if input_buf:
+ write_set.append(self.stdin)
+ else:
+ self.stdin.close()
+ if self.stdout:
+ read_set.append(self.stdout)
+ stdout = bytearray()
+ if self.stderr and self.stderr != self.stdout:
+ read_set.append(self.stderr)
+ stderr = bytearray()
+ combined = bytearray()
+
+ stop_now = False
+ input_offset = 0
+ while read_set or write_set:
+ try:
+ rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
+ except select.error as e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+
+ if not stay_alive:
+ self.terminate()
+
+ if self.stdin in wlist:
+ # When select has indicated that the file is writable,
+ # we can write up to PIPE_BUF bytes without risk
+ # blocking. POSIX defines PIPE_BUF >= 512
+ chunk = input_buf[input_offset : input_offset + 512]
+ bytes_written = os.write(self.stdin.fileno(), chunk)
+ input_offset += bytes_written
+ if input_offset >= len(input_buf):
+ self.stdin.close()
+ write_set.remove(self.stdin)
+
+ if self.stdout in rlist:
+ data = b''
+ # We will get an error on read if the pty is closed
+ try:
+ data = os.read(self.stdout.fileno(), 1024)
+ except OSError:
+ pass
+ if not len(data):
+ self.stdout.close()
+ read_set.remove(self.stdout)
+ else:
+ stdout += data
+ combined += data
+ if output:
+ stop_now = output(sys.stdout, data)
+ if self.stderr in rlist:
+ data = b''
+ # We will get an error on read if the pty is closed
+ try:
+ data = os.read(self.stderr.fileno(), 1024)
+ except OSError:
+ pass
+ if not len(data):
+ self.stderr.close()
+ read_set.remove(self.stderr)
+ else:
+ stderr += data
+ combined += data
+ if output:
+ stop_now = output(sys.stderr, data)
+ if stop_now:
+ self.terminate()
+
+ # All data exchanged. Translate lists into strings.
+ stdout = self.convert_data(stdout)
+ stderr = self.convert_data(stderr)
+ combined = self.convert_data(combined)
+
+ self.wait()
+ return (stdout, stderr, combined)
+
+
+# Just being a unittest.TestCase gives us 14 public methods. Unless we
+# disable this, we can only have 6 tests in a TestCase. That's not enough.
+#
+# pylint: disable=R0904
+
+class TestSubprocess(unittest.TestCase):
+ """Our simple unit test for this module"""
+
+ class MyOperation:
+ """Provides a operation that we can pass to Popen"""
+ def __init__(self, input_to_send=None):
+ """Constructor to set up the operation and possible input.
+
+ Args:
+ input_to_send: a text string to send when we first get input. We will
+ add \r\n to the string.
+ """
+ self.stdout_data = ''
+ self.stderr_data = ''
+ self.combined_data = ''
+ self.stdin_pipe = None
+ self._input_to_send = input_to_send
+ if input_to_send:
+ pipe = os.pipe()
+ self.stdin_read_pipe = pipe[0]
+ self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
+
+ def output(self, stream, data):
+ """Output handler for Popen. Stores the data for later comparison"""
+ if stream == sys.stdout:
+ self.stdout_data += data
+ if stream == sys.stderr:
+ self.stderr_data += data
+ self.combined_data += data
+
+ # Output the input string if we have one.
+ if self._input_to_send:
+ self._stdin_write_pipe.write(self._input_to_send + '\r\n')
+ self._stdin_write_pipe.flush()
+
+ def _basic_check(self, plist, oper):
+ """Basic checks that the output looks sane."""
+ self.assertEqual(plist[0], oper.stdout_data)
+ self.assertEqual(plist[1], oper.stderr_data)
+ self.assertEqual(plist[2], oper.combined_data)
+
+ # The total length of stdout and stderr should equal the combined length
+ self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
+
+ def test_simple(self):
+ """Simple redirection: Get process list"""
+ oper = TestSubprocess.MyOperation()
+ plist = Popen(['ps']).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+
+ def test_stderr(self):
+ """Check stdout and stderr"""
+ oper = TestSubprocess.MyOperation()
+ cmd = 'echo fred >/dev/stderr && false || echo bad'
+ plist = Popen([cmd], shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], 'bad\r\n')
+ self.assertEqual(plist [1], 'fred\r\n')
+
+ def test_shell(self):
+ """Check with and without shell works"""
+ oper = TestSubprocess.MyOperation()
+ cmd = 'echo test >/dev/stderr'
+ self.assertRaises(OSError, Popen, [cmd], shell=False)
+ plist = Popen([cmd], shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(len(plist [0]), 0)
+ self.assertEqual(plist [1], 'test\r\n')
+
+ def test_list_args(self):
+ """Check with and without shell works using list arguments"""
+ oper = TestSubprocess.MyOperation()
+ cmd = ['echo', 'test', '>/dev/stderr']
+ plist = Popen(cmd, shell=False).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
+ self.assertEqual(len(plist [1]), 0)
+
+ oper = TestSubprocess.MyOperation()
+
+ # this should be interpreted as 'echo' with the other args dropped
+ cmd = ['echo', 'test', '>/dev/stderr']
+ plist = Popen(cmd, shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], '\r\n')
+
+ def test_cwd(self):
+ """Check we can change directory"""
+ for shell in (False, True):
+ oper = TestSubprocess.MyOperation()
+ plist = Popen('pwd', shell=shell, cwd='/tmp').communicate_filter(
+ oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], '/tmp\r\n')
+
+ def test_env(self):
+ """Check we can change environment"""
+ for add in (False, True):
+ oper = TestSubprocess.MyOperation()
+ env = os.environ
+ if add:
+ env ['FRED'] = 'fred'
+ cmd = 'echo $FRED'
+ plist = Popen(cmd, shell=True, env=env).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
+
+ def test_extra_args(self):
+ """Check we can't add extra arguments"""
+ self.assertRaises(ValueError, Popen, 'true', close_fds=False)
+
+ def test_basic_input(self):
+ """Check that incremental input works
+
+ We set up a subprocess which will prompt for name. When we see this prompt
+ we send the name as input to the process. It should then print the name
+ properly to stdout.
+ """
+ oper = TestSubprocess.MyOperation('Flash')
+ prompt = 'What is your name?: '
+ cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
+ plist = Popen([cmd], stdin=oper.stdin_read_pipe,
+ shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(len(plist [1]), 0)
+ self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
+
+ def test_isatty(self):
+ """Check that ptys appear as terminals to the subprocess"""
+ oper = TestSubprocess.MyOperation()
+ cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
+ 'else echo "not %d" >&%d; fi;')
+ both_cmds = ''
+ for fd in (1, 2):
+ both_cmds += cmd % (fd, fd, fd, fd, fd)
+ plist = Popen(both_cmds, shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], 'terminal 1\r\n')
+ self.assertEqual(plist [1], 'terminal 2\r\n')
+
+ # Now try with PIPE and make sure it is not a terminal
+ oper = TestSubprocess.MyOperation()
+ plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], 'not 1\n')
+ self.assertEqual(plist [1], 'not 2\n')
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright (c) 2011 The Chromium OS Authors.
+#
+
+"""Terminal utilities
+
+This module handles terminal interaction including ANSI color codes.
+"""
+
+import os
+import re
+import shutil
+import sys
+
+# Selection of when we want our output to be colored
+COLOR_IF_TERMINAL, COLOR_ALWAYS, COLOR_NEVER = range(3)
+
+# Initially, we are set up to print to the terminal
+print_test_mode = False
+print_test_list = []
+
+# The length of the last line printed without a newline
+last_print_len = None
+
+# credit:
+# stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python
+ansi_escape = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+
+class PrintLine:
+ """A line of text output
+
+ Members:
+ text: Text line that was printed
+ newline: True to output a newline after the text
+ colour: Text colour to use
+ """
+ def __init__(self, text, colour, newline=True, bright=True):
+ self.text = text
+ self.newline = newline
+ self.colour = colour
+ self.bright = bright
+
+ def __eq__(self, other):
+ return (self.text == other.text and
+ self.newline == other.newline and
+ self.colour == other.colour and
+ self.bright == other.bright)
+
+ def __str__(self):
+ return ("newline=%s, colour=%s, bright=%d, text='%s'" %
+ (self.newline, self.colour, self.bright, self.text))
+
+
+def calc_ascii_len(text):
+ """Calculate the length of a string, ignoring any ANSI sequences
+
+ When displayed on a terminal, ANSI sequences don't take any space, so we
+ need to ignore them when calculating the length of a string.
+
+ Args:
+ text: Text to check
+
+ Returns:
+ Length of text, after skipping ANSI sequences
+
+ >>> col = Color(COLOR_ALWAYS)
+ >>> text = col.build(Color.RED, 'abc')
+ >>> len(text)
+ 14
+ >>> calc_ascii_len(text)
+ 3
+ >>>
+ >>> text += 'def'
+ >>> calc_ascii_len(text)
+ 6
+ >>> text += col.build(Color.RED, 'abc')
+ >>> calc_ascii_len(text)
+ 9
+ """
+ result = ansi_escape.sub('', text)
+ return len(result)
+
+def trim_ascii_len(text, size):
+ """Trim a string containing ANSI sequences to the given ASCII length
+
+ The string is trimmed with ANSI sequences being ignored for the length
+ calculation.
+
+ >>> col = Color(COLOR_ALWAYS)
+ >>> text = col.build(Color.RED, 'abc')
+ >>> len(text)
+ 14
+ >>> calc_ascii_len(trim_ascii_len(text, 4))
+ 3
+ >>> calc_ascii_len(trim_ascii_len(text, 2))
+ 2
+ >>> text += 'def'
+ >>> calc_ascii_len(trim_ascii_len(text, 4))
+ 4
+ >>> text += col.build(Color.RED, 'ghi')
+ >>> calc_ascii_len(trim_ascii_len(text, 7))
+ 7
+ """
+ if calc_ascii_len(text) < size:
+ return text
+ pos = 0
+ out = ''
+ left = size
+
+ # Work through each ANSI sequence in turn
+ for m in ansi_escape.finditer(text):
+ # Find the text before the sequence and add it to our string, making
+ # sure it doesn't overflow
+ before = text[pos:m.start()]
+ toadd = before[:left]
+ out += toadd
+
+ # Figure out how much non-ANSI space we have left
+ left -= len(toadd)
+
+ # Add the ANSI sequence and move to the position immediately after it
+ out += m.group()
+ pos = m.start() + len(m.group())
+
+ # Deal with text after the last ANSI sequence
+ after = text[pos:]
+ toadd = after[:left]
+ out += toadd
+
+ return out
+
+
+def tprint(text='', newline=True, colour=None, limit_to_line=False, bright=True):
+ """Handle a line of output to the terminal.
+
+ In test mode this is recorded in a list. Otherwise it is output to the
+ terminal.
+
+ Args:
+ text: Text to print
+ newline: True to add a new line at the end of the text
+ colour: Colour to use for the text
+ """
+ global last_print_len
+
+ if print_test_mode:
+ print_test_list.append(PrintLine(text, colour, newline, bright))
+ else:
+ if colour:
+ col = Color()
+ text = col.build(colour, text, bright=bright)
+ if newline:
+ print(text)
+ last_print_len = None
+ else:
+ if limit_to_line:
+ cols = shutil.get_terminal_size().columns
+ text = trim_ascii_len(text, cols)
+ print(text, end='', flush=True)
+ last_print_len = calc_ascii_len(text)
+
+def print_clear():
+ """Clear a previously line that was printed with no newline"""
+ global last_print_len
+
+ if last_print_len:
+ print('\r%s\r' % (' '* last_print_len), end='', flush=True)
+ last_print_len = None
+
+def set_print_test_mode(enable=True):
+ """Go into test mode, where all printing is recorded"""
+ global print_test_mode
+
+ print_test_mode = enable
+ get_print_test_lines()
+
+def get_print_test_lines():
+ """Get a list of all lines output through tprint()
+
+ Returns:
+ A list of PrintLine objects
+ """
+ global print_test_list
+
+ ret = print_test_list
+ print_test_list = []
+ return ret
+
+def echo_print_test_lines():
+ """Print out the text lines collected"""
+ for line in print_test_list:
+ if line.colour:
+ col = Color()
+ print(col.build(line.colour, line.text), end='')
+ else:
+ print(line.text, end='')
+ if line.newline:
+ print()
+
+
+class Color(object):
+ """Conditionally wraps text in ANSI color escape sequences."""
+ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+ BOLD = -1
+ BRIGHT_START = '\033[1;%dm'
+ NORMAL_START = '\033[22;%dm'
+ BOLD_START = '\033[1m'
+ RESET = '\033[0m'
+
+ def __init__(self, colored=COLOR_IF_TERMINAL):
+ """Create a new Color object, optionally disabling color output.
+
+ Args:
+ enabled: True if color output should be enabled. If False then this
+ class will not add color codes at all.
+ """
+ try:
+ self._enabled = (colored == COLOR_ALWAYS or
+ (colored == COLOR_IF_TERMINAL and
+ os.isatty(sys.stdout.fileno())))
+ except:
+ self._enabled = False
+
+ def start(self, color, bright=True):
+ """Returns a start color code.
+
+ Args:
+ color: Color to use, .e.g BLACK, RED, etc.
+
+ Returns:
+ If color is enabled, returns an ANSI sequence to start the given
+ color, otherwise returns empty string
+ """
+ if self._enabled:
+ base = self.BRIGHT_START if bright else self.NORMAL_START
+ return base % (color + 30)
+ return ''
+
+ def stop(self):
+ """Returns a stop color code.
+
+ Returns:
+ If color is enabled, returns an ANSI color reset sequence,
+ otherwise returns empty string
+ """
+ if self._enabled:
+ return self.RESET
+ return ''
+
+ def build(self, color, text, bright=True):
+ """Returns text with conditionally added color escape sequences.
+
+ Keyword arguments:
+ color: Text color -- one of the color constants defined in this
+ class.
+ text: The text to color.
+
+ Returns:
+ If self._enabled is False, returns the original text. If it's True,
+ returns text with color escape sequences based on the value of
+ color.
+ """
+ if not self._enabled:
+ return text
+ if color == self.BOLD:
+ start = self.BOLD_START
+ else:
+ base = self.BRIGHT_START if bright else self.NORMAL_START
+ start = base % (color + 30)
+ return start + text + self.RESET
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Copyright (c) 2016 Google, Inc
+#
+
+from contextlib import contextmanager
+import doctest
+import glob
+import multiprocessing
+import os
+import sys
+import unittest
+
+from u_boot_pylib import command
+
+from io import StringIO
+
+use_concurrent = True
+try:
+ from concurrencytest import ConcurrentTestSuite
+ from concurrencytest import fork_for_tests
+except:
+ use_concurrent = False
+
+
+def run_test_coverage(prog, filter_fname, exclude_list, build_dir, required=None,
+ extra_args=None):
+ """Run tests and check that we get 100% coverage
+
+ Args:
+ prog: Program to run (with be passed a '-t' argument to run tests
+ filter_fname: Normally all *.py files in the program's directory will
+ be included. If this is not None, then it is used to filter the
+ list so that only filenames that don't contain filter_fname are
+ included.
+ exclude_list: List of file patterns to exclude from the coverage
+ calculation
+ build_dir: Build directory, used to locate libfdt.py
+ required: List of modules which must be in the coverage report
+ extra_args (str): Extra arguments to pass to the tool before the -t/test
+ arg
+
+ Raises:
+ ValueError if the code coverage is not 100%
+ """
+ # This uses the build output from sandbox_spl to get _libfdt.so
+ path = os.path.dirname(prog)
+ if filter_fname:
+ glob_list = glob.glob(os.path.join(path, '*.py'))
+ glob_list = [fname for fname in glob_list if filter_fname in fname]
+ else:
+ glob_list = []
+ glob_list += exclude_list
+ glob_list += ['*libfdt.py', '*site-packages*', '*dist-packages*']
+ glob_list += ['*concurrencytest*']
+ test_cmd = 'test' if 'binman' in prog or 'patman' in prog else '-t'
+ prefix = ''
+ if build_dir:
+ prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir
+ cmd = ('%spython3-coverage run '
+ '--omit "%s" %s %s %s -P1' % (prefix, ','.join(glob_list),
+ prog, extra_args or '', test_cmd))
+ os.system(cmd)
+ stdout = command.output('python3-coverage', 'report')
+ lines = stdout.splitlines()
+ if required:
+ # Convert '/path/to/name.py' just the module name 'name'
+ test_set = set([os.path.splitext(os.path.basename(line.split()[0]))[0]
+ for line in lines if '/etype/' in line])
+ missing_list = required
+ missing_list.discard('__init__')
+ missing_list.difference_update(test_set)
+ if missing_list:
+ print('Missing tests for %s' % (', '.join(missing_list)))
+ print(stdout)
+ ok = False
+
+ coverage = lines[-1].split(' ')[-1]
+ ok = True
+ print(coverage)
+ if coverage != '100%':
+ print(stdout)
+ print("To get a report in 'htmlcov/index.html', type: python3-coverage html")
+ print('Coverage error: %s, but should be 100%%' % coverage)
+ ok = False
+ if not ok:
+ raise ValueError('Test coverage failure')
+
+
+# Use this to suppress stdout/stderr output:
+# with capture_sys_output() as (stdout, stderr)
+# ...do something...
+@contextmanager
+def capture_sys_output():
+ capture_out, capture_err = StringIO(), StringIO()
+ old_out, old_err = sys.stdout, sys.stderr
+ try:
+ sys.stdout, sys.stderr = capture_out, capture_err
+ yield capture_out, capture_err
+ finally:
+ sys.stdout, sys.stderr = old_out, old_err
+
+
+class FullTextTestResult(unittest.TextTestResult):
+ """A test result class that can print extended text results to a stream
+
+ This is meant to be used by a TestRunner as a result class. Like
+ TextTestResult, this prints out the names of tests as they are run,
+ errors as they occur, and a summary of the results at the end of the
+ test run. Beyond those, this prints information about skipped tests,
+ expected failures and unexpected successes.
+
+ Args:
+ stream: A file-like object to write results to
+ descriptions (bool): True to print descriptions with test names
+ verbosity (int): Detail of printed output per test as they run
+ Test stdout and stderr always get printed when buffering
+ them is disabled by the test runner. In addition to that,
+ 0: Print nothing
+ 1: Print a dot per test
+ 2: Print test names
+ """
+ def __init__(self, stream, descriptions, verbosity):
+ self.verbosity = verbosity
+ super().__init__(stream, descriptions, verbosity)
+
+ def printErrors(self):
+ "Called by TestRunner after test run to summarize the tests"
+ # The parent class doesn't keep unexpected successes in the same
+ # format as the rest. Adapt it to what printErrorList expects.
+ unexpected_successes = [
+ (test, 'Test was expected to fail, but succeeded.\n')
+ for test in self.unexpectedSuccesses
+ ]
+
+ super().printErrors() # FAIL and ERROR
+ self.printErrorList('SKIP', self.skipped)
+ self.printErrorList('XFAIL', self.expectedFailures)
+ self.printErrorList('XPASS', unexpected_successes)
+
+ def addSkip(self, test, reason):
+ """Called when a test is skipped."""
+ # Add empty line to keep spacing consistent with other results
+ if not reason.endswith('\n'):
+ reason += '\n'
+ super().addSkip(test, reason)
+
+
+def run_test_suites(toolname, debug, verbosity, test_preserve_dirs, processes,
+ test_name, toolpath, class_and_module_list):
+ """Run a series of test suites and collect the results
+
+ Args:
+ toolname: Name of the tool that ran the tests
+ debug: True to enable debugging, which shows a full stack trace on error
+ verbosity: Verbosity level to use (0-4)
+ test_preserve_dirs: True to preserve the input directory used by tests
+ so that it can be examined afterwards (only useful for debugging
+ tests). If a single test is selected (in args[0]) it also preserves
+ the output directory for this test. Both directories are displayed
+ on the command line.
+ processes: Number of processes to use to run tests (None=same as #CPUs)
+ test_name: Name of test to run, or None for all
+ toolpath: List of paths to use for tools
+ class_and_module_list: List of test classes (type class) and module
+ names (type str) to run
+ """
+ sys.argv = [sys.argv[0]]
+ if debug:
+ sys.argv.append('-D')
+ if verbosity:
+ sys.argv.append('-v%d' % verbosity)
+ if toolpath:
+ for path in toolpath:
+ sys.argv += ['--toolpath', path]
+
+ suite = unittest.TestSuite()
+ loader = unittest.TestLoader()
+ runner = unittest.TextTestRunner(
+ stream=sys.stdout,
+ verbosity=(1 if verbosity is None else verbosity),
+ resultclass=FullTextTestResult,
+ )
+
+ if use_concurrent and processes != 1:
+ suite = ConcurrentTestSuite(suite,
+ fork_for_tests(processes or multiprocessing.cpu_count()))
+
+ for module in class_and_module_list:
+ if isinstance(module, str) and (not test_name or test_name == module):
+ suite.addTests(doctest.DocTestSuite(module))
+
+ for module in class_and_module_list:
+ if isinstance(module, str):
+ continue
+ # Test the test module about our arguments, if it is interested
+ if hasattr(module, 'setup_test_args'):
+ setup_test_args = getattr(module, 'setup_test_args')
+ setup_test_args(preserve_indir=test_preserve_dirs,
+ preserve_outdirs=test_preserve_dirs and test_name is not None,
+ toolpath=toolpath, verbosity=verbosity)
+ if test_name:
+ # Since Python v3.5 If an ImportError or AttributeError occurs
+ # while traversing a name then a synthetic test that raises that
+ # error when run will be returned. Check that the requested test
+ # exists, otherwise these errors are included in the results.
+ if test_name in loader.getTestCaseNames(module):
+ suite.addTests(loader.loadTestsFromName(test_name, module))
+ else:
+ suite.addTests(loader.loadTestsFromTestCase(module))
+
+ print(f" Running {toolname} tests ".center(70, "="))
+ result = runner.run(suite)
+ print()
+
+ return result
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Copyright (c) 2016 Google, Inc
+#
+
+import glob
+import os
+import shlex
+import shutil
+import sys
+import tempfile
+import urllib.request
+
+from u_boot_pylib import command
+from u_boot_pylib import tout
+
+# Output directly (generally this is temporary)
+outdir = None
+
+# True to keep the output directory around after exiting
+preserve_outdir = False
+
+# Path to the Chrome OS chroot, if we know it
+chroot_path = None
+
+# Search paths to use for filename(), used to find files
+search_paths = []
+
+tool_search_paths = []
+
+# Tools and the packages that contain them, on debian
+packages = {
+ 'lz4': 'liblz4-tool',
+ }
+
+# List of paths to use when looking for an input file
+indir = []
+
+def prepare_output_dir(dirname, preserve=False):
+ """Select an output directory, ensuring it exists.
+
+ This either creates a temporary directory or checks that the one supplied
+ by the user is valid. For a temporary directory, it makes a note to
+ remove it later if required.
+
+ Args:
+ dirname: a string, name of the output directory to use to store
+ intermediate and output files. If is None - create a temporary
+ directory.
+ preserve: a Boolean. If outdir above is None and preserve is False, the
+ created temporary directory will be destroyed on exit.
+
+ Raises:
+ OSError: If it cannot create the output directory.
+ """
+ global outdir, preserve_outdir
+
+ preserve_outdir = dirname or preserve
+ if dirname:
+ outdir = dirname
+ if not os.path.isdir(outdir):
+ try:
+ os.makedirs(outdir)
+ except OSError as err:
+ raise ValueError(
+ f"Cannot make output directory 'outdir': 'err.strerror'")
+ tout.debug("Using output directory '%s'" % outdir)
+ else:
+ outdir = tempfile.mkdtemp(prefix='binman.')
+ tout.debug("Using temporary directory '%s'" % outdir)
+
+def _remove_output_dir():
+ global outdir
+
+ shutil.rmtree(outdir)
+ tout.debug("Deleted temporary directory '%s'" % outdir)
+ outdir = None
+
+def finalise_output_dir():
+ global outdir, preserve_outdir
+
+ """Tidy up: delete output directory if temporary and not preserved."""
+ if outdir and not preserve_outdir:
+ _remove_output_dir()
+ outdir = None
+
+def get_output_filename(fname):
+ """Return a filename within the output directory.
+
+ Args:
+ fname: Filename to use for new file
+
+ Returns:
+ The full path of the filename, within the output directory
+ """
+ return os.path.join(outdir, fname)
+
+def get_output_dir():
+ """Return the current output directory
+
+ Returns:
+ str: The output directory
+ """
+ return outdir
+
+def _finalise_for_test():
+ """Remove the output directory (for use by tests)"""
+ global outdir
+
+ if outdir:
+ _remove_output_dir()
+ outdir = None
+
+def set_input_dirs(dirname):
+ """Add a list of input directories, where input files are kept.
+
+ Args:
+ dirname: a list of paths to input directories to use for obtaining
+ files needed by binman to place in the image.
+ """
+ global indir
+
+ indir = dirname
+ tout.debug("Using input directories %s" % indir)
+
+def get_input_filename(fname, allow_missing=False):
+ """Return a filename for use as input.
+
+ Args:
+ fname: Filename to use for new file
+ allow_missing: True if the filename can be missing
+
+ Returns:
+ fname, if indir is None;
+ full path of the filename, within the input directory;
+ None, if file is missing and allow_missing is True
+
+ Raises:
+ ValueError if file is missing and allow_missing is False
+ """
+ if not indir or fname[:1] == '/':
+ return fname
+ for dirname in indir:
+ pathname = os.path.join(dirname, fname)
+ if os.path.exists(pathname):
+ return pathname
+
+ if allow_missing:
+ return None
+ raise ValueError("Filename '%s' not found in input path (%s) (cwd='%s')" %
+ (fname, ','.join(indir), os.getcwd()))
+
+def get_input_filename_glob(pattern):
+ """Return a list of filenames for use as input.
+
+ Args:
+ pattern: Filename pattern to search for
+
+ Returns:
+ A list of matching files in all input directories
+ """
+ if not indir:
+ return glob.glob(pattern)
+ files = []
+ for dirname in indir:
+ pathname = os.path.join(dirname, pattern)
+ files += glob.glob(pathname)
+ return sorted(files)
+
+def align(pos, align):
+ if align:
+ mask = align - 1
+ pos = (pos + mask) & ~mask
+ return pos
+
+def not_power_of_two(num):
+ return num and (num & (num - 1))
+
+def set_tool_paths(toolpaths):
+ """Set the path to search for tools
+
+ Args:
+ toolpaths: List of paths to search for tools executed by run()
+ """
+ global tool_search_paths
+
+ tool_search_paths = toolpaths
+
+def path_has_file(path_spec, fname):
+ """Check if a given filename is in the PATH
+
+ Args:
+ path_spec: Value of PATH variable to check
+ fname: Filename to check
+
+ Returns:
+ True if found, False if not
+ """
+ for dir in path_spec.split(':'):
+ if os.path.exists(os.path.join(dir, fname)):
+ return True
+ return False
+
+def get_host_compile_tool(env, name):
+ """Get the host-specific version for a compile tool
+
+ This checks the environment variables that specify which version of
+ the tool should be used (e.g. ${HOSTCC}).
+
+ The following table lists the host-specific versions of the tools
+ this function resolves to:
+
+ Compile Tool | Host version
+ --------------+----------------
+ as | ${HOSTAS}
+ ld | ${HOSTLD}
+ cc | ${HOSTCC}
+ cpp | ${HOSTCPP}
+ c++ | ${HOSTCXX}
+ ar | ${HOSTAR}
+ nm | ${HOSTNM}
+ ldr | ${HOSTLDR}
+ strip | ${HOSTSTRIP}
+ objcopy | ${HOSTOBJCOPY}
+ objdump | ${HOSTOBJDUMP}
+ dtc | ${HOSTDTC}
+
+ Args:
+ name: Command name to run
+
+ Returns:
+ host_name: Exact command name to run instead
+ extra_args: List of extra arguments to pass
+ """
+ host_name = None
+ extra_args = []
+ if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip',
+ 'objcopy', 'objdump', 'dtc'):
+ host_name, *host_args = env.get('HOST' + name.upper(), '').split(' ')
+ elif name == 'c++':
+ host_name, *host_args = env.get('HOSTCXX', '').split(' ')
+
+ if host_name:
+ return host_name, extra_args
+ return name, []
+
+def get_target_compile_tool(name, cross_compile=None):
+ """Get the target-specific version for a compile tool
+
+ This first checks the environment variables that specify which
+ version of the tool should be used (e.g. ${CC}). If those aren't
+ specified, it checks the CROSS_COMPILE variable as a prefix for the
+ tool with some substitutions (e.g. "${CROSS_COMPILE}gcc" for cc).
+
+ The following table lists the target-specific versions of the tools
+ this function resolves to:
+
+ Compile Tool | First choice | Second choice
+ --------------+----------------+----------------------------
+ as | ${AS} | ${CROSS_COMPILE}as
+ ld | ${LD} | ${CROSS_COMPILE}ld.bfd
+ | | or ${CROSS_COMPILE}ld
+ cc | ${CC} | ${CROSS_COMPILE}gcc
+ cpp | ${CPP} | ${CROSS_COMPILE}gcc -E
+ c++ | ${CXX} | ${CROSS_COMPILE}g++
+ ar | ${AR} | ${CROSS_COMPILE}ar
+ nm | ${NM} | ${CROSS_COMPILE}nm
+ ldr | ${LDR} | ${CROSS_COMPILE}ldr
+ strip | ${STRIP} | ${CROSS_COMPILE}strip
+ objcopy | ${OBJCOPY} | ${CROSS_COMPILE}objcopy
+ objdump | ${OBJDUMP} | ${CROSS_COMPILE}objdump
+ dtc | ${DTC} | (no CROSS_COMPILE version)
+
+ Args:
+ name: Command name to run
+
+ Returns:
+ target_name: Exact command name to run instead
+ extra_args: List of extra arguments to pass
+ """
+ env = dict(os.environ)
+
+ target_name = None
+ extra_args = []
+ if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip',
+ 'objcopy', 'objdump', 'dtc'):
+ target_name, *extra_args = env.get(name.upper(), '').split(' ')
+ elif name == 'c++':
+ target_name, *extra_args = env.get('CXX', '').split(' ')
+
+ if target_name:
+ return target_name, extra_args
+
+ if cross_compile is None:
+ cross_compile = env.get('CROSS_COMPILE', '')
+
+ if name in ('as', 'ar', 'nm', 'ldr', 'strip', 'objcopy', 'objdump'):
+ target_name = cross_compile + name
+ elif name == 'ld':
+ try:
+ if run(cross_compile + 'ld.bfd', '-v'):
+ target_name = cross_compile + 'ld.bfd'
+ except:
+ target_name = cross_compile + 'ld'
+ elif name == 'cc':
+ target_name = cross_compile + 'gcc'
+ elif name == 'cpp':
+ target_name = cross_compile + 'gcc'
+ extra_args = ['-E']
+ elif name == 'c++':
+ target_name = cross_compile + 'g++'
+ else:
+ target_name = name
+ return target_name, extra_args
+
+def get_env_with_path():
+ """Get an updated environment with the PATH variable set correctly
+
+ If there are any search paths set, these need to come first in the PATH so
+ that these override any other version of the tools.
+
+ Returns:
+ dict: New environment with PATH updated, or None if there are not search
+ paths
+ """
+ if tool_search_paths:
+ env = dict(os.environ)
+ env['PATH'] = ':'.join(tool_search_paths) + ':' + env['PATH']
+ return env
+
+def run_result(name, *args, **kwargs):
+ """Run a tool with some arguments
+
+ This runs a 'tool', which is a program used by binman to process files and
+ perhaps produce some output. Tools can be located on the PATH or in a
+ search path.
+
+ Args:
+ name: Command name to run
+ args: Arguments to the tool
+ for_host: True to resolve the command to the version for the host
+ for_target: False to run the command as-is, without resolving it
+ to the version for the compile target
+ raise_on_error: Raise an error if the command fails (True by default)
+
+ Returns:
+ CommandResult object
+ """
+ try:
+ binary = kwargs.get('binary')
+ for_host = kwargs.get('for_host', False)
+ for_target = kwargs.get('for_target', not for_host)
+ raise_on_error = kwargs.get('raise_on_error', True)
+ env = get_env_with_path()
+ if for_target:
+ name, extra_args = get_target_compile_tool(name)
+ args = tuple(extra_args) + args
+ elif for_host:
+ name, extra_args = get_host_compile_tool(env, name)
+ args = tuple(extra_args) + args
+ name = os.path.expanduser(name) # Expand paths containing ~
+ all_args = (name,) + args
+ result = command.run_pipe([all_args], capture=True, capture_stderr=True,
+ env=env, raise_on_error=False, binary=binary)
+ if result.return_code:
+ if raise_on_error:
+ raise ValueError("Error %d running '%s': %s" %
+ (result.return_code,' '.join(all_args),
+ result.stderr or result.stdout))
+ return result
+ except ValueError:
+ if env and not path_has_file(env['PATH'], name):
+ msg = "Please install tool '%s'" % name
+ package = packages.get(name)
+ if package:
+ msg += " (e.g. from package '%s')" % package
+ raise ValueError(msg)
+ raise
+
+def tool_find(name):
+ """Search the current path for a tool
+
+ This uses both PATH and any value from set_tool_paths() to search for a tool
+
+ Args:
+ name (str): Name of tool to locate
+
+ Returns:
+ str: Full path to tool if found, else None
+ """
+ name = os.path.expanduser(name) # Expand paths containing ~
+ paths = []
+ pathvar = os.environ.get('PATH')
+ if pathvar:
+ paths = pathvar.split(':')
+ if tool_search_paths:
+ paths += tool_search_paths
+ for path in paths:
+ fname = os.path.join(path, name)
+ if os.path.isfile(fname) and os.access(fname, os.X_OK):
+ return fname
+
+def run(name, *args, **kwargs):
+ """Run a tool with some arguments
+
+ This runs a 'tool', which is a program used by binman to process files and
+ perhaps produce some output. Tools can be located on the PATH or in a
+ search path.
+
+ Args:
+ name: Command name to run
+ args: Arguments to the tool
+ for_host: True to resolve the command to the version for the host
+ for_target: False to run the command as-is, without resolving it
+ to the version for the compile target
+
+ Returns:
+ CommandResult object
+ """
+ result = run_result(name, *args, **kwargs)
+ if result is not None:
+ return result.stdout
+
+def filename(fname):
+ """Resolve a file path to an absolute path.
+
+ If fname starts with ##/ and chroot is available, ##/ gets replaced with
+ the chroot path. If chroot is not available, this file name can not be
+ resolved, `None' is returned.
+
+ If fname is not prepended with the above prefix, and is not an existing
+ file, the actual file name is retrieved from the passed in string and the
+ search_paths directories (if any) are searched to for the file. If found -
+ the path to the found file is returned, `None' is returned otherwise.
+
+ Args:
+ fname: a string, the path to resolve.
+
+ Returns:
+ Absolute path to the file or None if not found.
+ """
+ if fname.startswith('##/'):
+ if chroot_path:
+ fname = os.path.join(chroot_path, fname[3:])
+ else:
+ return None
+
+ # Search for a pathname that exists, and return it if found
+ if fname and not os.path.exists(fname):
+ for path in search_paths:
+ pathname = os.path.join(path, os.path.basename(fname))
+ if os.path.exists(pathname):
+ return pathname
+
+ # If not found, just return the standard, unchanged path
+ return fname
+
+def read_file(fname, binary=True):
+ """Read and return the contents of a file.
+
+ Args:
+ fname: path to filename to read, where ## signifiies the chroot.
+
+ Returns:
+ data read from file, as a string.
+ """
+ with open(filename(fname), binary and 'rb' or 'r') as fd:
+ data = fd.read()
+ #self._out.Info("Read file '%s' size %d (%#0x)" %
+ #(fname, len(data), len(data)))
+ return data
+
+def write_file(fname, data, binary=True):
+ """Write data into a file.
+
+ Args:
+ fname: path to filename to write
+ data: data to write to file, as a string
+ """
+ #self._out.Info("Write file '%s' size %d (%#0x)" %
+ #(fname, len(data), len(data)))
+ with open(filename(fname), binary and 'wb' or 'w') as fd:
+ fd.write(data)
+
+def get_bytes(byte, size):
+ """Get a string of bytes of a given size
+
+ Args:
+ byte: Numeric byte value to use
+ size: Size of bytes/string to return
+
+ Returns:
+ A bytes type with 'byte' repeated 'size' times
+ """
+ return bytes([byte]) * size
+
+def to_bytes(string):
+ """Convert a str type into a bytes type
+
+ Args:
+ string: string to convert
+
+ Returns:
+ A bytes type
+ """
+ return string.encode('utf-8')
+
+def to_string(bval):
+ """Convert a bytes type into a str type
+
+ Args:
+ bval: bytes value to convert
+
+ Returns:
+ Python 3: A bytes type
+ Python 2: A string type
+ """
+ return bval.decode('utf-8')
+
+def to_hex(val):
+ """Convert an integer value (or None) to a string
+
+ Returns:
+ hex value, or 'None' if the value is None
+ """
+ return 'None' if val is None else '%#x' % val
+
+def to_hex_size(val):
+ """Return the size of an object in hex
+
+ Returns:
+ hex value of size, or 'None' if the value is None
+ """
+ return 'None' if val is None else '%#x' % len(val)
+
+def print_full_help(fname):
+ """Print the full help message for a tool using an appropriate pager.
+
+ Args:
+ fname: Path to a file containing the full help message
+ """
+ pager = shlex.split(os.getenv('PAGER', ''))
+ if not pager:
+ lesspath = shutil.which('less')
+ pager = [lesspath] if lesspath else None
+ if not pager:
+ pager = ['more']
+ command.run(*pager, fname)
+
+def download(url, tmpdir_pattern='.patman'):
+ """Download a file to a temporary directory
+
+ Args:
+ url (str): URL to download
+ tmpdir_pattern (str): pattern to use for the temporary directory
+
+ Returns:
+ Tuple:
+ Full path to the downloaded archive file in that directory,
+ or None if there was an error while downloading
+ Temporary directory name
+ """
+ print('- downloading: %s' % url)
+ leaf = url.split('/')[-1]
+ tmpdir = tempfile.mkdtemp(tmpdir_pattern)
+ response = urllib.request.urlopen(url)
+ fname = os.path.join(tmpdir, leaf)
+ fd = open(fname, 'wb')
+ meta = response.info()
+ size = int(meta.get('Content-Length'))
+ done = 0
+ block_size = 1 << 16
+ status = ''
+
+ # Read the file in chunks and show progress as we go
+ while True:
+ buffer = response.read(block_size)
+ if not buffer:
+ print(chr(8) * (len(status) + 1), '\r', end=' ')
+ break
+
+ done += len(buffer)
+ fd.write(buffer)
+ status = r'%10d MiB [%3d%%]' % (done // 1024 // 1024,
+ done * 100 // size)
+ status = status + chr(8) * (len(status) + 1)
+ print(status, end=' ')
+ sys.stdout.flush()
+ print('\r', end='')
+ sys.stdout.flush()
+ fd.close()
+ if done != size:
+ print('Error, failed to download')
+ os.remove(fname)
+ fname = None
+ return fname, tmpdir
--- /dev/null
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright (c) 2016 Google, Inc
+#
+# Terminal output logging.
+#
+
+import sys
+
+from u_boot_pylib import terminal
+
+# Output verbosity levels that we support
+ERROR, WARNING, NOTICE, INFO, DETAIL, DEBUG = range(6)
+
+in_progress = False
+
+"""
+This class handles output of progress and other useful information
+to the user. It provides for simple verbosity level control and can
+output nothing but errors at verbosity zero.
+
+The idea is that modules set up an Output object early in their years and pass
+it around to other modules that need it. This keeps the output under control
+of a single class.
+
+Public properties:
+ verbose: Verbosity level: 0=silent, 1=progress, 3=full, 4=debug
+"""
+def __enter__():
+ return
+
+def __exit__(unused1, unused2, unused3):
+ """Clean up and remove any progress message."""
+ clear_progress()
+ return False
+
+def user_is_present():
+ """This returns True if it is likely that a user is present.
+
+ Sometimes we want to prompt the user, but if no one is there then this
+ is a waste of time, and may lock a script which should otherwise fail.
+
+ Returns:
+ True if it thinks the user is there, and False otherwise
+ """
+ return stdout_is_tty and verbose > 0
+
+def clear_progress():
+ """Clear any active progress message on the terminal."""
+ global in_progress
+ if verbose > 0 and stdout_is_tty and in_progress:
+ _stdout.write('\r%s\r' % (" " * len (_progress)))
+ _stdout.flush()
+ in_progress = False
+
+def progress(msg, warning=False, trailer='...'):
+ """Display progress information.
+
+ Args:
+ msg: Message to display.
+ warning: True if this is a warning."""
+ global in_progress
+ clear_progress()
+ if verbose > 0:
+ _progress = msg + trailer
+ if stdout_is_tty:
+ col = _color.YELLOW if warning else _color.GREEN
+ _stdout.write('\r' + _color.build(col, _progress))
+ _stdout.flush()
+ in_progress = True
+ else:
+ _stdout.write(_progress + '\n')
+
+def _output(level, msg, color=None):
+ """Output a message to the terminal.
+
+ Args:
+ level: Verbosity level for this message. It will only be displayed if
+ this as high as the currently selected level.
+ msg; Message to display.
+ error: True if this is an error message, else False.
+ """
+ if verbose >= level:
+ clear_progress()
+ if color:
+ msg = _color.build(color, msg)
+ if level < NOTICE:
+ print(msg, file=sys.stderr)
+ else:
+ print(msg)
+
+def do_output(level, msg):
+ """Output a message to the terminal.
+
+ Args:
+ level: Verbosity level for this message. It will only be displayed if
+ this as high as the currently selected level.
+ msg; Message to display.
+ """
+ _output(level, msg)
+
+def error(msg):
+ """Display an error message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(ERROR, msg, _color.RED)
+
+def warning(msg):
+ """Display a warning message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(WARNING, msg, _color.YELLOW)
+
+def notice(msg):
+ """Display an important infomation message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(NOTICE, msg)
+
+def info(msg):
+ """Display an infomation message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(INFO, msg)
+
+def detail(msg):
+ """Display a detailed message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(DETAIL, msg)
+
+def debug(msg):
+ """Display a debug message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(DEBUG, msg)
+
+def user_output(msg):
+ """Display a message regardless of the current output level.
+
+ This is used when the output was specifically requested by the user.
+ Args:
+ msg; Message to display.
+ """
+ _output(0, msg)
+
+def init(_verbose=WARNING, stdout=sys.stdout):
+ """Initialize a new output object.
+
+ Args:
+ verbose: Verbosity level (0-4).
+ stdout: File to use for stdout.
+ """
+ global verbose, _progress, _color, _stdout, stdout_is_tty
+
+ verbose = _verbose
+ _progress = '' # Our last progress message
+ _color = terminal.Color()
+ _stdout = stdout
+
+ # TODO(sjg): Move this into Chromite libraries when we have them
+ stdout_is_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
+ stderr_is_tty = hasattr(sys.stderr, 'isatty') and sys.stderr.isatty()
+
+def uninit():
+ clear_progress()
+
+init()
--- /dev/null
+__main__.py
\ No newline at end of file