当前位置:网站首页>‘CMRESHandler‘ object has no attribute ‘_timer‘,socket.gaierror: [Errno 8] nodename nor servname pro
‘CMRESHandler‘ object has no attribute ‘_timer‘,socket.gaierror: [Errno 8] nodename nor servname pro
2022-06-12 07:06:00 【繁星蓝雨】
1 错误重现
在Mac上调用Python的CMRESHandler进行elasticsearch的日志写入时,遇到如下错误。一开始还以为是自己的语法出现了错误,排查出发现问题出在库中的代码上。在网上找了一圈都没有发现解决方案,遂记录一下,方便以后他人查找。
解决问题后,成功写入es:

2 解决
出现问题的原因,是库中下面的语句获取ip时,出现了错误(报错地方socket.gethostbyname(socket.gethostname()))。
self.es_additional_fields.update({
'host': socket.gethostname(),
'host_ip': socket.gethostbyname(socket.gethostname())})
由于我们不能去修改库中的代码,因此我们只能把库中的代码单独提取出来,然后修改出错的地方,再单独引入这个文件。
修改的后handlers.py文件如下(其他关联的文件也一并的放到了该文件中):
修改的地方为使用
get_local_ip()函数,来代替报错的socket.gethostbyname(socket.gethostname())。
️:这里使用的elasticsearch版本为7.9.1,需要8以下的版本,不然会出现RequestsHttpConnection库找不到的错误(pip3 install "elasticsearch==7.9.1 -i https://pypi.tuna.tsinghua.edu.cn/simple")。
#!/usr/bin/env python3
# 链接和初始化elasticsearch
import logging
import datetime
import socket
from threading import Timer, Lock
from enum import Enum
from elasticsearch import helpers as eshelpers
from elasticsearch import Elasticsearch, RequestsHttpConnection
# from CMRESSerializer import CMRESSerializer
# from getLocal_ip import get_local_ip
# import settings
from elasticsearch.serializer import JSONSerializer
class CMRESSerializer(JSONSerializer):
def default(self, data):
try:
return super(CMRESSerializer, self).default(data)
except TypeError:
return str(data)
def get_local_ip():
""" 获取本地IP :return: """
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
except Exception as e:
print(e)
return ''
else:
return ip
class CMRESHandler(logging.Handler):
""" Elasticsearch log handler """
class AuthType(Enum):
""" Authentication types supported The handler supports - No authentication - Basic authentication """
NO_AUTH = 0
BASIC_AUTH = 1
DEVOPS_AUTH = 2
class IndexNameFrequency(Enum):
""" Index type supported the handler supports - Daily indices - Weekly indices - Monthly indices - Year indices """
DAILY = 0
WEEKLY = 1
MONTHLY = 2
YEARLY = 3
# Defaults for the class
__DEFAULT_ELASTICSEARCH_HOST = [{
'host': '10.97.138.194', 'port': 9200}]
__DEFAULT_AUTH_USER = 'elastic'
__DEFAULT_AUTH_PASSWD = '[email protected]'
__DEFAULT_USE_SSL = False
__DEFAULT_VERIFY_SSL = True
__DEFAULT_AUTH_TYPE = AuthType.NO_AUTH
__DEFAULT_INDEX_FREQUENCY = IndexNameFrequency.DAILY
__DEFAULT_BUFFER_SIZE = 1000
__DEFAULT_FLUSH_FREQ_INSEC = 1
__DEFAULT_ADDITIONAL_FIELDS = {
}
__DEFAULT_ES_INDEX_NAME = 'python_logger'
__DEFAULT_ES_DOC_TYPE = '_doc'
__DEFAULT_RAISE_ON_EXCEPTION = False
__DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp"
__DEFAULT_ISO_TIMESTAMP_FIELD_NAME = "iso_timestamp"
__LOGGING_FILTER_FIELDS = ['msecs',
'relativeCreated',
'levelno',
'created']
@staticmethod
def _get_daily_index_name(es_index_name):
""" Returns elasticearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date. """
# return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m.%d'))
return es_index_name
@staticmethod
def _get_weekly_index_name(es_index_name):
""" Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific week """
# current_date = datetime.datetime.now()
# start_of_the_week = current_date - datetime.timedelta(days=current_date.weekday())
# return "{0!s}-{1!s}".format(es_index_name, start_of_the_week.strftime('%Y.%m.%d'))
return es_index_name
@staticmethod
def _get_monthly_index_name(es_index_name):
""" Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific moth """
# return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m'))
return es_index_name
@staticmethod
def _get_yearly_index_name(es_index_name):
""" Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific year """
# return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y'))
return es_index_name
_INDEX_FREQUENCY_FUNCION_DICT = {
IndexNameFrequency.DAILY: _get_daily_index_name,
IndexNameFrequency.WEEKLY: _get_weekly_index_name,
IndexNameFrequency.MONTHLY: _get_monthly_index_name,
IndexNameFrequency.YEARLY: _get_yearly_index_name
}
def __init__(self,
hosts=__DEFAULT_ELASTICSEARCH_HOST,
auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD),
auth_type=__DEFAULT_AUTH_TYPE,
use_ssl=__DEFAULT_USE_SSL,
verify_ssl=__DEFAULT_VERIFY_SSL,
buffer_size=__DEFAULT_BUFFER_SIZE,
flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC,
es_index_name=__DEFAULT_ES_INDEX_NAME,
index_name_frequency=__DEFAULT_INDEX_FREQUENCY,
es_doc_type=__DEFAULT_ES_DOC_TYPE,
es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS,
raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION,
default_iso_timestamp_field_name=__DEFAULT_ISO_TIMESTAMP_FIELD_NAME,
default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME):
""" Handler constructor :param hosts: The list of hosts that elasticsearch clients will connect. The list can be provided in the format ```[{'host':'host1','port':9200}, {'host':'host2','port':9200}]```to make sure the client supports failover of one of the instertion nodes :param auth_details: When ```CMRESHandler.AuthType.BASIC_AUTH```is used this argument must contain a tuple of string with the user and password that will be used to authenticate against the Elasticsearch servers, for example```('User','Password') :param auth_type: The authentication type to be used in the connection ```CMRESHandler.AuthType``` Currently, NO_AUTH, BASIC_AUTH, DEVOPS_AUTH are supported :param use_ssl: A boolean that defines if the communications should use SSL encrypted communication :param verify_ssl: A boolean that defines if the SSL certificates are validated or not :param buffer_size: An int, Once this size is reached on the internal buffer results are flushed into ES :param flush_frequency_in_sec: A float representing how often and when the buffer will be flushed, even if the buffer_size has not been reached yet :param es_index_name: A string with the prefix of the elasticsearch index that will be created. Note a date with YYYY.MM.dd, ```python_logger```used by default :param index_name_frequency: Defines what the date used in the postfix of the name would be. available values are selected from the IndexNameFrequency class (IndexNameFrequency.DAILY, IndexNameFrequency.WEEKLY, IndexNameFrequency.MONTHLY, IndexNameFrequency.YEARLY). By default it uses daily indices. :param es_doc_type: A string with the name of the document type that will be used ```python_log```used by default :param es_additional_fields: A dictionary with all the additional fields that you would like to add to the logs, such the application, environment, etc. :param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptions caused when :return: A ready to be used CMRESHandler. """
logging.Handler.__init__(self)
self.hosts = hosts
self.auth_details = auth_details
self.auth_type = auth_type
self.use_ssl = use_ssl
self.verify_certs = verify_ssl
self.buffer_size = buffer_size
self.flush_frequency_in_sec = flush_frequency_in_sec
self.es_index_name = es_index_name
self.index_name_frequency = index_name_frequency
self.es_doc_type = es_doc_type
self.es_additional_fields = es_additional_fields.copy()
# 原始的报错:socket.gaierror: [Errno 8] nodename nor servname provided, or not known
# self.es_additional_fields.update({'host': socket.gethostname(),
# 'host_ip': socket.gethostbyname(socket.gethostname())})
# 替换为下面的内容
self.es_additional_fields.update({
'host': socket.gethostname(),
'host_ip': get_local_ip()})
self.raise_on_indexing_exceptions = raise_on_indexing_exceptions
self.default_iso_timestamp_field_name = default_iso_timestamp_field_name
self.default_timestamp_field_name = default_timestamp_field_name
self._client = None
self._buffer = []
self._buffer_lock = Lock()
self._timer = None
self._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[self.index_name_frequency]
self.serializer = CMRESSerializer()
def __schedule_flush(self):
if self._timer is None:
self._timer = Timer(self.flush_frequency_in_sec, self.flush)
self._timer.setDaemon(True)
self._timer.start()
def __get_es_client(self):
if self.auth_type == CMRESHandler.AuthType.NO_AUTH:
if self._client is None:
self._client = Elasticsearch(hosts=self.hosts,
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
connection_class=RequestsHttpConnection,
serializer=self.serializer)
return self._client
if self.auth_type == CMRESHandler.AuthType.BASIC_AUTH:
if self._client is None:
return Elasticsearch(hosts=self.hosts,
http_auth=self.auth_details,
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
connection_class=RequestsHttpConnection,
serializer=self.serializer)
return self._client
raise ValueError("Authentication method not supported")
def test_es_source(self):
""" Returns True if the handler can ping the Elasticsearch servers :return: A boolean, True if the connection against elasticserach host was successful """
return self.__get_es_client().ping()
@staticmethod
def __get_es_datetime_str(timestamp):
""" Returns elasticsearch utc formatted time for an epoch timestamp :param timestamp: epoch, including milliseconds :return: A string valid for elasticsearch time record """
current_date = datetime.datetime.utcfromtimestamp(timestamp)
return "{0!s}.{1}".format(
datetime.datetime.strftime(current_date + datetime.timedelta(hours=8), '%Y-%m-%dT%H:%M:%S'),
int(current_date.microsecond))
def flush(self):
""" Flushes the buffer into ES :return: None """
if self._timer is not None and self._timer.is_alive():
self._timer.cancel()
self._timer = None
if self._buffer:
try:
with self._buffer_lock:
logs_buffer = self._buffer
self._buffer = []
actions = (
{
'_index': self._index_name_func.__func__(self.es_index_name),
'_type': self.es_doc_type,
'_source': log_record
}
for log_record in logs_buffer
)
eshelpers.bulk(
client=self.__get_es_client(),
actions=actions,
stats_only=True
)
except Exception as exception:
if self.raise_on_indexing_exceptions:
raise exception
def close(self):
""" Flushes the buffer and release any outstanding resource :return: None """
if self._timer is not None:
self.flush()
self._timer = None
def emit(self, record):
""" Emit overrides the abstract logging.Handler logRecord emit method Format and records the log :param record: A class of type ```logging.LogRecord``` :return: None """
self.format(record)
rec = self.es_additional_fields.copy()
for key, value in record.__dict__.items():
if key not in CMRESHandler.__LOGGING_FILTER_FIELDS:
rec[key] = "" if value is None else value
rec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created)
with self._buffer_lock:
self._buffer.append(rec)
if len(self._buffer) >= self.buffer_size:
self.flush()
else:
self.__schedule_flush()
由于下面的库作用不大,就没有放在上面的文件中:
try:
from requests_kerberos import HTTPKerberosAuth, DISABLED
CMR_KERBEROS_SUPPORTED = True
except ImportError:
CMR_KERBEROS_SUPPORTED = False
try:
from requests_aws4auth import AWS4Auth
AWS4AUTH_SUPPORTED = True
except ImportError:
AWS4AUTH_SUPPORTED = False
3 调用方法写入es
调用的方法如下:
import logging
from handlers import CMRESHandler
LOG_LEVEL = 'DEBUG' # 日志级别
LOG_FORMAT = '%(levelname)s - %(asctime)s - process: %(process)d - %(filename)s - %(name)s - %(lineno)d - %(module)s - %(message)s' # 每条日志输出格式
ELASTIC_SEARCH_HOST = 'localhost' # Elasticsearch Host
ELASTIC_SEARCH_PORT = 9200 # Elasticsearch Port
ELASTIC_SEARCH_INDEX = 'test_log3' # Elasticsearch Index Name # system_log_24172335668239631
APP_ENVIRONMENT = 'dev' # 运行环境,如测试环境还是生产环境
ELASTICSEARCH_USER = 'admin'
ELASTICSEARCH_PASSWORD = 'admin'
es_handler = CMRESHandler(hosts=[{
'host': ELASTIC_SEARCH_HOST, 'port': ELASTIC_SEARCH_PORT}],
# 用户名和密码
auth_details=(ELASTICSEARCH_USER, ELASTICSEARCH_PASSWORD),
# 可以配置对应的认证权限
auth_type=CMRESHandler.AuthType.BASIC_AUTH,
# 索引值
es_index_name=ELASTIC_SEARCH_INDEX,
# 额外增加环境标识
es_additional_fields={
'environment': APP_ENVIRONMENT}
)
# 被注释的格式并未起任何作用
# es_handler.setLevel(level=LOG_LEVEL)
# formatter = logging.Formatter(LOG_FORMAT)
# es_handler.setFormatter(formatter)
logger = logging.getLogger('test')
logger.setLevel(LOG_LEVEL)
logger.addHandler(es_handler)
logger.debug('test write es2')
if __name__ == '__main__':
pass
4 扩展
边栏推荐
- What is the difference between < t > and object?
- Day 4 of pyhon
- Go common usage
- 2 variables and basic types
- RT thread studio learning (IX) TF Card File System
- 公众号也能带货?
- Leetcode: Sword finger offer 67 Convert string to integer [simulation + segmentation + discussion]
- MySQL multiple SQL batch operations (crud) in JDBC
- Matlab 6-DOF manipulator forward and inverse motion
- Host computer development (firmware download software requirement analysis)
猜你喜欢

paddlepaddl 28 支持任意维度数据的梯度平衡机制GHM Loss的实现(支持ignore_index、class_weight,支持反向传播训练,支持多分类)

6 functions

9 Sequence container

8. form label

Detailed explanation of coordinate tracking of TF2 operation in ROS (example + code)

JDE 对象管理工作平台介绍及 From 的使用

Static coordinate transformation in ROS (analysis + example)

Detailed principle of 4.3-inch TFTLCD based on warship V3

Error mcrypt in php7 version of official encryption and decryption library of enterprise wechat_ module_ Open has no method defined and is discarded by PHP. The solution is to use OpenSSL

Meituan won the first place in fewclue in the small sample learning list! Prompt learning+ self training practice
随机推荐
Jackson XML is directly converted to JSON without writing entity classes manually
Detailed explanation of convirt paper (medical pictures)
RT thread studio learning (VIII) connecting Alibaba cloud IOT with esp8266
12.13-12.19 summary
NOI openjudge 计算2的N次方
Leetcode: Sword finger offer 63 Maximum profit of stock [record prefix minimum and or no brain segment tree]
SQL Server 2019 installation error. How to solve it
Decoupling in D
新知识:Monkey 改进版之 App Crawler
Detailed explanation of 14 registers in 8086CPU
Vscode Common plug - in
Detailed principle of 4.3-inch TFTLCD based on warship V3
企业微信官方 加解密库 PHP7版本报错 mcrypt_module_open 未定义方法 并且被PHP抛弃 解决方法使用 openssl解决
Introduction to JDE object management platform and use of from
9 Sequence container
Cron expression and website generation
库里扛起了勇士对凯尔特人的第四场
Kali与编程:如何快速搭建OWASP网站安全实验靶场?
[data clustering] data set, visualization and precautions are involved in this column
Thoroughly understand the "rotation matrix / Euler angle / quaternion" and let you experience the beauty of three-dimensional rotation