当前位置:网站首页>成品升级程序
成品升级程序
2022-08-04 19:23:00 【洪大宇】
import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile
from tqdm import tqdm
class TelnetClient():
def __init__(self, host_ip, username, password):
self.tn = telnetlib.Telnet()
self.host_ip = host_ip
self.username = username
self.password = password
def login_host(self):
try:
self.tn.open(self.host_ip, port=23)
except:
print('%s网络连接失败' % self.host_ip)
return False
self.tn.read_until(b'login: ', timeout=10)
self.tn.write(self.username.encode('ascii') + b'\n')
self.tn.read_until(b'Password: ', timeout=10)
self.tn.write(self.password.encode('ascii') + b'\n')
command_result = self.tn.read_very_eager().decode('ascii')
if 'Login incorrect' not in command_result:
print('%s登录成功' % self.host_ip)
return True
else:
print('%s登录失败,用户名或密码错误' % self.host_ip)
return False
def execute_some_command(self, command,mode="nowait"):
self.tn.write(command.encode('ascii')+b'\n')
time.sleep(5)
print("\n***************************************************\n")
if mode == "nowait":
command_result = self.tn.read_very_eager().decode('ascii')
elif mode == "wait":
command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
print('命令执行结果:\n%s' % command_result)
print("\n***************************************************\n")
def __del__(self):
self.tn.write(b"exit\n")
def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
if tn.login_host():
tn.execute_some_command(cmd,mode)
return True
else:
return False
class FtpClient():
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.user = username
self.password = password
self.ftp = FTP()
def connect(self):
try:
# self.ftp.set_debuglevel(2)
self.ftp.connect(self.ip, port=21)
self.ftp.login(self.user, self.password)
#print(self.ftp.getwelcome())
except:
print("ftp connect {} error ...".format(self.ip))
def put(self, filename: str):
try:
size = os.path.getsize(filename)/1024
fp = open(filename, "rb")
print("Start Upload ... ...")
with tqdm(total=size,desc="Upload {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
self.ftp.storbinary("STOR "+filename, fp,callback=call_)
except:
print("ftp upload {} error".format(filename))
def get(self, filename: str):
try:
size = self.ftp.size(filename)/1024
fp = open(filename, "wb")
print("Start Download ... ...")
with tqdm(total=size,desc="Download {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
fp.write(data)
self.ftp.retrbinary("RETR "+filename,callback=call_)
except:
print("ftp get {} error".format(filename))
def __del__(self):
# self.ftp.set_debuglevel(0)
self.ftp.quit()
class SerialClient:
def __init__(self, port: str, bps: int) -> None:
try:
self.port = port
self.bps = bps
self.ser = serial.Serial(port, bps)
except:
print("Open Serial Error {}".format(port))
def write(self, cmd: str) -> bool:
if self.ser.isOpen():
print("Open Serial {}".format(self.port))
cmd = cmd+" \n"
self.ser.write(cmd.encode("utf8"))
return True
else:
return False
#如果终端有响应一直读取数据
def read(self):
if self.ser.isOpen():
while self.ser.readable():
line = self.ser.readline(4096)
print(line.decode('ascii'))
def __del__(self):
self.ser.close()
# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98
def dos2unix(dos: str, unix: str):
content = ''
outsize = 0
with open(dos, 'rb') as infile:
content = infile.read()
with open(unix, 'wb') as output:
for line in content.splitlines():
outsize += len(line) + 1
output.write(line + b'\n')
print("Done. Stripped %s bytes." % (len(content)-outsize))
def Exit(signum, frame):
exit()
def CompressFile(TarFile:str,File:str,module:str):
tar = tarfile.open(TarFile,module)
for root,dir,files in os.walk(File):
for file in files:
fullpath = os.path.join(root,file)
tar.add(fullpath)
tar.close()
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit)
Parser = configparser.ConfigParser()
config = "config.ini"
if os.access(config,os.F_OK) == False:
print("Error:Can't find upgrade.ini, Please check your ini config")
exit()
Parser.read(config)
Parser.sections()
IP = Parser["BaseConfig"]["ServerIpAddr"]
Scripts = Parser["BaseConfig"]["UpgradeScript"]
UserName = Parser["UserConfig"]["UserName"]
PassWord = Parser["UserConfig"]["UserPassword"]
Filename = Parser["UserConfig"]["UpgradeFilename"]
CompressedMode = Parser["BaseConfig"]["CompressedMode"]
dos2unix(Scripts,Scripts)
if os.access(Filename+".tar",os.F_OK):
os.remove(Filename+".tar")
CompressFile(Filename+".tar",Filename,CompressedMode)
ftp = FtpClient(IP,UserName,PassWord)
ftp.connect()
ftp.put(Scripts)
ftp.put(Filename+".tar")
os.remove(Filename+".tar")
exec_upgrade = TelnetClient(IP,UserName,PassWord)
CMD = "chmod +rwx "+ "/mnt/flash/"+Scripts+" && /mnt/flash/"+Scripts
TelnetExecCmd(exec_upgrade,CMD,"wait")
TelnetExecCmd(exec_upgrade,"rm /mnt/flash/"+Scripts+" ; rm -rf /run/"+Filename+"*")
os.system("pause")
更稳定的版本
from msilib.schema import Upgrade
import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile
from tqdm import tqdm
global TelnetLoginEnable
class TelnetClient():
def __init__(self, host_ip, username, password):
self.tn = telnetlib.Telnet()
self.host_ip = host_ip
self.username = username
self.password = password
def login_host(self):
try:
self.tn.open(self.host_ip, port=23)
except:
print('%s网络连接失败' % self.host_ip)
return False
if TelnetLoginEnable == "en":
login = self.tn.read_until(b'login: ', timeout=3).decode('ascii')
self.tn.write(self.username.encode('ascii') + b'\n')
self.tn.read_until(b'Password: ', timeout=3)
self.tn.write(self.password.encode('ascii') + b'\n')
command_result = self.tn.read_very_eager().decode('ascii')
if 'Login incorrect' not in command_result:
print('%s登录成功' % self.host_ip)
return True
else:
print('%s登录失败,用户名或密码错误' % self.host_ip)
return False
def execute_some_command(self, command,mode="nowait"):
if mode == "nowait":
self.tn.write(command.encode('ascii')+b'\n')
print("Wait Command Exec... ...")
for i in tqdm(range(10),ncols=100):
time.sleep(1)
command_result = self.tn.read_very_eager().decode('ascii')
elif mode == "wait":
self.tn.write(command.encode('ascii')+b'\n')
command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
print('命令执行结果:\n%s' % command_result)
def __del__(self):
self.tn.write(b"exit\n")
def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
if tn.login_host():
tn.execute_some_command(cmd,mode)
return True
else:
return False
class FtpClient():
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.user = username
self.password = password
self.ftp = FTP()
def connect(self):
try:
self.ftp.connect(self.ip, port=21,timeout=5)
self.ftp.login(self.user, self.password)
#print(self.ftp.getwelcome())
except:
print("ftp connect {} error ...".format(self.ip))
self.ftp.set_debuglevel(2)
print("Start Ping FtpServer ... ...")
os.system("ping {}".format(self.ip))
exit(1)
def put(self, filename: str):
try:
size = os.path.getsize(filename)/1024
fp = open(filename, "rb")
desc="Start Upload {}... ...".format(filename)
print(desc)
with tqdm(total=int(size),ncols=100,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(int(l))
self.ftp.storbinary("STOR "+filename, fp,callback=call_)
except:
print("ftp upload {} error".format(filename))
def get(self, filename: str):
try:
size = self.ftp.size(filename)/1024
fp = open(filename, "wb")
print("Start Download {} ... ...".format(filename))
with tqdm(total=int(size),ncols=100,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(int(l))
fp.write(data)
self.ftp.retrbinary("RETR "+filename,callback=call_)
except:
print("ftp get {} error".format(filename))
def __del__(self):
# self.ftp.set_debuglevel(0)
self.ftp.quit()
class SerialClient:
def __init__(self, port: str, bps: int) -> None:
try:
self.port = port
self.bps = bps
self.ser = serial.Serial(port, bps)
except:
print("Open Serial Error {}".format(port))
def write(self, cmd: str) -> bool:
if self.ser.isOpen():
print("Open Serial {}".format(self.port))
cmd = cmd+" \n"
self.ser.write(cmd.encode("utf8"))
return True
else:
return False
#如果终端有响应一直读取数据
def read(self):
if self.ser.isOpen():
while self.ser.readable():
line = self.ser.readline(4096)
print(line.decode('ascii'))
def __del__(self):
self.ser.close()
# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98
def dos2unix(dos: str, unix: str):
content = ''
outsize = 0
with open(dos, 'rb') as infile:
content = infile.read()
with open(unix, 'wb') as output:
for line in content.splitlines():
outsize += len(line) + 1
output.write(line + b'\n')
print("Done. Stripped %s bytes." % (len(content)-outsize))
def Exit(signum, frame):
exit()
def CompressFile(TarFile:str,File:str,module:str):
print("Start Compressed Dir ... ...")
tar = tarfile.open(TarFile,module)
for root,dir,files in os.walk(File):
for file in files:
fullpath = os.path.join(root,file)
tar.add(fullpath)
tar.close()
print("End Compressed Dir Done ... ...")
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit)
Parser = configparser.ConfigParser()
config = "config.ini"
if os.access(config,os.F_OK) == False:
print("Error:Can't find upgrade.ini, Please check your ini config")
exit()
Parser.read(config)
Parser.sections()
IP = Parser["BaseConfig"]["ServerIpAddr"]
UserName = Parser["UserConfig"]["UserName"]
PassWord = Parser["UserConfig"]["UserPassword"]
Filename = Parser["UserConfig"]["UpgradeFilename"]
CompressedMode = Parser["BaseConfig"]["CompressedMode"]
TelnetLoginEnable = Parser["BaseConfig"]["TelnetLoginEnable"]
UpgradeCommand = Parser["UserConfig"]["UpgradeCommand"]
UpgradeRootPath = Parser["UserConfig"]["UpgradeRootPath"]
FtpRootPath = Parser["BaseConfig"]["FtpRootPath"]
ftp = FtpClient(IP,UserName,PassWord)
ftp.connect()
SourceFilename = Filename
uncompressed=""
if CompressedMode == "x":
uncompressed = "xf"
Filename +=".tar"
elif CompressedMode == "x:gz":
uncompressed = "xzf"
Filename +=".tar.gz"
elif CompressedMode == "x:xz":
uncompressed == "xjf"
Filename += ".tar.xz"
exec_upgrade = TelnetClient(IP,UserName,PassWord)
RootPath=UpgradeRootPath+Filename
FtpPath=FtpRootPath+Filename
if os.access(Filename,os.F_OK):
os.remove(Filename)
CompressFile(Filename,SourceFilename,CompressedMode)
ftp.put(Filename)
os.remove(Filename)
cmd = "mv {} {}".format(FtpPath,UpgradeRootPath)
TelnetExecCmd(exec_upgrade,cmd)
cmd = "tar {} {} -C {}".format(uncompressed,RootPath,UpgradeRootPath)
TelnetExecCmd(exec_upgrade,cmd)
cmd = "cp -f {}/* {}".format((UpgradeRootPath+SourceFilename),FtpRootPath)
TelnetExecCmd(exec_upgrade,cmd)
TelnetExecCmd(exec_upgrade,UpgradeCommand)
os.system("pause")
边栏推荐
猜你喜欢
随机推荐
PostgreSQL的 SPI_接口函数
按需视觉识别:愿景和初步方案
华为交换机:STP测试实验
致-.-- -..- -
Dragoma(DMA)元宇宙系统开发
正则表达式未完
网络运维管理从基础到实战-自用笔记(1)构建综合园区网、接入互联网
win10 uwp win2d 使用 Path 绘制界面
Zip4j使用
四维图新:子公司首款功能安全 MCU 芯片已陆续送样
Notepad++更改显示背景
Openharmony first experience (1)
Polygon zkEVM 基本概念
小波提取特征的VQ实现
从零开始实现一个简单的CycleGAN项目
使用.NET简单实现一个Redis的高性能克隆版(二)
如何进行自动化测试?【Eolink分享】
八一建军节 | 致敬中国人民解放军
《学会写作》粥佐罗著
WPF 多个 StylusPlugIn 的事件触发顺序