当前位置:网站首页>成品升级程序
成品升级程序
2022-06-30 06:41:00 【洪大宇】
import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile
from tqdm import tqdm
class TelnetClient():
def __init__(self, host_ip, username, password):
self.tn = telnetlib.Telnet()
self.host_ip = host_ip
self.username = username
self.password = password
def login_host(self):
try:
self.tn.open(self.host_ip, port=23)
except:
print('%s网络连接失败' % self.host_ip)
return False
self.tn.read_until(b'login: ', timeout=10)
self.tn.write(self.username.encode('ascii') + b'\n')
self.tn.read_until(b'Password: ', timeout=10)
self.tn.write(self.password.encode('ascii') + b'\n')
command_result = self.tn.read_very_eager().decode('ascii')
if 'Login incorrect' not in command_result:
print('%s登录成功' % self.host_ip)
return True
else:
print('%s登录失败,用户名或密码错误' % self.host_ip)
return False
def execute_some_command(self, command,mode="nowait"):
self.tn.write(command.encode('ascii')+b'\n')
time.sleep(5)
print("\n***************************************************\n")
if mode == "nowait":
command_result = self.tn.read_very_eager().decode('ascii')
elif mode == "wait":
command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
print('命令执行结果:\n%s' % command_result)
print("\n***************************************************\n")
def __del__(self):
self.tn.write(b"exit\n")
def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
if tn.login_host():
tn.execute_some_command(cmd,mode)
return True
else:
return False
class FtpClient():
def __init__(self, ip: str, username: str, password: str) -> None:
self.ip = ip
self.user = username
self.password = password
self.ftp = FTP()
def connect(self):
try:
# self.ftp.set_debuglevel(2)
self.ftp.connect(self.ip, port=21)
self.ftp.login(self.user, self.password)
#print(self.ftp.getwelcome())
except:
print("ftp connect {} error ...".format(self.ip))
def put(self, filename: str):
try:
size = os.path.getsize(filename)/1024
fp = open(filename, "rb")
print("Start Upload ... ...")
with tqdm(total=size,desc="Upload {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
self.ftp.storbinary("STOR "+filename, fp,callback=call_)
except:
print("ftp upload {} error".format(filename))
def get(self, filename: str):
try:
size = self.ftp.size(filename)/1024
fp = open(filename, "wb")
print("Start Download ... ...")
with tqdm(total=size,desc="Download {}".format(filename),ncols=150,unit="KB") as bar:
def call_(data):
l = len(data)/1024
bar.update(l)
fp.write(data)
self.ftp.retrbinary("RETR "+filename,callback=call_)
except:
print("ftp get {} error".format(filename))
def __del__(self):
# self.ftp.set_debuglevel(0)
self.ftp.quit()
class SerialClient:
def __init__(self, port: str, bps: int) -> None:
try:
self.port = port
self.bps = bps
self.ser = serial.Serial(port, bps)
except:
print("Open Serial Error {}".format(port))
def write(self, cmd: str) -> bool:
if self.ser.isOpen():
print("Open Serial {}".format(self.port))
cmd = cmd+" \n"
self.ser.write(cmd.encode("utf8"))
return True
else:
return False
#如果终端有响应一直读取数据
def read(self):
if self.ser.isOpen():
while self.ser.readable():
line = self.ser.readline(4096)
print(line.decode('ascii'))
def __del__(self):
self.ser.close()
# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98
def dos2unix(dos: str, unix: str):
content = ''
outsize = 0
with open(dos, 'rb') as infile:
content = infile.read()
with open(unix, 'wb') as output:
for line in content.splitlines():
outsize += len(line) + 1
output.write(line + b'\n')
print("Done. Stripped %s bytes." % (len(content)-outsize))
def Exit(signum, frame):
exit()
def CompressFile(TarFile:str,File:str,module:str):
tar = tarfile.open(TarFile,module)
for root,dir,files in os.walk(File):
for file in files:
fullpath = os.path.join(root,file)
tar.add(fullpath)
tar.close()
if __name__ == '__main__':
signal.signal(signal.SIGINT, Exit)
Parser = configparser.ConfigParser()
config = "config.ini"
if os.access(config,os.F_OK) == False:
print("Error:Can't find upgrade.ini, Please check your ini config")
exit()
Parser.read(config)
Parser.sections()
IP = Parser["BaseConfig"]["ServerIpAddr"]
Scripts = Parser["BaseConfig"]["UpgradeScript"]
UserName = Parser["UserConfig"]["UserName"]
PassWord = Parser["UserConfig"]["UserPassword"]
Filename = Parser["UserConfig"]["UpgradeFilename"]
CompressedMode = Parser["BaseConfig"]["CompressedMode"]
dos2unix(Scripts,Scripts)
if os.access(Filename+".tar",os.F_OK):
os.remove(Filename+".tar")
CompressFile(Filename+".tar",Filename,CompressedMode)
ftp = FtpClient(IP,UserName,PassWord)
ftp.connect()
ftp.put(Scripts)
ftp.put(Filename+".tar")
os.remove(Filename+".tar")
exec_upgrade = TelnetClient(IP,UserName,PassWord)
CMD = "chmod +rwx "+ "/mnt/flash/"+Scripts+" && /mnt/flash/"+Scripts
TelnetExecCmd(exec_upgrade,CMD,"wait")
TelnetExecCmd(exec_upgrade,"rm /mnt/flash/"+Scripts+" ; rm -rf /run/"+Filename+"*")
os.system("pause")
边栏推荐
- Gazebo model modification
- Software tools_ Shortcut_ Operation summary
- Connect to remote server
- Bat 使用细节2
- Thread safe solutions, communication between threads (classic examples of producers and consumers)
- New project folder based on PIO plug-in in vscode -- Interpretation
- 01. regular expression overview
- Loading class `com. mysql. jdbc. Driver‘. This is deprecated. The new driver class is `com. mysql. cj. jdb
- Never forget the original intention, and be lazy if you can: C # operate word files
- 华泰炒股安全吗?我想网上开户。
猜你喜欢

Pay attention to this live broadcast and learn about the path to achieve the dual carbon goal of the energy industry

Wechat applet mall project
![[untitled]](/img/6c/df2ebb3e39d1e47b8dd74cfdddbb06.gif)
[untitled]

Using C language pure for loop to implement ilovey

Traitement d'images 7 - amélioration d'images

【微信小程序:单选、多选样式,背景色,圆角】

Px4 control mode summary

SOC_AHB_SD_IF

Never forget the original intention, and be lazy if you can: C # operate word files

1.6 - CPU组成
随机推荐
RT thread migration to s5p4418 (I): scheduler
Pay attention to this live broadcast and learn about the path to achieve the dual carbon goal of the energy industry
KEIL - 下载调试出现“TRACE HW not present”
基础刷题(一)
Bat 使用细节2
1.5 - 逻辑运算
First experience of Galaxy Kirin
Software tools_ Shortcut_ Operation summary
写一个C程序判断系统是大端字节序还是小端字节序
Connect to remote server
MySQL中的InnoDB引擎
Four ways to create multithreads
深度学习---三好学生各成绩所占权重问题(3)
Several C language implementations
Analysis of startup process of gazebo multi computer simulation
List in set (2)
Gazebo installation, uninstall and upgrade
华泰炒股安全吗?我想网上开户。
关注这场直播,了解能源行业双碳目标实现路径
图像处理7-图像增强