当前位置:网站首页>成品升级程序
成品升级程序
2022-08-04 19:23:00 【洪大宇】
import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile
from tqdm import tqdm
class TelnetClient():
def __init__(self, host_ip, username, password):
self.tn = telnetlib.Telnet()
self.host_ip = host_ip
self.username = username
self.password = password
def login_host(self):
try:
self.tn.open(self.host_ip, port=23)
except:
print('%s网络连接失败' % self.host_ip)
return False
self.tn.read_until(b'login: ', timeout=10)
self.tn.write(self.username.encode('ascii') + b'\n')
self.tn.read_until(b'Password: ', timeout=10)
self.tn.write(self.password.encode('ascii') + b'\n')
command_result = self.tn.read_very_eager().decode('ascii')
if 'Login incorrect' not in command_result:
print('%s登录成功' % self.host_ip)
return True
else:
print('%s登录失败,用户名或密码错误' % self.host_ip)
return False
def execute_some_command(self, command,mode="nowait"):
self.tn.write(command.encode('ascii')+b'\n')
time.sleep(5)
print("\n***************************************************\n")
if mode == "nowait":
command_result = self.tn.read_very_eager().decode('ascii')
elif mode == "wait":
command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
print('命令执行结果:\n%s' % command_result)
print("\n***************************************************\n")
def __del__(self):
self.tn.write(b"exit\n")
def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
if tn.login_host():
tn.execute_some_command(cmd,mode)
return True
else:
return False
class FtpClient():
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.user = username
self.password = password
self.ftp = FTP()
def connect(self):
try:
# self.ftp.set_debuglevel(2)
self.ftp.connect(self.ip, port=21)
self.ftp.login(self.user, self.password)
#print(self.ftp.getwelcome())
except:
print("ftp connect {} error ...".format(self.ip))
def put(self, filename: str):
try:
size = os.path.getsize(filename)/1024
fp = open(filename, "rb")
print("Start Upload ... ...")
with tqdm(total=size,desc="Upload {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
self.ftp.storbinary("STOR "+filename, fp,callback=call_)
except:
print("ftp upload {} error".format(filename))
def get(self, filename: str):
try:
size = self.ftp.size(filename)/1024
fp = open(filename, "wb")
print("Start Download ... ...")
with tqdm(total=size,desc="Download {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
fp.write(data)
self.ftp.retrbinary("RETR "+filename,callback=call_)
except:
print("ftp get {} error".format(filename))
def __del__(self):
# self.ftp.set_debuglevel(0)
self.ftp.quit()
class SerialClient:
def __init__(self, port: str, bps: int) -> None:
try:
self.port = port
self.bps = bps
self.ser = serial.Serial(port, bps)
except:
print("Open Serial Error {}".format(port))
def write(self, cmd: str) -> bool:
if self.ser.isOpen():
print("Open Serial {}".format(self.port))
cmd = cmd+" \n"
self.ser.write(cmd.encode("utf8"))
return True
else:
return False
#如果终端有响应一直读取数据
def read(self):
if self.ser.isOpen():
while self.ser.readable():
line = self.ser.readline(4096)
print(line.decode('ascii'))
def __del__(self):
self.ser.close()
# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98
def dos2unix(dos: str, unix: str):
content = ''
outsize = 0
with open(dos, 'rb') as infile:
content = infile.read()
with open(unix, 'wb') as output:
for line in content.splitlines():
outsize += len(line) + 1
output.write(line + b'\n')
print("Done. Stripped %s bytes." % (len(content)-outsize))
def Exit(signum, frame):
exit()
def CompressFile(TarFile:str,File:str,module:str):
tar = tarfile.open(TarFile,module)
for root,dir,files in os.walk(File):
for file in files:
fullpath = os.path.join(root,file)
tar.add(fullpath)
tar.close()
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit)
Parser = configparser.ConfigParser()
config = "config.ini"
if os.access(config,os.F_OK) == False:
print("Error:Can't find upgrade.ini, Please check your ini config")
exit()
Parser.read(config)
Parser.sections()
IP = Parser["BaseConfig"]["ServerIpAddr"]
Scripts = Parser["BaseConfig"]["UpgradeScript"]
UserName = Parser["UserConfig"]["UserName"]
PassWord = Parser["UserConfig"]["UserPassword"]
Filename = Parser["UserConfig"]["UpgradeFilename"]
CompressedMode = Parser["BaseConfig"]["CompressedMode"]
dos2unix(Scripts,Scripts)
if os.access(Filename+".tar",os.F_OK):
os.remove(Filename+".tar")
CompressFile(Filename+".tar",Filename,CompressedMode)
ftp = FtpClient(IP,UserName,PassWord)
ftp.connect()
ftp.put(Scripts)
ftp.put(Filename+".tar")
os.remove(Filename+".tar")
exec_upgrade = TelnetClient(IP,UserName,PassWord)
CMD = "chmod +rwx "+ "/mnt/flash/"+Scripts+" && /mnt/flash/"+Scripts
TelnetExecCmd(exec_upgrade,CMD,"wait")
TelnetExecCmd(exec_upgrade,"rm /mnt/flash/"+Scripts+" ; rm -rf /run/"+Filename+"*")
os.system("pause")
更稳定的版本
from msilib.schema import Upgrade
import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile
from tqdm import tqdm
global TelnetLoginEnable
class TelnetClient():
def __init__(self, host_ip, username, password):
self.tn = telnetlib.Telnet()
self.host_ip = host_ip
self.username = username
self.password = password
def login_host(self):
try:
self.tn.open(self.host_ip, port=23)
except:
print('%s网络连接失败' % self.host_ip)
return False
if TelnetLoginEnable == "en":
login = self.tn.read_until(b'login: ', timeout=3).decode('ascii')
self.tn.write(self.username.encode('ascii') + b'\n')
self.tn.read_until(b'Password: ', timeout=3)
self.tn.write(self.password.encode('ascii') + b'\n')
command_result = self.tn.read_very_eager().decode('ascii')
if 'Login incorrect' not in command_result:
print('%s登录成功' % self.host_ip)
return True
else:
print('%s登录失败,用户名或密码错误' % self.host_ip)
return False
def execute_some_command(self, command,mode="nowait"):
if mode == "nowait":
self.tn.write(command.encode('ascii')+b'\n')
print("Wait Command Exec... ...")
for i in tqdm(range(10),ncols=100):
time.sleep(1)
command_result = self.tn.read_very_eager().decode('ascii')
elif mode == "wait":
self.tn.write(command.encode('ascii')+b'\n')
command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
print('命令执行结果:\n%s' % command_result)
def __del__(self):
self.tn.write(b"exit\n")
def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
if tn.login_host():
tn.execute_some_command(cmd,mode)
return True
else:
return False
class FtpClient():
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.user = username
self.password = password
self.ftp = FTP()
def connect(self):
try:
self.ftp.connect(self.ip, port=21,timeout=5)
self.ftp.login(self.user, self.password)
#print(self.ftp.getwelcome())
except:
print("ftp connect {} error ...".format(self.ip))
self.ftp.set_debuglevel(2)
print("Start Ping FtpServer ... ...")
os.system("ping {}".format(self.ip))
exit(1)
def put(self, filename: str):
try:
size = os.path.getsize(filename)/1024
fp = open(filename, "rb")
desc="Start Upload {}... ...".format(filename)
print(desc)
with tqdm(total=int(size),ncols=100,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(int(l))
self.ftp.storbinary("STOR "+filename, fp,callback=call_)
except:
print("ftp upload {} error".format(filename))
def get(self, filename: str):
try:
size = self.ftp.size(filename)/1024
fp = open(filename, "wb")
print("Start Download {} ... ...".format(filename))
with tqdm(total=int(size),ncols=100,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(int(l))
fp.write(data)
self.ftp.retrbinary("RETR "+filename,callback=call_)
except:
print("ftp get {} error".format(filename))
def __del__(self):
# self.ftp.set_debuglevel(0)
self.ftp.quit()
class SerialClient:
def __init__(self, port: str, bps: int) -> None:
try:
self.port = port
self.bps = bps
self.ser = serial.Serial(port, bps)
except:
print("Open Serial Error {}".format(port))
def write(self, cmd: str) -> bool:
if self.ser.isOpen():
print("Open Serial {}".format(self.port))
cmd = cmd+" \n"
self.ser.write(cmd.encode("utf8"))
return True
else:
return False
#如果终端有响应一直读取数据
def read(self):
if self.ser.isOpen():
while self.ser.readable():
line = self.ser.readline(4096)
print(line.decode('ascii'))
def __del__(self):
self.ser.close()
# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98
def dos2unix(dos: str, unix: str):
content = ''
outsize = 0
with open(dos, 'rb') as infile:
content = infile.read()
with open(unix, 'wb') as output:
for line in content.splitlines():
outsize += len(line) + 1
output.write(line + b'\n')
print("Done. Stripped %s bytes." % (len(content)-outsize))
def Exit(signum, frame):
exit()
def CompressFile(TarFile:str,File:str,module:str):
print("Start Compressed Dir ... ...")
tar = tarfile.open(TarFile,module)
for root,dir,files in os.walk(File):
for file in files:
fullpath = os.path.join(root,file)
tar.add(fullpath)
tar.close()
print("End Compressed Dir Done ... ...")
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit)
Parser = configparser.ConfigParser()
config = "config.ini"
if os.access(config,os.F_OK) == False:
print("Error:Can't find upgrade.ini, Please check your ini config")
exit()
Parser.read(config)
Parser.sections()
IP = Parser["BaseConfig"]["ServerIpAddr"]
UserName = Parser["UserConfig"]["UserName"]
PassWord = Parser["UserConfig"]["UserPassword"]
Filename = Parser["UserConfig"]["UpgradeFilename"]
CompressedMode = Parser["BaseConfig"]["CompressedMode"]
TelnetLoginEnable = Parser["BaseConfig"]["TelnetLoginEnable"]
UpgradeCommand = Parser["UserConfig"]["UpgradeCommand"]
UpgradeRootPath = Parser["UserConfig"]["UpgradeRootPath"]
FtpRootPath = Parser["BaseConfig"]["FtpRootPath"]
ftp = FtpClient(IP,UserName,PassWord)
ftp.connect()
SourceFilename = Filename
uncompressed=""
if CompressedMode == "x":
uncompressed = "xf"
Filename +=".tar"
elif CompressedMode == "x:gz":
uncompressed = "xzf"
Filename +=".tar.gz"
elif CompressedMode == "x:xz":
uncompressed == "xjf"
Filename += ".tar.xz"
exec_upgrade = TelnetClient(IP,UserName,PassWord)
RootPath=UpgradeRootPath+Filename
FtpPath=FtpRootPath+Filename
if os.access(Filename,os.F_OK):
os.remove(Filename)
CompressFile(Filename,SourceFilename,CompressedMode)
ftp.put(Filename)
os.remove(Filename)
cmd = "mv {} {}".format(FtpPath,UpgradeRootPath)
TelnetExecCmd(exec_upgrade,cmd)
cmd = "tar {} {} -C {}".format(uncompressed,RootPath,UpgradeRootPath)
TelnetExecCmd(exec_upgrade,cmd)
cmd = "cp -f {}/* {}".format((UpgradeRootPath+SourceFilename),FtpRootPath)
TelnetExecCmd(exec_upgrade,cmd)
TelnetExecCmd(exec_upgrade,UpgradeCommand)
os.system("pause")
边栏推荐
- JS手写JSON.stringify() (面试)
- The CPU suddenly soars and the system responds slowly, what is the cause?Is there any way to check?
- Yuanguo chain game system development
- Homework 8.3 Thread Synchronization Mutex Condition Variables
- Exploration and Practice of Database Governance
- STP实验
- Polygon zkEVM 基本概念
- 机器学习之支持向量机实例,线性核函数 多项式核函数 RBF高斯核函数 sigmoid核函数
- JS: 数组和树的相互转换
- MMDetection 使用示例:从入门到出门
猜你喜欢
随机推荐
Day018 Inheritance
密码学系列之:PEM和PKCS7,PKCS8,PKCS12
Highlights of some performance tests
c语言进阶篇:自定义类型--结构体
TritonVM——基于Recursive STARK的虚拟机
从零开始实现一个简单的CycleGAN项目
致-.-- -..- -
Defaced Fingerprint Recovery and Identification
win10 uwp json
Jmeter - Heap配置原因报错Invalid initial heap size: -Xms1024m -Xmx2048mError
Dragoma(DMA)元宇宙系统开发
奥拉时钟芯片生成配置文件脚本
win10 uwp 修改Pivot Header 颜色
小波提取特征的VQ实现
完善的交叉编译环境记录 peta 生成的shell 脚本
目标检测的发展与现状
如何进行自动化测试?
Storage resource activation system to help new infrastructure
哈佛架构 VS 冯·诺依曼架构
前3名突然变了,揭秘 7 月编程语言最新排行榜