文件的上传校验

使用线程池开多线程会出错

服务端.py

import hashlib
import random
import socketserver
import struct
import json
import os
import threading
from configparser import ConfigParser

class Myserver(socketserver.BaseRequestHandler):

def get_md5(self, file_size, file_path):
md = hashlib.md5()
read_size = 0
f = open(file_path, 'rb')
while read_size < file_size:
if file_size - read_size < 8192:
total_data = f.read(file_size - read_size)
else:
total_data = f.read(8192)
read_size += len(total_data)
md.update(total_data)
f.close()
return md.hexdigest()

# 接收文件
def receive(self, addr,filesize):
file_path = addr
file_size = filesize
# print("接收文件","file_path:",file_path,"file_size:",file_size)
f = open(file_path, 'wb')
received_size = 0
while received_size < file_size:
temp = random.randint(12000, 200000)
if file_size - received_size < 65535:
total_data = self.request.recv(file_size - received_size)
else:
total_data = self.request.recv(temp)
received_size += len(total_data)
f.write(total_data)
# print("接收完成.......")
f.close()

def send_msg(self, data):
head_data = json.dumps(data).encode()
head_size = struct.pack('i', len(head_data))
self.request.send(head_size)
self.request.send(head_data)

def recv_msg(self):
head_size = self.request.recv(4)
st_data = struct.unpack('i', head_size)[0]
head_data = self.request.recv(st_data).decode()
return json.loads(head_data)

def handle(self):
global flag
try:
cur_thread = threading.current_thread()
print('----------------------------------')
print('cur_thread.name:', cur_thread.name)
print("---------------------------------")
try:
if flag:
global dir
dir = self.recv_msg()
os.makedirs(dirs + dir)
# self.send_msg("文件创建成功")
print(dir+"目录创建成功")
pass
except Exception as e:
# self.send_msg("文件已创建")
flag = False
pass
header_dir = self.recv_msg()
filename = header_dir['filename']
filesize = header_dir['filesize']
md5 = header_dir['md5']
addr = dirs + dir + "\\" + filename
self.receive( addr, filesize)
print("%s文件上传成功" % filename)
if md5 == self.get_md5(filesize, dirs + dir + "\\" + filename):
self.send_msg("1")
print("%s验证成功" % filename)
else:
self.send_msg("0")
print("%s验证失败" % filename)
self.request.close()
except Exception as e:
pass



if __name__ == '__main__':
dir = ""
float = True
cp = ConfigParser()
cp.read('system.conf', encoding="utf-8")
ip = cp.get("server", 'ip')
port = cp.getint("server", 'port')
dirs = cp.get("server", 'dirs')
print("-" * 30)
print("用户ip地址:", ip)
print("用户端口:", port)
print("文件存放目录:", dirs)
print("-" * 30)
ip_port = (ip, port)
server = socketserver.ThreadingTCPServer(ip_port, Myserver)
server.serve_forever()

客户端.py

import hashlib
import socket
import struct
import json
import os
import time
from multiprocessing.dummy import Pool
from configparser import ConfigParser
class Client(object):
def __init__(self):
self.host = ip
self.port = port

#发送文件
def sendfile(self, addr,dirsize):
file_path = addr
file_size = dirsize
f = open(file_path, 'rb')
send_size = 0
while send_size < file_size:
if file_size - send_size < 8192: # 最后一次
total_data = f.read(file_size - send_size)
self.request.send(total_data)
else:
total_data = f.read(8192)
self.request.send(total_data)
send_size += len(total_data)
# print("发送完成")
f.close()
# 发送目录
def send_table(self):
lis = []
pool = Pool(thread)
for parent, dirname, filters in os.walk(dir):
for i in filters:
diradrr = parent + '\\' + i
lis.append(diradrr)
# print(lis)
pool.map(self.main, lis)
time.sleep(2)
# 加密
def get_md5(self, file_path=None, file_size=None):
if file_size:
md = hashlib.md5()
read_size = 0
f = open(file_path, 'rb')
while read_size < file_size:
if file_size - read_size < 8192:
total_data = f.read(file_size - read_size)
else:
total_data = f.read(8192)
read_size += len(total_data)
md.update(total_data)
f.close()
return md.hexdigest()
else:
return hashlib.md5().hexdigest()

# 发送信息
def send_msg(self, data):
head_data = json.dumps(data).encode()
head_size = struct.pack('i', len(head_data))
self.request.send(head_size)
self.request.send(head_data)

# 接收信息
def recv_msg(self):
head_size = self.request.recv(4)
st_data = struct.unpack('i', head_size)[0]
head_data = self.request.recv(st_data).decode()
return json.loads(head_data)

# 连接
def connect_socket(self):
self.request = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.request.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.request.connect((self.host, self.port), )
#运行
def run(self):
self.send_table()

def main(self,addr=None):
global flag
self.connect_socket()
if flag:
self.send_msg(mkdir)
flag = False
else:
pass
# data = self.recv_msg()
# print(data)
dirsize = os.path.getsize(addr)
dirname = addr.split("\\")[-1]
header_dir = {
"filename": dirname,
"filesize": dirsize,
"md5": self.get_md5(addr,dirsize)
}
self.send_msg(header_dir)
self.sendfile(addr,dirsize)
data = self.recv_msg()
if data == "1":
print("%s验证成功" %dirname)
os.remove(addr)
print("%s文件已删除"%dirname )
elif data == "0":
print("%s验证失败"%dirname)

if __name__ == '__main__':
flag = True
cp = ConfigParser()
cp.read('system.conf', encoding="utf-8")
mkdir = cp.get("client", 'mkdir')
ip = cp.get("client", 'ip')
port = cp.getint("client", 'port')
dir = cp.get("client", 'dir')
thread = cp.getint('client', 'thread')
print("-" * 30)
print("用户ip地址:", ip)
print("用户端口:", port)
print("文件发送的目录:", dir)
print("创建的文件夹目录:", mkdir)
print("打开的线程数目:", thread)
print("-" * 30)
while 1:
try:
Client().run()
except Exception as e:
continue

配置文件

system.conf

[client]
ip = 127.0.0.1
port = 8000
dir = E:\文件目录\\
mkdir = 它的目录
thread = 1

[server]
ip = 127.0.0.1
port = 8000
dirs = E:\\


开线程池急用使用以下方法,但传大文件不合适

使用线程池开多线程上传文件

服务端.py

import hashlib
import socketserver
import struct
import json
import os
from configparser import ConfigParser

class Myserver(socketserver.BaseRequestHandler):
def send_msg(self, data):
head_data = json.dumps(data).encode()
head_size = struct.pack('i', len(head_data))
self.request.send(head_size)
self.request.send(head_data)

def recv_msg(self):
head_size = self.request.recv(4)
st_data = struct.unpack('i', head_size)[0]
head_data = self.request.recv(st_data).decode()
return json.loads(head_data)

def get_md5(self,data=None):
if data:
m5 = hashlib.md5()
file = open(data, 'rb')
m5.update(file.read(4096))
file.close()
return m5.hexdigest()

def handle(self):
try:
dir = self.recv_msg()
try:
os.makedirs(dirs + dir)
except Exception as e:
pass
obj = self.request.recv(4)
header_size = struct.unpack('i', obj)[0]
header_bytes = self.request.recv(header_size)
header_json = header_bytes.decode("utf-8")
header_dic = json.loads(header_json)
filename = header_dic['filename']
filesize = header_dic['filesize']
md5 = header_dic['md5']
with open(dirs + dir + "\\" + filename, 'wb') as f:
recv_size = 0
while recv_size < filesize:
res = self.request.recv(1024)
f.write(res)
recv_size += len(res)
print("%s文件上传成功" % filename)

if md5 == self.get_md5(dirs + dir + "\\" + filename):
self.request.send("1".encode("utf-8"))
print("%s验证成功" % filename)
else:
self.request.send("0".encode("utf-8"))
print("%s验证失败" % filename)
self.request.close()
except Exception as e:
pass


if __name__ == '__main__':
cp = ConfigParser()
cp.read('system.conf', encoding="utf-8")
ip = cp.get("server", 'ip')
port = cp.getint("server", 'port')
dirs = cp.get("server", 'dirs')
print("-" * 30)
print("用户ip地址:", ip)
print("用户端口:", port)
print("文件存放目录:", dirs)
print("-" * 30)
ip_port = (ip, port)
server = socketserver.ThreadingTCPServer(ip_port, Myserver)
server.serve_forever()

客户端.py

import hashlib
import os
import json
import socket
import struct
import time
from multiprocessing.dummy import Pool
from configparser import ConfigParser

def send_msg(client,mkdirs):
head_data = json.dumps(mkdirs).encode()
head_size = struct.pack('i', len(head_data))
client.send(head_size)
client.request.send(head_data)


def recv_msg(client):
head_size = client.recv(4)
st_data = struct.unpack('i', head_size)[0]
head_data = client.recv(st_data).decode()
return json.loads(head_data)

def get_md5(data=None):
if data:
m5 = hashlib.md5()
file = open(data, 'rb')
m5.update(file.read(4096))
file.close()
return m5.hexdigest()

cp = ConfigParser()
cp.read('system.conf',encoding="utf-8")
mkdir = cp.get("client",'mkdir')
ip = cp.get("client",'ip')
port = cp.getint("client",'port')
dir = cp.get("client",'dir')
thread = cp.getint('client','thread')
print("-"*30)
print("用户ip地址:",ip)
print("用户端口:",port)
print("文件发送的目录:",dir)
print("创建的文件夹目录:",mkdir)
print("打开的线程数目:",thread)
print("-"*30)

while 1:
try:
def run(addr):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
client.connect((ip, port))

head_data = json.dumps(mkdir).encode()
head_size = struct.pack('i', len(head_data))
client.send(head_size)
client.send(head_data)

dirsize = os.path.getsize(addr)
dirname = addr.split("\\")[-1]

header_dir = {
"filename":dirname,
"filesize":dirsize,
"md5":get_md5(addr)
}
header_join = json.dumps(header_dir)
header_bytes = header_join.encode("utf-8")
client.send(struct.pack('i',len(header_bytes)))
client.send(header_bytes)
with open(addr,"rb") as f:
client.sendall(f.read())
data = client.recv(1024).decode("utf-8")
if int(data) == 1:
print("%s校验成功"%dirname)
os.remove(addr)
print("%s文件已删除" %dirname)
if int(data) == 0:
print("%s文件传输异常"%dirname)
client.close()
lis = []
pool = Pool(thread)
for parent, dirname, filters in os.walk(dir):
for i in filters:
diradrr = parent+'\\'+i
lis.append(diradrr)
pool.map(run,lis)
time.sleep(2)
except Exception as e:
continue

配置文件

system.conf

[client]
ip = 127.0.0.1
port = 8000
dir = E:\文件目录\\
mkdir = 它的目录
thread = 1

[server]
ip = 127.0.0.1
port = 8000
dirs = E:\\


  上一篇:python文件打包 下一篇:github超时解决  

湘ICP备19016894号 © 2019 小钱