当前位置:网站首页>2019阿里集群数据集使用总结
2019阿里集群数据集使用总结
2022-07-06 10:05:00 【5-StarrySky】
2019阿里集群数据集使用总结
数据集中的一些细节
在离线负载的实例表格(batch_instance)中,实例的开始时间和结束时间有问题,如果你仔细去查,就会发现实例的开始时间居然会比实例的结束时间要大,所以减出来是负数,我一开始的时候调BUG怎么都没发现,知道有一天注意到我的任务量是负数,我才发现怎么回事,这里也给出我的任务量的定义:
cloudlet_total = self.cpu_avg * (self.finished_time_data_set - self.start_time_data_set) cloudmem_total = self.mem_avg * (self.finished_time_data_set - self.start_time_data_set)
第二个坑,就是上面的CPU的数量,对于离线负载来说,instance里面的CPU数量都是乘以了100的数量,所以你需要做到标准化,因为机器节点中CPU的数量是不要除以100的,就是96,同理container_mate的CPU数量也需要注意,如果是400之类的,就需要再除以100
创建出一个JOB和APP
# 拼接出APP 一个APP包括多个容器
def create_app(self):
container_dao = ContainerDao(self.a)
container_data_list = container_dao.get_container_list()
container_data_groupby_list = container_dao.get_container_data_groupby()
for app_du,count in container_data_groupby_list:
self.app_num += 1
num = 0
app = Apps(app_du)
index_temp = -1
for container_data in container_data_list:
if container_data.app_du == app_du:
index_temp += 1
container = Container(app_du)
container.set_container(container_data)
container.index = index_temp
app.add_container(container)
num += 1
if num == count:
# print(app.app_du + "")
# print(num)
app.set_container_num(num)
self.App_list.append(app)
break
# 创建出JOB
def create_job(self):
instance_dao = InstanceDao(self.b)
task_dao = TaskDao(self.b)
task_data_list = task_dao.get_task_list()
job_group_by = task_dao.get_job_group_by()
instance_data_list = instance_dao.get_instance_list()
# instance_data_list = instance_dao.get_instance_list()
for job_name,count in job_group_by:
self.job_num += 1
job = Job(job_name)
for task_data in task_data_list:
if task_data.job_name == job_name:
task = Task(job_name)
task.set_task(task_data)
for instance_data in instance_data_list:
if instance_data.job_name == job_name and instance_data.task_name == task_data.task_name:
instance = Instance(task_data.task_name,job_name)
instance.set_instance(instance_data)
task.add_instance(instance)
pass
pass
task.prepared()
job.add_task(task)
pass
# 以下语句可以去掉,去掉就表示去掉了DAG关系
job.DAG()
self.job_list.append(job)
# 创建DAG关系
def DAG(self):
for task in self.task_list:
str_list = task.get_task_name().replace(task.get_task_name()[0],"")
num_list = str_list.split('_')
# 当前任务的序列号 M1
start = num_list[0]
del num_list[0]
# 后面指向前面 M2_1 1指向2 1是2的前驱 2是1的后继
for task_1 in self.task_list:
str_list_1 = task_1.get_task_name().replace(task_1.get_task_name()[0], "")
num_list_1 = str_list_1.split('_')
del num_list_1[0]
# 当前任务
for num in num_list_1:
if num == start:
# 保存后继
task.add_subsequent(task_1.get_task_name())
# 保存前驱
task_1.add_precursor(task.get_task_name())
# task_1.add_subsequent(task.get_task_name())
pass
pass
pass
pass
pass
怎么读取数据并创建对应实例
import pandas as pd
from data.containerData import ContainerData
""" 数据集中读取容器数据 属性: a:选择哪一个数据集, = 1 :表示数据集1 容器124个 有20组 = 2 :表示数据集2 容器500个 有66组 = 3 :表示数据集3 容器1002个 有122组 """
# 这里仅以contaienr为例
class ContainerDao:
def __init__(self, a):
# 行数
self.a = a
# 从数据集中把容器数据读取出来,并存入contaianer_list
def get_container_list(self):
# 从数据集中提取数据: 0容器ID 1机器ID 2时间戳 3部署域 4状态 5需要的cpu数量 6cpu限制数量 7内存大小
columns = ['container_id','time_stamp','app_du','status','cpu_request','cpu_limit','mem_size','container_type','finished_time']
filepath_or_buffer = "G:\\experiment\\data_set\\container_meta\\app_"+str(self.a)+".csv"
container_dao = pd.read_csv(filepath_or_buffer, names=columns)
temp_list = container_dao.values.tolist()
# print(temp_list)
container_data_list = list()
for data in temp_list:
# print(data)
temp_container = ContainerData()
temp_container.set_container_data(data)
container_data_list.append(temp_container)
# print(temp_container)
return container_data_list
def get_container_data_groupby(self):
""" columns = ['container_id', 'machine_id ', 'time_stamp', 'app_du', 'status', 'cpu_request', 'cpu_limit', 'mem_size'] filepath_or_buffer = "D:\\experiment\\data_set\\container_meta\\app_" + str(self.a) + ".csv" container_dao = pd.read_csv(filepath_or_buffer, names=columns) temp_container_dao = container_dao temp_container_dao['count'] = 0 temp_container_dao.groupby(['app_du']).count()['count'] :return: """
temp_filepath = "G:\\experiment\\data_set\\container_meta\\app_" + str(self.a) + "_groupby.csv"
container_dao_groupby = pd.read_csv(temp_filepath)
container_data_groupby_list = container_dao_groupby.values.tolist()
return container_data_groupby_list
对应的container_data类:
class ContainerData:
def __init__(self):
pass
# 数据集中的容器ID
def set_container_id(self, container_id):
self.container_id = container_id
# 数据集中的机器ID
def set_machine_id(self, machine_id):
self.machine_id = machine_id
# 数据集中的部署容器组(用于构建集群)
def set_deploy_unit(self, app_du):
self.app_du = app_du
# 数据集中的时间戳
def set_time_stamp(self, time_stamp):
self.time_stamp = time_stamp
# 数据集中的cpu限制请求
def set_cpu_limit(self, cpu_limit):
self.cpu_limit = cpu_limit
# 数据集中的cpu请求
def set_cpu_request(self, cpu_request):
self.cpu_request = cpu_request
# 数据集中请求的内存大小
def set_mem_size(self, mem_size):
self.mem_request = mem_size
# 数据集中的容器状态
def set_state(self, state):
self.state = state
pass
def set_container_type(self,container_type):
self.container_type = container_type
pass
def set_finished_time(self,finished_time):
self.finished_time = finished_time
# 用数据集初始化对象
def set_container_data(self, data):
# 容器ID
self.set_container_id(data[0])
# self.set_machine_id(data[1])
# 容器时间戳(到达时间)
self.set_time_stamp(data[1])
# 容器组
self.set_deploy_unit(data[2])
# 状态
self.set_state(data[3])
# cpu需求
self.set_cpu_request(data[4])
# cpu限制
self.set_cpu_limit(data[5])
# 内存大小
self.set_mem_size(data[6])
# 容器类型
self.set_container_type(data[7])
# 完成时间
self.set_finished_time(data[8])
最后
边栏推荐
- The art of Engineering (1): try to package things that do not need to be exposed
- 一体化实时 HTAP 数据库 StoneDB,如何替换 MySQL 并实现近百倍性能提升
- [translation] principle analysis of X Window Manager (I)
- F200——搭载基于模型设计的国产开源飞控系统无人机
- Scratch epidemic isolation and nucleic acid detection Analog Electronics Society graphical programming scratch grade examination level 3 true questions and answers analysis June 2022
- I want to say more about this communication failure
- VR全景婚礼,帮助新人记录浪漫且美好的场景
- Distinguish between basic disk and dynamic disk RAID disk redundant array
- node の SQLite
- QT中Model-View-Delegate委托代理机制用法介绍
猜你喜欢
Summary of Android interview questions of Dachang in 2022 (I) (including answers)
Reppoints: advanced order of deformable convolution
李書福為何要親自掛帥造手機?
Getting started with pytest ----- test case rules
ASEMI整流桥DB207的导通时间与参数选择
How to solve the error "press any to exit" when deploying multiple easycvr on one server?
FlutterWeb瀏覽器刷新後無法回退的解决方案
The easycvr platform reports an error "ID cannot be empty" through the interface editing channel. What is the reason?
STM32 key state machine 2 - state simplification and long press function addition
Interview shock 62: what are the precautions for group by?
随机推荐
The shell generates JSON arrays and inserts them into the database
Today in history: the mother of Google was born; Two Turing Award pioneers born on the same day
Basic configuration and use of spark
Olivetin can safely run shell commands on Web pages (Part 1)
HMS Core 机器学习服务打造同传翻译新“声”态,AI让国际交流更顺畅
分布式不来点网关都说不过去
kivy教程之在 Kivy 中支持中文以构建跨平台应用程序(教程含源码)
李书福为何要亲自挂帅造手机?
It doesn't make sense without a distributed gateway
Pytest learning ----- pytest confitest of interface automation test Py file details
基于STM32+华为云IOT设计的智能路灯
面试突击62:group by 有哪些注意事项?
Getting started with pytest ----- allow generate report
node の SQLite
编译原理——自上而下分析与递归下降分析构造(笔记)
关于这次通信故障,我想多说几句…
FlutterWeb瀏覽器刷新後無法回退的解决方案
node の SQLite
Running the service with systemctl in the container reports an error: failed to get D-Bus connection: operation not permitted (solution)
微信小程序中给event对象传递数据