ZTP supports Python script intermediate files that store device and version file information. A ZTP-enabled device can execute the Python script to download version files.
The file name extension of a Python script file must be .py, in the format shown in Example of a Python Script File. Use the Python 3.7 syntax to compile or modify the script file. For details about the fields in a Python script file, see Python Script Description.
The following preconfiguration script file is only an example and needs to be modified based on deployment requirements.
The SHA256 checksum in the following file is only an example.
#sha256sum="126b05cb7ed99956281edef93f72c0f0ab517eb025edfd9cc4f31a37f123c4fc" #!/usr/bin/env python # coding=utf-8 # # Copyright (C) Huawei Technologies Co., Ltd. 2008-2013. All rights reserved. # ---------------------------------------------------------------------------------------------------------------------- # History: # Date Author Modification # 20180122 Author created file. # ---------------------------------------------------------------------------------------------------------------------- """ Zero Touch Provisioning (ZTP) enables devices to automatically load version files including system software, patch files, configuration files when the device starts up, the devices to be configured must be new devices or have no configuration files. This is a sample of Zero Touch Provisioning user script. You can customize it to meet the requirements of your network environment. """ import hashlib import http.client import logging import os import re import string import traceback import xml.etree.ElementTree as etree from time import sleep from urllib.parse import urlparse import ops # error code OK = 0 ERR = 1 # File server in which stores the necessary system software, configuration and patch files: # 1) Specify the file server which supports the following format. # tftp://hostname # ftp://[username[:password]@]hostname # sftp://[username[:password]@]hostname[:port] # 2) Do not add a trailing slash at the end of file server path. FILE_SERVER = 'ftp://username:password@hostname/path/' # Remote file paths: # 1) The path may include directory name and file name. # 2) If file name is not specified, indicate the procedure can be skipped. # 3) If you do not want image, please set it as REMOTE_PATH_IMAGE = {} or REMOTE_PATH_IMAGE = {'DEVICETYPE': ''} # File paths of system software on file server, filename extension is '.cc'. REMOTE_PATH_IMAGE = { 'NetEngine 8000 F': 'V800R021C00SPC100.cc' } # File path of configuration file on file server, filename extension is '.cfg', '.zip' or '.dat'. REMOTE_PATH_CONFIG = 'conf_%s.cfg' # If you do not want patch, please set it as REMOTE_PATH_PATCH = {} or REMOTE_PATH_PATCH = {'DEVICETYPE': ''} # File path of patch file on file server, filename extension is '.pat' REMOTE_PATH_PATCH = { 'NetEngine 8000 F': 'V800R021C00SPC100SPH001.PAT' } # File path of sha256 file, contains sha256 value of image / patch / configuration, file extension is '.txt' REMOTE_PATH_SHA256 = 'sha256.txt' # constant # autoconfig HTTP_OK = 200 HTTP_BAD_REQUEST = 400 HTTP_BAD_RESPONSE = -1 CONFLICT_RETRY_INTERVAL = 5 POST_METHOD = 'POST' GET_METHOD = 'GET' DELETE_METHOD = 'DELETE' PUT_METHOD = 'PUT' MAX_TIMES_GET_STARTUP = 120 GET_STARTUP_INTERVAL = 15 MAX_TIMES_CHECK_STARTUP = 205 MAX_TIMES_CHECK_STARTUP_SLAVE = 265 CHECK_STARTUP_INTERVAL = 5 FILE_DELETE_DELAY_TIME = 3 # ztplib LAST_STATE_MAP = {'true': 'enable', 'false': 'disable'} # DNS DNS_STATE_MAP = {'true': 'enable', 'false': 'disable'} # download FILE_TRANSFER_RETRY_TIMES = 3 FILE_DOWNLOAD_INTERVAL_TIME = 5 DISK_SPACE_NOT_ENOUGH = 48 IPV4 = 'ipv4' IPV6 = 'ipv6' OPS_CLIENT = None # exception class PNPStopError(Exception): """Stop by pnp""" class OPIExecError(Exception): """OPS Connection Exception""" class NoNeedZTP2PNPError(Exception): """No need start ztp""" class SysRebootError(Exception): """Device reboot error""" class ZTPDisableError(Exception): """ZTP set disable error""" # opslib class OPSConnection: """Make an OPS connection instance.""" __slots__ = ['host', 'port', 'headers', 'conn'] def __init__(self, host, port=80): self.host = host self.port = port self.headers = { 'Content-type': 'application/xml', 'Accept': 'application/xml' } self.conn = http.client.HTTPConnection(self.host, self.port) def close(self): """Close the connection""" self.conn.close() def create(self, uri, req_data, need_retry=True): """Create a resource on the server""" ret = self._rest_call(POST_METHOD, uri, req_data) if ret[0] != HTTP_OK and need_retry: sleep(CONFLICT_RETRY_INTERVAL) ret = self._rest_call(POST_METHOD, uri, req_data) return ret def delete(self, uri, req_data, need_retry=True): """Delete a resource on the server""" ret = self._rest_call(DELETE_METHOD, uri, req_data) if ret[0] != HTTP_OK and need_retry: sleep(CONFLICT_RETRY_INTERVAL) ret = self._rest_call(DELETE_METHOD, uri, req_data) return ret def get(self, uri, req_data=None, need_retry=True): """Retrieve a resource from the server""" ret = self._rest_call(GET_METHOD, uri, req_data) if (ret[0] != HTTP_OK or ret[2] == '') and need_retry: sleep(CONFLICT_RETRY_INTERVAL) ret = self._rest_call(GET_METHOD, uri, req_data) return ret def set(self, uri, req_data, need_retry=True): """Update a resource on the server""" ret = self._rest_call(PUT_METHOD, uri, req_data) if ret[0] != HTTP_OK and need_retry: sleep(CONFLICT_RETRY_INTERVAL) ret = self._rest_call(PUT_METHOD, uri, req_data) return ret def _rest_call(self, method, uri, req_data): """REST call""" body = '' if req_data is None else req_data try: self.conn.request(method, uri, body, self.headers) except http.client.CannotSendRequest: logging.warning('An error occurred during http request, try to send request again') self.close() self.conn = http.client.HTTPConnection(self.host, self.port) self.conn.request(method, uri, body, self.headers) except http.client.InvalidURL: logging.warning('Failed to find url: %s in OPS whitelist', uri) return HTTP_BAD_REQUEST, '', '' try: response = self.conn.getresponse() except AttributeError: logging.warning('An error occurred during http response, try again') return HTTP_BAD_RESPONSE, '', '' rest_message = response.read() if isinstance(rest_message, bytes): rest_message = str(rest_message, 'iso-8859-1') # logging.debug('uri = %s ret = %s \n %s \n %s', uri, response.status, req_data, rest_message) ret = (response.status, response.reason, rest_message) return ret OPS_CLIENT = OPSConnection("localhost") # pnplib def dhcp_stop(): """Stop DHCP client, include dhcpv4 and dhcpv6.""" logging.info('Stopping dhcp client') uri = '/pnp/stopPnp' req_data = '''<?xml version="1.0" encoding="UTF-8"?> <stopPnp/>''' ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) if ret != HTTP_OK: # ignore stop pnp err logging.warning('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) logging.warning('Failed to stop dhcp client') return logging.info('DHCP client has stopped') return # commlib def get_cwd(): """Get the full filename of the current working directory""" logging.info("Get the current working directory...") namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'} uri = "/vfm/pwds/pwd" req_data = '''<?xml version="1.0" encoding="UTF-8"?> <pwd> <dictionaryName/> </pwd> ''' ret, _, rsp_data = OPS_CLIENT.get(uri, req_data) if ret != http.client.OK or rsp_data is '': raise OPIExecError('Failed to get the current working directory') logging.info("pwd rsp_data: {}".format(rsp_data)) root_elem = etree.fromstring(rsp_data) uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:dictionaryName' elem = root_elem.find(uri, namespaces) if elem is None: raise OPIExecError('Failed to get the current working directory for no "directoryName" element') return elem.text def file_exist(file_name, dir_path=None): """Returns True if file_path refers to an existing file, otherwise returns False""" uri = '/vfm/dirs/dir' str_temp_1 = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <dir> <fileName>$fileName</fileName> </dir>''') str_temp_2 = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <dir> <dirName>$dirName</dirName> <fileName>$fileName</fileName> </dir>''') if dir_path: req_data = str_temp_2.substitute(dirName=dir_path, fileName=file_name) else: req_data = str_temp_1.substitute(fileName=file_name) ret, _, rsp_data = OPS_CLIENT.get(uri, req_data) if ret != HTTP_OK or rsp_data == '': return False root_elem = etree.fromstring(rsp_data) namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'} uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:fileName' elem = root_elem.find(uri, namespaces) if elem is None: return False return True def copy_file(src_path, dest_path): """Copy a file""" logging.info('Copy file %s to %s', src_path, dest_path) if 'slave' in dest_path: file_name = dest_path.split(':/')[1] if file_exist(file_name, 'slave#cfcard:/'): logging.info('Detect dest file exist, delete it first') delete_file(dest_path) uri = '/vfm/copyFile' str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <copyFile> <srcFileName>$src</srcFileName> <desFileName>$dest</desFileName> </copyFile>''') req_data = str_temp.substitute(src=src_path, dest=dest_path) ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data, False) if ret != HTTP_OK: file_name = dest_path.split(':/')[1] if file_exist(file_name, "slave#cfcard:/"): logging.info('Exists file copy fragment, delete it') delete_file(dest_path) logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) logging.error('Failed to copy %s to %s', src_path, dest_path) return False logging.info('succeed to copy') return True def delete_file(file_path): """Delete a file permanently""" if file_path is None or file_path == '': return logging.info('Delete file %s permanently', file_path) uri = '/vfm/deleteFileUnRes' str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <deleteFileUnRes> <fileName>$filePath</fileName> </deleteFileUnRes> ''') req_data = str_temp.substitute(filePath=file_path) ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) logging.error('Failed to delete the file %s permanently', file_path) def delete_file_all(file_path, slave, protect_file_list=None): """Delete a file permanently on all main boards""" if not file_path: return if protect_file_list: for protect_file in protect_file_list: if file_path == protect_file: return file_name = os.path.basename(file_path) file_path_t = file_path[:len(file_path)-len(file_name)] if file_exist(file_name, file_path_t): delete_file(file_path) if slave and file_exist(file_name, 'slave#'+file_path_t): delete_file('slave#' + file_path) def has_slave_mpu(): """Whether device has slave MPU, returns a bool value :raise OPIExecError """ logging.info("Test whether device has slave MPU") uri = '/devm/phyEntitys' req_data = '''<?xml version="1.0" encoding="UTF-8"?> <phyEntitys> <phyEntity> <entClass>mpuModule</entClass> <entStandbyState/> <position/> </phyEntity> </phyEntitys>''' has_slave = False mpu_slot = {}.fromkeys(('master', 'slave')) ret, err_code, rsp_data = OPS_CLIENT.get(uri, req_data) if ret != HTTP_OK or rsp_data == '': logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to get the device slave information') root_elem = etree.fromstring(rsp_data) namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'} uri = 'data{0}/vrp:phyEntity'.format(uri.replace('/', '/vrp:')) for entity in root_elem.findall(uri, namespaces): elem = entity.find("vrp:entStandbyState", namespaces) if elem is not None and elem.text.lower().find('slave') >= 0: has_slave = True elem = entity.find("vrp:position", namespaces) if elem is not None: mpu_slot['slave'] = elem.text if elem is not None and elem.text.lower().find('master') >= 0: elem = entity.find("vrp:position", namespaces) if elem is not None: mpu_slot['master'] = elem.text logging.info('Device has slave: %s', has_slave) return has_slave, mpu_slot def get_system_info(): """Get device product esn mac :raise: OPIExecError """ logging.info("Get the system information...") uri = "/system/systemInfo" req_data = '''<?xml version="1.0" encoding="UTF-8"?> <systemInfo> <productName/> <esn/> <mac/> </systemInfo> ''' sys_info = {}.fromkeys(('productName', 'esn', 'mac'), '') ret, err_code, rsp_data = OPS_CLIENT.get(uri, req_data) if ret != HTTP_OK or rsp_data == '': logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to get the system information') root_elem = etree.fromstring(rsp_data) namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'} uri = 'data' + uri.replace('/', '/vrp:') nslen = len(namespaces['vrp']) elem = root_elem.find(uri, namespaces) if elem is not None: for child in elem: tag = child.tag[nslen + 2:] if tag in list(sys_info.keys()): sys_info[tag] = child.text return sys_info def reboot_system(save_config='false'): """Reboot system.""" logging.info('System will reboot to make the configuration take effect') if save_config not in ['true', 'false']: return sleep(10) uri = "/devm/reboot" str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <reboot> <saveConfig>$saveConfig</saveConfig> </reboot>''') req_data = str_temp.substitute(saveConfig=save_config) ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) logging.info("/devm/reboot/: rep_data[{}]".format(str(rsp_data))) if ret != HTTP_OK or rsp_data == '': logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to execute the reboot system operation') def check_file_type_valid(image, config, patch, sha256_file): """Test whether argument paths are valid.""" logging.info("Test whether argument paths are valid...") # check image file path file_name = os.path.basename(image) if file_name is not '' and not file_name.lower().endswith('.cc'): logging.error('Error: Invalid filename extension of system software') return False # check config file path file_name = os.path.basename(config) file_name = file_name.lower() _, ext = os.path.splitext(file_name) if file_name is not '' and ext not in ['.cfg', '.zip', '.dat']: logging.error('Error: Invalid filename extension of configuration file') return False # check patch file path file_name = os.path.basename(patch) if file_name is not '' and not file_name.lower().endswith('.pat'): logging.error('Error: Invalid filename extension of patch file') return False # check sha256 file path file_name = os.path.basename(sha256_file) if file_name is not '' and not file_name.lower().endswith('.txt'): logging.error('Error: Invalid filename extension of %s file', sha256_file) return False return True # startuplib class StartupInfo: """Startup configuration information image: startup system software config: startup saved-configuration file patch: startup patch package """ def __init__(self, image=None, config=None, patch=None): self.image = image self.config = config self.patch = patch class Startup: """Startup configuration information current: current startup configuration next: current next startup configuration """ def __init__(self): self.current, self.next = self._get_startup_info() self.startup_info_from_ini_or_cfg = {} self.startup_info_before_set = StartupInfo() @staticmethod def _get_startup_info(retry=True): """Get device startup information :raise opslib.OPIExecError """ uri = '/cfg/startupInfos/startupInfo' req_data = '''<?xml version="1.0" encoding="UTF-8"?> <startupInfo> <position/> <configedSysSoft/> <curSysSoft/> <nextSysSoft/> <curStartupFile/> <nextStartupFile/> <curPatchFile/> <nextPatchFile/> </startupInfo>''' if retry is True: retry_time = MAX_TIMES_GET_STARTUP else: retry_time = 1 cnt = 0 elem = None namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'} ns_len = len(namespaces['vrp']) path = 'data' + uri.replace('/', '/vrp:') # match path while cnt < retry_time: ret, _, rsp_data = OPS_CLIENT.get(uri, req_data) if ret != HTTP_OK or rsp_data == '': cnt += 1 logging.warning('Failed to get the startup information') # sleep to wait for system ready when no query result sleep(GET_STARTUP_INTERVAL) continue root_elem = etree.fromstring(rsp_data) elem = root_elem.find(path, namespaces) if elem is not None: break logging.warning('No query result while getting startup info') # sleep to wait for system ready when no query result sleep(GET_STARTUP_INTERVAL) cnt += 1 if elem is None: raise OPIExecError('Failed to get the startup information') current = StartupInfo() # current startup info curnext = StartupInfo() # next startup info for child in elem: # skip the namespace, '{namespace}text' tag = child.tag[ns_len + 2:] if tag == 'curSysSoft': current.image = child.text elif tag == 'nextSysSoft': curnext.image = child.text elif tag == 'curStartupFile' and child.text != 'NULL': current.config = child.text elif tag == 'nextStartupFile' and child.text != 'NULL': curnext.config = child.text elif tag == 'curPatchFile' and child.text != 'NULL': current.patch = child.text elif tag == 'nextPatchFile' and child.text != 'NULL': curnext.patch = child.text else: continue return current, curnext @staticmethod def _set_startup_image_file(file_path, slave=True): """Set the next startup system software""" file_name = os.path.basename(file_path) logging.info('Set the next startup system software to %s, please wait a moment', file_name) uri = '/sum/startupbymode' str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <startupbymode> <softwareName>$fileName</softwareName> <mode>$startupMode</mode> </startupbymode>''') if slave: startup_mode = 'STARTUP_MODE_ALL' else: startup_mode = 'STARTUP_MODE_PRIMARY' req_data = str_temp.substitute(fileName=file_name, startupMode=startup_mode) # it is a action operation, so use create for HTTP POST ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to set startup system software') @staticmethod def _set_startup_config_file(file_path): """Set the next startup saved-configuration file""" file_name = os.path.basename(file_path) logging.info('Set the next startup saved-configuration file to %s', file_name) uri = '/cfg/setStartup' str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <setStartup> <fileName>$fileName</fileName> </setStartup>''') req_data = str_temp.substitute(fileName=file_name) # it is a action operation, so use create for HTTP POST ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to set startup configuration file') @staticmethod def _del_startup_config_file(): """Delete startup config file""" logging.info('Delete the next startup config file') uri = '/cfg/clearStartup' req_data = '''<?xml version="1.0" encoding="UTF-8"?> <clearStartup> </clearStartup>''' # it is a action operation, so use create for HTTP POST ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to delete startup configuration file') @staticmethod def _set_startup_patch_file(file_path): """Set the next startup patch file""" file_name = os.path.basename(file_path) logging.info('Set the next startup patch file to %s', file_name) uri = "/patch/startup" str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <startup> <packageName>$fileName</packageName> </startup>''') req_data = str_temp.substitute(fileName=file_name) # it is a action operation, so use create for HTTP POST ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to set startup patch file') @staticmethod def _reset_startup_patch_file(): """Reset patch file for system to startup""" logging.info('Reset the next startup patch file') uri = '/patch/resetpatch' req_data = '''<?xml version="1.0" encoding="UTF-8"?> <resetpatch/>''' # it is a action operation, so use create for HTTP POST ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to reset startup patch file') def _check_next_startup_file(self, file_name, check_item, slave): """Check next startup file ready check_item: [image, config, patch] """ if check_item not in ['image', 'config', 'patch']: return True logging.info('Check the next startup %s information', check_item) if slave: check_time = MAX_TIMES_CHECK_STARTUP_SLAVE else: check_time = MAX_TIMES_CHECK_STARTUP cnt = 0 while cnt < check_time: _, next_startup = self._get_startup_info() startup_file_name = getattr(next_startup, check_item) if startup_file_name == file_name: sleep(CHECK_STARTUP_INTERVAL) logging.info('The next system %s check successfully', check_item) return True # sleep to wait for system ready when no query result sleep(CHECK_STARTUP_INTERVAL) if cnt % 12 == 0: # logging every minute logging.info('Checking the next startup %s, please wait a moment', check_item) cnt += 1 logging.warning('The next system %s is not ready', check_item) return False def set_startup_info(self, image_file, config_file, patch_file, slave): """Set the next startup information.""" # backup startup_info set by user cur_startup, cur_next_startup = self._get_startup_info() self.startup_info_before_set.image = cur_next_startup.image self.startup_info_before_set.patch = cur_next_startup.patch self.startup_info_before_set.config = cur_next_startup.config logging.info("save startup config before ztp setting") logging.info('Start to set next startup information') # 1. Set next startup system software if image_file is not None: try: self._set_startup_image_file(image_file) if self._check_next_startup_file(image_file, 'image', slave) is False: raise OPIExecError('Failed to check the next startup image file') except OPIExecError as reason: logging.error(reason) delete_file_all(image_file, slave, [cur_startup.image, cur_next_startup.image]) self.reset_startup_info(slave) raise # 2. Set next startup patch file if patch_file is not None: try: self._set_startup_patch_file(patch_file) if self._check_next_startup_file(patch_file, 'patch', slave) is False: raise OPIExecError('Failed to check the next startup patch file') except OPIExecError as reason: logging.error(reason) delete_file_all(patch_file, slave, [cur_startup.patch, cur_next_startup.patch]) self.reset_startup_info(slave) raise # 3. Set next startup config file if config_file is not None: try: self._set_startup_config_file(config_file) if self._check_next_startup_file(config_file, 'config', slave) is False: raise OPIExecError('Failed to check the next startup config file') except OPIExecError as reason: logging.error(reason) delete_file_all(config_file, slave, [cur_startup.config, cur_next_startup.config]) self.reset_startup_info(slave) raise def reset_startup_info(self, slave): """Reset startup info and delete the downloaded files""" logging.info('Start to reset next startup information') if not self.startup_info_before_set.image: logging.error('image of roll back point is None') return cur_startup, next_startup = self._get_startup_info() # 1. Reset next startup config file and delete it try: # user configure startup info after ZTP if next_startup.config != self.startup_info_from_ini_or_cfg.get("SYSTEM-CONFIG"): logging.info("no need to reset startup config") if self.startup_info_from_ini_or_cfg.get("SYSTEM-CONFIG"): sleep(FILE_DELETE_DELAY_TIME) delete_file_all(self.startup_info_from_ini_or_cfg.get("SYSTEM-CONFIG"), slave, [cur_startup.config, next_startup.config]) # user do not configure startup info elif next_startup.config != self.startup_info_before_set.config: logging.info("reset startup config to the beginning") if self.startup_info_before_set.config is None: self._del_startup_config_file() else: self._set_startup_config_file(self.startup_info_before_set.config) if self._check_next_startup_file(self.startup_info_before_set.config, 'config', slave) is not True: raise OPIExecError('Failed to check the next startup config file') if next_startup.config: sleep(FILE_DELETE_DELAY_TIME) delete_file_all(next_startup.config, slave, [cur_startup.config, self.startup_info_before_set.config]) except Exception as reason: logging.error(reason) # 2. Reset next startup patch file and delete it try: # user configure startup info after ZTP if next_startup.patch != self.startup_info_from_ini_or_cfg.get("SYSTEM-PAT"): logging.info("no need to reset startup patch") if self.startup_info_from_ini_or_cfg.get("SYSTEM-PAT"): sleep(FILE_DELETE_DELAY_TIME) delete_file_all(self.startup_info_from_ini_or_cfg.get("SYSTEM-PAT"), slave, [cur_startup.patch, next_startup.patch]) # user do not configure startup info elif next_startup.patch != self.startup_info_before_set.patch: logging.info("reset startup patch to the beginning") if self.startup_info_before_set.patch is None: self._reset_startup_patch_file() else: self._set_startup_patch_file(self.startup_info_before_set.patch) if self._check_next_startup_file(self.startup_info_before_set.patch, 'patch', slave) is not True: raise OPIExecError('Failed to check the next startup patch file') if next_startup.patch: sleep(FILE_DELETE_DELAY_TIME) delete_file_all(next_startup.patch, slave, [cur_startup.patch, self.startup_info_before_set.patch]) except Exception as reason: logging.error(reason) # 3. Reset next startup system software and delete it try: # user configure startup info after ZTP if next_startup.image != self.startup_info_from_ini_or_cfg.get("SYSTEM-SOFTWARE"): logging.info("no need to reset startup image") if self.startup_info_from_ini_or_cfg.get("SYSTEM-SOFTWARE"): sleep(FILE_DELETE_DELAY_TIME) delete_file_all(self.startup_info_from_ini_or_cfg.get("SYSTEM-SOFTWARE"), slave, [cur_startup.image, next_startup.image]) # user do not configure startup info elif next_startup.image != self.startup_info_before_set.image: logging.info("reset startup config to the beginning") self._set_startup_image_file(self.startup_info_before_set.image) if self._check_next_startup_file(self.startup_info_before_set.image, 'image', slave) is not True: raise OPIExecError('Failed to check the next startup image file') if next_startup.image: sleep(FILE_DELETE_DELAY_TIME) delete_file_all(next_startup.image, slave, [cur_startup.image, self.startup_info_before_set.image]) except Exception as reason: logging.error(reason) def set_startup_info_from_ini_or_cfg(self, startup_info): for item_key in ['SYSTEM-SOFTWARE', 'SYSTEM-CONFIG', 'SYSTEM-PAT']: if not startup_info[item_key]: self.startup_info_from_ini_or_cfg[item_key] = startup_info[item_key] else: self.startup_info_from_ini_or_cfg[item_key] = 'cfcard:/' + startup_info[item_key] def convert_byte_to_str(data): result = data if not isinstance(data, str): result = str(data, "iso-8859-1") return result def sha256sum(fname, need_skip_first_line=False): """ Calculate sha256 num for this file. """ def read_chunks(fhdl): '''read chunks''' chunk = fhdl.read(8096) while chunk: yield chunk chunk = fhdl.read(8096) else: fhdl.seek(0) sha256_obj = hashlib.sha256() if isinstance(fname, str): with open(fname, "rb") as fhdl: # skip the first line fhdl.seek(0) if need_skip_first_line: fhdl.readline() for chunk in read_chunks(fhdl): sha256_obj.update(chunk) elif fname.__class__.__name__ in ["StringIO", "StringO"]: for chunk in read_chunks(fname): sha256_obj.update(chunk) else: pass return sha256_obj.hexdigest() def sha256_get_from_file(fname): """Get sha256 num form file, stored in first line""" with open(fname, "rb") as fhdl: fhdl.seek(0) line_first = convert_byte_to_str(fhdl.readline()) # if not match pattern, the format of this file is not supported if not re.match('^#sha256sum="[\\w]{64}"[\r\n]+$', line_first): return 'None' return line_first[12:76] def sha256_check_with_first_line(fname): """Validate sha256 for this file""" work_fname = os.path.join("ztp", fname) sha256_calc = sha256sum(work_fname, True) sha256_file = sha256_get_from_file(work_fname) if sha256_file.lower() != sha256_calc: logging.warning('SHA256 check failed, file %s', fname) logging.warning('SHA256 checksum of the file "%s" is %s', fname, sha256_calc) logging.warning('SHA256 checksum received from the file "%s" is %s', fname, sha256_file) return False return True def parse_sha256_file(fname): """parse sha256 file""" def read_line(fhdl): """read a line by loop""" line = fhdl.readline() while line: yield line line = fhdl.readline() else: fhdl.seek(0) sha256_dic = {} work_fname = os.path.join("ztp", fname) with open(work_fname, "rb") as fhdl: for line in read_line(fhdl): line_spilt = convert_byte_to_str(line).split() if 2 != len(line_spilt): continue dic_tmp = {line_spilt[0]: line_spilt[1]} sha256_dic.update(dic_tmp) return sha256_dic def verify_and_parse_sha256_file(fname): """ verify data integrity of sha256 file and parse this file format of this file is like: ------------------------------------------------------------------ file-name sha256 conf_5618642831132.cfg 1254b2e49d3347c4147a90858fa5f59aa2594b7294304f34e7da328bf3cdfbae ------------------------------------------------------------------ """ if not sha256_check_with_first_line(fname): return ERR, None return OK, parse_sha256_file(fname) def sha256_check_with_dic(sha256_dic, fname): """sha256 check with dic""" if fname not in sha256_dic: logging.info('sha256_dic does not has key %s, no need to do sha256 verification', fname) return True sha256sum_result = sha256sum(fname, False) if sha256_dic[fname].lower() == sha256sum_result: logging.info('SHA256 check %s successfully', fname) return True logging.warning('SHA256 check failed, file %s', fname) logging.warning('SHA256 checksum of the file "%s" is %s', fname, sha256sum_result) logging.warning('SHA256 checksum received for the file "%s" is %s', fname, sha256_dic[fname]) return False def check_parameter(aset): seq = ['&', '>', '<', '"', "'"] if aset: for c in seq: if c in aset: return True return False def check_filename(): sys_info = get_system_info() url_tuple = urlparse(FILE_SERVER) if check_parameter(url_tuple.username) or check_parameter(url_tuple.password): logging.error('Invalid username or password, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.") return ERR file_name = os.path.basename(REMOTE_PATH_IMAGE.get(sys_info['productName'], '')) if file_name is not '' and check_parameter(file_name): logging.error( 'Invalid filename of system software, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.") return ERR file_name = os.path.basename(REMOTE_PATH_CONFIG) if file_name is not '' and check_parameter(file_name): logging.error( 'Invalid filename of configuration file, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.") return ERR file_name = os.path.basename(REMOTE_PATH_PATCH.get(sys_info['productName'], '')) if file_name is not '' and check_parameter(file_name): logging.error( 'Invalid filename of patch file, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.") return ERR try: file_name = os.path.basename(REMOTE_PATH_SHA256) except NameError: file_name = '' if file_name is not '' and check_parameter(file_name): logging.error( 'Invalid filename of sha256 file, the name should not contain: ' + '&' + ' >' + ' <' + ' "' + " '.") return ERR return OK def download_cfg_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic): """ Download configuration file """ url = os.path.join(startup_info['FILESERVER'], startup_info['SYSTEM-CONFIG']) local_path_config = os.path.join('cfcard:', os.path.basename(startup_info['SYSTEM-CONFIG'])) delete_file_all(local_path_config, slave) ret = download_file(url, os.path.basename(local_path_config), ip_protocol, vpn_instance) if ret == ERR or not file_exist(os.path.basename(url)): logging.error('%s download fail', local_path_config) return False, local_path_config if sha256_val_dic is not None: if not startup_info['SYSTEM-CONFIG']: return False, local_path_config file_name = os.path.basename(startup_info['SYSTEM-CONFIG']) if not sha256_check_with_dic(sha256_val_dic, file_name): logging.error('Error: SHA256 check failed, file "%s"' % file_name) return False, local_path_config if slave: ret = copy_file(local_path_config, 'slave#' + local_path_config) if ret is False: logging.error('%s copy fail', local_path_config) return False, local_path_config return True, local_path_config def download_patch_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic): """ Download patch file """ file_name = os.path.basename(startup_info['SYSTEM-PAT']) url = os.path.join(startup_info['FILESERVER'], startup_info['SYSTEM-PAT']) local_path_patch = os.path.join('cfcard:', file_name) delete_file_all(local_path_patch, slave) # Delete the software package with the same name as the non-startup software package from the disk to avoid space insufficiency. ret = download_file(url, file_name, ip_protocol, vpn_instance) if ret not in [OK, DISK_SPACE_NOT_ENOUGH] or not file_exist(file_name): logging.error('%s download fail', local_path_patch) return ERR, local_path_patch if ret == DISK_SPACE_NOT_ENOUGH: logging.error('The space of disk is not enough') return DISK_SPACE_NOT_ENOUGH, local_path_patch if not sha256_check_with_dic(sha256_val_dic, file_name): logging.error('Error: SHA256 check failed, file "%s"' % file_name) return ERR, local_path_patch if slave: ret = copy_file(local_path_patch, 'slave#' + local_path_patch) if ret is False: logging.error('%s copy fail', local_path_patch) return ERR, local_path_patch return OK, local_path_patch def download_image_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic): """ Download system software """ file_name = os.path.basename(startup_info['SYSTEM-SOFTWARE']) url = startup_info['FILESERVER'] + '/' + startup_info['SYSTEM-SOFTWARE'] local_path_image = os.path.join('cfcard:', file_name) delete_file_all(local_path_image, slave) # Delete the software package with the same name as the non-startup software package from the disk to avoid space insufficiency. ret = download_file(url, file_name, ip_protocol, vpn_instance) if ret not in [OK, DISK_SPACE_NOT_ENOUGH] or not file_exist(file_name): logging.error('%s download fail', local_path_image) return ERR, local_path_image if ret == DISK_SPACE_NOT_ENOUGH: logging.error('The space of disk is not enough') return DISK_SPACE_NOT_ENOUGH, local_path_image if not sha256_check_with_dic(sha256_val_dic, file_name): logging.error('Error: SHA256 check failed, file "%s"' % file_name) return ERR, local_path_image if slave: ret = copy_file(local_path_image, 'slave#' + local_path_image) if ret is False: logging.error('%s copy fail', local_path_image) return ERR, local_path_image return OK, local_path_image def download_startup_file(startup_info, slave, ip_protocol, vpn_instance): """Download startup file""" # init here local_path_config = None local_path_patch = None local_path_image = None # current STARTUP_INFO cur_startup, next_startup = STARTUP._get_startup_info() cur_config = None if not cur_startup.config else os.path.basename(cur_startup.config) cur_patch = None if not cur_startup.patch else os.path.basename(cur_startup.patch) cur_image = None if not cur_startup.image else os.path.basename(cur_startup.image) next_config = None if not next_startup.config else os.path.basename(next_startup.config) next_patch = None if not next_startup.patch else os.path.basename(next_startup.patch) next_image = None if not next_startup.image else os.path.basename(next_startup.image) # download sha256 file first, used to verify data integrity of files which will be downloaded next try: cwd = get_cwd() file_path = REMOTE_PATH_SHA256 if not file_path.startswith('/'): file_path = '/' + file_path file_name = os.path.basename(file_path) if file_name: url = FILE_SERVER + file_path local_path = os.path.join(cwd, "ztp", file_name) ret = download_file(url, local_path, ip_protocol, vpn_instance) if ret is ERR: logging.error('Error: Failed to download sha256 file "%s"' % file_name) return ERR, None, None, None logging.info('Info: Download sha256 file successfully') ret, sha256_val_dic = verify_and_parse_sha256_file(file_name) # delete the file immediately os.remove(os.path.join("ztp", file_name)) if ret is ERR: logging.error('Error: sha256 check failed, file "%s"' % file_name) return ERR, None, None, None else: sha256_val_dic = {} except NameError: sha256_val_dic = {} logging.info('no need sha256 to check download file') # if user change the startup to the name in ini/cfg, ztp will not download # 1. Download configuration file if startup_info['SYSTEM-CONFIG'] and startup_info['SYSTEM-CONFIG'] not in [cur_config, next_config]: ret, local_path_config = download_cfg_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic) if ret is False: logging.info('delete startup file [cfg]') delete_startup_file(local_path_image, local_path_config, local_path_patch, slave) return ERR, local_path_image, local_path_config, local_path_patch logging.info('succeed to download config file') elif startup_info['SYSTEM-CONFIG'] and startup_info['SYSTEM-CONFIG'] in [cur_config, next_config]: logging.warning('The configured config version is the same as the current device version') # 2. Download patch file if startup_info['SYSTEM-PAT'] and startup_info['SYSTEM-PAT'] not in [cur_patch, next_patch]: ret, local_path_patch = download_patch_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic) if ret is ERR: delete_startup_file(local_path_image, local_path_config, local_path_patch, slave) return ERR, local_path_image, local_path_config, local_path_patch if ret == DISK_SPACE_NOT_ENOUGH: delete_startup_file(local_path_image, None, local_path_patch, slave) logging.info('disk space not enough, delete patch') return OK, None, local_path_config, None elif startup_info['SYSTEM-PAT'] and startup_info['SYSTEM-PAT'] in [cur_patch, next_patch]: logging.warning('The configured patch version is the same as the current device version') # 3. Download system software if startup_info['SYSTEM-SOFTWARE'] and startup_info['SYSTEM-SOFTWARE'] not in [cur_image, next_image]: ret, local_path_image = download_image_file(startup_info, slave, ip_protocol, vpn_instance, sha256_val_dic) if ret is ERR: delete_startup_file(local_path_image, local_path_config, local_path_patch, slave) return ERR, local_path_image, local_path_config, local_path_patch if ret == DISK_SPACE_NOT_ENOUGH: delete_startup_file(local_path_image, None, local_path_patch, slave) logging.info('disk space not enough, delete image and patch') return OK, None, local_path_config, None elif startup_info['SYSTEM-SOFTWARE'] and startup_info['SYSTEM-SOFTWARE'] in [cur_image, next_image]: logging.warning('The configured image version is the same as the current device version') return OK, local_path_image, local_path_config, local_path_patch def set_startup_file(image_file, config_file, patch_file, slave): """Set startup file""" try: STARTUP.set_startup_info(image_file, config_file, patch_file, slave) except OPIExecError: return ERR logging.info('Set startup info ready %s %s %s', image_file, config_file, patch_file) return OK def delete_startup_file(image_file, config_file, patch_file, slave): """Delete all system file""" delete_file_all(image_file, slave) delete_file_all(config_file, slave) delete_file_all(patch_file, slave) # ztplib def set_ztp_last_status(state): """Set ztp last status.""" uri = '/ztpops/ztpStatus/ztpLastStatus' str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <ztpLastStatus>$ztpLastStatus</ztpLastStatus>''') req_data = str_temp.substitute(ztpLastStatus=state) ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) logging.error('Failed to set ztp last status to %s', LAST_STATE_MAP[state]) return logging.info('Succeed to set ztp last status to %s', LAST_STATE_MAP[state]) def get_ztp_enable_status(): """Get ztp enable status :raise: OPIExecError """ uri = '/ztpops/ztpStatus/ztpEnable' req_data = '''<?xml version="1.0" encoding="UTF-8"?> <ztpEnable/>''' ret, err_code, rsp_data = OPS_CLIENT.get(uri, req_data) if ret != HTTP_OK or rsp_data == '': logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to get ztp enable status') root_elem = etree.fromstring(rsp_data) namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'} uri = 'data' + uri.replace('/', '/vrp:') elem = root_elem.find(uri, namespaces) if elem is None: raise OPIExecError('Failed to read ztp enable status') return elem.text def parse_environment(env): lines = re.split(r'\r\n', env) for line in lines[3:-2]: item = re.split(r'[ ][ ]*', line) if item[1] == 'ztp_exit_flag': logging.info('parse environment, ztp_exit_flag: ' + item[2]) return item[2] return None def get_ztp_exit_environment(): _ops = ops.ops() handle, err_desp = _ops.cli.open() ret = _ops.cli.execute(handle, "display ops environment") if ret[2] == 'Success' and ret[0]: return parse_environment(ret[0]) return None def check_ztp_continue(): """Check if ztp can continue to run""" res = True try: enable_state = get_ztp_enable_status() ztp_exit_flag = get_ztp_exit_environment() if enable_state == 'false' or ztp_exit_flag == 'true': res = False except OPIExecError as ex: logging.warning(ex) return res # DNS class DNSServer: """Dns protocol service""" __slots__ = ['dns_servers', 'enable_state', 'vpn_instance'] def __init__(self): self.dns_servers = [] self.enable_state = 'false' self.vpn_instance = {} def _set_dns_enable_switch(self, switch): """Set DNS global switch.""" if switch not in ['true', 'false']: return if self.enable_state == switch: logging.info('The current enable state of dns is %s, no need to set', DNS_STATE_MAP.get(switch)) return uri = '/dns/dnsGlobalCfgs/dnsGlobalCfg' str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <dnsGlobalCfg> <dnsEnable>$dnsEnable</dnsEnable> </dnsGlobalCfg>''') req_data = str_temp.substitute(dnsEnable=switch) ret, err_code, rsp_data = OPS_CLIENT.set(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to %s DNS' % DNS_STATE_MAP.get(switch)) self.enable_state = switch return def add_dns_servers_ipv4(self, dns_servers, vpn_instance): """Add IPv4 DNS servers configuration. :raise: OPIExecError """ while '255.255.255.255' in dns_servers: dns_servers.remove('255.255.255.255') # only configure new dns servers new_dns_servers = list(set(dns_servers).difference(set(self.dns_servers))) if not new_dns_servers: return self._set_dns_enable_switch('true') logging.info('Add DNS IPv4 servers') uri = '/dns/dnsIpv4Servers' root_elem = etree.Element('dnsIpv4Servers') for server_addr in new_dns_servers: dns_server = etree.SubElement(root_elem, 'dnsIpv4Server') etree.SubElement(dns_server, 'ipv4Addr').text = server_addr etree.SubElement(dns_server, 'vrfName').text = vpn_instance req_data = etree.tostring(root_elem, 'UTF-8') ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to config DNS IPv4 server') # configure success self.dns_servers.extend(new_dns_servers) self.vpn_instance.update(dict.fromkeys(new_dns_servers, vpn_instance)) def del_dns_servers_ipv4(self): """Delete IPv4 DNS servers configuration. :raise: OPIExecError """ if not self.dns_servers: logging.info('Current dns server is empty, no need to delete') return logging.info('Delete DNS IPv4 servers') uri = '/dns/dnsIpv4Servers' root_elem = etree.Element('dnsIpv4Servers') for server_addr in self.dns_servers: dns_server = etree.SubElement(root_elem, 'dnsIpv4Server') etree.SubElement(dns_server, 'ipv4Addr').text = server_addr etree.SubElement(dns_server, 'vrfName').text = self.vpn_instance.get(server_addr) req_data = etree.tostring(root_elem, 'UTF-8') ret, err_code, rsp_data = OPS_CLIENT.delete(uri, req_data) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to delete DNS IPv4 server') # delete all dns server success self.vpn_instance = {} self.dns_servers = [] self._set_dns_enable_switch('false') @staticmethod def get_addr_by_hostname(host, vpn_instance, addr_type='1'): """Translate a host name to IPv4 address format. The IPv4 address is returned as a string. :raise: OPIExecError """ logging.info('Get ipv4 address by host name %s', host) uri = '/dns/dnsNameResolution' root_elem = etree.Element('dnsNameResolution') etree.SubElement(root_elem, 'host').text = host etree.SubElement(root_elem, 'addrType').text = addr_type etree.SubElement(root_elem, 'vrfName').text = vpn_instance req_data = etree.tostring(root_elem, "UTF-8") logging.warning(req_data) ret, err_code, rsp_data = OPS_CLIENT.get(uri, req_data) if ret != HTTP_OK or rsp_data == '': logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) raise OPIExecError('Failed to get ipv4 address by host name') logging.warning(rsp_data) root_elem = etree.fromstring(rsp_data) namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'} uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:' elem = root_elem.find(uri + 'ipv4Addr', namespaces) if elem is None: raise OPIExecError('Failed to read IP address by host name') return elem.text # download def download_file(url, local_path, ip_protocol, vpn_instance): """ Description: Download file, support TFTP, FTP, SFTP. Args: url: URL of remote file tftp://hostname/path ftp://[username[:password]@]hostname/path sftp://[username[:password]@]hostname[:port]/path local_path: local path to put the file cfcard:/xxx ip_protocol: ipv4 or ipv6 vpn_instance: vpn_instance Returns: ERR[1]: download fail OK[0]: download success """ url_tuple = urlparse(url) func_dict = { 'tftp': { IPV4: TFTPv4, IPV6: TFTPv6, }, 'ftp': { IPV4: FTPv4, IPV6: FTPv6, }, 'sftp': { IPV4: SFTPv4, IPV6: SFTPv6, } } scheme = url_tuple.scheme if scheme not in func_dict.keys(): logging.error('Unknown file transfer scheme %s', scheme) return ERR if ip_protocol == IPV4: if not re.match(r'\d+\.\d+\.\d+\.\d+', url_tuple.hostname): # get server ip by hostname from dns try: dns_vpn = '_public_' if vpn_instance in [None, ''] else vpn_instance server_ip = DNS.get_addr_by_hostname(url_tuple.hostname, dns_vpn) logging.info("server ip: " + server_ip) except OPIExecError as ex: logging.error(ex) return ERR url = url.replace(url_tuple.hostname, server_ip) vpn_instance = '' if vpn_instance in [None, '_public_'] else vpn_instance logging.info('Start to download file %s using %s', os.path.basename(local_path), scheme) ret = ERR cnt = 0 while cnt < 1 + FILE_TRANSFER_RETRY_TIMES: if cnt: logging.info('Try downloading again, please wait a moment') try: ret = func_dict[scheme][ip_protocol](url, local_path, vpn_instance).start() if ret in [OK, DISK_SPACE_NOT_ENOUGH]: logging.info('download file %s using %s, ret:%d', os.path.basename(local_path), scheme, ret) break logging.error('Failed to download file %s using %s', os.path.basename(local_path), scheme) sleep(FILE_DOWNLOAD_INTERVAL_TIME) except OPIExecError as ex: logging.error(ex) except Exception as ex: logging.exception(ex) cnt += 1 return ret class Download: """File download base class""" def start(self): """Start to download file""" uri = self.get_uri() req_data = self.get_req_data() self.pre_download() ret, err_code, rsp_data = OPS_CLIENT.create(uri, req_data, False) if ret != HTTP_OK: logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret, err_code, rsp_data) root = etree.fromstring(rsp_data) rpc_error = root.find('rpc-error') if rpc_error and rpc_error.find('error-app-tag') is not None: ret = int(rpc_error.find('error-app-tag').text) else: ret = ERR else: ret = OK self.after_download() return ret def get_uri(self): """Return download request uri""" raise NotImplementedError def get_req_data(self): """Return download request xml message""" raise NotImplementedError def pre_download(self): """Do some actions before download file""" raise NotImplementedError def after_download(self): """Do some actions after download file""" raise NotImplementedError class FTP(Download): """FTP download class""" def get_uri(self): """Return ftp download request uri""" return '/ftpc/ftpcTransferFiles/ftpcTransferFile' def get_req_data(self): """Implemented by subclasses""" raise NotImplementedError def pre_download(self): """FTP not care""" def after_download(self): """FTP not care""" class FTPv4(FTP): """FTPv4 download class""" def __init__(self, url, local_path, vpn_instance): self.url = url self.local_path = local_path self.vpn_instance = vpn_instance def get_req_data(self): """Return ftpv4 download request xml message""" str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <ftpcTransferFile> <serverIpv4Address>$serverIp</serverIpv4Address> <commandType>get</commandType> <userName>$username</userName> <password>$password</password> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <vpnInstanceName>$vpnInstance</vpnInstanceName> </ftpcTransferFile>''') url_tuple = urlparse(self.url) req_data = str_temp.substitute(serverIp=url_tuple.hostname, username=url_tuple.username, password=url_tuple.password, remotePath=url_tuple.path[1:], localPath=self.local_path, vpnInstance=self.vpn_instance) return req_data class FTPv6(FTP): """FTPv6 download class""" def __init__(self, url, local_path, vpn_instance): self.url = url self.local_path = local_path self.vpn_instance = vpn_instance def get_req_data(self): """Return ftpv6 download request xml message""" str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <ftpcTransferFile> <serverIpv6Address>$serverIp</serverIpv6Address> <commandType>get</commandType> <userName>$username</userName> <password>$password</password> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <ipv6VpnName>$vpnInstance</ipv6VpnName> </ftpcTransferFile>''') url_tuple = urlparse(self.url) idx = url_tuple.netloc.rfind('@') server_ip = url_tuple.netloc[idx + 1:] req_data = str_temp.substitute(serverIp=server_ip, username=url_tuple.username, password=url_tuple.password, remotePath=url_tuple.path[1:], localPath=self.local_path, vpnInstance=self.vpn_instance) return req_data class TFTP(Download): """TFTP download class""" def get_uri(self): """Return ftp download request uri""" return '/tftpc/tftpcTransferFiles/tftpcTransferFile' def get_req_data(self): """Implemented by subclasses""" raise NotImplementedError def pre_download(self): """TFTP not case""" def after_download(self): """TFTP not case""" class TFTPv4(TFTP): """TFTPv4 download class""" def __init__(self, url, local_path, vpn_instance): self.url = url self.local_path = local_path self.vpn_instance = vpn_instance def get_req_data(self): """Return tftpv4 download request xml message""" str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <tftpcTransferFile> <serverIpv4Address>$serverIp</serverIpv4Address> <commandType>get_cmd</commandType> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <vpnInstanceName>$vpnInstance</vpnInstanceName> </tftpcTransferFile>''') url_tuple = urlparse(self.url) req_data = str_temp.substitute(serverIp=url_tuple.hostname, remotePath=url_tuple.path[1:], localPath=self.local_path, vpnInstance=self.vpn_instance) return req_data class TFTPv6(TFTP): """TFTPv6 download class""" def __init__(self, url, local_path, vpn_instance): self.url = url self.local_path = local_path self.vpn_instance = vpn_instance def get_req_data(self): """Return tftpv4 download request xml message""" str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <tftpcTransferFile> <serverIpv6Address>$serverIp</serverIpv6Address> <commandType>get_cmd</commandType> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <ipv6VpnName>$vpnInstance</ipv6VpnName> </tftpcTransferFile>''') url_tuple = urlparse(self.url) idx = url_tuple.netloc.rfind('@') server_ip = url_tuple.netloc[idx + 1:] req_data = str_temp.substitute(serverIp=server_ip, remotePath=url_tuple.path[1:], localPath=self.local_path, vpnInstance=self.vpn_instance) return req_data class SFTP(Download): """SFTP download class""" def get_uri(self): """Return ftp download request uri""" return '/sshc/sshcConnects/sshcConnect' def get_req_data(self): """Implemented by subclasses""" raise NotImplementedError def pre_download(self, ): self._set_sshc_first_time('Enable') def after_download(self): self._del_sshc_rsa_key() self._set_sshc_first_time('Disable') @classmethod def _set_sshc_first_time(cls, switch): """Set SSH client attribute of authenticating user for the first time access""" if switch not in ['Enable', 'Disable']: return ERR logging.info('Set SSH client first-time enable switch = %s', switch) uri = "/sshc/sshClient" str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <sshClient> <firstTimeEnable>$enable</firstTimeEnable> </sshClient>''') req_data = str_temp.substitute(enable=switch) ret, _, _ = OPS_CLIENT.set(uri, req_data) if ret != HTTP_OK: if switch == 'Enable': reason = 'Failed to enable SSH client first-time' else: reason = 'Failed to disable SSH client first-time' raise OPIExecError(reason) return OK def _del_rsa_peer_key(self): """Delete RSA peer key configuration""" logging.info('Delete RSA peer key') uri = '/rsa/rsaPeerKeys/rsaPeerKey' root_elem = etree.Element('rsaPeerKey') etree.SubElement(root_elem, 'keyName').text = self.get_key_name() req_data = etree.tostring(root_elem, 'UTF-8') ret, _, _ = OPS_CLIENT.delete(uri, req_data) if ret != HTTP_OK: logging.error('Failed to delete RSA peer key') def _del_sshc_rsa_key(self, key_type='RSA'): """Delete SSH client RSA key configuration""" logging.info('Delete SSH client RSA key') uri = '/sshc/sshCliKeyCfgs/sshCliKeyCfg' root_elem = etree.Element('sshCliKeyCfg') etree.SubElement(root_elem, 'serverName').text = self.get_key_name() etree.SubElement(root_elem, 'pubKeyType').text = key_type req_data = etree.tostring(root_elem, 'UTF-8') ret, _, _ = OPS_CLIENT.delete(uri, req_data) if ret != HTTP_OK: logging.error('Failed to delete SSH client RSA key') self._del_rsa_peer_key() def get_key_name(self): """Get sftp server ip""" raise NotImplementedError class SFTPv4(SFTP): """SFTPv4 download class""" def __init__(self, url, local_path, vpn_instance): self.url = url self.local_path = local_path self.vpn_instance = vpn_instance def get_key_name(self): url_tuple = urlparse(self.url) return url_tuple.hostname def get_req_data(self): """Return sftpv4 download request xml message""" str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <sshcConnect> <HostAddrIPv4>$serverIp</HostAddrIPv4> <commandType>get</commandType> <userName>$username</userName> <password>$password</password> <serverPort>$port</serverPort> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <vpnInstanceName>$vpnInstance</vpnInstanceName> <identityKey>ssh-rsa</identityKey> <transferType>SFTP</transferType> </sshcConnect>''') url_tuple = urlparse(self.url) try: if url_tuple.port is None: port = 22 else: port = url_tuple.port except ValueError: port = 22 logging.info('Sftp download file using port:%s', port) req_data = str_temp.substitute(serverIp=url_tuple.hostname, username=url_tuple.username, password=url_tuple.password, port=port, remotePath=url_tuple.path[1:], localPath=self.local_path, vpnInstance=self.vpn_instance) return req_data class SFTPv6(SFTP): """SFTPv6 download class""" def __init__(self, url, local_path, vpn_instance): self.url = url self.local_path = local_path self.vpn_instance = vpn_instance def get_key_name(self): url_tuple = urlparse(self.url) idx = url_tuple.netloc.find('@') return url_tuple.netloc[idx + 1:] def get_req_data(self): """Return sftpv4 download request xml message""" str_temp = string.Template('''<?xml version="1.0" encoding="UTF-8"?> <sshcConnect> <HostAddrIPv6>$serverIp</HostAddrIPv6> <commandType>get</commandType> <userName>$username</userName> <password>$password</password> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <ipv6VpnName>$vpnInstance</ipv6VpnName> <identityKey>ssh-rsa</identityKey> <transferType>SFTP</transferType> </sshcConnect>''') url_tuple = urlparse(self.url) server_ip = self.get_key_name() req_data = str_temp.substitute(serverIp=server_ip, username=url_tuple.username, password=url_tuple.password, remotePath=url_tuple.path[1:], localPath=self.local_path, vpnInstance=self.vpn_instance) return req_data def _is_startup_info_valid(startup_info): """Does startup info valid FILESERVER, SOFTWARE, CONFIG, PATCH, not None """ return startup_info.get('SYSTEM-CONFIG', None) and startup_info.get('FILESERVER', None) def main_proc(vpn_instance, ip_protocol): """ :param vpn_instance: :param ip_protocol: :return: """ global REMOTE_PATH_CONFIG sys_info = get_system_info() slave, _ = has_slave_mpu() # Check whether slave MPU board exists or not logging.info('Get devicetype=%s, esn=%s, mac=%s from the current system', sys_info['productName'], sys_info['esn'], sys_info['mac']) if not REMOTE_PATH_IMAGE.get(sys_info['productName']): logging.warning( "The product name of the current device [{}] not in REMOTE_PATH_IMAGE".format(sys_info['productName'])) if not REMOTE_PATH_PATCH.get(sys_info['productName']): logging.warning( "The product name of the current device [{}] not in REMOTE_PATH_PATCH".format(sys_info['productName'])) if '%s' in REMOTE_PATH_CONFIG: REMOTE_PATH_CONFIG = REMOTE_PATH_CONFIG % sys_info['esn'] startup_info = {'FILESERVER': FILE_SERVER, 'SYSTEM-SOFTWARE': REMOTE_PATH_IMAGE.get(sys_info['productName'], ''), 'SYSTEM-CONFIG': REMOTE_PATH_CONFIG, 'SYSTEM-PAT': REMOTE_PATH_PATCH.get(sys_info['productName'], '')} STARTUP.set_startup_info_from_ini_or_cfg(startup_info) if not _is_startup_info_valid(startup_info): logging.warning('FILESERVER is None or SYSTEM-CONFIG is None, no need download and ' 'set system startup file') return ERR ret = check_filename() if ret == ERR: return ERR # check remote file paths try: remote_path_sha256 = REMOTE_PATH_SHA256 except NameError: remote_path_sha256 = '' if not check_file_type_valid(REMOTE_PATH_IMAGE.get(sys_info['productName'], ''), REMOTE_PATH_CONFIG, REMOTE_PATH_PATCH.get(sys_info['productName'], ''), remote_path_sha256): return ERR ret, image_file, config_file, patch_file = download_startup_file(startup_info, slave, ip_protocol, vpn_instance) if ret == ERR: logging.info('failed to download file') return ERR if check_ztp_continue() is False: logging.info('user stop ztp before setting, ztp will reset startup') delete_startup_file(image_file, config_file, patch_file, slave) return ERR ret = set_startup_file(image_file, config_file, patch_file, slave) if ret == ERR: return ERR if not check_ztp_continue(): logging.info('user stop ztp after setting, ztp will reset startup') STARTUP.reset_startup_info(slave) return ERR set_ztp_last_status('true') dhcp_stop() try: reboot_system() except OPIExecError as reason: logging.error("reboot failed: {}".format(reason)) set_ztp_last_status('false') STARTUP.reset_startup_info(slave) return ERR return OK def main(vpn_instance='', ip_protocol=IPV4): """The main function of user script. It is called by ZTP frame, so do not remove or change this function. Args: Raises: Returns: user script processing result """ ip_protocol = ip_protocol.lower() try: ret = main_proc(vpn_instance, ip_protocol) except Exception as reason: logging.error(reason) trace_info = traceback.format_exc() logging.error(trace_info) ret = ERR finally: # Close the OPS connection OPS_CLIENT.close() return ret while True: try: STARTUP = Startup() break except OPIExecError as ex: logging.warning(ex) sleep(CHECK_STARTUP_INTERVAL) DNS = DNSServer() if __name__ == "__main__": main()
Specify an SHA256 checksum for the script file.
#sha256sum="126b05cb7ed99956281edef93f72c0f0ab517eb025edfd9cc4f31a37f123c4fc"
The SHA256 checksum is used to check the integrity of the script file.
You can use either of the following methods to generate an SHA256 checksum for a script file:
The SHA256 checksum is calculated based on the content following #sha256sum=. In practice, you need to delete the first line in the file, move the following part one line above, calculate the SHA256 checksum, and write #sha256sum= plus the generated SHA256 checksum at the beginning of the file.
The SHA256 algorithm can be used to verify the integrity of files. This algorithm has high security.
Specify the file obtaining mode.
FILE_SERVER = 'ftp://username:password@hostname/path/'
You can obtain version files from a TFTP, FTP, or SFTP server. Based on the server used, the path can be any of the following:
username, password, and port are optional.
Specify the path and file name of the system software.
REMOTE_PATH_IMAGE = {
'NetEngine 8000 F': 'V800R021C00SPC100.cc'
}
NetEngine 8000 F indicates the device model.
V800R021C00SPC100.cc indicates the file name of the system software obtained for the device model.
REMOTE_PATH_IMAGE = {
'NetEngine 8000 F' : ''
}
REMOTE_PATH_IMAGE = {}
If the device model entered here is inconsistent with the actual device model, the device skips this check and continues the ZTP process. That is, the system considers that this item does not need to be set, and only logs are recorded.
Specify the path and name of the configuration file.
REMOTE_PATH_CONFIG = 'conf_%s.cfg'
%s indicates a device ESN, based on which you can obtain a configuration file. This field cannot be edited.
Specify the path and name of the patch file.
REMOTE_PATH_PATCH = {
'NetEngine 8000 F': 'V800R021C00SPC100SPH001.PAT'
}
NetEngine 8000 F indicates the device model.
V800R021C00SPC100SPH001.PAT indicates the file name of the patch software obtained for the device model.
If no patch file needs to be loaded, leave this parameter blank or do not specify the device type. For example:
REMOTE_PATH_PATCH = {
'NetEngine 8000 F' : ''
}
REMOTE_PATH_PATCH = {}
Specify the path and name of the SHA256 verification file.
REMOTE_PATH_SHA256 = 'sha256.txt'
You can use the SHA256 verification file to check the integrity of the files downloaded by the device.
For details about the format of the SHA256 verification file, see Version File Integrity Check.
If the downloaded files do not need to be checked, set this field to ".
HTTP_OK = 200 HTTP_BAD_REQUEST = 400 HTTP_BAD_RESPONSE = -1
You do not need to edit this field.
CONFLICT_RETRY_INTERVAL = 5
POST_METHOD = 'POST' GET_METHOD = 'GET' DELETE_METHOD = 'DELETE' PUT_METHOD = 'PUT'
You do not need to edit this field.
Specify the maximum number of retries allowed when the startup information fails to be obtained.
MAX_TIMES_GET_STARTUP = 120
Specify the interval for obtaining device startup information.
GET_STARTUP_INTERVAL = 15
Specify the maximum number of retries allowed when the check boot items fail to be configured for a device equipped with a single main control board.
MAX_TIMES_CHECK_STARTUP = 205
Specify the maximum number of retries allowed when the check boot items fail to be configured for a device equipped with two main control boards.
MAX_TIMES_CHECK_STARTUP_SLAVE = 265
Specify the interval for checking whether the system software is successfully set.
CHECK_STARTUP_INTERVAL = 5
Specify the waiting time before a file is deleted.
FILE_DELETE_DELAY_TIME = 3
LAST_STATE_MAP = {'true': 'enable', 'false': 'disable'}
DNS_STATE_MAP = {'true': 'enable', 'false': 'disable'}
FILE_TRANSFER_RETRY_TIMES = 3
FILE_DOWNLOAD_INTERVAL_TIME = 5
DISK_SPACE_NOT_ENOUGH = 48
You do not need to edit this field.
class PNPStopError()
You do not need to edit this field.
Define an OPS execution error.
class OPIExecError()
You do not need to edit this field.
class NoNeedZTP2PNPError()
You do not need to edit this field.
class SysRebootError()
You do not need to edit this field.
class ZTPDisableError()
You do not need to edit this field.
Define the OPS connection class.
class OPSConnection()
You do not need to edit this field.
Encapsulate the OPS connection.
self.conn = http.client.HTTPConnection()
You do not need to edit this field.
Invoke the underlying interface of the platform.
def close() def create() def delete() def get() def set()
You do not need to edit this field.
Define the REST standard for requests.
def _rest_call()
You do not need to edit this field.
def dhcp_stop()
You do not need to edit this field.
Obtain the working directory of the user.
def get_cwd()
You do not need to edit this field.
Check whether the files to be downloaded exist.
def file_exist()
You do not need to edit this field.
def copy_file()
You do not need to edit this field.
Delete files after an operation failure.
def delete_file()
If a file fails to be loaded, all files downloaded by the device must be deleted to roll the device back to the state before ZTP is performed.
You do not need to edit this field.
def delete_file_all()
You do not need to edit this field.
Check whether the device has a standby main control board.
def has_slave_mpu()
You do not need to edit this field.
Obtain the device's system information.
def get_system_info()
You do not need to edit this field.
def reboot_system()
You do not need to edit this field.
def check_file_type_valid()
You do not need to edit this field.
Obtain information about the next startup of the device.
def _get_startup_info()
You do not need to edit this field.
def _set_startup_image_file()
You do not need to edit this field.
def _set_startup_config_file()
You do not need to edit this field.
def _del_startup_config_file()
You do not need to edit this field.
def _set_startup_patch_file()
You do not need to edit this field.
def _reset_startup_patch_file()
You do not need to edit this field.
def _check_next_startup_file()
You do not need to edit this field.
def set_startup_info()
You do not need to edit this field.
def reset_startup_info()
You do not need to edit this field.
Enable SHA256 check for files.
def sha256sum() def sha256_get_from_file() def sha256_check_with_first_line() def sha256_check_with_dic() def parse_sha256_file() def verify_and_parse_sha256_file()
You do not need to edit this field.
Check whether the username, password, and file name contain special characters.
def check_parameter() def check_filename()
You do not need to edit this field.
def download_cfg_file()
You do not need to edit this field.
def download_patch_file()
You do not need to edit this field.
def download_image_file()
You do not need to edit this field.
def download_startup_file()
You do not need to edit this field.
def set_startup_file()
You do not need to edit this field.
def delete_startup_file()
You do not need to edit this field.
def set_ztp_last_status()
You do not need to edit this field.
def get_ztp_enable_status()
You do not need to edit this field.
def parse_environment() def get_ztp_exit_environment()
You do not need to edit this field.
def check_ztp_continue()
You do not need to edit this field.
def _set_dns_enable_switch()
You do not need to edit this field.
def add_dns_servers_ipv4()
You do not need to edit this field.
def del_dns_servers_ipv4()
You do not need to edit this field.
def get_addr_by_hostname()
You do not need to edit this field.
Define the file download parameters.
def download_file()
You do not need to edit this field.
def start()
You do not need to edit this field.
def get_uri()
You do not need to edit this field.
def get_req_data()
You do not need to edit this field.
def pre_download()
You do not need to edit this field.
def after_download()
You do not need to edit this field.
Set the attributes for first-time authentication on an SSH client.
def _set_sshc_first_time()
You do not need to edit this field.
def _del_rsa_peer_key()
You do not need to edit this field.
Delete the SSH server address and RSA key.
def _del_sshc_rsa_key()
You do not need to edit this field.
Obtain the SFTP server address.
def get_key_name()
You do not need to edit this field.
Define the overall ZTP process.
def main_proc() def main() if __name__ == "__main__": main()
You do not need to edit this field.
The main function is mandatory. If the main function is unavailable, the script cannot be executed.