当前位置:网站首页>2019阿里集群数据集使用总结

2019阿里集群数据集使用总结

2022-07-06 10:05:00 5-StarrySky

2019阿里集群数据集使用总结

数据集中的一些细节

  • 在离线负载的实例表格(batch_instance)中,实例的开始时间和结束时间有问题,如果你仔细去查,就会发现实例的开始时间居然会比实例的结束时间要大,所以减出来是负数,我一开始的时候调BUG怎么都没发现,知道有一天注意到我的任务量是负数,我才发现怎么回事,这里也给出我的任务量的定义:

    cloudlet_total = self.cpu_avg * (self.finished_time_data_set - self.start_time_data_set) 
    cloudmem_total = self.mem_avg * (self.finished_time_data_set - self.start_time_data_set)
    
  • 第二个坑,就是上面的CPU的数量,对于离线负载来说,instance里面的CPU数量都是乘以了100的数量,所以你需要做到标准化,因为机器节点中CPU的数量是不要除以100的,就是96,同理container_mate的CPU数量也需要注意,如果是400之类的,就需要再除以100

创建出一个JOB和APP

    # 拼接出APP 一个APP包括多个容器
    def create_app(self):
        container_dao = ContainerDao(self.a)
        container_data_list = container_dao.get_container_list()
        container_data_groupby_list = container_dao.get_container_data_groupby()
        for app_du,count in container_data_groupby_list:
            self.app_num += 1
            num = 0
            app = Apps(app_du)
            index_temp = -1
            for container_data in container_data_list:
                if container_data.app_du == app_du:
                    index_temp += 1
                    container = Container(app_du)
                    container.set_container(container_data)
                    container.index = index_temp
                    app.add_container(container)
                    num += 1
                if num == count:
                    # print(app.app_du + "")
                    # print(num)
                    app.set_container_num(num)
                    self.App_list.append(app)
                    break
     	
        
        # 创建出JOB
        def create_job(self):
        instance_dao = InstanceDao(self.b)
        task_dao = TaskDao(self.b)
        task_data_list = task_dao.get_task_list()
        job_group_by = task_dao.get_job_group_by()
        instance_data_list = instance_dao.get_instance_list()
        # instance_data_list = instance_dao.get_instance_list()
        for job_name,count in job_group_by:
            self.job_num += 1
            job = Job(job_name)
            for task_data in task_data_list:
                if task_data.job_name == job_name:
                    task = Task(job_name)
                    task.set_task(task_data)
                    for instance_data in instance_data_list:
                        if instance_data.job_name == job_name and instance_data.task_name == task_data.task_name:
                            instance = Instance(task_data.task_name,job_name)
                            instance.set_instance(instance_data)
                            task.add_instance(instance)
                            pass
                        pass
                    task.prepared()
                    job.add_task(task)
            pass
            # 以下语句可以去掉,去掉就表示去掉了DAG关系
            job.DAG()
            self.job_list.append(job)
            
	# 创建DAG关系
    def DAG(self):
        for task in self.task_list:
            str_list = task.get_task_name().replace(task.get_task_name()[0],"")
            num_list = str_list.split('_')
            # 当前任务的序列号 M1
            start = num_list[0]
            del num_list[0]
            # 后面指向前面 M2_1 1指向2 1是2的前驱 2是1的后继
            for task_1 in self.task_list:
                str_list_1 = task_1.get_task_name().replace(task_1.get_task_name()[0], "")
                num_list_1 = str_list_1.split('_')
                del num_list_1[0]
                # 当前任务
                for num in num_list_1:
                    if num == start:
                        # 保存后继
                        task.add_subsequent(task_1.get_task_name())
                        # 保存前驱
                        task_1.add_precursor(task.get_task_name())
                        # task_1.add_subsequent(task.get_task_name())
                        pass
                    pass
                pass
            pass
        pass

怎么读取数据并创建对应实例

import pandas as pd

from data.containerData import ContainerData

""" 数据集中读取容器数据 属性: a:选择哪一个数据集, = 1 :表示数据集1 容器124个 有20组 = 2 :表示数据集2 容器500个 有66组 = 3 :表示数据集3 容器1002个 有122组 """
# 这里仅以contaienr为例
class ContainerDao:
    def __init__(self, a):
    # 行数
        self.a = a
    # 从数据集中把容器数据读取出来,并存入contaianer_list
    def get_container_list(self):
        # 从数据集中提取数据: 0容器ID 1机器ID 2时间戳 3部署域 4状态 5需要的cpu数量 6cpu限制数量 7内存大小
        columns = ['container_id','time_stamp','app_du','status','cpu_request','cpu_limit','mem_size','container_type','finished_time']
        filepath_or_buffer = "G:\\experiment\\data_set\\container_meta\\app_"+str(self.a)+".csv"
        container_dao = pd.read_csv(filepath_or_buffer,  names=columns)
        temp_list = container_dao.values.tolist()
        # print(temp_list)
        container_data_list = list()
        for data in temp_list:
            # print(data)
            temp_container = ContainerData()
            temp_container.set_container_data(data)
            container_data_list.append(temp_container)
            # print(temp_container)
        return container_data_list
    def get_container_data_groupby(self):
        """ columns = ['container_id', 'machine_id ', 'time_stamp', 'app_du', 'status', 'cpu_request', 'cpu_limit', 'mem_size'] filepath_or_buffer = "D:\\experiment\\data_set\\container_meta\\app_" + str(self.a) + ".csv" container_dao = pd.read_csv(filepath_or_buffer, names=columns) temp_container_dao = container_dao temp_container_dao['count'] = 0 temp_container_dao.groupby(['app_du']).count()['count'] :return: """
        temp_filepath = "G:\\experiment\\data_set\\container_meta\\app_" + str(self.a) + "_groupby.csv"
        container_dao_groupby = pd.read_csv(temp_filepath)
        container_data_groupby_list = container_dao_groupby.values.tolist()
        return container_data_groupby_list

对应的container_data类:

class ContainerData:
    def __init__(self):
        pass
    # 数据集中的容器ID
    def set_container_id(self, container_id):
        self.container_id = container_id

    # 数据集中的机器ID
    def set_machine_id(self, machine_id):
        self.machine_id = machine_id

    # 数据集中的部署容器组(用于构建集群)
    def set_deploy_unit(self, app_du):
        self.app_du = app_du

    # 数据集中的时间戳
    def set_time_stamp(self, time_stamp):
        self.time_stamp = time_stamp

    # 数据集中的cpu限制请求
    def set_cpu_limit(self, cpu_limit):
        self.cpu_limit = cpu_limit

    # 数据集中的cpu请求
    def set_cpu_request(self, cpu_request):
        self.cpu_request = cpu_request

    # 数据集中请求的内存大小
    def set_mem_size(self, mem_size):
        self.mem_request = mem_size

    # 数据集中的容器状态
    def set_state(self, state):
        self.state = state
        pass

    def set_container_type(self,container_type):
        self.container_type = container_type
        pass

    def set_finished_time(self,finished_time):
        self.finished_time = finished_time

    # 用数据集初始化对象
    def set_container_data(self, data):
        # 容器ID
        self.set_container_id(data[0])
        # self.set_machine_id(data[1])
        # 容器时间戳(到达时间)
        self.set_time_stamp(data[1])
        # 容器组
        self.set_deploy_unit(data[2])
        # 状态
        self.set_state(data[3])
        # cpu需求
        self.set_cpu_request(data[4])
        # cpu限制
        self.set_cpu_limit(data[5])
        # 内存大小
        self.set_mem_size(data[6])
        # 容器类型
        self.set_container_type(data[7])
        # 完成时间
        self.set_finished_time(data[8])

最后

在这里插入图片描述

原网站

版权声明
本文为[5-StarrySky]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_37431461/article/details/125524164