Intermediate Files in the Python Format

ZTP supports Python script intermediate files that store device and version file information. A ZTP-enabled device can execute the Python script to download version files.

The file name extension of a Python script file must be .py, in the format shown in Example of a Python Script File. Use the Python 3.7 syntax to compile or modify the script file. For details about the fields in a Python script file, see Python Script Description.

Example of a Python Script File

The following preconfiguration script file is only an example and needs to be modified based on deployment requirements.

The SHA256 checksum in the following file is only an example.

#sha256sum="126b05cb7ed99956281edef93f72c0f0ab517eb025edfd9cc4f31a37f123c4fc"
#!/usr/bin/env python
# coding=utf-8
#
# Copyright (C) Huawei Technologies Co., Ltd. 2008-2013. All rights reserved.
# ----------------------------------------------------------------------------------------------------------------------
# History:
# Date                Author                      Modification
# 20180122            Author                      created file.
# ----------------------------------------------------------------------------------------------------------------------

"""
Zero Touch Provisioning (ZTP) enables devices to automatically load version files including system software,
patch files, configuration files when the device starts up, the devices to be configured must be new devices
or have no configuration files.

This is a sample of Zero Touch Provisioning user script. You can customize it to meet the requirements of
your network environment.
"""

import hashlib
import http.client
import logging
import os
import re
import string
import traceback
import xml.etree.ElementTree as etree
from time import sleep
from urllib.parse import urlparse

import ops


# error code
OK = 0
ERR = 1

# File server in which stores the necessary system software, configuration and patch files:
#   1) Specify the file server which supports the following format.
#      tftp://hostname
#      ftp://[username[:password]@]hostname
#      sftp://[username[:password]@]hostname[:port]
#   2) Do not add a trailing slash at the end of file server path.
FILE_SERVER = 'ftp://username:password@hostname/path/'

# Remote file paths:
#   1) The path may include directory name and file name.
#   2) If file name is not specified, indicate the procedure can be skipped.
#   3) If you do not want image, please set it as REMOTE_PATH_IMAGE = {} or REMOTE_PATH_IMAGE = {'DEVICETYPE': ''}
# File paths of system software on file server, filename extension is '.cc'.
REMOTE_PATH_IMAGE = {
    'NetEngine 8000 F': 'V800R021C00SPC100.cc'
}
# File path of configuration file on file server, filename extension is '.cfg', '.zip' or '.dat'.
REMOTE_PATH_CONFIG = 'conf_%s.cfg'
# If you do not want patch, please set it as REMOTE_PATH_PATCH = {} or REMOTE_PATH_PATCH = {'DEVICETYPE': ''}
# File path of patch file on file server, filename extension is '.pat'
REMOTE_PATH_PATCH = {
    'NetEngine 8000 F': 'V800R021C00SPC100SPH001.PAT'
}
# File path of sha256 file, contains sha256 value of image / patch / configuration, file extension is '.txt'
REMOTE_PATH_SHA256 = 'sha256.txt'

# constant
# autoconfig
HTTP_OK = 200
HTTP_BAD_REQUEST = 400
HTTP_BAD_RESPONSE = -1
CONFLICT_RETRY_INTERVAL = 5

POST_METHOD = 'POST'
GET_METHOD = 'GET'
DELETE_METHOD = 'DELETE'
PUT_METHOD = 'PUT'

MAX_TIMES_GET_STARTUP = 120
GET_STARTUP_INTERVAL = 15

MAX_TIMES_CHECK_STARTUP = 205
MAX_TIMES_CHECK_STARTUP_SLAVE = 265
CHECK_STARTUP_INTERVAL = 5

FILE_DELETE_DELAY_TIME = 3

# ztplib
LAST_STATE_MAP = {'true': 'enable', 'false': 'disable'}

# DNS
DNS_STATE_MAP = {'true': 'enable', 'false': 'disable'}

# download
FILE_TRANSFER_RETRY_TIMES = 3
FILE_DOWNLOAD_INTERVAL_TIME = 5

DISK_SPACE_NOT_ENOUGH = 48

IPV4 = 'ipv4'
IPV6 = 'ipv6'

OPS_CLIENT = None


# exception
class PNPStopError(Exception):
    """Stop by pnp"""


class OPIExecError(Exception):
    """OPS Connection Exception"""


class NoNeedZTP2PNPError(Exception):
    """No need start ztp"""


class SysRebootError(Exception):
    """Device reboot error"""


class ZTPDisableError(Exception):
    """ZTP set disable error"""


# opslib
class OPSConnection:
    """Make an OPS connection instance."""
    __slots__ = ['host', 'port', 'headers', 'conn']

    def __init__(self, host, port=80):
        self.host = host
        self.port = port
        self.headers = {
            'Content-type': 'application/xml',
            'Accept': 'application/xml'
        }

        self.conn = http.client.HTTPConnection(self.host, self.port)

    def close(self):
        """Close the connection"""
        self.conn.close()

    def create(self, uri, req_data, need_retry=True):
        """Create a resource on the server"""
        ret = self._rest_call(POST_METHOD, uri, req_data)
        if ret[0] != HTTP_OK and need_retry:
            sleep(CONFLICT_RETRY_INTERVAL)
            ret = self._rest_call(POST_METHOD, uri, req_data)
        return ret

    def delete(self, uri, req_data, need_retry=True):
        """Delete a resource on the server"""
        ret = self._rest_call(DELETE_METHOD, uri, req_data)
        if ret[0] != HTTP_OK and need_retry:
            sleep(CONFLICT_RETRY_INTERVAL)
            ret = self._rest_call(DELETE_METHOD, uri, req_data)
        return ret

    def get(self, uri, req_data=None, need_retry=True):
        """Retrieve a resource from the server"""
        ret = self._rest_call(GET_METHOD, uri, req_data)
        if (ret[0] != HTTP_OK or ret[2] == '') and need_retry:
            sleep(CONFLICT_RETRY_INTERVAL)
            ret = self._rest_call(GET_METHOD, uri, req_data)
        return ret

    def set(self, uri, req_data, need_retry=True):
        """Update a resource on the server"""
        ret = self._rest_call(PUT_METHOD, uri, req_data)
        if ret[0] != HTTP_OK and need_retry:
            sleep(CONFLICT_RETRY_INTERVAL)
            ret = self._rest_call(PUT_METHOD, uri, req_data)
        return ret

    def _rest_call(self, method, uri, req_data):
        """REST call"""
        body = '' if req_data is None else req_data

        try:
            self.conn.request(method, uri, body, self.headers)
        except http.client.CannotSendRequest:
            logging.warning('An error occurred during http request, try to send request again')
            self.close()
            self.conn = http.client.HTTPConnection(self.host, self.port)
            self.conn.request(method, uri, body, self.headers)
        except http.client.InvalidURL:
            logging.warning('Failed to find url: %s in OPS whitelist', uri)
            return HTTP_BAD_REQUEST, '', ''

        try:
            response = self.conn.getresponse()
        except AttributeError:
            logging.warning('An error occurred during http response, try again')
            return HTTP_BAD_RESPONSE, '', ''

        rest_message = response.read()
        if isinstance(rest_message, bytes):
            rest_message = str(rest_message, 'iso-8859-1')
        # logging.debug('uri = %s ret = %s \n %s \n %s', uri, response.status, req_data, rest_message)

        ret = (response.status, response.reason, rest_message)
        return ret


OPS_CLIENT = OPSConnection("localhost")


# pnplib
def dhcp_stop():
    """Stop DHCP client, include dhcpv4 and dhcpv6."""
    logging.info('Stopping dhcp client')

    uri = '/pnp/stopPnp'
    req_data = '''<?xml version="1.0" encoding="UTF-8"?>
        <stopPnp/>'''
    ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
    if ret != HTTP_OK:
        # ignore stop pnp err
        logging.warning('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
        logging.warning('Failed to stop dhcp client')
        return

    logging.info('DHCP client has stopped')
    return


# commlib
def get_cwd():
    """Get the full filename of the current working directory"""
    logging.info("Get the current working directory...")
    namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'}
    uri = "/vfm/pwds/pwd"
    req_data = '''<?xml version="1.0" encoding="UTF-8"?>
                      <pwd>
                          <dictionaryName/>
                      </pwd>
               '''
    ret, _, rsp_data = OPS_CLIENT.get(uri, req_data)
    if ret != http.client.OK or rsp_data is '':
        raise OPIExecError('Failed to get the current working directory')

    logging.info("pwd rsp_data: {}".format(rsp_data))

    root_elem = etree.fromstring(rsp_data)
    uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:dictionaryName'
    elem = root_elem.find(uri, namespaces)
    if elem is None:
        raise OPIExecError('Failed to get the current working directory for no "directoryName" element')

    return elem.text


def file_exist(file_name, dir_path=None):
    """Returns True if file_path refers to an existing file, otherwise returns False"""
    uri = '/vfm/dirs/dir'
    str_temp_1 = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
        <dir>
            <fileName>$fileName</fileName>
        </dir>''')
    str_temp_2 = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
        <dir>
            <dirName>$dirName</dirName>
            <fileName>$fileName</fileName>
        </dir>''')

    if dir_path:
        req_data = str_temp_2.substitute(dirName=dir_path, fileName=file_name)
    else:
        req_data = str_temp_1.substitute(fileName=file_name)
    ret, _, rsp_data = OPS_CLIENT.get(uri, req_data)
    if ret != HTTP_OK or rsp_data == '':
        return False

    root_elem = etree.fromstring(rsp_data)
    namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'}
    uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:fileName'
    elem = root_elem.find(uri, namespaces)
    if elem is None:
        return False

    return True


def copy_file(src_path, dest_path):
    """Copy a file"""
    logging.info('Copy file %s to %s', src_path, dest_path)

    if 'slave' in dest_path:
        file_name = dest_path.split(':/')[1]
        if file_exist(file_name, 'slave#cfcard:/'):
            logging.info('Detect dest file exist, delete it first')
            delete_file(dest_path)

    uri = '/vfm/copyFile'
    str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
        <copyFile>
            <srcFileName>$src</srcFileName>
            <desFileName>$dest</desFileName>
        </copyFile>''')
    req_data = str_temp.substitute(src=src_path, dest=dest_path)

    ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data, False)
    if ret != HTTP_OK:
        file_name = dest_path.split(':/')[1]
        if file_exist(file_name, "slave#cfcard:/"):
            logging.info('Exists file copy fragment, delete it')
            delete_file(dest_path)
        logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
        logging.error('Failed to copy %s to %s', src_path, dest_path)
        return False

    logging.info('succeed to copy')
    return True


def delete_file(file_path):
    """Delete a file permanently"""
    if file_path is None or file_path == '':
        return

    logging.info('Delete file %s permanently', file_path)
    uri = '/vfm/deleteFileUnRes'
    str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
        <deleteFileUnRes>
            <fileName>$filePath</fileName>
        </deleteFileUnRes>
        ''')
    req_data = str_temp.substitute(filePath=file_path)
    ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
    if ret != HTTP_OK:
        logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
        logging.error('Failed to delete the file %s permanently', file_path)


def delete_file_all(file_path, slave, protect_file_list=None):
    """Delete a file permanently on all main boards"""
    if not file_path:
        return
    if protect_file_list:
        for protect_file in protect_file_list:
            if file_path == protect_file:
                return
    file_name = os.path.basename(file_path)
    file_path_t = file_path[:len(file_path)-len(file_name)]
    if file_exist(file_name, file_path_t):
        delete_file(file_path)
    if slave and file_exist(file_name, 'slave#'+file_path_t):
        delete_file('slave#' + file_path)


def has_slave_mpu():
    """Whether device has slave MPU, returns a bool value
    :raise OPIExecError
    """
    logging.info("Test whether device has slave MPU")
    uri = '/devm/phyEntitys'
    req_data = '''<?xml version="1.0" encoding="UTF-8"?>
        <phyEntitys>
            <phyEntity>
                <entClass>mpuModule</entClass>
                <entStandbyState/>
                <position/>
            </phyEntity>
        </phyEntitys>'''

    has_slave = False
    mpu_slot = {}.fromkeys(('master', 'slave'))
    ret, err_code, rsp_data = OPS_CLIENT.get(uri, req_data)
    if ret != HTTP_OK or rsp_data == '':
        logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
        raise OPIExecError('Failed to get the device slave information')

    root_elem = etree.fromstring(rsp_data)
    namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'}
    uri = 'data{0}/vrp:phyEntity'.format(uri.replace('/', '/vrp:'))
    for entity in root_elem.findall(uri, namespaces):
        elem = entity.find("vrp:entStandbyState", namespaces)
        if elem is not None and elem.text.lower().find('slave') >= 0:
            has_slave = True
            elem = entity.find("vrp:position", namespaces)
            if elem is not None:
                mpu_slot['slave'] = elem.text
        if elem is not None and elem.text.lower().find('master') >= 0:
            elem = entity.find("vrp:position", namespaces)
            if elem is not None:
                mpu_slot['master'] = elem.text

    logging.info('Device has slave: %s', has_slave)
    return has_slave, mpu_slot


def get_system_info():
    """Get device product esn mac
    :raise: OPIExecError
    """
    logging.info("Get the system information...")
    uri = "/system/systemInfo"
    req_data = '''<?xml version="1.0" encoding="UTF-8"?>
    <systemInfo>
        <productName/>
        <esn/>
        <mac/>
    </systemInfo>
    '''

    sys_info = {}.fromkeys(('productName', 'esn', 'mac'), '')
    ret, err_code, rsp_data = OPS_CLIENT.get(uri, req_data)
    if ret != HTTP_OK or rsp_data == '':
        logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
        raise OPIExecError('Failed to get the system information')

    root_elem = etree.fromstring(rsp_data)
    namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'}
    uri = 'data' + uri.replace('/', '/vrp:')
    nslen = len(namespaces['vrp'])
    elem = root_elem.find(uri, namespaces)
    if elem is not None:
        for child in elem:
            tag = child.tag[nslen + 2:]
            if tag in list(sys_info.keys()):
                sys_info[tag] = child.text

    return sys_info


def reboot_system(save_config='false'):
    """Reboot system."""
    logging.info('System will reboot to make the configuration take effect')

    if save_config not in ['true', 'false']:
        return

    sleep(10)

    uri = "/devm/reboot"
    str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
        <reboot>
            <saveConfig>$saveConfig</saveConfig>
        </reboot>''')
    req_data = str_temp.substitute(saveConfig=save_config)
    ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
    logging.info("/devm/reboot/: rep_data[{}]".format(str(rsp_data)))
    if ret != HTTP_OK or rsp_data == '':
        logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
        raise OPIExecError('Failed to execute the reboot system operation')


def check_file_type_valid(image, config, patch, sha256_file):
    """Test whether argument paths are valid."""
    logging.info("Test whether argument paths are valid...")
    # check image file path
    file_name = os.path.basename(image)
    if file_name is not '' and not file_name.lower().endswith('.cc'):
        logging.error('Error: Invalid filename extension of system software')
        return False

    # check config file path
    file_name = os.path.basename(config)
    file_name = file_name.lower()
    _, ext = os.path.splitext(file_name)
    if file_name is not '' and ext not in ['.cfg', '.zip', '.dat']:
        logging.error('Error: Invalid filename extension of configuration file')
        return False

    # check patch file path
    file_name = os.path.basename(patch)
    if file_name is not '' and not file_name.lower().endswith('.pat'):
        logging.error('Error: Invalid filename extension of patch file')
        return False

    # check sha256 file path
    file_name = os.path.basename(sha256_file)
    if file_name is not '' and not file_name.lower().endswith('.txt'):
        logging.error('Error: Invalid filename extension of %s file', sha256_file)
        return False

    return True


# startuplib
class StartupInfo:
    """Startup configuration information

    image: startup system software
    config: startup saved-configuration file
    patch: startup patch package
    """

    def __init__(self, image=None, config=None, patch=None):
        self.image = image
        self.config = config
        self.patch = patch


class Startup:
    """Startup configuration information

    current: current startup configuration
    next: current next startup configuration
    """

    def __init__(self):
        self.current, self.next = self._get_startup_info()
        self.startup_info_from_ini_or_cfg = {}
        self.startup_info_before_set = StartupInfo()

    @staticmethod
    def _get_startup_info(retry=True):
        """Get device startup information
            :raise
                opslib.OPIExecError
        """
        uri = '/cfg/startupInfos/startupInfo'
        req_data = '''<?xml version="1.0" encoding="UTF-8"?>
            <startupInfo>
                <position/>
                <configedSysSoft/>
                <curSysSoft/>
                <nextSysSoft/>
                <curStartupFile/>
                <nextStartupFile/>
                <curPatchFile/>
                <nextPatchFile/>
            </startupInfo>'''

        if retry is True:
            retry_time = MAX_TIMES_GET_STARTUP
        else:
            retry_time = 1

        cnt = 0
        elem = None
        namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'}
        ns_len = len(namespaces['vrp'])
        path = 'data' + uri.replace('/', '/vrp:')  # match path
        while cnt < retry_time:
            ret, _, rsp_data = OPS_CLIENT.get(uri, req_data)
            if ret != HTTP_OK or rsp_data == '':
                cnt += 1
                logging.warning('Failed to get the startup information')
                # sleep to wait for system ready when no query result
                sleep(GET_STARTUP_INTERVAL)
                continue

            root_elem = etree.fromstring(rsp_data)
            elem = root_elem.find(path, namespaces)
            if elem is not None:
                break
            logging.warning('No query result while getting startup info')
            # sleep to wait for system ready when no query result
            sleep(GET_STARTUP_INTERVAL)
            cnt += 1

        if elem is None:
            raise OPIExecError('Failed to get the startup information')

        current = StartupInfo()  # current startup info
        curnext = StartupInfo()  # next startup info
        for child in elem:
            # skip the namespace, '{namespace}text'
            tag = child.tag[ns_len + 2:]
            if tag == 'curSysSoft':
                current.image = child.text
            elif tag == 'nextSysSoft':
                curnext.image = child.text
            elif tag == 'curStartupFile' and child.text != 'NULL':
                current.config = child.text
            elif tag == 'nextStartupFile' and child.text != 'NULL':
                curnext.config = child.text
            elif tag == 'curPatchFile' and child.text != 'NULL':
                current.patch = child.text
            elif tag == 'nextPatchFile' and child.text != 'NULL':
                curnext.patch = child.text
            else:
                continue

        return current, curnext

    @staticmethod
    def _set_startup_image_file(file_path, slave=True):
        """Set the next startup system software"""
        file_name = os.path.basename(file_path)
        logging.info('Set the next startup system software to %s, please wait a moment', file_name)
        uri = '/sum/startupbymode'
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <startupbymode>
                <softwareName>$fileName</softwareName>
                <mode>$startupMode</mode>
            </startupbymode>''')

        if slave:
            startup_mode = 'STARTUP_MODE_ALL'
        else:
            startup_mode = 'STARTUP_MODE_PRIMARY'

        req_data = str_temp.substitute(fileName=file_name, startupMode=startup_mode)
        # it is a action operation, so use create for HTTP POST
        ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
        if ret != HTTP_OK:
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            raise OPIExecError('Failed to set startup system software')

    @staticmethod
    def _set_startup_config_file(file_path):
        """Set the next startup saved-configuration file"""
        file_name = os.path.basename(file_path)
        logging.info('Set the next startup saved-configuration file to %s', file_name)
        uri = '/cfg/setStartup'
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <setStartup>
                <fileName>$fileName</fileName>
            </setStartup>''')

        req_data = str_temp.substitute(fileName=file_name)
        # it is a action operation, so use create for HTTP POST
        ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
        if ret != HTTP_OK:
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            raise OPIExecError('Failed to set startup configuration file')

    @staticmethod
    def _del_startup_config_file():
        """Delete startup config file"""
        logging.info('Delete the next startup config file')
        uri = '/cfg/clearStartup'
        req_data = '''<?xml version="1.0" encoding="UTF-8"?>
            <clearStartup>
            </clearStartup>'''
        # it is a action operation, so use create for HTTP POST
        ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
        if ret != HTTP_OK:
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            raise OPIExecError('Failed to delete startup configuration file')

    @staticmethod
    def _set_startup_patch_file(file_path):
        """Set the next startup patch file"""
        file_name = os.path.basename(file_path)
        logging.info('Set the next startup patch file to %s', file_name)
        uri = "/patch/startup"
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <startup>
                <packageName>$fileName</packageName>
            </startup>''')
        req_data = str_temp.substitute(fileName=file_name)
        # it is a action operation, so use create for HTTP POST
        ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
        if ret != HTTP_OK:
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            raise OPIExecError('Failed to set startup patch file')

    @staticmethod
    def _reset_startup_patch_file():
        """Reset patch file for system to startup"""
        logging.info('Reset the next startup patch file')
        uri = '/patch/resetpatch'
        req_data = '''<?xml version="1.0" encoding="UTF-8"?>
            <resetpatch/>'''
        # it is a action operation, so use create for HTTP POST
        ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
        if ret != HTTP_OK:
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            raise OPIExecError('Failed to reset startup patch file')

    def _check_next_startup_file(self, file_name, check_item, slave):
        """Check next startup file ready
            check_item: [image, config, patch]
        """
        if check_item not in ['image', 'config', 'patch']:
            return True

        logging.info('Check the next startup %s information', check_item)
        if slave:
            check_time = MAX_TIMES_CHECK_STARTUP_SLAVE
        else:
            check_time = MAX_TIMES_CHECK_STARTUP
        cnt = 0
        while cnt < check_time:
            _, next_startup = self._get_startup_info()
            startup_file_name = getattr(next_startup, check_item)
            if startup_file_name == file_name:
                sleep(CHECK_STARTUP_INTERVAL)
                logging.info('The next system %s check successfully', check_item)
                return True
            # sleep to wait for system ready when no query result
            sleep(CHECK_STARTUP_INTERVAL)
            if cnt % 12 == 0:
                # logging every minute
                logging.info('Checking the next startup %s, please wait a moment', check_item)
            cnt += 1
        logging.warning('The next system %s is not ready', check_item)
        return False

    def set_startup_info(self, image_file, config_file, patch_file, slave):
        """Set the next startup information."""
        # backup startup_info set by user
        cur_startup, cur_next_startup = self._get_startup_info()
        self.startup_info_before_set.image = cur_next_startup.image
        self.startup_info_before_set.patch = cur_next_startup.patch
        self.startup_info_before_set.config = cur_next_startup.config
        logging.info("save startup config before ztp setting")

        logging.info('Start to set next startup information')
        # 1. Set next startup system software
        if image_file is not None:
            try:
                self._set_startup_image_file(image_file)
                if self._check_next_startup_file(image_file, 'image', slave) is False:
                    raise OPIExecError('Failed to check the next startup image file')
            except OPIExecError as reason:
                logging.error(reason)
                delete_file_all(image_file, slave, [cur_startup.image, cur_next_startup.image])
                self.reset_startup_info(slave)
                raise

        # 2. Set next startup patch file
        if patch_file is not None:
            try:
                self._set_startup_patch_file(patch_file)
                if self._check_next_startup_file(patch_file, 'patch', slave) is False:
                    raise OPIExecError('Failed to check the next startup patch file')
            except OPIExecError as reason:
                logging.error(reason)
                delete_file_all(patch_file, slave, [cur_startup.patch, cur_next_startup.patch])
                self.reset_startup_info(slave)
                raise

        # 3. Set next startup config file
        if config_file is not None:
            try:
                self._set_startup_config_file(config_file)
                if self._check_next_startup_file(config_file, 'config', slave) is False:
                    raise OPIExecError('Failed to check the next startup config file')
            except OPIExecError as reason:
                logging.error(reason)
                delete_file_all(config_file, slave, [cur_startup.config, cur_next_startup.config])
                self.reset_startup_info(slave)
                raise

    def reset_startup_info(self, slave):
        """Reset startup info and delete the downloaded files"""
        logging.info('Start to reset next startup information')
        if not self.startup_info_before_set.image:
            logging.error('image of roll back point is None')
            return
        cur_startup, next_startup = self._get_startup_info()

        # 1. Reset next startup config file and delete it
        try:
            # user configure startup info after ZTP
            if next_startup.config != self.startup_info_from_ini_or_cfg.get("SYSTEM-CONFIG"):
                logging.info("no need to reset startup config")
                if self.startup_info_from_ini_or_cfg.get("SYSTEM-CONFIG"):
                    sleep(FILE_DELETE_DELAY_TIME)
                    delete_file_all(self.startup_info_from_ini_or_cfg.get("SYSTEM-CONFIG"), slave,
                                    [cur_startup.config, next_startup.config])
            # user do not configure startup info
            elif next_startup.config != self.startup_info_before_set.config:
                logging.info("reset startup config to the beginning")
                if self.startup_info_before_set.config is None:
                    self._del_startup_config_file()
                else:
                    self._set_startup_config_file(self.startup_info_before_set.config)
                    if self._check_next_startup_file(self.startup_info_before_set.config, 'config', slave) is not True:
                        raise OPIExecError('Failed to check the next startup config file')
                if next_startup.config:
                    sleep(FILE_DELETE_DELAY_TIME)
                    delete_file_all(next_startup.config, slave,
                                    [cur_startup.config, self.startup_info_before_set.config])
        except Exception as reason:
            logging.error(reason)

        # 2. Reset next startup patch file and delete it
        try:
            # user configure startup info after ZTP
            if next_startup.patch != self.startup_info_from_ini_or_cfg.get("SYSTEM-PAT"):
                logging.info("no need to reset startup patch")
                if self.startup_info_from_ini_or_cfg.get("SYSTEM-PAT"):
                    sleep(FILE_DELETE_DELAY_TIME)
                    delete_file_all(self.startup_info_from_ini_or_cfg.get("SYSTEM-PAT"), slave,
                                    [cur_startup.patch, next_startup.patch])
            # user do not configure startup info
            elif next_startup.patch != self.startup_info_before_set.patch:
                logging.info("reset startup patch to the beginning")
                if self.startup_info_before_set.patch is None:
                    self._reset_startup_patch_file()
                else:
                    self._set_startup_patch_file(self.startup_info_before_set.patch)
                    if self._check_next_startup_file(self.startup_info_before_set.patch, 'patch', slave) is not True:
                        raise OPIExecError('Failed to check the next startup patch file')
                if next_startup.patch:
                    sleep(FILE_DELETE_DELAY_TIME)
                    delete_file_all(next_startup.patch, slave,
                                    [cur_startup.patch, self.startup_info_before_set.patch])
        except Exception as reason:
            logging.error(reason)

        # 3. Reset next startup system software and delete it
        try:
            # user configure startup info after ZTP
            if next_startup.image != self.startup_info_from_ini_or_cfg.get("SYSTEM-SOFTWARE"):
                logging.info("no need to reset startup image")
                if self.startup_info_from_ini_or_cfg.get("SYSTEM-SOFTWARE"):
                    sleep(FILE_DELETE_DELAY_TIME)
                    delete_file_all(self.startup_info_from_ini_or_cfg.get("SYSTEM-SOFTWARE"), slave,
                                    [cur_startup.image, next_startup.image])
            # user do not configure startup info
            elif next_startup.image != self.startup_info_before_set.image:
                logging.info("reset startup config to the beginning")
                self._set_startup_image_file(self.startup_info_before_set.image)
                if self._check_next_startup_file(self.startup_info_before_set.image, 'image', slave) is not True:
                    raise OPIExecError('Failed to check the next startup image file')
                if next_startup.image:
                    sleep(FILE_DELETE_DELAY_TIME)
                    delete_file_all(next_startup.image, slave,
                                    [cur_startup.image, self.startup_info_before_set.image])
        except Exception as reason:
            logging.error(reason)

    def set_startup_info_from_ini_or_cfg(self, startup_info):
        for item_key in ['SYSTEM-SOFTWARE', 'SYSTEM-CONFIG', 'SYSTEM-PAT']:
            if not startup_info[item_key]:
                self.startup_info_from_ini_or_cfg[item_key] = startup_info[item_key]
            else:
                self.startup_info_from_ini_or_cfg[item_key] = 'cfcard:/' + startup_info[item_key]


def convert_byte_to_str(data):
    result = data
    if not isinstance(data, str):
        result = str(data, "iso-8859-1")
    return result


def sha256sum(fname, need_skip_first_line=False):
    """
    Calculate sha256 num for this file.
    """

    def read_chunks(fhdl):
        '''read chunks'''
        chunk = fhdl.read(8096)
        while chunk:
            yield chunk
            chunk = fhdl.read(8096)
        else:
            fhdl.seek(0)

    sha256_obj = hashlib.sha256()
    if isinstance(fname, str):
        with open(fname, "rb") as fhdl:
            # skip the first line
            fhdl.seek(0)
            if need_skip_first_line:
                fhdl.readline()
            for chunk in read_chunks(fhdl):
                sha256_obj.update(chunk)
    elif fname.__class__.__name__ in ["StringIO", "StringO"]:
        for chunk in read_chunks(fname):
            sha256_obj.update(chunk)
    else:
        pass
    return sha256_obj.hexdigest()


def sha256_get_from_file(fname):
    """Get sha256 num form file, stored in first line"""
    with open(fname, "rb") as fhdl:
        fhdl.seek(0)
        line_first = convert_byte_to_str(fhdl.readline())
    # if not match pattern, the format of this file is not supported
    if not re.match('^#sha256sum="[\\w]{64}"[\r\n]+$', line_first):
        return 'None'

    return line_first[12:76]


def sha256_check_with_first_line(fname):
    """Validate sha256 for this file"""

    work_fname = os.path.join("ztp", fname)
    sha256_calc = sha256sum(work_fname, True)
    sha256_file = sha256_get_from_file(work_fname)

    if sha256_file.lower() != sha256_calc:
        logging.warning('SHA256 check failed, file %s', fname)
        logging.warning('SHA256 checksum of the file "%s" is %s', fname, sha256_calc)
        logging.warning('SHA256 checksum received from the file "%s" is %s', fname, sha256_file)
        return False

    return True


def parse_sha256_file(fname):
    """parse sha256 file"""

    def read_line(fhdl):
        """read a line by loop"""
        line = fhdl.readline()
        while line:
            yield line
            line = fhdl.readline()
        else:
            fhdl.seek(0)

    sha256_dic = {}
    work_fname = os.path.join("ztp", fname)
    with open(work_fname, "rb") as fhdl:
        for line in read_line(fhdl):
            line_spilt = convert_byte_to_str(line).split()
            if 2 != len(line_spilt):
                continue
            dic_tmp = {line_spilt[0]: line_spilt[1]}
            sha256_dic.update(dic_tmp)
    return sha256_dic


def verify_and_parse_sha256_file(fname):
    """
    verify data integrity of sha256 file and parse this file

    format of this file is like:
    ------------------------------------------------------------------

    file-name              sha256
    conf_5618642831132.cfg 1254b2e49d3347c4147a90858fa5f59aa2594b7294304f34e7da328bf3cdfbae
    ------------------------------------------------------------------
    """
    if not sha256_check_with_first_line(fname):
        return ERR, None
    return OK, parse_sha256_file(fname)


def sha256_check_with_dic(sha256_dic, fname):
    """sha256 check with dic"""
    if fname not in sha256_dic:
        logging.info('sha256_dic does not has key %s, no need to do sha256 verification', fname)
        return True

    sha256sum_result = sha256sum(fname, False)
    if sha256_dic[fname].lower() == sha256sum_result:
        logging.info('SHA256 check %s successfully', fname)
        return True

    logging.warning('SHA256 check failed, file %s', fname)
    logging.warning('SHA256 checksum of the file "%s" is %s', fname, sha256sum_result)
    logging.warning('SHA256 checksum received for the file "%s" is %s', fname, sha256_dic[fname])

    return False


def check_parameter(aset):
    seq = ['&', '>', '<', '"', "'"]
    if aset:
        for c in seq:
            if c in aset:
                return True
    return False


def check_filename():
    sys_info = get_system_info()
    url_tuple = urlparse(FILE_SERVER)
    if check_parameter(url_tuple.username) or check_parameter(url_tuple.password):
        logging.error('Invalid username or password, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.")
        return ERR
    file_name = os.path.basename(REMOTE_PATH_IMAGE.get(sys_info['productName'], ''))
    if file_name is not '' and check_parameter(file_name):
        logging.error(
            'Invalid filename of system software, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.")
        return ERR
    file_name = os.path.basename(REMOTE_PATH_CONFIG)
    if file_name is not '' and check_parameter(file_name):
        logging.error(
            'Invalid filename of configuration file, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.")
        return ERR
    file_name = os.path.basename(REMOTE_PATH_PATCH.get(sys_info['productName'], ''))
    if file_name is not '' and check_parameter(file_name):
        logging.error(
            'Invalid filename of patch file, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.")
        return ERR
    try:
        file_name = os.path.basename(REMOTE_PATH_SHA256)
    except NameError:
        file_name = ''
    if file_name is not '' and check_parameter(file_name):
        logging.error(
            'Invalid filename of sha256 file, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.")
        return ERR

    return OK


def download_cfg_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic):
    """ Download configuration file """
    url = os.path.join(startup_info['FILESERVER'], startup_info['SYSTEM-CONFIG'])
    local_path_config = os.path.join('cfcard:', os.path.basename(startup_info['SYSTEM-CONFIG']))
    delete_file_all(local_path_config, slave)
    ret = download_file(url, os.path.basename(local_path_config), ip_protocol, vpn_instance)

    if ret == ERR or not file_exist(os.path.basename(url)):
        logging.error('%s download fail', local_path_config)
        return False, local_path_config

    if sha256_val_dic is not None:
        if not startup_info['SYSTEM-CONFIG']:
            return False, local_path_config
        file_name = os.path.basename(startup_info['SYSTEM-CONFIG'])
        if not sha256_check_with_dic(sha256_val_dic, file_name):
            logging.error('Error: SHA256 check failed, file "%s"' % file_name)
            return False, local_path_config

    if slave:
        ret = copy_file(local_path_config, 'slave#' + local_path_config)
        if ret is False:
            logging.error('%s copy fail', local_path_config)
            return False, local_path_config

    return True, local_path_config


def download_patch_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic):
    """ Download patch file """
    file_name = os.path.basename(startup_info['SYSTEM-PAT'])
    url = os.path.join(startup_info['FILESERVER'], startup_info['SYSTEM-PAT'])
    local_path_patch = os.path.join('cfcard:', file_name)
    delete_file_all(local_path_patch, slave)  # Delete the software package with the same name as the non-startup software package from the disk to avoid space insufficiency.
    ret = download_file(url, file_name, ip_protocol, vpn_instance)
    if ret not in [OK, DISK_SPACE_NOT_ENOUGH] or not file_exist(file_name):
        logging.error('%s download fail', local_path_patch)
        return ERR, local_path_patch

    if ret == DISK_SPACE_NOT_ENOUGH:
        logging.error('The space of disk is not enough')
        return DISK_SPACE_NOT_ENOUGH, local_path_patch

    if not sha256_check_with_dic(sha256_val_dic, file_name):
        logging.error('Error: SHA256 check failed, file "%s"' % file_name)
        return ERR, local_path_patch

    if slave:
        ret = copy_file(local_path_patch, 'slave#' + local_path_patch)
        if ret is False:
            logging.error('%s copy fail', local_path_patch)
            return ERR, local_path_patch

    return OK, local_path_patch


def download_image_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic):
    """ Download system software """
    file_name = os.path.basename(startup_info['SYSTEM-SOFTWARE'])
    url = startup_info['FILESERVER'] + '/' + startup_info['SYSTEM-SOFTWARE']
    local_path_image = os.path.join('cfcard:', file_name)
   delete_file_all(local_path_image, slave)  # Delete the software package with the same name as the non-startup software package from the disk to avoid space insufficiency.
    ret = download_file(url, file_name, ip_protocol, vpn_instance)
    if ret not in [OK, DISK_SPACE_NOT_ENOUGH] or not file_exist(file_name):
        logging.error('%s download fail', local_path_image)
        return ERR, local_path_image

    if ret == DISK_SPACE_NOT_ENOUGH:
        logging.error('The space of disk is not enough')
        return DISK_SPACE_NOT_ENOUGH, local_path_image

    if not sha256_check_with_dic(sha256_val_dic, file_name):
        logging.error('Error: SHA256 check failed, file "%s"' % file_name)
        return ERR, local_path_image

    if slave:
        ret = copy_file(local_path_image, 'slave#' + local_path_image)
        if ret is False:
            logging.error('%s copy fail', local_path_image)
            return ERR, local_path_image

    return OK, local_path_image


def download_startup_file(startup_info, slave, ip_protocol, vpn_instance):
    """Download startup file"""
    # init here
    local_path_config = None
    local_path_patch = None
    local_path_image = None

    # current STARTUP_INFO
    cur_startup, next_startup = STARTUP._get_startup_info()
    cur_config = None if not cur_startup.config else os.path.basename(cur_startup.config)
    cur_patch = None if not cur_startup.patch else os.path.basename(cur_startup.patch)
    cur_image = None if not cur_startup.image else os.path.basename(cur_startup.image)
    next_config = None if not next_startup.config else os.path.basename(next_startup.config)
    next_patch = None if not next_startup.patch else os.path.basename(next_startup.patch)
    next_image = None if not next_startup.image else os.path.basename(next_startup.image)

    # download sha256 file first, used to verify data integrity of files which will be downloaded next
    try:
        cwd = get_cwd()
        file_path = REMOTE_PATH_SHA256
        if not file_path.startswith('/'):
            file_path = '/' + file_path
        file_name = os.path.basename(file_path)
        if file_name:
            url = FILE_SERVER + file_path
            local_path = os.path.join(cwd, "ztp", file_name)
            ret = download_file(url, local_path, ip_protocol, vpn_instance)
            if ret is ERR:
                logging.error('Error: Failed to download sha256 file "%s"' % file_name)
                return ERR, None, None, None
            logging.info('Info: Download sha256 file successfully')
            ret, sha256_val_dic = verify_and_parse_sha256_file(file_name)
            # delete the file immediately
            os.remove(os.path.join("ztp", file_name))
            if ret is ERR:
                logging.error('Error: sha256 check failed, file "%s"' % file_name)
                return ERR, None, None, None
        else:
            sha256_val_dic = {}
    except NameError:
        sha256_val_dic = {}
        logging.info('no need sha256 to check download file')

    # if user change the startup to the name in ini/cfg, ztp will not download
    # 1. Download configuration file
    if startup_info['SYSTEM-CONFIG'] and startup_info['SYSTEM-CONFIG'] not in [cur_config, next_config]:
        ret, local_path_config = download_cfg_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic)
        if ret is False:
            logging.info('delete startup file [cfg]')
            delete_startup_file(local_path_image, local_path_config, local_path_patch, slave)
            return ERR, local_path_image, local_path_config, local_path_patch
        logging.info('succeed to download config file')
    elif startup_info['SYSTEM-CONFIG'] and startup_info['SYSTEM-CONFIG'] in [cur_config, next_config]:
        logging.warning('The configured config version is the same as the current device version')

    # 2. Download patch file
    if startup_info['SYSTEM-PAT'] and startup_info['SYSTEM-PAT'] not in [cur_patch, next_patch]:
        ret, local_path_patch = download_patch_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic)
        if ret is ERR:
            delete_startup_file(local_path_image, local_path_config, local_path_patch, slave)
            return ERR, local_path_image, local_path_config, local_path_patch
        if ret == DISK_SPACE_NOT_ENOUGH:
            delete_startup_file(local_path_image, None, local_path_patch, slave)
            logging.info('disk space not enough, delete patch')
            return OK, None, local_path_config, None
    elif startup_info['SYSTEM-PAT'] and startup_info['SYSTEM-PAT'] in [cur_patch, next_patch]:
        logging.warning('The configured patch version is the same as the current device version')

    # 3. Download system software
    if startup_info['SYSTEM-SOFTWARE'] and startup_info['SYSTEM-SOFTWARE'] not in [cur_image, next_image]:
        ret, local_path_image = download_image_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic)
        if ret is ERR:
            delete_startup_file(local_path_image, local_path_config, local_path_patch, slave)
            return ERR, local_path_image, local_path_config, local_path_patch
        if ret == DISK_SPACE_NOT_ENOUGH:
            delete_startup_file(local_path_image, None, local_path_patch, slave)
            logging.info('disk space not enough, delete image and patch')
            return OK, None, local_path_config, None
    elif startup_info['SYSTEM-SOFTWARE'] and startup_info['SYSTEM-SOFTWARE'] in [cur_image, next_image]:
        logging.warning('The configured image version is the same as the current device version')

    return OK, local_path_image, local_path_config, local_path_patch


def set_startup_file(image_file, config_file, patch_file, slave):
    """Set startup file"""
    try:
        STARTUP.set_startup_info(image_file, config_file, patch_file, slave)
    except OPIExecError:
        return ERR

    logging.info('Set startup info ready %s %s %s', image_file, config_file, patch_file)
    return OK


def delete_startup_file(image_file, config_file, patch_file, slave):
    """Delete all system file"""
    delete_file_all(image_file, slave)
    delete_file_all(config_file, slave)
    delete_file_all(patch_file, slave)


# ztplib
def set_ztp_last_status(state):
    """Set ztp last status."""
    uri = '/ztpops/ztpStatus/ztpLastStatus'
    str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
        <ztpLastStatus>$ztpLastStatus</ztpLastStatus>''')
    req_data = str_temp.substitute(ztpLastStatus=state)
    ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
    if ret != HTTP_OK:
        logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
        logging.error('Failed to set ztp last status to %s', LAST_STATE_MAP[state])
        return

    logging.info('Succeed to set ztp last status to %s', LAST_STATE_MAP[state])


def get_ztp_enable_status():
    """Get ztp enable status
    :raise: OPIExecError
    """
    uri = '/ztpops/ztpStatus/ztpEnable'
    req_data = '''<?xml version="1.0" encoding="UTF-8"?>
        <ztpEnable/>'''
    ret, err_code, rsp_data = OPS_CLIENT.get(uri, req_data)
    if ret != HTTP_OK or rsp_data == '':
        logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
        raise OPIExecError('Failed to get ztp enable status')

    root_elem = etree.fromstring(rsp_data)
    namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'}
    uri = 'data' + uri.replace('/', '/vrp:')
    elem = root_elem.find(uri, namespaces)
    if elem is None:
        raise OPIExecError('Failed to read ztp enable status')

    return elem.text


def parse_environment(env):
    lines = re.split(r'\r\n', env)
    for line in lines[3:-2]:
        item = re.split(r'[ ][ ]*', line)
        if item[1] == 'ztp_exit_flag':
            logging.info('parse environment, ztp_exit_flag: ' + item[2])
            return item[2]

    return None


def get_ztp_exit_environment():
    _ops = ops.ops()
    handle, err_desp = _ops.cli.open()
    ret = _ops.cli.execute(handle, "display ops environment")
    if ret[2] == 'Success' and ret[0]:
        return parse_environment(ret[0])

    return None


def check_ztp_continue():
    """Check if ztp can continue to run"""
    res = True
    try:
        enable_state = get_ztp_enable_status()
        ztp_exit_flag = get_ztp_exit_environment()
        if enable_state == 'false' or ztp_exit_flag == 'true':
            res = False
    except OPIExecError as ex:
        logging.warning(ex)

    return res


# DNS
class DNSServer:
    """Dns protocol service"""
    __slots__ = ['dns_servers', 'enable_state', 'vpn_instance']

    def __init__(self):
        self.dns_servers = []
        self.enable_state = 'false'
        self.vpn_instance = {}

    def _set_dns_enable_switch(self, switch):
        """Set DNS global switch."""
        if switch not in ['true', 'false']:
            return

        if self.enable_state == switch:
            logging.info('The current enable state of dns is %s, no need to set', DNS_STATE_MAP.get(switch))
            return

        uri = '/dns/dnsGlobalCfgs/dnsGlobalCfg'
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <dnsGlobalCfg>
                <dnsEnable>$dnsEnable</dnsEnable>
            </dnsGlobalCfg>''')
        req_data = str_temp.substitute(dnsEnable=switch)
        ret, err_code, rsp_data = OPS_CLIENT.set(uri, req_data)
        if ret != HTTP_OK:
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            raise OPIExecError('Failed to %s DNS' % DNS_STATE_MAP.get(switch))

        self.enable_state = switch
        return

    def add_dns_servers_ipv4(self, dns_servers, vpn_instance):
        """Add IPv4 DNS servers configuration.
        :raise: OPIExecError
        """
        while '255.255.255.255' in dns_servers:
            dns_servers.remove('255.255.255.255')

        # only configure new dns servers
        new_dns_servers = list(set(dns_servers).difference(set(self.dns_servers)))
        if not new_dns_servers:
            return

        self._set_dns_enable_switch('true')
        logging.info('Add DNS IPv4 servers')

        uri = '/dns/dnsIpv4Servers'
        root_elem = etree.Element('dnsIpv4Servers')
        for server_addr in new_dns_servers:
            dns_server = etree.SubElement(root_elem, 'dnsIpv4Server')
            etree.SubElement(dns_server, 'ipv4Addr').text = server_addr
            etree.SubElement(dns_server, 'vrfName').text = vpn_instance
        req_data = etree.tostring(root_elem, 'UTF-8')
        ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data)
        if ret != HTTP_OK:
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            raise OPIExecError('Failed to config DNS IPv4 server')

        # configure success
        self.dns_servers.extend(new_dns_servers)
        self.vpn_instance.update(dict.fromkeys(new_dns_servers, vpn_instance))

    def del_dns_servers_ipv4(self):
        """Delete IPv4 DNS servers configuration.
        :raise: OPIExecError
        """
        if not self.dns_servers:
            logging.info('Current dns server is empty, no need to delete')
            return

        logging.info('Delete DNS IPv4 servers')

        uri = '/dns/dnsIpv4Servers'
        root_elem = etree.Element('dnsIpv4Servers')
        for server_addr in self.dns_servers:
            dns_server = etree.SubElement(root_elem, 'dnsIpv4Server')
            etree.SubElement(dns_server, 'ipv4Addr').text = server_addr
            etree.SubElement(dns_server, 'vrfName').text = self.vpn_instance.get(server_addr)
        req_data = etree.tostring(root_elem, 'UTF-8')

        ret, err_code, rsp_data = OPS_CLIENT.delete(uri, req_data)
        if ret != HTTP_OK:
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            raise OPIExecError('Failed to delete DNS IPv4 server')

        # delete all dns server success
        self.vpn_instance = {}
        self.dns_servers = []

        self._set_dns_enable_switch('false')

    @staticmethod
    def get_addr_by_hostname(host, vpn_instance, addr_type='1'):
        """Translate a host name to IPv4 address format. The IPv4 address is returned as a string.
        :raise: OPIExecError
        """
        logging.info('Get ipv4 address by host name %s', host)
        uri = '/dns/dnsNameResolution'
        root_elem = etree.Element('dnsNameResolution')
        etree.SubElement(root_elem, 'host').text = host
        etree.SubElement(root_elem, 'addrType').text = addr_type
        etree.SubElement(root_elem, 'vrfName').text = vpn_instance
        req_data = etree.tostring(root_elem, "UTF-8")
        logging.warning(req_data)
        ret, err_code, rsp_data = OPS_CLIENT.get(uri, req_data)
        if ret != HTTP_OK or rsp_data == '':
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            raise OPIExecError('Failed to get ipv4 address by host name')

        logging.warning(rsp_data)
        root_elem = etree.fromstring(rsp_data)
        namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'}
        uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:'
        elem = root_elem.find(uri + 'ipv4Addr', namespaces)
        if elem is None:
            raise OPIExecError('Failed to read IP address by host name')

        return elem.text


# download
def download_file(url, local_path, ip_protocol, vpn_instance):
    """
    Description:
        Download file, support TFTP, FTP, SFTP.
    Args:
      url: URL of remote file
        tftp://hostname/path
        ftp://[username[:password]@]hostname/path
        sftp://[username[:password]@]hostname[:port]/path

      local_path: local path to put the file
        cfcard:/xxx

      ip_protocol: ipv4 or ipv6
      vpn_instance: vpn_instance
    Returns:
        ERR[1]: download fail
        OK[0]:  download success
    """

    url_tuple = urlparse(url)
    func_dict = {
        'tftp': {
            IPV4: TFTPv4,
            IPV6: TFTPv6,
        },
        'ftp': {
            IPV4: FTPv4,
            IPV6: FTPv6,
        },
        'sftp': {
            IPV4: SFTPv4,
            IPV6: SFTPv6,
        }
    }

    scheme = url_tuple.scheme
    if scheme not in func_dict.keys():
        logging.error('Unknown file transfer scheme %s', scheme)
        return ERR

    if ip_protocol == IPV4:
        if not re.match(r'\d+\.\d+\.\d+\.\d+', url_tuple.hostname):
            # get server ip by hostname from dns
            try:
                dns_vpn = '_public_' if vpn_instance in [None, ''] else vpn_instance
                server_ip = DNS.get_addr_by_hostname(url_tuple.hostname, dns_vpn)
                logging.info("server ip: " + server_ip)
            except OPIExecError as ex:
                logging.error(ex)
                return ERR

            url = url.replace(url_tuple.hostname, server_ip)

    vpn_instance = '' if vpn_instance in [None, '_public_'] else vpn_instance
    logging.info('Start to download file %s using %s', os.path.basename(local_path), scheme)

    ret = ERR
    cnt = 0
    while cnt < 1 + FILE_TRANSFER_RETRY_TIMES:
        if cnt:
            logging.info('Try downloading again, please wait a moment')
        try:
            ret = func_dict[scheme][ip_protocol](url, local_path, vpn_instance).start()
            if ret in [OK, DISK_SPACE_NOT_ENOUGH]:
                logging.info('download file %s using %s, ret:%d', os.path.basename(local_path), scheme, ret)
                break
            logging.error('Failed to download file %s using %s', os.path.basename(local_path), scheme)
            sleep(FILE_DOWNLOAD_INTERVAL_TIME)
        except OPIExecError as ex:
            logging.error(ex)
        except Exception as ex:
            logging.exception(ex)
        cnt += 1
    return ret


class Download:
    """File download base class"""

    def start(self):
        """Start to download file"""
        uri = self.get_uri()
        req_data = self.get_req_data()
        self.pre_download()
        ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data, False)
        if ret != HTTP_OK:
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data)
            root = etree.fromstring(rsp_data)
            rpc_error = root.find('rpc-error')
            if rpc_error and rpc_error.find('error-app-tag') is not None:
                ret = int(rpc_error.find('error-app-tag').text)
            else:
                ret = ERR
        else:
            ret = OK
        self.after_download()
        return ret

    def get_uri(self):
        """Return download request uri"""
        raise NotImplementedError

    def get_req_data(self):
        """Return download request xml message"""
        raise NotImplementedError

    def pre_download(self):
        """Do some actions before download file"""
        raise NotImplementedError

    def after_download(self):
        """Do some actions after download file"""
        raise NotImplementedError


class FTP(Download):
    """FTP download class"""

    def get_uri(self):
        """Return ftp download request uri"""
        return '/ftpc/ftpcTransferFiles/ftpcTransferFile'

    def get_req_data(self):
        """Implemented by subclasses"""
        raise NotImplementedError

    def pre_download(self):
        """FTP not care"""

    def after_download(self):
        """FTP not care"""


class FTPv4(FTP):
    """FTPv4 download class"""

    def __init__(self, url, local_path, vpn_instance):
        self.url = url
        self.local_path = local_path
        self.vpn_instance = vpn_instance

    def get_req_data(self):
        """Return ftpv4 download request xml message"""
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <ftpcTransferFile>
                <serverIpv4Address>$serverIp</serverIpv4Address>
                <commandType>get</commandType>
                <userName>$username</userName>
                <password>$password</password>
                <localFileName>$localPath</localFileName>
                <remoteFileName>$remotePath</remoteFileName>
                <vpnInstanceName>$vpnInstance</vpnInstanceName>
            </ftpcTransferFile>''')
        url_tuple = urlparse(self.url)
        req_data = str_temp.substitute(serverIp=url_tuple.hostname,
                                       username=url_tuple.username,
                                       password=url_tuple.password,
                                       remotePath=url_tuple.path[1:],
                                       localPath=self.local_path,
                                       vpnInstance=self.vpn_instance)
        return req_data


class FTPv6(FTP):
    """FTPv6 download class"""

    def __init__(self, url, local_path, vpn_instance):
        self.url = url
        self.local_path = local_path
        self.vpn_instance = vpn_instance

    def get_req_data(self):
        """Return ftpv6 download request xml message"""
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <ftpcTransferFile>
                <serverIpv6Address>$serverIp</serverIpv6Address>
                <commandType>get</commandType>
                <userName>$username</userName>
                <password>$password</password>
                <localFileName>$localPath</localFileName>
                <remoteFileName>$remotePath</remoteFileName>
                <ipv6VpnName>$vpnInstance</ipv6VpnName>
            </ftpcTransferFile>''')
        url_tuple = urlparse(self.url)
        idx = url_tuple.netloc.rfind('@')
        server_ip = url_tuple.netloc[idx + 1:]
        req_data = str_temp.substitute(serverIp=server_ip,
                                       username=url_tuple.username,
                                       password=url_tuple.password,
                                       remotePath=url_tuple.path[1:],
                                       localPath=self.local_path,
                                       vpnInstance=self.vpn_instance)
        return req_data


class TFTP(Download):
    """TFTP download class"""

    def get_uri(self):
        """Return ftp download request uri"""
        return '/tftpc/tftpcTransferFiles/tftpcTransferFile'

    def get_req_data(self):
        """Implemented by subclasses"""
        raise NotImplementedError

    def pre_download(self):
        """TFTP not case"""

    def after_download(self):
        """TFTP not case"""


class TFTPv4(TFTP):
    """TFTPv4 download class"""

    def __init__(self, url, local_path, vpn_instance):
        self.url = url
        self.local_path = local_path
        self.vpn_instance = vpn_instance

    def get_req_data(self):
        """Return tftpv4 download request xml message"""
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <tftpcTransferFile>
                <serverIpv4Address>$serverIp</serverIpv4Address>
                <commandType>get_cmd</commandType>
                <localFileName>$localPath</localFileName>
                <remoteFileName>$remotePath</remoteFileName>
                <vpnInstanceName>$vpnInstance</vpnInstanceName>
            </tftpcTransferFile>''')
        url_tuple = urlparse(self.url)
        req_data = str_temp.substitute(serverIp=url_tuple.hostname,
                                       remotePath=url_tuple.path[1:],
                                       localPath=self.local_path,
                                       vpnInstance=self.vpn_instance)
        return req_data


class TFTPv6(TFTP):
    """TFTPv6 download class"""

    def __init__(self, url, local_path, vpn_instance):
        self.url = url
        self.local_path = local_path
        self.vpn_instance = vpn_instance

    def get_req_data(self):
        """Return tftpv4 download request xml message"""
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <tftpcTransferFile>
                <serverIpv6Address>$serverIp</serverIpv6Address>
                <commandType>get_cmd</commandType>
                <localFileName>$localPath</localFileName>
                <remoteFileName>$remotePath</remoteFileName>
                <ipv6VpnName>$vpnInstance</ipv6VpnName>
            </tftpcTransferFile>''')
        url_tuple = urlparse(self.url)
        idx = url_tuple.netloc.rfind('@')
        server_ip = url_tuple.netloc[idx + 1:]
        req_data = str_temp.substitute(serverIp=server_ip,
                                       remotePath=url_tuple.path[1:],
                                       localPath=self.local_path,
                                       vpnInstance=self.vpn_instance)
        return req_data


class SFTP(Download):
    """SFTP download class"""

    def get_uri(self):
        """Return ftp download request uri"""
        return '/sshc/sshcConnects/sshcConnect'

    def get_req_data(self):
        """Implemented by subclasses"""
        raise NotImplementedError

    def pre_download(self, ):
        self._set_sshc_first_time('Enable')

    def after_download(self):
        self._del_sshc_rsa_key()
        self._set_sshc_first_time('Disable')

    @classmethod
    def _set_sshc_first_time(cls, switch):
        """Set SSH client attribute of authenticating user for the first time access"""
        if switch not in ['Enable', 'Disable']:
            return ERR

        logging.info('Set SSH client first-time enable switch = %s', switch)
        uri = "/sshc/sshClient"
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <sshClient>
                <firstTimeEnable>$enable</firstTimeEnable>
            </sshClient>''')
        req_data = str_temp.substitute(enable=switch)
        ret, _, _ = OPS_CLIENT.set(uri, req_data)
        if ret != HTTP_OK:
            if switch == 'Enable':
                reason = 'Failed to enable SSH client first-time'
            else:
                reason = 'Failed to disable SSH client first-time'

            raise OPIExecError(reason)

        return OK

    def _del_rsa_peer_key(self):
        """Delete RSA peer key configuration"""
        logging.info('Delete RSA peer key')
        uri = '/rsa/rsaPeerKeys/rsaPeerKey'
        root_elem = etree.Element('rsaPeerKey')
        etree.SubElement(root_elem, 'keyName').text = self.get_key_name()
        req_data = etree.tostring(root_elem, 'UTF-8')
        ret, _, _ = OPS_CLIENT.delete(uri, req_data)
        if ret != HTTP_OK:
            logging.error('Failed to delete RSA peer key')

    def _del_sshc_rsa_key(self, key_type='RSA'):
        """Delete SSH client RSA key configuration"""
        logging.info('Delete SSH client RSA key')
        uri = '/sshc/sshCliKeyCfgs/sshCliKeyCfg'
        root_elem = etree.Element('sshCliKeyCfg')
        etree.SubElement(root_elem, 'serverName').text = self.get_key_name()
        etree.SubElement(root_elem, 'pubKeyType').text = key_type
        req_data = etree.tostring(root_elem, 'UTF-8')
        ret, _, _ = OPS_CLIENT.delete(uri, req_data)
        if ret != HTTP_OK:
            logging.error('Failed to delete SSH client RSA key')

        self._del_rsa_peer_key()

    def get_key_name(self):
        """Get sftp server ip"""
        raise NotImplementedError


class SFTPv4(SFTP):
    """SFTPv4 download class"""

    def __init__(self, url, local_path, vpn_instance):
        self.url = url
        self.local_path = local_path
        self.vpn_instance = vpn_instance

    def get_key_name(self):
        url_tuple = urlparse(self.url)
        return url_tuple.hostname

    def get_req_data(self):
        """Return sftpv4 download request xml message"""
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <sshcConnect>
                <HostAddrIPv4>$serverIp</HostAddrIPv4>
                <commandType>get</commandType>
                <userName>$username</userName>
                <password>$password</password>
                <serverPort>$port</serverPort>
                <localFileName>$localPath</localFileName>
                <remoteFileName>$remotePath</remoteFileName>
                <vpnInstanceName>$vpnInstance</vpnInstanceName>
                <identityKey>ssh-rsa</identityKey>
                <transferType>SFTP</transferType>
            </sshcConnect>''')
        url_tuple = urlparse(self.url)
        try:
            if url_tuple.port is None:
                port = 22
            else:
                port = url_tuple.port
        except ValueError:
            port = 22

        logging.info('Sftp download file using port:%s', port)
        req_data = str_temp.substitute(serverIp=url_tuple.hostname,
                                       username=url_tuple.username,
                                       password=url_tuple.password,
                                       port=port,
                                       remotePath=url_tuple.path[1:],
                                       localPath=self.local_path,
                                       vpnInstance=self.vpn_instance)
        return req_data


class SFTPv6(SFTP):
    """SFTPv6 download class"""

    def __init__(self, url, local_path, vpn_instance):
        self.url = url
        self.local_path = local_path
        self.vpn_instance = vpn_instance

    def get_key_name(self):
        url_tuple = urlparse(self.url)
        idx = url_tuple.netloc.find('@')
        return url_tuple.netloc[idx + 1:]

    def get_req_data(self):
        """Return sftpv4 download request xml message"""
        str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?>
            <sshcConnect>
                <HostAddrIPv6>$serverIp</HostAddrIPv6>
                <commandType>get</commandType>
                <userName>$username</userName>
                <password>$password</password>
                <localFileName>$localPath</localFileName>
                <remoteFileName>$remotePath</remoteFileName>
                <ipv6VpnName>$vpnInstance</ipv6VpnName>
                <identityKey>ssh-rsa</identityKey>
                <transferType>SFTP</transferType>
            </sshcConnect>''')
        url_tuple = urlparse(self.url)
        server_ip = self.get_key_name()
        req_data = str_temp.substitute(serverIp=server_ip,
                                       username=url_tuple.username,
                                       password=url_tuple.password,
                                       remotePath=url_tuple.path[1:],
                                       localPath=self.local_path,
                                       vpnInstance=self.vpn_instance)
        return req_data


def _is_startup_info_valid(startup_info):
    """Does startup info valid
        FILESERVER, SOFTWARE, CONFIG, PATCH, not None
    """
    return startup_info.get('SYSTEM-CONFIG', None) and startup_info.get('FILESERVER', None)


def main_proc(vpn_instance, ip_protocol):
    """
    :param vpn_instance:
    :param ip_protocol:
    :return:
    """
    global REMOTE_PATH_CONFIG

    sys_info = get_system_info()
    slave, _ = has_slave_mpu()  # Check whether slave MPU board exists or not
    logging.info('Get devicetype=%s, esn=%s, mac=%s from the current system', sys_info['productName'],
                 sys_info['esn'], sys_info['mac'])
    if not REMOTE_PATH_IMAGE.get(sys_info['productName']):
        logging.warning(
            "The product name of the current device [{}] not in REMOTE_PATH_IMAGE".format(sys_info['productName']))
    if not REMOTE_PATH_PATCH.get(sys_info['productName']):
        logging.warning(
            "The product name of the current device [{}] not in REMOTE_PATH_PATCH".format(sys_info['productName']))
    if '%s' in REMOTE_PATH_CONFIG:
        REMOTE_PATH_CONFIG = REMOTE_PATH_CONFIG % sys_info['esn']
    startup_info = {'FILESERVER': FILE_SERVER,
                    'SYSTEM-SOFTWARE': REMOTE_PATH_IMAGE.get(sys_info['productName'], ''),
                    'SYSTEM-CONFIG': REMOTE_PATH_CONFIG,
                    'SYSTEM-PAT': REMOTE_PATH_PATCH.get(sys_info['productName'], '')}
    STARTUP.set_startup_info_from_ini_or_cfg(startup_info)
    if not _is_startup_info_valid(startup_info):
        logging.warning('FILESERVER is None or SYSTEM-CONFIG is None, no need download and '
                        'set system startup file')
        return ERR

    ret = check_filename()
    if ret == ERR:
        return ERR

    # check remote file paths
    try:
        remote_path_sha256 = REMOTE_PATH_SHA256
    except NameError:
        remote_path_sha256 = ''
    if not check_file_type_valid(REMOTE_PATH_IMAGE.get(sys_info['productName'], ''), REMOTE_PATH_CONFIG,
                                 REMOTE_PATH_PATCH.get(sys_info['productName'], ''), remote_path_sha256):
        return ERR

    ret, image_file, config_file, patch_file = download_startup_file(startup_info, slave,
                                                                     ip_protocol, vpn_instance)
    if ret == ERR:
        logging.info('failed to download file')
        return ERR

    if check_ztp_continue() is False:
        logging.info('user stop ztp before setting, ztp will reset startup')
        delete_startup_file(image_file, config_file, patch_file, slave)
        return ERR

    ret = set_startup_file(image_file, config_file, patch_file, slave)
    if ret == ERR:
        return ERR

    if not check_ztp_continue():
        logging.info('user stop ztp after setting, ztp will reset startup')
        STARTUP.reset_startup_info(slave)
        return ERR

    set_ztp_last_status('true')
    dhcp_stop()
    try:
        reboot_system()
    except OPIExecError as reason:
        logging.error("reboot failed: {}".format(reason))
        set_ztp_last_status('false')
        STARTUP.reset_startup_info(slave)
        return ERR

    return OK


def main(vpn_instance='', ip_protocol=IPV4):
    """The main function of user script. It is called by ZTP frame, so do not remove or change this function.

    Args:
    Raises:
    Returns: user script processing result
    """
    ip_protocol = ip_protocol.lower()
    try:
        ret = main_proc(vpn_instance, ip_protocol)
    except Exception as reason:
        logging.error(reason)
        trace_info = traceback.format_exc()
        logging.error(trace_info)
        ret = ERR

    finally:
        # Close the OPS connection
        OPS_CLIENT.close()

    return ret


while True:
    try:
        STARTUP = Startup()
        break
    except OPIExecError as ex:
        logging.warning(ex)
    sleep(CHECK_STARTUP_INTERVAL)

DNS = DNSServer()

if __name__ == "__main__":
    main()

Python Script Description

  • The content in bold in this example can be modified based on actual requirements.
  • Do not modify the content that is not in bold in this example. Otherwise, the ZTP function may be unavailable.
  • Do not modify the script logic. Otherwise, an infinite loop may occur during script execution or the script fails to be executed, causing the ZTP function to be unavailable.
  • If the preceding examples do not meet the requirements, contact Huawei engineers.
  • Specify an SHA256 checksum for the script file.

    #sha256sum="126b05cb7ed99956281edef93f72c0f0ab517eb025edfd9cc4f31a37f123c4fc"

    The SHA256 checksum is used to check the integrity of the script file.

    You can use either of the following methods to generate an SHA256 checksum for a script file:

    1. SHA256 calculation tool (such as HashMyFiles)
    2. Run the certutil -hashfile filename SHA256 command provided by the Windows operating system.

    The SHA256 checksum is calculated based on the content following #sha256sum=. In practice, you need to delete the first line in the file, move the following part one line above, calculate the SHA256 checksum, and write #sha256sum= plus the generated SHA256 checksum at the beginning of the file.

    The SHA256 algorithm can be used to verify the integrity of files. This algorithm has high security.

  • Specify the file obtaining mode.

    FILE_SERVER = 'ftp://username:password@hostname/path/'

    You can obtain version files from a TFTP, FTP, or SFTP server. Based on the server used, the path can be any of the following:

    • tftp://hostname
    • ftp://[username[:password]@]hostname
    • sftp://[username[:password]@]hostname[:port]

    username, password, and port are optional.

  • Specify the path and file name of the system software.

    REMOTE_PATH_IMAGE = {
        'NetEngine 8000 F': 'V800R021C00SPC100.cc'
    }

    NetEngine 8000 F indicates the device model.

    V800R021C00SPC100.cc indicates the file name of the system software obtained for the device model.

    If no system software needs to be loaded, leave this parameter blank or do not specify the device type. For example:
    REMOTE_PATH_IMAGE = {
        'NetEngine 8000 F' : ''
    }
    Or
    REMOTE_PATH_IMAGE = {}

    If the device model entered here is inconsistent with the actual device model, the device skips this check and continues the ZTP process. That is, the system considers that this item does not need to be set, and only logs are recorded.

  • Specify the path and name of the configuration file.

    REMOTE_PATH_CONFIG = 'conf_%s.cfg' 

    %s indicates a device ESN, based on which you can obtain a configuration file. This field cannot be edited.

    • You are advised to use the ESN to specify the configuration file of a specific device. Do not use a configuration file that does not contain the ESN for batch configuration.
    • The ESN is case-sensitive and must be the same as that on the device.
    • If the conf_%s.cfg file does not exist on the file server, a message is displayed indicating that the configuration file fails to be downloaded. For example, if the ESN of the device is 2102351HLD10J2000012 and the conf_2102351HLD10J2000012.cfg file does not exist on the file server, an error message is displayed.
  • Specify the path and name of the patch file.

    REMOTE_PATH_PATCH = {
        'NetEngine 8000 F': 'V800R021C00SPC100SPH001.PAT'
    }

    NetEngine 8000 F indicates the device model.

    V800R021C00SPC100SPH001.PAT indicates the file name of the patch software obtained for the device model.

    If no patch file needs to be loaded, leave this parameter blank or do not specify the device type. For example:

    REMOTE_PATH_PATCH = {
        'NetEngine 8000 F' : ''
    }
    Or
    REMOTE_PATH_PATCH = {}
  • Specify the path and name of the SHA256 verification file.

    REMOTE_PATH_SHA256 = 'sha256.txt'

    You can use the SHA256 verification file to check the integrity of the files downloaded by the device.

    For details about the format of the SHA256 verification file, see Version File Integrity Check.

    If the downloaded files do not need to be checked, set this field to ".

  • HTTP message status.
    HTTP_OK = 200
    HTTP_BAD_REQUEST = 400
    HTTP_BAD_RESPONSE = -1

    You do not need to edit this field.

  • Specify the waiting time for initiating a second request after a request failure.
    CONFLICT_RETRY_INTERVAL = 5
  • HTTP message type.
    POST_METHOD = 'POST'
    GET_METHOD = 'GET'
    DELETE_METHOD = 'DELETE'
    PUT_METHOD = 'PUT'

    You do not need to edit this field.

  • Specify the maximum number of retries allowed when the startup information fails to be obtained.

    MAX_TIMES_GET_STARTUP = 120
  • Specify the interval for obtaining device startup information.

    GET_STARTUP_INTERVAL = 15
  • Specify the maximum number of retries allowed when the check boot items fail to be configured for a device equipped with a single main control board.

    MAX_TIMES_CHECK_STARTUP = 205
  • Specify the maximum number of retries allowed when the check boot items fail to be configured for a device equipped with two main control boards.

    MAX_TIMES_CHECK_STARTUP_SLAVE = 265
  • Specify the interval for checking whether the system software is successfully set.

    CHECK_STARTUP_INTERVAL = 5
  • Specify the waiting time before a file is deleted.

    FILE_DELETE_DELAY_TIME = 3
  • Specify the ZTP status value mapping, which is used for logs.
    LAST_STATE_MAP = {'true': 'enable', 'false': 'disable'}
  • Specify the DNS status value mapping, which is used for logs.
    DNS_STATE_MAP = {'true': 'enable', 'false': 'disable'}
  • Specify the maximum number of retries allowed for a download failure.
    FILE_TRANSFER_RETRY_TIMES = 3
  • Specify the waiting time for the next download after a download failure.
    FILE_DOWNLOAD_INTERVAL_TIME = 5
  • Status code of space insufficiency.
    DISK_SPACE_NOT_ENOUGH = 48

    You do not need to edit this field.

  • Define a PNP stop error.
    class PNPStopError()

    You do not need to edit this field.

  • Define an OPS execution error.

    class OPIExecError()

    You do not need to edit this field.

  • Define an error indicating that ZTP is not started.
    class NoNeedZTP2PNPError()

    You do not need to edit this field.

  • Define a device reboot error.
    class SysRebootError()

    You do not need to edit this field.

  • Define a ZTP disabling error.
    class ZTPDisableError()

    You do not need to edit this field.

  • Define the OPS connection class.

    class OPSConnection()

    You do not need to edit this field.

  • Encapsulate the OPS connection.

    self.conn = http.client.HTTPConnection()

    You do not need to edit this field.

  • Invoke the underlying interface of the platform.

    def close()
    def create()
    def delete()
    def get()
    def set()

    You do not need to edit this field.

  • Define the REST standard for requests.

    def _rest_call()

    You do not need to edit this field.

  • Disable DHCP clients, including DHCPv4 and DHCPv6 clients.
    def dhcp_stop()

    You do not need to edit this field.

  • Obtain the working directory of the user.

    def get_cwd()

    You do not need to edit this field.

  • Check whether the files to be downloaded exist.

    def file_exist()

    You do not need to edit this field.

  • Copy files.

    def copy_file()

    You do not need to edit this field.

  • Delete files after an operation failure.

    def delete_file()

    If a file fails to be loaded, all files downloaded by the device must be deleted to roll the device back to the state before ZTP is performed.

    You do not need to edit this field.

  • Deletes files on all main control boards.
    def delete_file_all()

    You do not need to edit this field.

  • Check whether the device has a standby main control board.

    def has_slave_mpu()

    You do not need to edit this field.

  • Obtain the device's system information.

    def get_system_info()

    You do not need to edit this field.

  • Reboot the system.
    def reboot_system()

    You do not need to edit this field.

  • Check whether the parameter path is valid.
    def check_file_type_valid()

    You do not need to edit this field.

  • Obtain information about the next startup of the device.

    def _get_startup_info()

    You do not need to edit this field.

  • Specify the system software for the next startup.
    def _set_startup_image_file()

    You do not need to edit this field.

  • Specify the configuration file for the next startup.
    def _set_startup_config_file()

    You do not need to edit this field.

  • Delete the configuration file for the next startup.
    def _del_startup_config_file()

    You do not need to edit this field.

  • Specify the patch file for the next startup.
    def _set_startup_patch_file()

    You do not need to edit this field.

  • Reset the patch file for the next startup.
    def _reset_startup_patch_file()

    You do not need to edit this field.

  • Check whether the files for the next startup are ready.
    def _check_next_startup_file()

    You do not need to edit this field.

  • Configure information about the next startup.
    def set_startup_info()

    You do not need to edit this field.

  • Reset information about the next startup and delete the downloaded files.
    def reset_startup_info()

    You do not need to edit this field.

  • Enable SHA256 check for files.

    def sha256sum()
    def sha256_get_from_file()
    def sha256_check_with_first_line()
    def sha256_check_with_dic()
    def parse_sha256_file()
    def verify_and_parse_sha256_file()

    You do not need to edit this field.

  • Check whether the username, password, and file name contain special characters.

    def check_parameter()
    def check_filename()

    You do not need to edit this field.

  • Download the configuration file.
    def download_cfg_file()

    You do not need to edit this field.

  • Download the patch file.
    def download_patch_file()

    You do not need to edit this field.

  • Download the system software.
    def download_image_file()

    You do not need to edit this field.

  • Download the files for the next startup.
    def download_startup_file()

    You do not need to edit this field.

  • Configure the files for the next startup.
    def set_startup_file()

    You do not need to edit this field.

  • Delete the files for the next startup.
    def delete_startup_file()

    You do not need to edit this field.

  • Set the ZTP execution status.
    def set_ztp_last_status()

    You do not need to edit this field.

  • Obtain the ZTP enabling status.
    def get_ztp_enable_status()

    You do not need to edit this field.

  • Parse the ZTP execution environment.
    def parse_environment()
    def get_ztp_exit_environment()

    You do not need to edit this field.

  • Check whether the ZTP process can continue.
    def check_ztp_continue()

    You do not need to edit this field.

  • Configure whether to globally enable DNS.
    def _set_dns_enable_switch()

    You do not need to edit this field.

  • Add the DNS IPv4 server configuration.
    def add_dns_servers_ipv4()

    You do not need to edit this field.

  • Delete the DNS IPv4 server configuration.
    def del_dns_servers_ipv4()

    You do not need to edit this field.

  • Resolve the domain name.

    def get_addr_by_hostname()

    You do not need to edit this field.

  • Define the file download parameters.

    def download_file()

    You do not need to edit this field.

  • Start downloading files.
    def start()

    You do not need to edit this field.

  • Return the URI for download requests.
    def get_uri()

    You do not need to edit this field.

  • Return the XML messages for download requests.
    def get_req_data()

    You do not need to edit this field.

  • Specify the operations to be performed before files are downloaded.
    def pre_download()

    You do not need to edit this field.

  • Specify the operations to be performed after files are downloaded.
    def after_download()

    You do not need to edit this field.

  • Set the attributes for first-time authentication on an SSH client.

    def _set_sshc_first_time()

    You do not need to edit this field.

  • Delete the RSA key.

    def _del_rsa_peer_key()

    You do not need to edit this field.

  • Delete the SSH server address and RSA key.

    def _del_sshc_rsa_key()

    You do not need to edit this field.

  • Obtain the SFTP server address.

    def get_key_name()

    You do not need to edit this field.

  • Define the overall ZTP process.

    def main_proc()
    def main()
    if __name__ == "__main__":
        main()

    You do not need to edit this field.

    The main function is mandatory. If the main function is unavailable, the script cannot be executed.

Copyright © Huawei Technologies Co., Ltd.
Copyright © Huawei Technologies Co., Ltd.
< Previous topic Next topic >