当前位置:网站首页>成品升级程序
成品升级程序
2022-08-04 19:23:00 【洪大宇】
import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile
from tqdm import tqdm
class TelnetClient():
def __init__(self, host_ip, username, password):
self.tn = telnetlib.Telnet()
self.host_ip = host_ip
self.username = username
self.password = password
def login_host(self):
try:
self.tn.open(self.host_ip, port=23)
except:
print('%s网络连接失败' % self.host_ip)
return False
self.tn.read_until(b'login: ', timeout=10)
self.tn.write(self.username.encode('ascii') + b'\n')
self.tn.read_until(b'Password: ', timeout=10)
self.tn.write(self.password.encode('ascii') + b'\n')
command_result = self.tn.read_very_eager().decode('ascii')
if 'Login incorrect' not in command_result:
print('%s登录成功' % self.host_ip)
return True
else:
print('%s登录失败,用户名或密码错误' % self.host_ip)
return False
def execute_some_command(self, command,mode="nowait"):
self.tn.write(command.encode('ascii')+b'\n')
time.sleep(5)
print("\n***************************************************\n")
if mode == "nowait":
command_result = self.tn.read_very_eager().decode('ascii')
elif mode == "wait":
command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
print('命令执行结果:\n%s' % command_result)
print("\n***************************************************\n")
def __del__(self):
self.tn.write(b"exit\n")
def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
if tn.login_host():
tn.execute_some_command(cmd,mode)
return True
else:
return False
class FtpClient():
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.user = username
self.password = password
self.ftp = FTP()
def connect(self):
try:
# self.ftp.set_debuglevel(2)
self.ftp.connect(self.ip, port=21)
self.ftp.login(self.user, self.password)
#print(self.ftp.getwelcome())
except:
print("ftp connect {} error ...".format(self.ip))
def put(self, filename: str):
try:
size = os.path.getsize(filename)/1024
fp = open(filename, "rb")
print("Start Upload ... ...")
with tqdm(total=size,desc="Upload {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
self.ftp.storbinary("STOR "+filename, fp,callback=call_)
except:
print("ftp upload {} error".format(filename))
def get(self, filename: str):
try:
size = self.ftp.size(filename)/1024
fp = open(filename, "wb")
print("Start Download ... ...")
with tqdm(total=size,desc="Download {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
fp.write(data)
self.ftp.retrbinary("RETR "+filename,callback=call_)
except:
print("ftp get {} error".format(filename))
def __del__(self):
# self.ftp.set_debuglevel(0)
self.ftp.quit()
class SerialClient:
def __init__(self, port: str, bps: int) -> None:
try:
self.port = port
self.bps = bps
self.ser = serial.Serial(port, bps)
except:
print("Open Serial Error {}".format(port))
def write(self, cmd: str) -> bool:
if self.ser.isOpen():
print("Open Serial {}".format(self.port))
cmd = cmd+" \n"
self.ser.write(cmd.encode("utf8"))
return True
else:
return False
#如果终端有响应一直读取数据
def read(self):
if self.ser.isOpen():
while self.ser.readable():
line = self.ser.readline(4096)
print(line.decode('ascii'))
def __del__(self):
self.ser.close()
# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98
def dos2unix(dos: str, unix: str):
content = ''
outsize = 0
with open(dos, 'rb') as infile:
content = infile.read()
with open(unix, 'wb') as output:
for line in content.splitlines():
outsize += len(line) + 1
output.write(line + b'\n')
print("Done. Stripped %s bytes." % (len(content)-outsize))
def Exit(signum, frame):
exit()
def CompressFile(TarFile:str,File:str,module:str):
tar = tarfile.open(TarFile,module)
for root,dir,files in os.walk(File):
for file in files:
fullpath = os.path.join(root,file)
tar.add(fullpath)
tar.close()
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit)
Parser = configparser.ConfigParser()
config = "config.ini"
if os.access(config,os.F_OK) == False:
print("Error:Can't find upgrade.ini, Please check your ini config")
exit()
Parser.read(config)
Parser.sections()
IP = Parser["BaseConfig"]["ServerIpAddr"]
Scripts = Parser["BaseConfig"]["UpgradeScript"]
UserName = Parser["UserConfig"]["UserName"]
PassWord = Parser["UserConfig"]["UserPassword"]
Filename = Parser["UserConfig"]["UpgradeFilename"]
CompressedMode = Parser["BaseConfig"]["CompressedMode"]
dos2unix(Scripts,Scripts)
if os.access(Filename+".tar",os.F_OK):
os.remove(Filename+".tar")
CompressFile(Filename+".tar",Filename,CompressedMode)
ftp = FtpClient(IP,UserName,PassWord)
ftp.connect()
ftp.put(Scripts)
ftp.put(Filename+".tar")
os.remove(Filename+".tar")
exec_upgrade = TelnetClient(IP,UserName,PassWord)
CMD = "chmod +rwx "+ "/mnt/flash/"+Scripts+" && /mnt/flash/"+Scripts
TelnetExecCmd(exec_upgrade,CMD,"wait")
TelnetExecCmd(exec_upgrade,"rm /mnt/flash/"+Scripts+" ; rm -rf /run/"+Filename+"*")
os.system("pause")
更稳定的版本
from msilib.schema import Upgrade
import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile
from tqdm import tqdm
global TelnetLoginEnable
class TelnetClient():
def __init__(self, host_ip, username, password):
self.tn = telnetlib.Telnet()
self.host_ip = host_ip
self.username = username
self.password = password
def login_host(self):
try:
self.tn.open(self.host_ip, port=23)
except:
print('%s网络连接失败' % self.host_ip)
return False
if TelnetLoginEnable == "en":
login = self.tn.read_until(b'login: ', timeout=3).decode('ascii')
self.tn.write(self.username.encode('ascii') + b'\n')
self.tn.read_until(b'Password: ', timeout=3)
self.tn.write(self.password.encode('ascii') + b'\n')
command_result = self.tn.read_very_eager().decode('ascii')
if 'Login incorrect' not in command_result:
print('%s登录成功' % self.host_ip)
return True
else:
print('%s登录失败,用户名或密码错误' % self.host_ip)
return False
def execute_some_command(self, command,mode="nowait"):
if mode == "nowait":
self.tn.write(command.encode('ascii')+b'\n')
print("Wait Command Exec... ...")
for i in tqdm(range(10),ncols=100):
time.sleep(1)
command_result = self.tn.read_very_eager().decode('ascii')
elif mode == "wait":
self.tn.write(command.encode('ascii')+b'\n')
command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
print('命令执行结果:\n%s' % command_result)
def __del__(self):
self.tn.write(b"exit\n")
def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
if tn.login_host():
tn.execute_some_command(cmd,mode)
return True
else:
return False
class FtpClient():
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.user = username
self.password = password
self.ftp = FTP()
def connect(self):
try:
self.ftp.connect(self.ip, port=21,timeout=5)
self.ftp.login(self.user, self.password)
#print(self.ftp.getwelcome())
except:
print("ftp connect {} error ...".format(self.ip))
self.ftp.set_debuglevel(2)
print("Start Ping FtpServer ... ...")
os.system("ping {}".format(self.ip))
exit(1)
def put(self, filename: str):
try:
size = os.path.getsize(filename)/1024
fp = open(filename, "rb")
desc="Start Upload {}... ...".format(filename)
print(desc)
with tqdm(total=int(size),ncols=100,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(int(l))
self.ftp.storbinary("STOR "+filename, fp,callback=call_)
except:
print("ftp upload {} error".format(filename))
def get(self, filename: str):
try:
size = self.ftp.size(filename)/1024
fp = open(filename, "wb")
print("Start Download {} ... ...".format(filename))
with tqdm(total=int(size),ncols=100,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(int(l))
fp.write(data)
self.ftp.retrbinary("RETR "+filename,callback=call_)
except:
print("ftp get {} error".format(filename))
def __del__(self):
# self.ftp.set_debuglevel(0)
self.ftp.quit()
class SerialClient:
def __init__(self, port: str, bps: int) -> None:
try:
self.port = port
self.bps = bps
self.ser = serial.Serial(port, bps)
except:
print("Open Serial Error {}".format(port))
def write(self, cmd: str) -> bool:
if self.ser.isOpen():
print("Open Serial {}".format(self.port))
cmd = cmd+" \n"
self.ser.write(cmd.encode("utf8"))
return True
else:
return False
#如果终端有响应一直读取数据
def read(self):
if self.ser.isOpen():
while self.ser.readable():
line = self.ser.readline(4096)
print(line.decode('ascii'))
def __del__(self):
self.ser.close()
# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98
def dos2unix(dos: str, unix: str):
content = ''
outsize = 0
with open(dos, 'rb') as infile:
content = infile.read()
with open(unix, 'wb') as output:
for line in content.splitlines():
outsize += len(line) + 1
output.write(line + b'\n')
print("Done. Stripped %s bytes." % (len(content)-outsize))
def Exit(signum, frame):
exit()
def CompressFile(TarFile:str,File:str,module:str):
print("Start Compressed Dir ... ...")
tar = tarfile.open(TarFile,module)
for root,dir,files in os.walk(File):
for file in files:
fullpath = os.path.join(root,file)
tar.add(fullpath)
tar.close()
print("End Compressed Dir Done ... ...")
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit)
Parser = configparser.ConfigParser()
config = "config.ini"
if os.access(config,os.F_OK) == False:
print("Error:Can't find upgrade.ini, Please check your ini config")
exit()
Parser.read(config)
Parser.sections()
IP = Parser["BaseConfig"]["ServerIpAddr"]
UserName = Parser["UserConfig"]["UserName"]
PassWord = Parser["UserConfig"]["UserPassword"]
Filename = Parser["UserConfig"]["UpgradeFilename"]
CompressedMode = Parser["BaseConfig"]["CompressedMode"]
TelnetLoginEnable = Parser["BaseConfig"]["TelnetLoginEnable"]
UpgradeCommand = Parser["UserConfig"]["UpgradeCommand"]
UpgradeRootPath = Parser["UserConfig"]["UpgradeRootPath"]
FtpRootPath = Parser["BaseConfig"]["FtpRootPath"]
ftp = FtpClient(IP,UserName,PassWord)
ftp.connect()
SourceFilename = Filename
uncompressed=""
if CompressedMode == "x":
uncompressed = "xf"
Filename +=".tar"
elif CompressedMode == "x:gz":
uncompressed = "xzf"
Filename +=".tar.gz"
elif CompressedMode == "x:xz":
uncompressed == "xjf"
Filename += ".tar.xz"
exec_upgrade = TelnetClient(IP,UserName,PassWord)
RootPath=UpgradeRootPath+Filename
FtpPath=FtpRootPath+Filename
if os.access(Filename,os.F_OK):
os.remove(Filename)
CompressFile(Filename,SourceFilename,CompressedMode)
ftp.put(Filename)
os.remove(Filename)
cmd = "mv {} {}".format(FtpPath,UpgradeRootPath)
TelnetExecCmd(exec_upgrade,cmd)
cmd = "tar {} {} -C {}".format(uncompressed,RootPath,UpgradeRootPath)
TelnetExecCmd(exec_upgrade,cmd)
cmd = "cp -f {}/* {}".format((UpgradeRootPath+SourceFilename),FtpRootPath)
TelnetExecCmd(exec_upgrade,cmd)
TelnetExecCmd(exec_upgrade,UpgradeCommand)
os.system("pause")
边栏推荐
猜你喜欢
随机推荐
[Sql刷题篇] 查询信息数据--Day1
ACP-Cloud Computing By Wakin自用笔记(2)CPU和内存虚拟化
致-.-- -..- -
Industrial CCD and CMOS camera
译文推荐|Apache Pulsar 隔离系列(四):单集群隔离策略
对比几类主流的跨端技术方案
JS: 数组和树的相互转换
EuROC dataset format and related codes
HCIA-R&S自用笔记(22)STP状态与计时器、STP拓扑变化、STP配置及实验
编译optimize源码实现过程
Pedestrian fall detection experiment based on YOLOV5
Finger Vein Recognition-matlab
企业应当实施的5个云安全管理策略
win10 uwp win2d 使用 Path 绘制界面
基于YOLOV5行人跌倒检测实验
TritonVM——基于Recursive STARK的虚拟机
零基础做出高端堆叠极环图
openharmony初体验(1)
前3名突然变了,揭秘 7 月编程语言最新排行榜
NLP技术为何在工业界这么卷?前沿案例解析来了








