mysql千万级数据如何快速导出
今天给大家讲解如何快速的导出千万级MySQL中的数据,大家平时在进行MySQL数据导出的时候,如何数据量不大(万级记录)可能不会遇到这样那样的问题,下面就我前段事件导出MySQL千万级(目前量级8千万,已快到一亿)数据遇到问题的一个回放和代码优化。
查询优化
当你接到需求,可能第一时间想到,直接全量查询不就好了,如果数据记录在几万条还好,当MySQL一个表的数据大于200W的时候,这个时候去查询已经非常吃力了,即使在添加索引的情况下。
查询需求
收到的需求是,为满足算法团队模型训练用,需要满足根据网元名称和时间范围动态快速的查询出数据,由于后台每天产生的数据是惊人的,该数据由kafka生产,MySQL存储。MySQL表中的下列字段
nename
、starttime
、endtime
均加有索引,查询时我们需要将产生逻辑运算的字段,放在后面,不然会引起索引失效的情况。
select * from workflow_hugealarm where nename = {nename} and eventtime > {starttime} and eventtime < {endtime};
多线程+分页查
由于上面的方案查询速度和效率都非常慢,因此对程序进行了改进,使用多线程+分页的方式进行查询。具体思路是:批量开启多个线程,然后用数量行数除以每页查询的条数,得到总页数,最后交给线程池进行批量执行。
- 获取数据行数
select max(id) as counts from workflow_hugealarm;
- 数据等分
def thread_pool(self, **kwargs):
pages = self.get_count() / self.step
with ThreadPoolExecutor(100) as execute:
for step in range(1, math.ceil(pages) + 1):
try:
execute.submit(self._query, step, **kwargs)
except Exception as e:
logging.error(e)
continue
return pd.DataFrame(self.data)
多线程+MySQL连接池查
上面的查询方案运行一段时间后会发现程序报错,经过多方定位发现是MySQL连接数超限,很快就到了MySQL的最大连接数,即1024个连接数。最后想到的解决方案是维护一个MySQL的连接池,这里我们使用Python字典类型进行存储维护。
- 报错原因
AttributeError: 'NoneType' object has no attribute 'settimeout'
- 连接池
def get_instance(self):
"""
创建数据库连接池
:return:
"""
name = threading.current_thread().name
if name not in self.pool:
conn = pymysql.connect(**self.db_info)
self.pool[name] = conn
return self.pool[name]
多进程+多线程查
上面的方案其实已经满足大批量查询了,为了使导出速度变的更快,我们使用多进程+多线程组合的方式进行查询。这也是处理IO密集型业务的最佳实践方案,使用该方案,可以极大的规避GIL所带来的弊端。
- 多进程+多线程
# Processing pool (see below for initiazation)
pool = None
# Performs a large calculation (CPU bound)
def some_work(args):
...
return result
# A thread that calls the above function
def some_thread():
while True:
...
r = pool.apply(some_work, (args))
...
# Initiaze the pool
if __name__ == '__main__':
import multiprocessing
pool = multiprocessing.Pool()
总结
由于公司每天产生的数据是惊人的,因此经过我们商讨已经不适合用MySQL来进行存储了,我们使用了PostgreSql数据库集群来进行了存储,每天分别建表来存储。