当前位置:网站首页>持续交付(一)JenkinsAPI接口调用

持续交付(一)JenkinsAPI接口调用

2022-08-02 16:27:00 M1kasal

前言

本章主要讲述:Jenkins_API接口调用

这里会简单讲下:
	1、自己使用request调用JenkinsAPI
	2、利用已有的jekinsapi库,简单封装了一个jenkins调用的类


一、Jenkins_API简要介绍

  • Jenkins_API:即Jenkins对外暴露的动作交互入口,为外部程序提供入口,可以控制Jenkins

    支持协议:Http

    常用功能:运行Job,查看任务状态,返回任务编号



二、request调用JenkinsAPI

  • 这里简单介绍下request进行调用JenkinsAPI,详细看下注释,其实很简单
"""
    该类调用jenkins_api接口
        1、获取任务的最新编号
        2、获取任务的详细信息
"""
import json

import requests

# 注意:这个地址前面部分
# mikasa:yy1998123 是你的jenkins用户名和密码
# 127.0.0.1:8080 是本地jenkins域名+端口号
url = "http://mikasa:[email protected]:8080/jenkins/job/"


def get_jenkins_url(job_name):
    """
    拼接url+任务job
    :return:
    """
    # print("拼接url为:", url + job_name)
    return url + job_name


def send_api(req, tools="requests"):
    """
    对发送接口测试的工具进行封装(可以使用urlib3/requests)
    :param tools:
    :param req:
    :return:
    """
    if tools == "requests":
        return requests.request(**req)


def get_latest_job_number():
    """
    1、获取最新任务编号
    :return:
    """
    req = {
    
        "method": "GET",
        "url": get_jenkins_url("mikasa_demo001") + "/lastBuild/buildNumber",
    }
    res = send_api(req)
    print("最新任务编号:", res.json())


def get_job_info():
    """
    2、获取job详细信息
    :return:
    """
    req = {
    
        "method": "GET",
        "url": get_jenkins_url("mikasa_demo001") + "/api/json",
    }
    res = send_api(req)
    res_json = json.dumps(res.json(), indent=2)
    print("返回结果:", res_json)


get_latest_job_number()
get_job_info()

在这里插入图片描述



三、使用jenkins api库调用

  • 这里介绍下利用jenkins api库去调用,上面是我们自己去写request请求去调用,而其实目前已经存在了轮子,我们直接使用Jenkins api即可

1、下载jenkinsapi库

在这里插入图片描述


2、封装jenkins调用

2.1、jenkins_api.py

在这里插入图片描述

"""
    封装jenkins调用类
"""
import configparser
import datetime
import logging
import os
import re
from jenkinsapi.jenkins import Jenkins

logging.basicConfig(level=logging.INFO, format='[%(asctime)s] - [%(name)s] - [%(levelname)s] - [%(message)s]')
log = logging.getLogger(__name__)


def get_jenkins_config(chose):
    """
    读取Jenkins配置:从配置文件中jenkins_server.ini
    :param chose:
    :return:
    """
    config = configparser.ConfigParser()
    # 读取配置
    config.read(os.path.join(os.getcwd(), 'jenkins_server.ini'))
    username = config.get(chose, 'username')
    password = config.get(chose, 'password')
    host = config.get(chose, 'host')
    port = config.get(chose, 'port')
    url = "http://" + host + ":" + port
    return url, username, password


class JenkinsDemo:
    def __init__(self, job_name, chose='jenkins'):
        """
        初始化,拿到jenkins配置
        :param job_name:
        :param chose:
        """
        self.job_name = job_name
        config = get_jenkins_config(chose)
        print("config:", config)
        # 解包元祖
        self.jk = Jenkins(*config, useCrumb=True)

    def __get_job_from_keys(self):
        """
        拿到所有的job名称
        :return: 返回一个列表
        """
        choose_list = []
        print(self.jk.keys())
        for my_job_name in self.jk.keys():
            # 遍历拿到所有的job,判断当前job是否在job列表里面,在的话添加到自定义列表
            if self.job_name in my_job_name:
                choose_list.append(my_job_name)
        return choose_list

    def __job_build(self, my_job_name):
        """
        构建job
        :param my_job_name:
        :return:
        """
        if self.jk.has_job(my_job_name):
            # 如果有这个job拿到他里面的job对象
            my_job = self.jk.get_job(my_job_name)
            if not my_job.is_queued_or_running():
                # 如果job当前没有在运行的话,就运行
                try:
                    # 若当前没有在跑的话,拿到最后一次构建数
                    last_build = my_job.get_last_buildnumber()
                except:
                    # 若没有获取到最后一次构建数的话,默认置为0
                    last_build = 0
                # 最新构建数+1
                build_num = last_build + 1
                try:
                    # 开始打包
                    self.jk.build_job(my_job_name)
                except Exception as e:
                    log.error(str(e))

                # 循环判断Jenkins是否打包完成
                while True:
                    # 若当前任务没有运行才获取信息
                    if not my_job.is_queued_or_running():
                        # 拿到最新一次的大奥信息
                        count_build = my_job.get_build(build_num)
                        # 获取打包开始时间
                        start_time = count_build.get_timestamp() + datetime.timedelta(hours=8)
                        # 获取打包日志
                        console_out = count_build.get_console()
                        # 获取打包状态
                        status = count_build.get_status()
                        # 获取变更内容
                        change = count_build.get_changeset_items()
                        log.info(" " + str(start_time) + " 发起的" + my_job_name + "构建已经完成,构建的状态为:" + status)
                        p2 = re.compile(r".*ERROR.*")
                        err_list = p2.findall(console_out)
                        log.info("打包日志为:" + str(console_out))
                        if status == "SUCCESS":
                            if len(change) > 0:
                                for data in change:
                                    for file_list in data["affectedPaths"]:
                                        log.info("发起的" + my_job_name + "变更的类:" + file_list)
                                    log.info("发起的" + my_job_name + "变更的备注:" + data["msg"])
                                    log.info("发起的" + my_job_name + "变更的提交人:" + data["author"]["fullName"])
                            else:
                                log.info("发起的" + my_job_name + "构建没有变更内容")

                            if len(err_list) > 0:
                                log.warning("构建的" + my_job_name + "构建状态为成功,但包含了以下错误:")
                                for error in err_list:
                                    log.error(error)
                        else:
                            if len(err_list) > 0:
                                log.warning("构建的" + my_job_name + "包含了以下错误:")
                                for error in err_list:
                                    log.error(error)
                        break
            else:
                log.warning("发起的" + my_job_name + "Jenkins is running")
        else:
            log.warning("发起的" + my_job_name + "没有该服务")

    def run(self):
        my_job_name = self.__get_job_from_keys()
        if len(my_job_name) == 1:
            self.__job_build(my_job_name[0])
        elif len(my_job_name) == 0:
            log.error("输入的job名称不正确!")


if __name__ == '__main__':
    jk = JenkinsDemo("mikasa_demo001")
    jk.run()

2.2、jenkins_server.ini

在这里插入图片描述

[jenkins]
username=mikasa
password=yy1998123
host=127.0.0.1
port=8080
原网站

版权声明
本文为[M1kasal]所创,转载请带上原文链接,感谢
https://blog.csdn.net/Makasa/article/details/126119348