今天小編給大家分享一下怎么使用Python語言實現(xiàn)消息傳遞的gRPC的相關(guān)知識點,內(nèi)容詳細(xì),邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
10年積累的成都做網(wǎng)站、成都網(wǎng)站設(shè)計經(jīng)驗,可以快速應(yīng)對客戶對網(wǎng)站的新想法和需求。提供各種問題對應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識你,你也不認(rèn)識我。但先網(wǎng)站設(shè)計后付款的網(wǎng)站建設(shè)流程,更有前鋒免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
# conda $ conda create -n grpc_env python=3.9 # install grpc $ pip install grpc -i https://pypi.doubanio.com/simple $ pip install grpc-tools -i https://pypi.doubanio.com/simple # 有時proto生成py文件不對就是得換換grpc兩個包的版本
整體結(jié)構(gòu),client.py server.py 和proto目錄下的example.proto

1)在example.proto定義傳送體
// 聲明
syntax = "proto3";
package proto;
// service創(chuàng)建
service HelloService{
rpc Hello(Request) returns (Response) {} // 單單傳送消息
}
// 請求參數(shù)消息體 1、2是指參數(shù)順序
message Request {
string data = 1;
}
// 返回參數(shù)消息體
message Response {
int32 ret = 1; //返回碼
string data = 2;
}
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto2) 在虛擬環(huán)境里使用命令生成py文件
$ conda activate grpc_env
$ f:
$ cd F:\examples
$ python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto
在proto目錄下會生成兩個py文件,如下圖所示:

3) 編輯client.py 和 server.py
# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具體功能實現(xiàn)"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def server(ip: str, port: int) -> None:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # ??為10的線程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
while True:
time.sleep(60 * 60)
except Exception as es:
print(es)
server.stop(0)
if __name__ == '__main__':
server("127.0.0.1", 8000)
# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
def client(ip: str, port: int) -> None:
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target) # 連接rpc服務(wù)器
cli = example_pb2_grpc.HelloServiceStub(channel) # 創(chuàng)建Stub
data = "hello 123"
request = example_pb2.Request(data=data)
res = cli.Hello(request)
print(f"ret:{res.ret}, data:{res.data}")
if __name__ == '__main__':
client("127.0.0.1", 8000)默認(rèn)情況下,gRPC 將傳入消息限制為 4 MB。 傳出消息沒有限制。
1)example.proto定義不變
2)編輯client.py 和 server.py
# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具體功能實現(xiàn)"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def server(ip: str, port: int) -> None:
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ??為10的線程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
while True:
time.sleep(60 * 60)
except Exception as es:
print(es)
server.stop(0)
if __name__ == '__main__':
server("127.0.0.1", 8000)# client.py
import grpc
from proto import example_pb2_grpc, example_pb2
def client(ip: str, port: int) -> None:
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 連接rpc服務(wù)器
cli = example_pb2_grpc.HelloServiceStub(channel) # 創(chuàng)建Stub
data = "hello 123" * 1024 * 1024
request = example_pb2.Request(data=data)
res = cli.Hello(request)
print(f"ret:{res.ret}, data:{res.data}")
if __name__ == '__main__':
client("127.0.0.1", 8000)1)example.proto定義不變
2)編輯client.py 和 server.py
# server.py
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具體功能實現(xiàn)"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
time.sleep(2)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def server(ip: str, port: int) -> None:
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ??為10的線程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
while True:
time.sleep(60 * 60)
except Exception as es:
print(es)
server.stop(0)
if __name__ == '__main__':
server("127.0.0.1", 8000)# client.py
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
def client(ip: str, port: int) -> None:
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 連接rpc服務(wù)器
cli = example_pb2_grpc.HelloServiceStub(channel) # 創(chuàng)建Stub
try:
data = "hello 123"
request = example_pb2.Request(data=data)
res = cli.Hello(request, timeout=1) # timeout 單位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
if __name__ == '__main__':
client("127.0.0.1", 8000)運(yùn)行結(jié)果:
grpc.RpcError Deadline Exceeded
1)在example.proto重新定義傳送體
// 聲明
syntax = "proto3";
package proto;
// service創(chuàng)建
service HelloService{
rpc Hello(Request) returns (Response) {} // 單單傳送消息
rpc ClientTOServer(stream UpFileRequest) returns (Response) {} // 流式上傳文件
rpc ServerTOClient(Request) returns (stream UpFileRequest) {} // 流式下載文件
}
// 請求參數(shù)消息體 1、2是指參數(shù)順序
message Request {
string data = 1;
}
// 返回參數(shù)消息體
message Response {
int32 ret = 1; //返回碼
string data = 2;
}
message UpFileRequest {
string filename = 1;
int64 sendsize = 2;
int64 totalsize = 3;
bytes data = 4;
}
//python -m grpc_tools.protoc -I ./ --python_out=./ --grpc_python_out=./ ./example.proto2)在虛擬環(huán)境里使用命令生成py文件,參考2. 2)
3)編輯client.py 和 server.py
# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具體功能實現(xiàn)"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
time.sleep(2)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def ClientTOServer(self, request_iterator, context):
"""上傳文件"""
data = bytearray()
for UpFileRequest in request_iterator:
file_name = UpFileRequest.filename
file_size = UpFileRequest.totalsize
file_data = UpFileRequest.data
print(f"文件名稱:{file_name}, 文件總長度:{file_size}")
data.extend(file_data) # 拼接兩個bytes
print(f"已接收長度:{len(data)}")
if len(data) == file_size:
with open("242_copy.mp3", "wb") as fw:
fw.write(data)
print(f"{file_name=} 下載完成")
(ret, res) = (0, file_name)
else:
print(f"{file_name=} 下載失敗")
(ret, res) = (-1, file_name)
return example_pb2.Response(ret=ret, data=res)
def ServerTOClient(self, request, context):
"""下載文件"""
fp = request.data
print(f"下載文件:{fp=}")
# 獲取文件名和文件大小
file_name = os.path.basename(fp)
file_size = os.path.getsize(fp) # 獲取文件大小
# 發(fā)送文件內(nèi)容
part_size = 1024 * 1024 # 每次讀取1MB數(shù)據(jù)
count = 1
with open(fp, "rb") as fr:
while True:
try:
if count == 1:
count += 1
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
else:
context = fr.read(part_size)
if context:
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
sendsize=len(context),
data=context)
else:
print(f"發(fā)送完畢")
return 0
except Exception as es:
print(es)
def server(ip: str, port: int) -> None:
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ??為10的線程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
while True:
time.sleep(60 * 60)
except Exception as es:
print(es)
server.stop(0)
if __name__ == '__main__':
server("127.0.0.1", 8000)# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
def send_stream_data(fp: str):
"""迭代器發(fā)送大文件"""
# 獲取文件名和文件大小
file_name = os.path.basename(fp)
file_size = os.path.getsize(fp) # 獲取文件大小
# 發(fā)送文件內(nèi)容
part_size = 1024 * 1024 # 每次讀取1MB數(shù)據(jù)
count = 1
with open(fp, "rb") as fr:
while True:
try:
if count == 1:
count += 1
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
else:
context = fr.read(part_size)
if context:
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
data=context)
else:
print(f"發(fā)送完畢")
return 0
except Exception as es:
print(es)
def client(ip: str, port: int) -> None:
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 連接rpc服務(wù)器
cli = example_pb2_grpc.HelloServiceStub(channel) # 創(chuàng)建Stub
try:
data = "hello 123"
request = example_pb2.Request(data=data)
res = cli.Hello(request, timeout=1) # timeout 單位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
def client_to_server(ip: str, port: int, fp: str):
"""
流式上傳數(shù)據(jù)。
"""
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 連接rpc服務(wù)器
cli = example_pb2_grpc.HelloServiceStub(channel) # 創(chuàng)建Stub
try:
request = send_stream_data(fp=fp)
res = cli.ClientTOServer(request, timeout=600) # timeout 單位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
def server_to_client(ip: str, port: int, fp: str):
"""
流式上傳數(shù)據(jù)。
"""
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 連接rpc服務(wù)器
cli = example_pb2_grpc.HelloServiceStub(channel) # 創(chuàng)建Stub
try:
data = bytearray()
request = example_pb2.Request(data=fp)
filename = ""
for res in cli.ServerTOClient(request, timeout=300):
filename = res.filename
total_size = res.totalsize
data += res.data
if total_size == len(data):
with open("242_1.mp3", "wb") as fw:
fw.write(data)
print(f"{filename=} : {total_size=} 下載完成!")
else:
print(f"{filename=} 下載失敗!")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
if __name__ == '__main__':
# client("127.0.0.1", 8000)
# client_to_server("127.0.0.1", 8000, "242.mp3")
server_to_client("127.0.0.1", 8000, "242.mp3")# server.py
import os
import time
import grpc
from concurrent import futures
from proto import example_pb2_grpc, example_pb2
import asyncio
class ServiceBack(example_pb2_grpc.HelloServiceServicer):
"""接口的具體功能實現(xiàn)"""
def Hello(self, request, context):
"""hello"""
data = request.data
print(data)
time.sleep(2)
ret_data = "Response:" + data
return example_pb2.Response(ret=0, data=ret_data)
def ClientTOServer(self, request_iterator, context):
"""上傳文件"""
data = bytearray()
for UpFileRequest in request_iterator:
file_name = UpFileRequest.filename
file_size = UpFileRequest.totalsize
file_data = UpFileRequest.data
print(f"文件名稱:{file_name}, 文件總長度:{file_size}")
data.extend(file_data) # 拼接兩個bytes
print(f"已接收長度:{len(data)}")
if len(data) == file_size:
with open("242_copy.mp3", "wb") as fw:
fw.write(data)
print(f"{file_name=} 下載完成")
(ret, res) = (0, file_name)
else:
print(f"{file_name=} 下載失敗")
(ret, res) = (-1, file_name)
return example_pb2.Response(ret=ret, data=res)
def ServerTOClient(self, request, context):
"""下載文件"""
fp = request.data
print(f"下載文件:{fp=}")
# 獲取文件名和文件大小
file_name = os.path.basename(fp)
file_size = os.path.getsize(fp) # 獲取文件大小
# 發(fā)送文件內(nèi)容
part_size = 1024 * 1024 # 每次讀取1MB數(shù)據(jù)
count = 1
with open(fp, "rb") as fr:
while True:
try:
if count == 1:
count += 1
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
else:
context = fr.read(part_size)
if context:
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size,
sendsize=len(context),
data=context)
else:
print(f"發(fā)送完畢")
return 0
except Exception as es:
print(es)
async def server(ip: str, port: int) -> None:
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10), options=options) # ??為10的線程池
ai_servicer = ServiceBack()
example_pb2_grpc.add_HelloServiceServicer_to_server(ai_servicer, server)
server.add_insecure_port(f"{ip}:{port}")
await server.start()
try:
print(f"server is started! ip:{ip} port:{str(port)}")
await server.wait_for_termination()
except Exception as es:
print(es)
await server.stop(None)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([server("127.0.0.1", 8000)]))
loop.close()# client.py
import os
import sys
import grpc
from proto import example_pb2_grpc, example_pb2
import asyncio
def send_stream_data(fp: str):
"""迭代器發(fā)送大文件"""
# 獲取文件名和文件大小
file_name = os.path.basename(fp)
file_size = os.path.getsize(fp) # 獲取文件大小
# 發(fā)送文件內(nèi)容
part_size = 1024 * 1024 # 每次讀取1MB數(shù)據(jù)
count = 1
with open(fp, "rb") as fr:
while True:
try:
if count == 1:
count += 1
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=0, data=b"")
else:
context = fr.read(part_size)
if context:
yield example_pb2.UpFileRequest(filename=file_name, totalsize=file_size, sendsize=len(context),
data=context)
else:
print(f"發(fā)送完畢")
return 0
except Exception as es:
print(es)
async def client(ip: str, port: int) -> None:
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
async with grpc.aio.insecure_channel(target, options=options) as channel: # 連接rpc服務(wù)器
cli = example_pb2_grpc.HelloServiceStub(channel) # 創(chuàng)建Stub
try:
data = "hello 123"
request = example_pb2.Request(data=data)
res = await cli.Hello(request, timeout=3) # timeout 單位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
async def client_to_server(ip: str, port: int, fp: str):
"""
流式上傳數(shù)據(jù)。
"""
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
async with grpc.aio.insecure_channel(target, options=options) as channel: # 連接rpc服務(wù)器
cli = example_pb2_grpc.HelloServiceStub(channel) # 創(chuàng)建Stub
try:
request = send_stream_data(fp=fp)
res = await cli.ClientTOServer(request, timeout=600) # timeout 單位:秒
print(f"ret:{res.ret}, data:{res.data}")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
def server_to_client(ip: str, port: int, fp: str):
"""
流式上傳數(shù)據(jù)。
"""
# 數(shù)據(jù)傳輸大小配置
max_message_length = 1024 * 1024 * 1024 # 1G
options = [('grpc.max_send_message_length', max_message_length),
('grpc.max_receive_message_length', max_message_length),
('grpc.enable_retries', 1),
]
target = str(ip) + ":" + str(port)
channel = grpc.insecure_channel(target, options=options) # 連接rpc服務(wù)器
cli = example_pb2_grpc.HelloServiceStub(channel) # 創(chuàng)建Stub
try:
data = bytearray()
request = example_pb2.Request(data=fp)
filename = ""
for res in cli.ServerTOClient(request, timeout=300):
filename = res.filename
total_size = res.totalsize
data += res.data
if total_size == len(data):
with open("242_1.mp3", "wb") as fw:
fw.write(data)
print(f"{filename=} : {total_size=} 下載完成!")
else:
print(f"{filename=} 下載失敗!")
except grpc.RpcError as rpc_error:
print("grpc.RpcError", rpc_error.details())
except Exception as es:
print(es)
finally:
sys.exit(-1)
if __name__ == '__main__':
# asyncio.run(client("127.0.0.1", 8000))
asyncio.run(client_to_server("127.0.0.1", 8000, "242.mp3"))
# server_to_client("127.0.0.1", 8000, "242.mp3")以上就是“怎么使用Python語言實現(xiàn)消息傳遞的gRPC”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學(xué)習(xí)更多的知識,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
網(wǎng)站名稱:怎么使用Python語言實現(xiàn)消息傳遞的gRPC
新聞來源:http://chinadenli.net/article42/jgjchc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站、關(guān)鍵詞優(yōu)化、定制開發(fā)、網(wǎng)站設(shè)計公司、動態(tài)網(wǎng)站、網(wǎng)頁設(shè)計公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)