当前位置:网站首页>sqlalchemy event listen Automatic generate CRUD excel
sqlalchemy event listen Automatic generate CRUD excel
2022-06-26 12:33:00 【Edison Dont】
import re
from sqlalchemy import event, MetaData, create_engine
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
from openpyxl.styles import Alignment
from openpyxl.styles.borders import Border, Side, BORDER_THIN
import models
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
if not hasattr(common.thread_local, 'current_request'):
return
print("request:{}\r\nendpoint:{}\r\napi_url:{} {}\r\nstatement:{}\r\n".format(common.thread_local.current_request, common.thread_local.current_request.endpoint, common.thread_local.current_request.path, common.thread_local.current_request.method, statement))
print("\r\n")
innert_table_columns_to_excel(common.thread_local.current_request.endpoint, "{} {}".format(common.thread_local.current_request.path, common.thread_local.current_request.method), statement)
@app.before_request
def before_request():
common.thread_local.current_request = request
event.listen(models.engine, "after_cursor_execute", receive_after_cursor_execute)
@app.after_request
def after_request(response):
if event.contains(models.engine, "after_cursor_execute", receive_after_cursor_execute):
event.remove(models.engine, "after_cursor_execute", receive_after_cursor_execute)
schema = "public"
engine = create_engine(
'postgresql://{db user name}:{password}@{host}:{port}/{db name}',
pool_size=5,
max_overflow=10,
connect_args={
'options': '-c search_path={}'.format(schema)}
)
def innert_table_columns_to_excel(endpoint, api_url, statement):
thin_border = Border(
left=Side(border_style=BORDER_THIN, color='00000000'),
right=Side(border_style=BORDER_THIN, color='00000000'),
top=Side(border_style=BORDER_THIN, color='00000000'),
bottom=Side(border_style=BORDER_THIN, color='00000000')
)
work_book = load_workbook(filename='D:/CRUD.xlsx')
sheet = work_book['Sheet1']
meta = MetaData()
meta.reflect(bind=engine)
# Begin of table names: (6, 9) -> I6
table_name_col_begin = 9
table_name_row_begin = 6
# Begin of table columns: (7, 9) -> I7
column_name_col_begin = 9
column_name_row_begin = 7
object_table_list = ["table_name1", "table_name2"]
endpoint_col_begin = 2
api_col_begin = 3
api_row_begin = 8
api_list = [
{
"url": "/ GET", "endpoint": "login"},
{
"url": "/login GET", "endpoint": "login"},
{
"url": "/login POST", "endpoint": "login"}
]
select_table_columns = []
insert_table_columns = {
"table_name": None, "column_names": []}
update_table_columns = {
"table_name": None, "column_names": []}
delete_table_columns = {
"table_name": None, "column_names": []}
if statement.startswith("SELECT "):
select_pattern = re.compile('(\\s([a-z_]{1,})\\.\\"?([a-z_]{1,})\\"?\\sAS\\s([a-z_1-9]{1,})[,\\s])')
table_column_matches = select_pattern.findall(statement)
if not table_column_matches:
select_pattern = re.compile('(\\spublic.([a-z_]{1,})\\.\\"?([a-z_]{1,})\\"?\\sAS\\s([a-z_1-9]{1,})[,\\s])')
table_column_matches = select_pattern.findall(statement)
for table_column_match in table_column_matches:
found_by_table_name = next(filter(lambda item: "table_name" in item and item["table_name"] == table_column_match[1], select_table_columns), None)
if found_by_table_name:
found_by_table_name["column_names"].append(table_column_match[2])
else:
select_table_columns.append({
"table_name": table_column_match[1], "column_names": [table_column_match[2]]})
print("table_name:{}, column_name:{}".format(table_column_match[1], table_column_match[2]))
if statement.startswith("INSERT INTO"):
insert_pattern = re.compile('INSERT\\sINTO\\s([a-z_]{1,})\\s\\(([a-z_\\,\\s\\"]{1,})\\)\\sVALUES\\s')
matches = insert_pattern.findall(statement)
if not matches:
insert_pattern = re.compile('INSERT\\sINTO\\spublic.([a-z_]{1,})\\s\\(([a-z_\\,\\s\\"]{1,})\\)\\sVALUES\\s')
matches = insert_pattern.findall(statement)
for match in matches:
insert_table_columns.update({
"table_name": match[0]})
print("table_name:{}, column_names:{}".format(match[0], match[1]))
columns = match[1].split(",")
for column in columns:
replaced_column = column.replace("\"", "")
insert_table_columns["column_names"].append(replaced_column.strip())
print(replaced_column.strip())
if statement.startswith("UPDATE "):
update_pattern = re.compile('UPDATE\\s([a-z_]{1,})\\sSET\\s([a-z_\\=\\%\\(\\)\\,\\s\\"]{1,})\\s')
matches = update_pattern.findall(statement)
if not matches:
update_pattern = re.compile('UPDATE\\spublic.([a-z_]{1,})\\sSET\\s([a-z_\\=\\%\\(\\)\\,\\s\\"]{1,})\\s')
matches = update_pattern.findall(statement)
for match in matches:
update_table_columns.update({
"table_name": match[0]})
print("table_name:{}".format(match[0]))
column_pattern = re.compile('([a-z_]{1,})=')
column_matches = column_pattern.findall(match[1])
print("column_names:")
for column in column_matches:
replaced_column = column.replace("\"", "")
update_table_columns["column_names"].append(replaced_column)
print(replaced_column)
if statement.startswith("DELETE "):
delete_pattern = re.compile('DELETE\\sFROM\\s([a-z_]{1,})\\s')
matches = delete_pattern.findall(statement)
if not matches:
delete_pattern = re.compile('DELETE\\sFROM\\spublic.([a-z_]{1,})\\s')
matches = delete_pattern.findall(statement)
for match in matches:
delete_table_columns.update({
"table_name": match})
print("table name:{}".format(match))
found_row = False
for api in api_list:
current_cell = sheet.cell(row=api_row_begin, column=api_col_begin)
current_cell.border = thin_border
current_cell.alignment = Alignment(horizontal='left', vertical='center')
api_begin_position = current_cell.coordinate
api_end_position = get_column_letter(api_col_begin + 5) + str(api_row_begin)
sheet[api_begin_position].value = api["url"]
merge_range = api_begin_position + ":" + api_end_position
sheet.merge_cells(merge_range) # Set merge on a cell range. Range is a cell range (e.g. A1:E1)
current_cell = sheet.cell(row=api_row_begin, column=endpoint_col_begin)
current_cell.value = api["endpoint"]
found_row = True if api["url"] == api_url and api["endpoint"] == endpoint else False
found_row_for_login = True if api["endpoint"] != "login" and api["endpoint"] == endpoint else False
found_row = found_row_for_login or found_row
column_offset = 0
sheet.row_dimensions[column_name_row_begin].height = 120
for table_name in meta.tables:
if table_name not in object_table_list:
continue
current_cell = sheet.cell(row=table_name_row_begin, column=table_name_col_begin + column_offset)
current_cell.border = thin_border
current_cell.alignment = Alignment(horizontal='center', vertical='center')
table_begin_position = current_cell.coordinate
sheet[table_begin_position].value = table_name
column_list = [item for item in meta.tables[table_name].columns]
table_end_position = get_column_letter(table_name_col_begin + column_offset + len(column_list) - 1) + str(table_name_row_begin)
merge_range = table_begin_position + ":" + table_end_position
sheet.merge_cells(merge_range) # Set merge on a cell range. Range is a cell range (e.g. A1:E1)
for column in column_list:
current_cell = sheet.cell(row=column_name_row_begin, column=column_name_col_begin + column_offset)
sheet.column_dimensions[get_column_letter(column_name_col_begin + column_offset)].width = 5
current_cell.border = thin_border
current_cell.alignment = Alignment(textRotation=90, horizontal='center', vertical='center')
sheet[current_cell.coordinate].value = column.name
current_cell_detail = sheet.cell(row=api_row_begin, column=column_name_col_begin + column_offset)
current_cell_detail.border = thin_border
curd_symbol = ""
if delete_table_columns["table_name"] == table_name and found_row:
curd_symbol = "D"
found_by_table_name = next(filter(lambda item: "table_name" in item and item["table_name"] == table_name, select_table_columns), None)
if found_by_table_name and found_row and column.name in found_by_table_name["column_names"]:
curd_symbol = "R"
if insert_table_columns["table_name"] == table_name and found_row and column.name in insert_table_columns["column_names"]:
curd_symbol = "C"
if update_table_columns["table_name"] == table_name and found_row and column.name in update_table_columns["column_names"]:
curd_symbol = "U"
sheet[current_cell_detail.coordinate].value = curd_symbol if sheet[current_cell_detail.coordinate].value is None else sheet[current_cell_detail.coordinate].value + curd_symbol if curd_symbol not in sheet[current_cell_detail.coordinate].value else sheet[current_cell_detail.coordinate].value
ordered_symbol_list = ["C", "R", "U", "D"]
ordered_symbol_result = ""
for symbol in ordered_symbol_list:
if symbol in sheet[current_cell_detail.coordinate].value:
ordered_symbol_result += symbol
sheet[current_cell_detail.coordinate].value = ordered_symbol_result
column_offset = column_offset + 1
api_row_begin = api_row_begin + 1
work_book.save(filename='D:/CRUD.xlsx')
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, threaded=True)
边栏推荐
- 7-16 货币系统Ⅰ
- 我想知道同花顺是炒股的么?在线开户安全么?
- PHP calculates excel coordinate values, starting with subscript 0
- 房租是由什么决定的
- 大智慧哪个开户更安全,更好点
- PHP returns false when calling redis method decrby less than 0
- How to do well in member marketing three steps to teach you to understand member management
- 2022 edition of China's cotton chemical fiber printing and dyeing Market Status Investigation and Prospect Forecast Analysis Report
- 2022 edition of China's medical robot industry investment status investigation and prospect dynamic analysis report
- [redis series] redis learning 16. Redis Dictionary (map) and its core coding structure
猜你喜欢

New routing file in laravel framework

SQL injection in Pikachu shooting range
![[solved] laravel completes the scheduled job task (delayed distribution task) [execute a user-defined task at a specified time]](/img/13/c2c63333a9e5ac08b339449ea17654.jpg)
[solved] laravel completes the scheduled job task (delayed distribution task) [execute a user-defined task at a specified time]

Introduction to the four major FPGA manufacturers abroad

Php+laravel5.7 use Alibaba oss+ Alibaba media to process and upload image / video files

Deep thinking from senior member managers

PHP uses laravel pay component to quickly access wechat jsapi payment (wechat official account payment)

Build Pikachu shooting range and introduction

HUST network attack and defense practice | 6_ IOT device firmware security experiment | Experiment 2 MPU based IOT device attack mitigation technology

Scala-day03- operators and loop control
随机推荐
HUST network attack and defense practice | 6_ IOT device firmware security experiment | Experiment 2 MPU based IOT device attack mitigation technology
Seven major trends deeply affecting the U.S. consumer goods industry in 2022
New routing file in laravel framework
One click deployment CEPH script
How can we reach members more effectively?
7-3 最低通行费
4. N queen problem
imagecopymerge
Lintcode 130 · 堆化
PHP calculates excel coordinate values, starting with subscript 0
File decryption in webgame development
Five trends of member management in 2022
"Pinduoduo and short video speed version", how can I roast!
MS17_ 010 utilization summary
Research on the current situation of China's modified engineering plastics market and demand forecast analysis report 2022-2028
I want to know whether flush is a stock market? Is online account opening safe?
Laravel uses find_ IN_ The set() native MySQL statement accurately queries whether a special string exists in the specified string to solve the problem that like cannot be accurately matched. (resolve
The transformation of enterprise customers' digital assets needs to suit the case
JS how to judge when data contains integer and floating-point types. Floating-point decimals retain two digits after the decimal point
Wechat applet wx Request request encapsulation