当前位置:网站首页>Finished product upgrade program
Finished product upgrade program
2022-08-04 19:32:00 【HongDaYu】
import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile
from tqdm import tqdm
class TelnetClient():
def __init__(self, host_ip, username, password):
self.tn = telnetlib.Telnet()
self.host_ip = host_ip
self.username = username
self.password = password
def login_host(self):
try:
self.tn.open(self.host_ip, port=23)
except:
print('%s网络连接失败' % self.host_ip)
return False
self.tn.read_until(b'login: ', timeout=10)
self.tn.write(self.username.encode('ascii') + b'\n')
self.tn.read_until(b'Password: ', timeout=10)
self.tn.write(self.password.encode('ascii') + b'\n')
command_result = self.tn.read_very_eager().decode('ascii')
if 'Login incorrect' not in command_result:
print('%s登录成功' % self.host_ip)
return True
else:
print('%s登录失败,用户名或密码错误' % self.host_ip)
return False
def execute_some_command(self, command,mode="nowait"):
self.tn.write(command.encode('ascii')+b'\n')
time.sleep(5)
print("\n***************************************************\n")
if mode == "nowait":
command_result = self.tn.read_very_eager().decode('ascii')
elif mode == "wait":
command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
print('命令执行结果:\n%s' % command_result)
print("\n***************************************************\n")
def __del__(self):
self.tn.write(b"exit\n")
def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
if tn.login_host():
tn.execute_some_command(cmd,mode)
return True
else:
return False
class FtpClient():
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.user = username
self.password = password
self.ftp = FTP()
def connect(self):
try:
# self.ftp.set_debuglevel(2)
self.ftp.connect(self.ip, port=21)
self.ftp.login(self.user, self.password)
#print(self.ftp.getwelcome())
except:
print("ftp connect {} error ...".format(self.ip))
def put(self, filename: str):
try:
size = os.path.getsize(filename)/1024
fp = open(filename, "rb")
print("Start Upload ... ...")
with tqdm(total=size,desc="Upload {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
self.ftp.storbinary("STOR "+filename, fp,callback=call_)
except:
print("ftp upload {} error".format(filename))
def get(self, filename: str):
try:
size = self.ftp.size(filename)/1024
fp = open(filename, "wb")
print("Start Download ... ...")
with tqdm(total=size,desc="Download {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
fp.write(data)
self.ftp.retrbinary("RETR "+filename,callback=call_)
except:
print("ftp get {} error".format(filename))
def __del__(self):
# self.ftp.set_debuglevel(0)
self.ftp.quit()
class SerialClient:
def __init__(self, port: str, bps: int) -> None:
try:
self.port = port
self.bps = bps
self.ser = serial.Serial(port, bps)
except:
print("Open Serial Error {}".format(port))
def write(self, cmd: str) -> bool:
if self.ser.isOpen():
print("Open Serial {}".format(self.port))
cmd = cmd+" \n"
self.ser.write(cmd.encode("utf8"))
return True
else:
return False
#如果终端有响应一直读取数据
def read(self):
if self.ser.isOpen():
while self.ser.readable():
line = self.ser.readline(4096)
print(line.decode('ascii'))
def __del__(self):
self.ser.close()
# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98
def dos2unix(dos: str, unix: str):
content = ''
outsize = 0
with open(dos, 'rb') as infile:
content = infile.read()
with open(unix, 'wb') as output:
for line in content.splitlines():
outsize += len(line) + 1
output.write(line + b'\n')
print("Done. Stripped %s bytes." % (len(content)-outsize))
def Exit(signum, frame):
exit()
def CompressFile(TarFile:str,File:str,module:str):
tar = tarfile.open(TarFile,module)
for root,dir,files in os.walk(File):
for file in files:
fullpath = os.path.join(root,file)
tar.add(fullpath)
tar.close()
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit)
Parser = configparser.ConfigParser()
config = "config.ini"
if os.access(config,os.F_OK) == False:
print("Error:Can't find upgrade.ini, Please check your ini config")
exit()
Parser.read(config)
Parser.sections()
IP = Parser["BaseConfig"]["ServerIpAddr"]
Scripts = Parser["BaseConfig"]["UpgradeScript"]
UserName = Parser["UserConfig"]["UserName"]
PassWord = Parser["UserConfig"]["UserPassword"]
Filename = Parser["UserConfig"]["UpgradeFilename"]
CompressedMode = Parser["BaseConfig"]["CompressedMode"]
dos2unix(Scripts,Scripts)
if os.access(Filename+".tar",os.F_OK):
os.remove(Filename+".tar")
CompressFile(Filename+".tar",Filename,CompressedMode)
ftp = FtpClient(IP,UserName,PassWord)
ftp.connect()
ftp.put(Scripts)
ftp.put(Filename+".tar")
os.remove(Filename+".tar")
exec_upgrade = TelnetClient(IP,UserName,PassWord)
CMD = "chmod +rwx "+ "/mnt/flash/"+Scripts+" && /mnt/flash/"+Scripts
TelnetExecCmd(exec_upgrade,CMD,"wait")
TelnetExecCmd(exec_upgrade,"rm /mnt/flash/"+Scripts+" ; rm -rf /run/"+Filename+"*")
os.system("pause")
更稳定的版本
from msilib.schema import Upgrade
import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile
from tqdm import tqdm
global TelnetLoginEnable
class TelnetClient():
def __init__(self, host_ip, username, password):
self.tn = telnetlib.Telnet()
self.host_ip = host_ip
self.username = username
self.password = password
def login_host(self):
try:
self.tn.open(self.host_ip, port=23)
except:
print('%s网络连接失败' % self.host_ip)
return False
if TelnetLoginEnable == "en":
login = self.tn.read_until(b'login: ', timeout=3).decode('ascii')
self.tn.write(self.username.encode('ascii') + b'\n')
self.tn.read_until(b'Password: ', timeout=3)
self.tn.write(self.password.encode('ascii') + b'\n')
command_result = self.tn.read_very_eager().decode('ascii')
if 'Login incorrect' not in command_result:
print('%s登录成功' % self.host_ip)
return True
else:
print('%s登录失败,用户名或密码错误' % self.host_ip)
return False
def execute_some_command(self, command,mode="nowait"):
if mode == "nowait":
self.tn.write(command.encode('ascii')+b'\n')
print("Wait Command Exec... ...")
for i in tqdm(range(10),ncols=100):
time.sleep(1)
command_result = self.tn.read_very_eager().decode('ascii')
elif mode == "wait":
self.tn.write(command.encode('ascii')+b'\n')
command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
print('命令执行结果:\n%s' % command_result)
def __del__(self):
self.tn.write(b"exit\n")
def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
if tn.login_host():
tn.execute_some_command(cmd,mode)
return True
else:
return False
class FtpClient():
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.user = username
self.password = password
self.ftp = FTP()
def connect(self):
try:
self.ftp.connect(self.ip, port=21,timeout=5)
self.ftp.login(self.user, self.password)
#print(self.ftp.getwelcome())
except:
print("ftp connect {} error ...".format(self.ip))
self.ftp.set_debuglevel(2)
print("Start Ping FtpServer ... ...")
os.system("ping {}".format(self.ip))
exit(1)
def put(self, filename: str):
try:
size = os.path.getsize(filename)/1024
fp = open(filename, "rb")
desc="Start Upload {}... ...".format(filename)
print(desc)
with tqdm(total=int(size),ncols=100,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(int(l))
self.ftp.storbinary("STOR "+filename, fp,callback=call_)
except:
print("ftp upload {} error".format(filename))
def get(self, filename: str):
try:
size = self.ftp.size(filename)/1024
fp = open(filename, "wb")
print("Start Download {} ... ...".format(filename))
with tqdm(total=int(size),ncols=100,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(int(l))
fp.write(data)
self.ftp.retrbinary("RETR "+filename,callback=call_)
except:
print("ftp get {} error".format(filename))
def __del__(self):
# self.ftp.set_debuglevel(0)
self.ftp.quit()
class SerialClient:
def __init__(self, port: str, bps: int) -> None:
try:
self.port = port
self.bps = bps
self.ser = serial.Serial(port, bps)
except:
print("Open Serial Error {}".format(port))
def write(self, cmd: str) -> bool:
if self.ser.isOpen():
print("Open Serial {}".format(self.port))
cmd = cmd+" \n"
self.ser.write(cmd.encode("utf8"))
return True
else:
return False
#如果终端有响应一直读取数据
def read(self):
if self.ser.isOpen():
while self.ser.readable():
line = self.ser.readline(4096)
print(line.decode('ascii'))
def __del__(self):
self.ser.close()
# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98
def dos2unix(dos: str, unix: str):
content = ''
outsize = 0
with open(dos, 'rb') as infile:
content = infile.read()
with open(unix, 'wb') as output:
for line in content.splitlines():
outsize += len(line) + 1
output.write(line + b'\n')
print("Done. Stripped %s bytes." % (len(content)-outsize))
def Exit(signum, frame):
exit()
def CompressFile(TarFile:str,File:str,module:str):
print("Start Compressed Dir ... ...")
tar = tarfile.open(TarFile,module)
for root,dir,files in os.walk(File):
for file in files:
fullpath = os.path.join(root,file)
tar.add(fullpath)
tar.close()
print("End Compressed Dir Done ... ...")
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit)
Parser = configparser.ConfigParser()
config = "config.ini"
if os.access(config,os.F_OK) == False:
print("Error:Can't find upgrade.ini, Please check your ini config")
exit()
Parser.read(config)
Parser.sections()
IP = Parser["BaseConfig"]["ServerIpAddr"]
UserName = Parser["UserConfig"]["UserName"]
PassWord = Parser["UserConfig"]["UserPassword"]
Filename = Parser["UserConfig"]["UpgradeFilename"]
CompressedMode = Parser["BaseConfig"]["CompressedMode"]
TelnetLoginEnable = Parser["BaseConfig"]["TelnetLoginEnable"]
UpgradeCommand = Parser["UserConfig"]["UpgradeCommand"]
UpgradeRootPath = Parser["UserConfig"]["UpgradeRootPath"]
FtpRootPath = Parser["BaseConfig"]["FtpRootPath"]
ftp = FtpClient(IP,UserName,PassWord)
ftp.connect()
SourceFilename = Filename
uncompressed=""
if CompressedMode == "x":
uncompressed = "xf"
Filename +=".tar"
elif CompressedMode == "x:gz":
uncompressed = "xzf"
Filename +=".tar.gz"
elif CompressedMode == "x:xz":
uncompressed == "xjf"
Filename += ".tar.xz"
exec_upgrade = TelnetClient(IP,UserName,PassWord)
RootPath=UpgradeRootPath+Filename
FtpPath=FtpRootPath+Filename
if os.access(Filename,os.F_OK):
os.remove(Filename)
CompressFile(Filename,SourceFilename,CompressedMode)
ftp.put(Filename)
os.remove(Filename)
cmd = "mv {} {}".format(FtpPath,UpgradeRootPath)
TelnetExecCmd(exec_upgrade,cmd)
cmd = "tar {} {} -C {}".format(uncompressed,RootPath,UpgradeRootPath)
TelnetExecCmd(exec_upgrade,cmd)
cmd = "cp -f {}/* {}".format((UpgradeRootPath+SourceFilename),FtpRootPath)
TelnetExecCmd(exec_upgrade,cmd)
TelnetExecCmd(exec_upgrade,UpgradeCommand)
os.system("pause")
边栏推荐
猜你喜欢
随机推荐
win10 uwp json
【着色器实现Glitch单项故障闪烁效果(与Television效果不同)_Shader效果第十四篇】
手把手教你CSP系列之script-src
seata源码解析:seata server各种消息处理流程
按需视觉识别:愿景和初步方案
nr部分计算
Orthodontic MIA micro-implant anchorage technology China 10th anniversary exchange meeting was held in Shenyang
T+Cloud:构建新型生意社交网络和营销关系的“智公司”
量化交易机器人系统开发
指静脉识别-matlab
How to add custom syntax to MySQL?
WPF 元素裁剪 Clip 属性
Dragoma (DMA) Metaverse System Development
Exploration and Practice of Database Governance
openharmony代码框架初识(2)
Quantitative trading robot system development
ERC20转账压缩
如何让远在的老板看到你!----------来自财富中国网
JS: 数组和树的相互转换
华为交换机:STP测试实验