HEX
Server: Apache
System: Linux vps28526 6.8.0-79-generic #79~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Fri Aug 15 16:54:53 UTC 2 x86_64
User: heewonvps_17 (6705002)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //usr/share/dput/dput/dcut.py
#! /usr/bin/python3
#
# dput/dcut.py
# Part of ‘dput’, a Debian package upload toolkit.
#
# This is free software, and you are welcome to redistribute it under
# certain conditions; see the end of this file for copyright
# information, grant of license, and disclaimer of warranty.

""" dcut — Debian command upload tool. """

import os
import pwd
import shutil
import string
import subprocess
import sys
import tempfile
import textwrap
import time

from . import dput
from .helper import dputhelper


validcommands = ('rm', 'cancel', 'reschedule')


def make_usage_message():
    """ Make the program usage help message. """
    text = textwrap.dedent("""\
        Usage: %s [options] [host] command [, command]
         Supported options (see man page for long forms):
           -c file Config file to parse.
           -d      Enable debug messages.
           -h      Display this help message.
           -s      Simulate the commands file creation only.
           -v      Display version information.
           -m maintaineraddress
                   Use maintainer information in "Uploader:" field.
           -k keyid
                   Use this keyid for signing.
           -O file Write commands to file.
           -U file Upload specified commands file (presently no checks).
           -i changes
                   Upload a commands file to remove files listed in .changes.
         Supported commands: mv, rm
           (No paths or command-line options allowed on ftp-master.)
        """) % (dputhelper.get_progname(sys.argv))
    return text


def get_uploader_from_system(options):
    """ Get the system value for the ‘uploader’ option. """
    uploader = None

    # check environment for maintainer
    if options['debug']:
        sys.stdout.write(
                "D: trying to get maintainer email from environment\n")

    if 'DEBEMAIL' in os.environ:
        if os.environ['DEBEMAIL'].find('<') < 0:
            uploader = os.environ.get("DEBFULLNAME", '')
            if uploader:
                uploader += ' '
            uploader += '<%s>' % (os.environ['DEBEMAIL'])
        else:
            uploader = os.environ['DEBEMAIL']
        if options['debug']:
            sys.stdout.write(
                    "D: Uploader from env: %s\n" % (uploader))
    elif 'EMAIL' in os.environ:
        if os.environ['EMAIL'].find('<') < 0:
            uploader = os.environ.get("DEBFULLNAME", '')
            if uploader:
                uploader += ' '
            uploader += '<%s>' % (os.environ['EMAIL'])
        else:
            uploader = os.environ['EMAIL']
        if options['debug']:
            sys.stdout.write(
                    "D: Uploader from env: %s\n" % (uploader))
    else:
        if options['debug']:
            sys.stdout.write("D: Guessing uploader\n")
        pwrec = pwd.getpwuid(os.getuid())
        username = pwrec[0]
        fullname = pwrec[4].split(',')[0]
        try:
            hostname = open('/etc/mailname').read().strip()
        except IOError:
            hostname = ''
        if not hostname:
            if options['debug']:
                sys.stdout.write(
                        "D: Guessing uploader: /etc/mailname was a failure\n")
            hostname_subprocess = subprocess.Popen(
                    "/bin/hostname --fqdn",
                    shell=True, stdout=subprocess.PIPE)
            hostname_stdout = dputhelper.make_text_stream(
                    hostname_subprocess.stdout)
            hostname = hostname_stdout.read().strip()
        if hostname:
            uploader = (
                    "%s <%s@%s>" % (fullname, username, hostname))
            if options['debug']:
                sys.stdout.write(
                        "D: Guessed uploader: %s\n" % (uploader))
        else:
            if options['debug']:
                sys.stdout.write("D: Couldn't guess uploader\n")

    return uploader


def getoptions():
    # seed some defaults
    options = {
            'debug': 0, 'simulate': 0, 'config': None, 'host': None,
            'uploader': None, 'keyid': None, 'passive': 0,
            'filetocreate': None, 'filetoupload': None, 'changes': None}
    progname = dputhelper.get_progname(sys.argv)
    version = dputhelper.get_distribution_version()

    # enable debugging very early
    if ('-d' in sys.argv[1:] or '--debug' in sys.argv[1:]):
        options['debug'] = 1
        sys.stdout.write("D: %s %s\n" % (progname, version))

    # parse command line arguments
    (opts, arguments) = dputhelper.getopt(
            sys.argv[1:],
            'c:dDhsvm:k:PU:O:i:', [
                'config=', 'debug',
                'help', 'simulate', 'version', 'host=',
                'maintainteraddress=', 'keyid=',
                'passive', 'upload=', 'output=', 'input='
                ])

    for (option, arg) in opts:
        if options['debug']:
            sys.stdout.write(
                    'D: processing arg "%s", option "%s"\n' % (option, arg))
        if option in ('-h', '--help'):
            sys.stdout.write(make_usage_message())
            sys.exit(os.EX_OK)
        elif option in ('-v', '--version'):
            sys.stdout.write("%s %s\n" % (progname, version))
            sys.exit(os.EX_OK)
        elif option in ('-d', '--debug'):
            options['debug'] = 1
        elif option in ('-c', '--config'):
            options['config'] = arg
        elif option in ('-m', '--maintaineraddress'):
            options['uploader'] = arg
        elif option in ('-k', '--keyid'):
            options['keyid'] = arg
        elif option in ('-s', '--simulate'):
            options['simulate'] = 1
        elif option in ('-P', '--passive'):
            options['passive'] = 1
        elif option in ('-U', '--upload'):
            options['filetoupload'] = arg
        elif option in ('-O', '--output'):
            options['filetocreate'] = arg
        elif option == '--host':
            options['host'] = arg
        elif option in ('-i', '--input'):
            options['changes'] = arg
        else:
            sys.stderr.write(
                    "%s internal error: Option %s, argument %s unknown\n"
                    % (progname, option, arg))
            sys.exit(1)

    if not options['host'] and arguments and arguments[0] not in validcommands:
        options['host'] = arguments[0]
        if options['debug']:
            sys.stdout.write(
                    'D: first argument "%s" treated as host\n'
                    % (options['host']))
        del arguments[0]

    if not options['uploader']:
        options['uploader'] = get_uploader_from_system(options)

    # we don't create command files without uploader
    if (
            not options['uploader']
            and (options['filetoupload'] or options['changes'])):
        sys.stderr.write(
                "%s error: command file cannot be created"
                " without maintainer email\n"
                % progname)
        sys.stderr.write(
                '%s        please set $DEBEMAIL, $EMAIL'
                ' or use the "-m" option\n'
                % (len(progname) * ' '))
        sys.exit(1)

    return options, arguments


def parse_queuecommands(arguments, options, config):
    commands = []
    # want to consume a copy of arguments
    arguments = arguments[:]
    arguments.append(0)
    curarg = []
    while arguments:
        if arguments[0] in validcommands:
            curarg = [arguments[0]]
            if arguments[0] == 'rm':
                if len(arguments) > 1 and arguments[1] == '--nosearchdirs':
                    del arguments[1]
                else:
                    curarg.append('--searchdirs')
        else:
            if not curarg and arguments[0] != 0:
                sys.stderr.write(
                        'Error: Could not parse commands at "%s"\n'
                        % (arguments[0]))
                sys.exit(1)
            if str(arguments[0])[-1] in (',', ';', 0):
                curarg.append(arguments[0][0:-1])
                arguments[0] = ','
            if arguments[0] in (',', ';', 0) and curarg:
                # TV-TODO: syntax check for #args etc.
                if options['debug']:
                    sys.stdout.write(
                            'D: Successfully parsed command "%s"\n'
                            % (' '.join(curarg)))
                commands.append(' '.join(curarg))
                curarg = []
            else:
                # TV-TODO: maybe syntax check the arguments here
                curarg.append(arguments[0])
        del arguments[0]
    if not commands:
        sys.stderr.write("Error: no arguments given, see dcut -h\n")
        sys.exit(1)
    return commands


def write_commands(commands, options, config, tempdir):
    """ Write a file of commands for the upload queue daemon.

        :param commands: Commands to write, as a sequence of text
            strings.
        :param options: Program configuration, as a mapping of options
            `{name: value}`.
        :param config: `ConfigParser` instance for this application.
        :param tempdir: Filesystem path to directory for temporary files.
        :return: Filesystem path of file which was written.

        Write the specified sequence of commands to a file, in the
        format required for the Debian upload queue management daemon.

        Once writing is finished, the file is signed using the
        'debsign' command.

        If not specified in the configuration option 'filetocreate', a
        default filename is generated. In either case, the resulting
        filename is returned.

        """
    progname = dputhelper.get_progname(sys.argv)
    if options['filetocreate']:
        filename = options['filetocreate']
    else:
        translationorig = (
                str('').join(map(chr, range(256)))
                + string.ascii_letters + string.digits)
        translationdest = 256 * '_' + string.ascii_letters + string.digits
        translationmap = str.maketrans(translationorig, translationdest)
        uploadpartforname = options['uploader'].translate(translationmap)
        filename = (
                progname + '.%s.%d.%d.commands' %
                (uploadpartforname, int(time.time()), os.getpid()))
        if tempdir:
            filename = os.path.join(tempdir, filename)
    f = open(filename, "w")
    f.write("Uploader: %s\n" % options['uploader'])
    f.write("Commands:\n %s\n\n" % ('\n '.join(commands)))
    f.close()
    debsign_cmdline = ['debsign']
    debsign_cmdline.append('-m%s' % options['uploader'])
    if options['keyid']:
        debsign_cmdline.append('-k%s' % options['keyid'])
    debsign_cmdline.append('%s' % filename)
    if options['debug']:
        sys.stdout.write("D: calling debsign: %s\n" % debsign_cmdline)
    try:
        subprocess.check_call(debsign_cmdline)
    except subprocess.CalledProcessError:
        sys.stderr.write("Error: debsign failed.\n")
        sys.exit(1)
    return filename


def upload_stolen_from_dput_main(
        host, upload_methods, config, debug, simulate,
        files_to_upload, ftp_passive_mode):
    """ Upload files to the host.

        :param host: Configuration host name.
        :param upload_methods: Mapping of {method_name: callable}.
        :param config: `ConfigParser` instance for this application.
        :param debug: If true, enable debugging output.
        :param simulate: If true, simulate the upload only.
        :param files_to_upload: Collection of file names to upload.
        :param ftp_passive_mode: If true, enable FTP passive mode.
        :return: ``None``.

        Upload the specified files to the host, using the method and
        credentials from the configuration for the host.

        """
    # Messy, yes. But it isn't referenced by the upload method anyway.
    if config.get(host, 'method') == 'local':
        fqdn = 'localhost'
    else:
        fqdn = config.get(host, 'fqdn')

    # Check the upload methods that we have as default and per host
    if debug:
        sys.stdout.write(
                "D: Default Method: %s\n" % config.get('DEFAULT', 'method'))
    if config.get('DEFAULT', 'method') not in upload_methods:
        sys.stderr.write(
                "Unknown upload method: %s\n"
                % config.get('DEFAULT', 'method'))
        sys.exit(1)
    if debug:
        sys.stdout.write("D: Host Method: %s\n" % config.get(host, 'method'))
    if config.get(host, 'method') not in upload_methods:
        sys.stderr.write(
                "Unknown upload method: %s\n" % config.get(host, 'method'))
        sys.exit(1)

    # Inspect the Config and set appropriate upload method
    if not config.get(host, 'method'):
        method = config.get('DEFAULT', 'method')
    else:
        method = config.get(host, 'method')

    # Check now the login and redefine it if needed
    if (
            config.has_option(host, 'login')
            and config.get(host, 'login') != 'username'):
        login = config.get(host, 'login')
    elif (
            config.has_option('DEFAULT', 'login')
            and config.get('DEFAULT', 'login') != 'username'):
        login = config.get('DEFAULT', 'login')
    else:
        # Try to get the login from the enviroment
        if 'USER' in os.environ:
            login = os.environ['USER']
        else:
            sys.stdout.write("$USER not set, will use login information.\n")
            # Else use the current username
            login = pwd.getpwuid(os.getuid())[0]
            if debug:
                sys.stdout.write("D: User-ID: %s\n" % os.getuid())
        if debug:
            sys.stdout.write(
                    "D: Neither host %s nor default login used. Using %s\n"
                    % (host, login))
    if debug:
        sys.stdout.write("D: Login to use: %s\n" % login)

    incoming = config.get(host, 'incoming')
    # Do the actual upload
    if not simulate:
        if debug:
            sys.stdout.write("D: FQDN: %s\n" % fqdn)
            sys.stdout.write("D: Login: %s\n" % login)
            sys.stdout.write("D: Incoming: %s\n" % incoming)
        if method == 'ftp':
            ftp_mode = config.getboolean(host, 'passive_ftp')
            if ftp_passive_mode == 1:
                ftp_mode = 1
            if ftp_mode == 1:
                if debug:
                    if ftp_passive_mode == 1:
                        sys.stdout.write("D: Using passive ftp\n")
                    else:
                        sys.stdout.write("D: Using active ftp\n")
            upload_methods[method](
                    fqdn, login, incoming,
                    files_to_upload, debug, ftp_mode)
        elif method == 'scp':
            if debug and config.getboolean(host, 'scp_compress'):
                sys.stdout.write("D: Setting compression for scp\n")
            scp_compress = config.getboolean(host, 'scp_compress')
            ssh_config_options = [
                    y for y in (
                        x.strip() for x in
                        config.get(host, 'ssh_config_options').split('\n'))
                    if y]
            upload_methods[method](
                    fqdn, login, incoming,
                    files_to_upload, debug, scp_compress, ssh_config_options)
        else:
            upload_methods[method](
                    fqdn, login, incoming,
                    files_to_upload, debug, 0)
    # Or just simulate it.
    else:
        for file_path in files_to_upload:
            sys.stderr.write(
                    "Uploading with %s: %s to %s:%s\n"
                    % (method, file_path, fqdn, incoming))
            subprocess.call("cat %s" % file_path, shell=True)


def dcut():
    options, arguments = getoptions()
    if options['debug']:
        sys.stdout.write('D: calling dput.read_configs\n')
    config = dput.read_configs(options['config'], options['debug'])
    if (
            not options['host']
            and config.has_option('DEFAULT', 'default_host_main')):
        options['host'] = config.get('DEFAULT', 'default_host_main')
        if options['debug']:
            sys.stdout.write(
                    'D: Using host "%s" (default_host_main)\n'
                    % (options['host']))
        if not options['host']:
            options['host'] = 'ftp-master'
            if options['debug']:
                sys.stdout.write(
                        'D: Using host "%s" (hardcoded)\n'
                        % (options['host']))
    tempdir = None
    filename = None
    progname = dputhelper.get_progname(sys.argv)
    try:
        if not (options['filetoupload'] or options['filetocreate']):
            tempdir = tempfile.mkdtemp(prefix=progname + '.')
        if not options['filetocreate']:
            if not options['host']:
                sys.stdout.write(
                        "Error: No host specified"
                        " and no default found in config\n")
                sys.exit(1)
            if not config.has_section(options['host']):
                sys.stdout.write(
                        "No host %s found in config\n" % (options['host']))
                sys.exit(1)
            else:
                if config.has_option(options['host'], 'allow_dcut'):
                    dcut_allowed = config.getboolean(
                            options['host'], 'allow_dcut')
                else:
                    dcut_allowed = config.getboolean('DEFAULT', 'allow_dcut')
                if not dcut_allowed:
                    sys.stdout.write(
                            'Error: dcut is not supported'
                            ' for this upload queue.\n')
                    sys.exit(1)
        if options['filetoupload']:
            if arguments:
                sys.stdout.write(
                        'Error: cannot take commands'
                        ' when uploading existing file,\n'
                        '       "%s" found\n' % (' '.join(arguments)))
                sys.exit(1)
            commands = None
            filename = options['filetoupload']
            if not filename.endswith(".commands"):
                sys.stdout.write(
                        'Error: I\'m insisting on the .commands extension,'
                        ' which\n'
                        '       "%s" doesnt seem to have.\n' % filename)
            # TV-TODO: check file to be readable?
        elif options['changes']:
            parse_changes = dput.parse_changes
            removecommands = create_commands(options, config, parse_changes)
            filename = write_commands(removecommands, options, config, tempdir)
        else:
            commands = parse_queuecommands(arguments, options, config)
            filename = write_commands(commands, options, config, tempdir)
        if not options['filetocreate']:
            dput.import_upload_functions()
            upload_methods = dput.import_upload_functions()
            upload_stolen_from_dput_main(
                    options['host'], upload_methods, config,
                    options['debug'], options['simulate'],
                    [filename], options['passive'])
    finally:
        # we use sys.exit, so we need to clean up here
        if tempdir:
            shutil.rmtree(tempdir)


def create_commands(options, config, parse_changes):
    """ Get the removal commands from a package changes file.

        Parse the specified ‘foo.changes’ file and returns commands to
        remove files named in it.

        """
    changes_file_path = options['changes']
    if options['debug']:
        sys.stdout.write(
                "D: Parsing changes file (%s) for files to remove\n"
                % changes_file_path)
    try:
        changes_file = open(changes_file_path, 'r')
    except IOError:
        sys.stdout.write("Can't open changes file: %s\n" % changes_file_path)
        sys.exit(1)
    changes = parse_changes(changes_file)
    changes_file.close
    removecommands = ['rm --searchdirs ' + os.path.basename(changes_file_path)]
    for file_spec in changes['files'].strip().split('\n'):
        # Filename only.
        file_path = file_spec.split()[4]
        rm = 'rm --searchdirs ' + file_path
        if options['debug']:
            sys.stdout.write("D: Will remove %s with '%s'\n" % (file_path, rm))
        removecommands.append(rm)
    return removecommands


# Copyright © 2015–2021 Ben Finney <bignose@debian.org>
# Copyright © 2008–2013 Y Giridhar Appaji Nag <appaji@debian.org>
# Copyright © 2004–2009 Thomas Viehmann <tv@beamnet.de>
# Copyright © 2000–2004 Christian Kurz <shorty@debian.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.


# Local variables:
# coding: utf-8
# mode: python
# End:
# vim: fileencoding=utf-8 filetype=python :