Python3-InfluxDB 연동
Python3로 InfluxDB에서 데이터를 읽고/쓰는 방법 공유
2022.04.22
Influx DB는 NoSQL 데이터베이스로, 오픈소스 시계열 데이터베이스 (TSDB)이다.
📚 1. InfluxDB의 특징
influxdb는 다음과 같은 장점이 있다.
- 많은 쓰기 작업과 쿼리 부하를 처리하기 위해 2013년에 Go 언어로 개발된 오픈소스 TSDB
- Tick Stack의 필수 컴포넌트 중 하나
- 가장 유명하고, 많이 사용되는 TSDB
- Distributed, Scale horizontally하게 설계되어 scale-out이 쉬우며, Restful API를 제공하고 있어 API 통신이 가능
📌 출처: https://mangkyu.tistory.com/190
TSDB, NoSQL 데이터베이스인 influxdb는 기존 RDBMS와 기본 용어에서 부터도 차이를 보이는데, 대응되는 용어는 다음과 같다.
RDBMS (ex.Mysql) | TBMS (ex.InfluxDB) |
---|---|
Database | Database |
Table | Measurement |
Column | Key |
Indexed Column | Tag Key (String) |
Unindexed Column | Field Key |
Row | Point |
또한, 다른 주목할만한 특징은 아래 3가지 정도가 있는 것 같다.
⭐ 1-1. 빠른 데이터 읽기 / 쓰기 속도
시계열 데이터를 다루기에 적합하게 설계된 DB인 만큼,
실제 데이터를 의미하는 Point는 DB에 넣을 때 자동으로 시간이 기록되어 넣어지게 된다.
이 Point를 이용한 DB 운용은 읽기/쓰기 속도를 크게 향상시켜준다.
⭐ 1-2. INSERT와 SELECT 쿼리 성능을을 위해 DELETE, UPDATE는 고려하지 않는다.
시계열 데이터는 데이터 특성상 수정되거나 삭제하는 일이 거의 없기 때문에,
DELETE와 UPDATE는 거의 발생하지 않아 이런 기능은 거의 지원하지 않는다.
InfluxDB의 도큐먼트에 따르면, InfluxDB는 CRUD가 아닌 CR-ud를 지원한다고 한다.
⭐ 1-3. 데이터를 자동으로 삭제해주는 정책이 있다.
데이터가 방대해지고 양이 많아지면, 문제가 생길 수 있기 때문에,
마치 식품의 유통기한처럼 일정 시간이 지난 이후에 오래된 데이터부터
순서대로 삭제를 진행하는 기능이 있다.
이를 보존 정책 (Retention Policy)이라고 하는데,
각 데이터베이스마다 이 정책을 정해 줄 수 있다.
📚 2. Python과 연동
시작하기 전에, 이미 Python과, InfluxDB가 설치되어있다고 생각하고,
Python과 InfluxDB를 연동해 보도록 한다.
시작하기전, influxdb
Python package를 설치한다.
sudo pip3 install influxdb
혹시 이 과정에서 AttributeError: _DistInfoDistribution__dep_map
문제가 발생한다면,
pip3가 최신버전이 아니어서그렇다.
sudo pip3 install --upgrade pip && sudo pip3 install --upgrade setuptools
를 입력해주면 된다.
⭐ 2-1. InfluxDB 접속
첫 번째 단계는, Python에서 influxdb의 Database에 접속하는 것이다.
아래와 같은 함수를 통해 client를 return 받을 수 있다.
from influxdb import InfluxDBClient
def get_influxdb(database_name, host='localhost', port=8086):
client = InfluxDBClient(host, port)
try:
client.switch_database(database_name) # 이용할 Client 데이터베이스 설정
print('switch ' + database_name)
except Exception as e:
print('switch except')
try:
client.create_database(database_name) # InfluxDB내 데이터베이스 생성
print('create ' + database_name)
except Exception as e:
print('create except', e)
pass
return None
return client
InfluxDB는 보통 8086포트를 사용한다.
InfluxDB에 원하는 이름의 database에 접속하고, 없으면 새로 만드는 내용이다.
아래 처럼 사용한다.
client = get_influxdb(database_name='test_db')
client
를 통해, 다른 데이터베이스나 client를 검색 할 수 있다.
client.get_list_database() # 데이터베이스 리스트 조회
client.get_list_measurements() # 현재 선택된 데이터베이스 measurements 리스트 조회
⭐ 2-2. Data Write
DB에 데이터를 넣는 방법에 대해 알아본다.
Data를 넣는 방법은, client.write_points()
를 통해
Dict List
자료형을, JSON으로 변환하여 전달한다.
points = [
{'measurement':'sensors',
'tags':{'sensor_id': 'S001'},
'fields':{'value': 100, 'isWarning': False} },
{'measurement':'sensors',
'tags':{'sensor_id': 'S001'},
'fields':{'value': 200, 'isWarning': False} },
{'measurement':'sensors',
'tags':{'sensor_id': 'S001'},
'fields':{'value': 300, 'isWarning': True} }
]
client.write_points(points=points, protocol='json')
# [결과] True
이 경우, 정상적으로 데이터가 등록 되지만,
Point들은 따로 시간을 입력해주지 않으면 현 시간을 기준으로 입력되게 된다.
만약, 특정 시간으로 입력 해 주고 싶다면,
points = [
{'measurement':'sensors',
'tags':{'sensor_id': 'S001'},
'fields':{'value': 100, 'isWarning': False},
'time': '2022-04-21 16:00:00+09:00'}, # 한국시간 16시
{'measurement':'sensors',
'tags':{'sensor_id': 'S001'},
'fields':{'value': 200, 'isWarning': False},
'time': '2022-04-21 17:00:00+09:00'}, # 한국시간 17시
{'measurement':'sensors',
'tags':{'sensor_id': 'S001'},
'fields':{'value': 300, 'isWarning': True},
'time': '2022-04-21 18:00:00+09:00'} # 한국시간 18시
]
client.write_points(points=points, protocol='json')
# [결과] True
처럼 해주면 된다. UTC를 기준으로 하므로, 한국에서는 + 09:00
를 해 주어야 한다.
⭐ 2-3. Data Read
DB에서 값을 가져오는 방법에 대해 알아본다.
client 객체에 query를 보낸 후, return된 객체에서 dict
형 데이터를 뽑아낼 수 있다.
points = client.query("SELECT * FROM sensors WHERE sensor_id = 'S001'")
for point in points.get_points():
print(point)
그럼 다음과 같은 결과를 볼 수 있다.
{'time': '2022-04-21T07:00:00Z', 'isWarning': False, 'sensor_id': 'S001', 'value': 100}
{'time': '2022-04-21T08:00:00Z', 'isWarning': False, 'sensor_id': 'S001', 'value': 200}
{'time': '2022-04-21T09:00:00Z', 'isWarning': True, 'sensor_id': 'S001', 'value': 300}
👓 참고 문헌
- InfluxDB 1.8 documentation
- [Python] InfluxDB 소개 & Grafana 연동 실습
- 130. [InfluxDB] InfluxDB Quickstart 및 간단한 사용 방법 정리
- [공식 문서 정리] InfluxDB와 SQL 데이터베이스의 비교
- InfluxDB와 Python3 연동하기 (For Windows & Ubuntu)
- [TSDB] 시계열 데이터베이스(TSDB, Time Series Database) 란? - (1/2)