Python重写Logstash,把NginxAccessLog清洗后汇入ElasticDB
Step1. 修改Nginx的log格式(改为JSON格式)
把nginx的log_format改为以下的参数(修改/etc/nginx/nginx.conf):
log_format main '{"@timestamp":"$time_iso8601","host":"$server_addr","clientip":"$remote_addr","size":$body_bytes_sent,"responsetime":$request_time,"upstreamtime":"$upstream_response_time","upstreamhost":"$upstream_addr","http_host":"$host","url":"$uri","xff":"$http_x_forwarded_for","referer":"$http_referer","agent":"$http_user_agent","status":"$status"}';
reload nginx后,看到access.log的格式如下:
{"@timestamp":"2017-12-13T17:29:49+08:00","host":"120.76.XX.XX","clientip":"120.76.XX.XXX","size":26963,"responsetime":0.000,"upstreamtime":"0.000","upstreamhost":"127.0.0.1:8080","http_host":"weixin.XXX.com","url":"/XXXXXXX/haowanyihao/thumb.png","xff":"111.22.65.171","referer":"-","agent":"WeChat/6.6.0.32 CFNetwork/811.4.18 Darwin/16.5.0","status":"200"}
Step2. 编写python程式
创新互联公司主要从事成都做网站、成都网站制作、网页设计、企业做网站、公司建网站等业务。立足成都服务井研,10多年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18982081108
# -- coding: utf-8 --
'''
By Willson Luo at 2017/11/23 v1.0
'''
import pandas as pd
import json,time,datetime,iso8601
from elasticsearch import Elasticsearch
from geoip import geolite2
# connect to elasticsearch database
es = Elasticsearch( "localhost:9200" )
es = Elasticsearch(hosts=[{'host': 'localhost', 'port': '9200'}],httpauth=('elastic', 'xxxxx'))
# nginx column name
#title = ['@timestamp','host','clientip','size','responsetime','upstreamtime','upstreamhost','httphost','url','xff','referer','agent','status']
# nginx access log
ngxlog = 'access.log'
ngxdata = open(ngxlog).readlines()
# nginx data(json format)
ngxjson = {}
for a1 in range(len(ngxdata)):
step1 = ngxdata[a1].strip().split("\"")
abc = iso8601.parsedate(step1[3])
bcd = abc.strftime('%Y-%m-%dT%H:%M:%S%Z')
cde = abc.strftime('%Y%m%d')
ngxindex = 'logstash-weixin-nginx-access-'+ cde
ngxjson['@timestamp'] = bcd
ngxjson['host'] = step1[7]
ngxjson['size'] = step1[14].replace(":","").replace(",","")
ngxjson['responsetime'] = step1[16].replace(":","").replace(",","")
ngxjson['upstreamtime'] = step1[19]
ngxjson['upstreamhost'] = step1[23]
if step1[35] == "-":
ngxjson['clientip'] = step1[11]
ngxjson['httphost'] = step1[27]
ipaddr = step1[11]
else:
ngxjson['clientip'] = step1[35].split(",")[0]
ngxjson['httphost'] = step1[39]
ipaddr = step1[35].split(",")[0]
if "Apple" in step1[43]:
ngxjson['agent']="Apple"
elif "WeChat" in step1[43]:
ngxjson['agent']="WeChat"
elif "curl" in step1[43]:
ngxjson['agent']="Linux"
elif "Alibaba" in step1[43]:
ngxjson['agent']="Aliyun"
elif "Android" in step1[43]:
ngxjson['agent']="Android"
elif "MSIE" in step1[43]:
ngxjson['agent']="IE"
elif "Firefox" in step1[43]:
ngxjson['agent']="Firefox"
elif "Windows" in step1[43]:
ngxjson['agent']="Windows"
elif "Apache-Http" in step1[43]:
ngxjson['agent']="Apache"
else:
ngxjson['agent']= step1[43]
ngxjson['status']= step1[47]
location = geolite2.lookup(ipaddr).location
match = geolite2.lookup(ipaddr).getinfodict()
location = []
location.append(match['location']['longitude'])
location.append(match['location']['latitude'])
geoip = {}
geoip['location'] = location
if match.haskey('city'):
city = match['city']['names']['en']
else:
city = "-"
if match.haskey('country'):
country = match['country']['names']['en']
else:
country = "-"
if match.haskey('subdivisions'):
subdivisions = match['subdivisions'][0]['names']['en']
else:
subdivisions = "-"
ngxjson['geoip'] = geoip
ngxjson['country'] = country
ngxjson['subdivisions'] = subdivisions
ngxjson['city'] = city
ngxjson['possition'] = country+"-"+subdivisions+"-"+city
print a1,ngxjson
es.index( index=ngxindex, doctype="logs", body=ngxjson )
Step3. 通过Kibana把数据呈现处理
1> 先在Kibana把index汇入(一般第一步就让你建立这个东西了)Kibana-->Management-->Kibana(Index Patterns)
2> 构建可用视图Kibana--> Visualize(这个东西比较见人见智)
Step4. 构建Dashboard(就是把Visualize的内容拖进来)
第一次写Blog,估计错漏不少,麻烦指正,谢谢
名称栏目:Python重写Logstash,把NginxAccessLog清洗后汇入ElasticDB
文章来源:http://myzitong.com/article/jpeooe.html