试用时间序列数据库InfluxDB-创新互联

Hadoop集群监控需要使用时间序列数据库,今天花了半天时间调研使用了一下最近比较火的InfluxDB,发现还真是不错,记录一下学习心得。

创新互联公司是专业的安定网站建设公司,安定接单;提供网站设计、成都网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行安定网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

Influx是用Go语言写的,专为时间序列数据持久化所开发的,由于使用Go语言,所以各平台基本都支持。类似的时间序列数据库还有OpenTSDB,Prometheus等。

OpenTSDB很有名,性能也不错,但是基于HBase,要用那个还得先搭一套HBase,有点为了吃红烧肉自己得先去杀猪,烫皮,拔毛的感觉。Prometheus相关文档和讨论太少,而InfluxDB项目活跃,使用者多,文档也比较丰富,所以先看看这个。Influx可以说是LevelDB的Go语言修改版实现,LevelDB采用LSM引擎,效率很高,Influx是基于LSM引擎再修改的TSM引擎,专为时间序列设计。

InfluxDB 架构原理介绍

LevelDB 架构原理介绍

下午跟七牛的CrazyJVM聊了一下,因为七牛都是用Go,所以也大量部署了Influx给大型企业级用户使用,据说是全球大的InfluxDB集群,七牛也给Influx提交了大量的Patch,结果Influx通过早期开源弄得差不多稳定了,突然就闭源了,这也太不局气了,然后搞Cluster功能收费,单机功能免费使用。

昨天看了一会文档,今天试用了一下,感觉很不错,值得推荐。把学习了解的内容记录下来,供爱好者参考,也省的自己时间长忘了。

InfluxDB其实不能说像哪个数据库,初上手感觉更像Mongo类型的NoSQL,但是有意思的是,它提供了类SQL接口,对开发人员十分友好。命令行查询结果的界面又有点像MySQL,挺有意思的。

不写安装部署和CLI接口,因为实在没得可写,直接yum或者apt就装了。service一启动,再influx命令就进命令行了,网上一大堆安装教程

InfluxDB有几个关键概念是需要了解的。

database:相当于RDBMS里面的库名。创建数据库的语句也十分相似。一进去就可以先创建一个数据库玩,加不加分号都行。

CREATE DATABASE 'hadoop'

然后需要创建一个用户,我省事点,直接创建一个高权限,就看了一天,然后直接写REST接口去了,权限管理慢慢再细看。

CREATE USER "xianglei" WITH PASSWORD 'password' WITH ALL PRIVILEGES

使用查询语句插入一条数据

INSERT hdfs,hdfs=adh,path=/ free=2341234,used=51234123,nonhdfs=1234

Influx没有先建立schema的概念,因为Influx允许存储的数据是schemeless的,表在这里叫做measurement,数据在插入时如果没有表,则自动创建该表。

measurement: 相当于RDBMS里面的数据表名。

在上面的INSERT语句中,跟在insert后面的第一个hdfs就是measurement,如果不存在一个叫做hdfs的,就自动创建一个叫做hdfs的表,否则直接插入数据。

然后是tags,tags的概念类似于RDBMS里面的查询索引名,这里的tags是hdfs=adh和path=/,等于我建立了两个tags。

free往后统称叫fields,tags和fields之间用空格分开,tags和fields内部自己用逗号分开。tags和fields名称可以随意填写,主要是一开始设计好就行。

所以,对以上插入语句做一个注释的话,就是这样。

INSERT [hdfs(measurement)],[hdfs=adh,path=/(tags)] [free=2341234,used=51234123,nonhdfs=1234(fields)]

然后即可查询该数据

SELECT free FROM hdfs WHERE hdfs='adh' and path='/'name: hdfs time                    free ----                    ---- 1485251656036494252     425234 1485251673348104714     425234
SELECT * FROM hdfs LIMIT 2name: hdfs time                  free    hdfs    nonhdfs  path   used ----                  ----    ----    -------  ----   ---- 1485251656036494252   425234  adh     1341     /      23412 1485251673348104714   425234  adh     1341     /      23412

这里的where条件,即是上面tags里面的hdfs=adh和path=/,所以tags可以随意添加,但是在插入第一条数据的时候,最好先设计好你的查询条件。当然,你插入的任何数据,都会自动添加time列,数了数,应该是纳秒级的时间戳。


上面是Influx的基本概念和基本使用的记录,下面是接口开发的使用。以Tornado示例Restful查询接口。

Influx本身支持restful的HTTP API,python有直接封装的接口可以调用,直接 pip install influxdb即可

influxdb-python文档

Talk is cheap, show me your code.

Models  Influx模块,用于连接influxdb

class InfluxClient:     def __init__(self):         self._conf = ParseConfig()         self._config = self._conf.load()         self._server = self._config['influxdb']['server']         self._port = self._config['influxdb']['port']         self._user = self._config['influxdb']['username']         self._pass = self._config['influxdb']['password']         self._db = self._config['influxdb']['db']         self._retention_days = self._config['influxdb']['retention']['days']         self._retention_replica = self._config['influxdb']['retention']['replica']         self._retention_name = self._config['influxdb']['retention']['name']         self._client = InfluxDBClient(self._server, self._port, self._user, self._pass, self._db)     def _create_database(self):         try:             self._client.create_database(self._db)         except Exception, e:             print e.message     def _create_retention_policy(self):         try:             self._client.create_retention_policy(self._retention_name,                                                  self._retention_days,                                                  self._retention_replica,                                                  default=True)         except Exception, e:             print e.message     def _switch_user(self):         try:             self._client.switch_user(self._user, self._pass)         except Exception, e:             print e.message     def write_points(self, data):         self._create_database()         self._create_retention_policy()         if self._client.write_points(data):             return True         else:             return False     def query(self, qry):         try:             result = self._client.query(qry)             return result         except Exception, e:             return e.message

连接influxdb的配置从项目的配置文件里读取,自己写也行。

Controller模块InfluxController

class InfluxRestController(tornado.web.RequestHandler):     '''     "GET"         op=query&qry=select+used+from+hdfs+where+hdfs=adh     '''     查询方法,使用HTTP GET     def get(self, *args, **kwargs):         op = self.get_argument('op')         #自己实现的python switch case,网上一大堆         for case in switch(op):             if case('query'):                 #查询语句从url参数获取                 qry = self.get_argument('qry')                 #实例化Models里面的class                 influx = InfluxClient()                 result = influx.query(qry)                 #返回结果为对象,通过raw属性获取对象中的字典。                 self.write(json.dumps(result.raw, ensure_ascii=False))                 break             if case():                 self.write('No argument found')     #写入数据,使用HTTP PUT     def put(self):         op = self.get_argument('op')         for case in switch(op):             if case('write'):                 #data should urldecode first and then turn into json                 data = json.loads(urllib.unquote(self.get_argument('data')))                 influx = InfluxClient()                 #写入成功或失败判断                 if influx.write_points(data):                     self.write('{"result":true}')                 else:                     self.write('{"result":false}')                 break             if case():                 self.write('No argument found')

Tornado配置路由

applications = tornado.web.Application(     [         (r'/', IndexController),         (r'/ws/api/influx', InfluxRestController)     ],     **settings )

JSON项目配置文件

{   "http_port": 19998,   "influxdb":{     "server": "47.88.6.247",     "port": "8086",     "username": "root",     "password": "root",     "db": "hadoop",     "retention": {       "days": "365d",       "replica": 3,       "name": "hound_policy"     },     "replica": 3   },   "copyright": "CopyLeft 2017 Xianglei" }

插入测试

def test_write():     base_url = 'http://localhost:19998/ws/api/influx'     #data = '[{"measurement": "hdfs"},"tags":{"hdfs": "adh","path":"/user"},"fields":{"used": 234123412343423423,"free": 425234523462546546,"nonhdfs": 1341453452345}]'     #构造插入数据     body = dict()     body['measurement'] = 'hdfs'     body['tags'] = dict()     body['tags']['hdfs'] = 'adh'     body['tags']['path'] = '/'     body['fields'] = dict()     body['fields']['used'] = 234123     body['fields']['free'] = 425234     body['fields']['nonhdfs'] = 13414     tmp = list()     tmp.append(body)     op = 'write'     # dict data to json and urlencode     data = urllib.urlencode({'op': op, 'data': json.dumps(tmp)})     headers = {'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'}     try:         http = tornado.httpclient.HTTPClient()         response = http.fetch(             tornado.httpclient.HTTPRequest(                 url=base_url,                 method='PUT',                 headers=headers,                 body=data             )         )         print response.body     except tornado.httpclient.HTTPError, e:         print e test_write()

插入数据后通过访问http连接获取插入结果

curl -i "http://localhost:19998/ws/api/influx?op=query&qry=select%20*%20from%20hdfs" HTTP/1.1 200 OK Date: Tue, 24 Jan 2017 15:47:42 GMT Content-Length: 1055 Etag: "7a2b1af6edd4f6d11f8b000de64050a729e8621e" Content-Type: text/html; charset=UTF-8 Server: TornadoServer/4.4.2 {"values": [["2017-01-24T09:54:16.036494252Z", 425234, "adh", 13414, "/", 234123]], "name": "hdfs", "columns": ["time", "free", "hdfs", "nonhdfs", "path", "used"]}

收工,明天用React写监控前端

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


文章题目:试用时间序列数据库InfluxDB-创新互联
本文URL:http://myzitong.com/article/cdpcco.html