Loading... ## 前言 这两天搞到一个 18GB 大的 txt 文本文件,于是就想尝试各种读取方式,并对比速度。 使用语言:Python ## 正文 ### 单线程篇 最简单最粗暴的方法,直接读! #### 单线程 V1 代码 ```python def open_file(): with open('F:\\Downloads\\6.9更新总库.txt', 'rb') as f: for line in f: yield str(line, encoding="utf-8") def find(findQQ): datas = open_file() for data in datas: # print(data) qq = data.split("----")[0] # mobile = data.split("----")[1] if qq==findQQ: print(data) break ``` HDD 下读取文件速度: ![image-20201125163341981](https://cdn.jsdelivr.net/gh/Quan666/CDN/pic/image-20201125163341981.png) SSD 下读取文件速度: ![image-20201125163742389](https://cdn.jsdelivr.net/gh/Quan666/CDN/pic/image-20201125163742389.png) 居然没有速度 ??? ![image-20201125164021764](https://cdn.jsdelivr.net/gh/Quan666/CDN/pic/image-20201125164021764.png) 过一会后速度上来了,HDD、SSD速度相差不大,看来瓶颈不在存储介质。 #### 单线程 V2 代码 优化后的版本 ```python def open_file2(QQ,path): length = len(QQ) with open(path, 'rb') as f: for line in f: # print(line) count=0 for i in range(0,length): if line[i]!=QQ[i]: break else: count+=1 if count==length: # print(line[length+1]) # print(line[length]) if line[length]==45: print(line) return line ``` HDD 下的读取速度: ![image-20201125164501364](https://cdn.jsdelivr.net/gh/Quan666/CDN/pic/image-20201125164501364.png) SSD 下的读取速度: ![image-20201125164340525](https://cdn.jsdelivr.net/gh/Quan666/CDN/pic/image-20201125164340525.png) 优化后的代码比原来快了一倍左右。 ### 多线程篇 #### 多线程 V1 代码 ```python # 先计算出对应线程每块开始的位置 即为找到 ‘\n’字符 ,在前后 10+4+11+2 字节处 搜索 # 在按这个去分配 def findOffsets(ThreadSize,FilePath): List_StartPiece_Offset = [] # 添加第一块的起点位置 List_StartPiece_Offset.append(0) file_size = os.path.getsize(FilePath) PieceSize = int(file_size/ThreadSize) # print(PieceSize) with open(FilePath,'rb') as f: for piece_number in range(1,ThreadSize): f.seek(piece_number*PieceSize) # 向后寻找 for offset in range(0,27): if f.read(1)==b'\n': # print(f.tell()) List_StartPiece_Offset.append(f.tell()) # # 向前找 # for offset in range(0,27): # f.seek(-2,1) # if f.read(1)==b'\n': # print(f.tell()) # List_StartPiece_Offset.append(f.tell()) f.close() file_size = os.path.getsize(FilePath) List_StartPiece_Offset.append(file_size) return List_StartPiece_Offset def findPhone(start,end,qq,path): length = len(qq) with open(path, 'rb') as f: f.seek(start) for line in f: # print(line) if f.tell()>=end: break count=0 for i in range(0,length): if line[i]!=qq[i]: break else: count+=1 if count==length: # print(line[length+1]) # print(line[length]) if line[length]==45: print(line) global Flag Flag=False f.close() return line # break f.close() return b'' def findMain(QQ,FilePath,ThreadSize): list = findOffsets(ThreadSize,FilePath) print(list) try: for tmp in range(0,len(list)-1): _thread.start_new_thread( findPhone, (list[tmp],list[tmp+1],QQ,FilePath) ) except BaseException as e: print ("Error: 无法启动线程 "+str(e)) # print(threading.activeCount()) while Flag: # print(Flag) pass ``` 读取速度 HDD、SSD 差不多: ![image-20201125164928168](https://cdn.jsdelivr.net/gh/Quan666/CDN/pic/image-20201125164928168.png) 两个线程、5线程,都没速度。很奇怪,暂时没找到原因。 可能原因如下: ![img](https://cdn.jsdelivr.net/gh/Quan666/CDN/pic/3_UAWLV6ETTF@2~_R%7DFMWCU.png) #### 多线程 V2 代码 经过查阅资料放弃了。。。 ### 大文件分块 既然整个读不现实,那就分块了来读。 #### V1 代码 ```python # 大文本分块 def cutPiece(size,path): list = findOffsets(size,path) with open(path,'rb') as big_f: for i in range(0,len(list)-1): tmp_piece = big_f.read(list[i+1]-list[i]) with open('piece\\qq_'+str(i)+'.txt','wb') as small_f: small_f.write(tmp_piece) small_f.close() big_f.close() ``` 这只是按份数等大分块,对实际查询帮助不大。 #### 按余数分块 V1 代码 ```python # 按余数分块 # 大文本分块 耗时:0:02:08.998875 def cutRemainderPiece(size,path): list = findOffsets(size,path) with open(path,'rb') as big_f: for i in range(0,len(list)-1): tmp_piece = big_f.read(list[i+1]-list[i]) # print(type(tmp_piece)) str_piece = str(tmp_piece, encoding = "utf-8") list_piece = str_piece.split('\r\n') for tmp in list_piece : tmpp=tmp.split("----") if len(tmpp[0])<=0: continue m = int(int(tmpp[0])%size) with open('piece2\\qq_'+str(m)+'.txt','a') as small_f: small_f.write(tmp+'\n') small_f.close() break big_f.close() ``` #### 按余数分块V2 代码 ```python # 大文本分块V2 耗时:0:00:02.572516 def cutRemainderPiece_V2(size,path): list = findOffsets(size,path) allPiece=[] for i in range(0,1000): allPiece.append('') for i in range(0,len(list)-1): for i in range(0,1000): allPiece[i]='' with open(path,'rb') as big_f: # big_f.flush() tmp_piece = big_f.read(list[i+1]-list[i]) # print(type(tmp_piece)) str_piece = str(tmp_piece, encoding = "utf-8") list_piece = str_piece.split('\r\n') for tmp in list_piece : tmpp=tmp.split("----") if len(tmpp[0])<=0: continue m = int(int(tmpp[0])%size) allPiece[m]+=tmp+'\n' for i in range(0,1000): with open('piece2\\qq_'+str(i)+'.txt','a') as small_f: small_f.write(allPiece[i]) small_f.close() # break big_f.close() ``` 对比 v1、v2,两者时间差距巨大,原因是 v1 的 IO 过于密集,于是 v2 优化为都暂时存在内存,等一个块处理完了,再写入磁盘,速度由原先的 129s 提升到 3s。 ### 按余数分块 V3 代码 v1,v2其实都是有 bug 的并不能很好的分割,因此 v3 诞生了1 ```python def cutRemainderPiece_V2(size,path): list = findOffsets(size,path) logging.debug(list) allPiece=[] for i in range(0,size): allPiece.append('') with open(path,'rb') as big_f: for i in range(0,len(list)-1): for j in range(0,size): allPiece[j]='' # big_f.flush() print(str(list[i])+'----'+str(i)) tmp_piece = big_f.read(list[i+1]-list[i]) # logging.debug(type(tmp_piece)) str_piece = str(tmp_piece, encoding = "utf-8") list_piece = str_piece.split('\r\n') for tmp in list_piece : if len(tmp)>25: logging.debug('错误行1:'+str(tmp)) continue tmpp=tmp.split("----") if len(tmpp[0])<5 or len(tmpp)<2: logging.debug('错误行2:'+str(tmp)) continue try: m = int(int(tmpp[0])%size) allPiece[m]+=tmp+'\n' except: logging.debug('错误行3:'+str(tmp)) for i in range(0,1000): with open('piece\\qq_'+str(i)+'.txt','a') as small_f: # 0:00:02.259968 # with open('\\\\Desktop-93nkdgi\\d\\piece\\qq_'+str(i)+'.txt','a') as small_f: small_f.write(allPiece[i]) small_f.close() # print(allPiece[0]) # 1375309000----15070084341 # break big_f.close() ``` 大概 1 个小时分完,分完后查询大概 1s 不到 ### 按余数查询 #### 异步查询 ````python # 异步求余查询 async def findQQPiece(findQQ,path): result = [] async with aiofiles.open(path, 'rb') as f: bytes_all= await f.read() str_all = str(bytes_all,encoding='UTF-8') list_all = str_all.split('\r\n') for tmp in list_all: qq = tmp.split('----')[0] if qq == findQQ: phone = tmp.split('----')[1] result.append(phone) return result ```` #### 同步查询 ````python # 同步求余查询 def findQQPiece2(findQQ,path): findQQ=bytes(findQQ, encoding = "utf8") length = len(findQQ) result = [] with open(path, 'rb') as f: for line in f: # print(line) count=0 for i in range(0,length): if line[i]!=findQQ[i]: break else: count+=1 if count==length: if line[length]==45: logging.debug(line) # print(line) tmp = str(line,encoding='UTF-8') result.append(tmp.split('----')[1].strip()) count=0 return result ```` ### 完整代码 ```python import datetime import time import aiofiles import asyncio import os import _thread import threading import pymysql import logging Flag = True def open_file(): # with open('F:\\Downloads\\6.9更新总库.txt', 'rb') as f: with open('C:\\Users\\14470\\Desktop\\6.9更新总库.txt', 'rb') as f: for line in f: yield str(line, encoding="utf-8") def find(findQQ): datas = open_file() for data in datas: # logging.debug(data) qq = data.split("----")[0] # mobile = data.split("----")[1] if qq==findQQ: logging.debug(data) break def open_file2(QQ,path): length = len(QQ) with open(path, 'rb') as f: for line in f: if len(line)<20: logging.debug(line) count=0 for i in range(0,length): if line[i]!=QQ[i]: break else: count+=1 if count==length: # logging.debug(line[length+1]) # logging.debug(line[length]) if line[length]==45: count=0 logging.debug(line) # return line # break async def open_file3(findQQ): length = len(findQQ) async with aiofiles.open('F:\\Downloads\\6.9更新总库.txt', 'rb') as f: async for line in f: # logging.debug(line) count=0 for i in range(0,length): if line[i]!=findQQ[i]: break else: count+=1 if count==length: if line[length]==45: # logging.debug(line) return line return b'' async def open_file3(findQQ,offset): length = len(findQQ) async with aiofiles.open('F:\\Downloads\\6.9更新总库.txt', 'rb') as f: f.seek(offset=offset,whence=0) async for line in f: # logging.debug(line) count=0 for i in range(0,length): if line[i]!=findQQ[i]: break else: count+=1 if count==length: if line[length]==45: # logging.debug(line) return line return b'' # 先计算出对应线程每块开始的位置 即为找到 ‘\n’字符 ,在前后 10+4+11+2 字节处 搜索 # 在按这个去分配 def findOffsets(ThreadSize,FilePath): List_StartPiece_Offset = [] # 添加第一块的起点位置 List_StartPiece_Offset.append(0) file_size = os.path.getsize(FilePath) PieceSize = int(file_size/ThreadSize) # logging.debug(PieceSize) curros = 0 # 游标 with open(FilePath,'rb') as f: for piece_number in range(1,ThreadSize): f.seek(piece_number*PieceSize) f.readline() logging.debug(f.readline()) List_StartPiece_Offset.append(f.tell()) # 向后寻找 # for offset in range(0,60): # if f.read(1)==b'\r': # if f.read(1)==b'\n': # curros = f.tell() # List_StartPiece_Offset.append(curros) # break # # 向前找 # for offset in range(0,27): # f.seek(-2,1) # if f.read(1)==b'\n': # logging.debug(f.tell()) # List_StartPiece_Offset.append(f.tell()) f.close() file_size = os.path.getsize(FilePath) List_StartPiece_Offset.append(file_size) return List_StartPiece_Offset def findPhone(start,end,qq,path): length = len(qq) with open(path, 'rb') as f: f.seek(start) for line in f: # logging.debug(line) if f.tell()>=end: break count=0 for i in range(0,length): if line[i]!=qq[i]: break else: count+=1 if count==length: # logging.debug(line[length+1]) # logging.debug(line[length]) if line[length]==45: logging.debug(line) global Flag Flag=False f.close() return line # break f.close() return b'' def findMain(QQ,FilePath,ThreadSize): list = findOffsets(ThreadSize,FilePath) logging.debug(list) try: for tmp in range(0,len(list)-1): _thread.start_new_thread( findPhone, (list[tmp],list[tmp+1],QQ,FilePath) ) except BaseException as e: logging.debug ("Error: 无法启动线程 "+str(e)) # logging.debug(threading.activeCount()) while Flag: # logging.debug(Flag) pass def findMain2(QQ,FilePath,ThreadSize): list = findOffsets(ThreadSize,FilePath) logging.debug(list) FilePathHDD = 'F:\\Downloads\\6.9更新总库.txt' # 0:07:38.443549 FilePathSSD = 'C:\\Users\\14470\\Desktop\\6.9更新总库.txt' # 0:07:12.953106 try: _thread.start_new_thread( findPhone, (list[0],list[1],QQ,FilePathHDD) ) _thread.start_new_thread( findPhone, (list[1],list[2],QQ,FilePathSSD) ) except BaseException as e: logging.debug ("Error: 无法启动线程 "+str(e)) # logging.debug(threading.activeCount()) while Flag: # logging.debug(Flag) pass # 大文本分块 def cutPiece(size,path): list = findOffsets(size,path) with open(path,'rb') as big_f: for i in range(0,len(list)-1): tmp_piece = big_f.read(list[i+1]-list[i]) with open('piece\\qq_'+str(i)+'.txt','wb') as small_f: small_f.write(tmp_piece) small_f.close() big_f.close() # 插入数据库 def open_piece_file(piece): with open('piece\\qq_'+str(piece)+'.txt', 'rb') as f: for line in f: yield str(line, encoding="utf-8") def ini_sql(): db = pymysql.connect("localhost", "root", "123456", "qq") return db # 创建表 def createTable(): db = ini_sql() for i in range(0,1000): sql = 'create table qq_' + str(i) +' (qq char(10),phone char(11)) ' # logging.debug(sql) cursor = db.cursor() try: cursor.execute(sql) db.commit() except: db.rollback() cursor.close() db.close() # 插入数据 def insterDB(startPiece,endPiece): starttime = datetime.datetime.now() logging.debug('线程开始:startPiece-endPiece: '+str(startPiece)+'-'+str(endPiece)+' 时间:'+str(starttime)) db = ini_sql() for i in range(startPiece,endPiece): datas = open_piece_file(i) for data in datas: qq = data.split("----")[0] mobile = data.split("----")[1] table = int(qq)%1000 sql = "insert into qq_"+str(table)+"(qq, phone) values('%s','%s')" % (qq, mobile) cursor = db.cursor() try: cursor.execute(sql) db.commit() except: db.rollback() cursor.close() db.close() endtime = datetime.datetime.now() logging.debug (endtime - starttime) logging.debug('线程结束:startPiece-endPiece: '+str(startPiece)+'-'+str(endPiece)+' 消耗时间:'+str(endtime - starttime)) # 多线程插入 def insterMain(ThreadSize): size = int(1000 / ThreadSize) for i in range(0,size): try: _thread.start_new_thread( insterDB, (i*size,i*size+10) ) except BaseException as e: logging.debug ("Error: 无法启动线程 "+str(e)) # logging.debug(threading.activeCount()) while True: # logging.debug(Flag) pass # 按余数分块 # 大文本分块 耗时:0:02:08.998875 def cutRemainderPiece(size,path): list = findOffsets(size,path) with open(path,'rb') as big_f: for i in range(0,len(list)-1): tmp_piece = big_f.read(list[i+1]-list[i]) # logging.debug(type(tmp_piece)) str_piece = str(tmp_piece, encoding = "utf-8") list_piece = str_piece.split('\r\n') for tmp in list_piece : tmpp=tmp.split("----") if len(tmpp[0])<=0: continue m = int(int(tmpp[0])%size) with open('piece2\\qq_'+str(m)+'.txt','a') as small_f: small_f.write(tmp+'\n') small_f.close() break big_f.close() # 大文本分块V2 耗时:0:00:02.572516 | all 0:46:59.691256 def cutRemainderPiece_V2(size,path): list = findOffsets(size,path) logging.debug(list) allPiece=[] for i in range(0,size): allPiece.append('') with open(path,'rb') as big_f: for i in range(0,len(list)-1): for j in range(0,size): allPiece[j]='' # big_f.flush() print(str(list[i])+'----'+str(i)) tmp_piece = big_f.read(list[i+1]-list[i]) # logging.debug(type(tmp_piece)) str_piece = str(tmp_piece, encoding = "utf-8") list_piece = str_piece.split('\r\n') for tmp in list_piece : if len(tmp)>25: logging.debug('错误行1:'+str(tmp)) continue tmpp=tmp.split("----") if len(tmpp[0])<5 or len(tmpp)<2: logging.debug('错误行2:'+str(tmp)) continue try: m = int(int(tmpp[0])%size) allPiece[m]+=tmp+'\n' except: logging.debug('错误行3:'+str(tmp)) for i in range(0,1000): with open('piece\\qq_'+str(i)+'.txt','a') as small_f: # 0:00:02.259968 # with open('\\\\Desktop-93nkdgi\\d\\piece\\qq_'+str(i)+'.txt','a') as small_f: small_f.write(allPiece[i]) small_f.close() # print(allPiece[0]) # 1375309000----15070084341 # break big_f.close() # 同步求余查询 def findQQPiece2(findQQ,path): findQQ=bytes(findQQ, encoding = "utf8") length = len(findQQ) result = [] with open(path, 'rb') as f: for line in f: # print(line) count=0 for i in range(0,length): if line[i]!=findQQ[i]: break else: count+=1 if count==length: if line[length]==45: logging.debug(line) # print(line) tmp = str(line,encoding='UTF-8') result.append(tmp.split('----')[1].strip()) count=0 return result # 异步求余查询 async def findQQPiece(findQQ,path): result = [] async with aiofiles.open(path, 'rb') as f: bytes_all= await f.read() str_all = str(bytes_all,encoding='UTF-8') list_all = str_all.split('\r\n') for tmp in list_all: qq = tmp.split('----')[0] if qq == findQQ: phone = tmp.split('----')[1] result.append(phone) return result async def testpiece(): QQ='2679445157' m = int(int(QQ)%1000) re = findQQPiece2(QQ,str('F:\\Downloads\\piece\\qq_'+str(m)+'.txt')) # re = await findQQPiece(QQ,str('F:\\Downloads\\piece\\qq_'+str(m)+'.txt')) print(re) if __name__ == "__main__": # logging.basicConfig(level='DEBUG') QQ=b'2679445157' FilePath = 'F:\\Downloads\\6.9更新总库.txt' # 0:07:38.443549 # FilePath = 'C:\\Users\\14470\\Desktop\\6.9更新总库.txt' # 0:07:12.953106 ThreadSize = 2 starttime = datetime.datetime.now() # find(QQ) # asyncio.run(open_file3(findQQ)) # open_file2(findQQ) # str = '13076120000' # logging.debug(str[0:3]) # logging.debug(str[-2:]) file_size = os.path.getsize(FilePath) print(float(file_size)/(1024.0*1024.0*1024.0),end='GB\n') # 多线程版本 # findMain(QQ,FilePath,ThreadSize) # 多线程版本 两个文件 # findMain2(QQ,FilePath,ThreadSize) # 单线程 # open_file2(QQ=QQ,path='F:\\Downloads\\piece\\qq_0.txt') # open_file2(QQ=QQ,path=FilePath) # 文件切割 # cutPiece(1000,FilePath) # 创建表 # createTable() # 插入数据测试 # insterDB(0,1) # 多线程插入 # insterMain(100) # 取余分块 # cutRemainderPiece_V2(1000,FilePath) # 异步查询 # asyncio.run(testpiece()) loop = asyncio.get_event_loop() loop.run_until_complete(testpiece()) endtime = datetime.datetime.now() print (endtime - starttime) ``` Last modification:November 27th, 2020 at 11:02 am © 允许规范转载 Support If you think my article is useful to you, please feel free to appreciate ×Close Appreciate the author Sweeping payments
大佬来一份java或者go得代码
Quan姐姐太棒啦
大佬数据给我一份试试呗