這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)基于akka怎樣實(shí)現(xiàn)RPC,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

10年積累的成都網(wǎng)站制作、成都做網(wǎng)站、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站設(shè)計(jì)后付款的網(wǎng)站建設(shè)流程,更有靈壽免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
目前的工作在基于akka實(shí)現(xiàn)數(shù)據(jù)服務(wù)總線,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很簡(jiǎn)單的寫出一個(gè)大型的分布式集群的架構(gòu)。里面的一塊功能就是RPC(遠(yuǎn)程過程調(diào)用),這篇文章將會(huì)介紹一種實(shí)現(xiàn)方式。
akka rpc java
目錄[-]
akka-rpc(基于akka的rpc的實(shí)現(xiàn))
RPC
實(shí)現(xiàn)原理
Server端核心代碼
Client端核心代碼
Demo
akka-rpc(基于akka的rpc的實(shí)現(xiàn))
代碼:http://git.oschina.net/for-1988/Simples
目前的工作在基于akka(java)實(shí)現(xiàn)數(shù)據(jù)服務(wù)總線,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很簡(jiǎn)單的寫出一個(gè)大型的分布式集群的架構(gòu)。里面的一塊功能就是RPC(遠(yuǎn)程過程調(diào)用)。
RPC
遠(yuǎn)程過程調(diào)用(Remote Procedure Call,RPC)是一個(gè)計(jì)算機(jī)通信協(xié)議。該協(xié)議允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一臺(tái)計(jì)算機(jī)的子程序,而程序員無需額外地為這個(gè)交互作用編程。如果涉及的軟件采用面向?qū)ο缶幊蹋敲催h(yuǎn)程過程調(diào)用亦可稱作遠(yuǎn)程調(diào)用或遠(yuǎn)程方法調(diào)用,例:Java RMI。
實(shí)現(xiàn)原理
整個(gè)RPC的調(diào)用過程完全基于akka來傳遞對(duì)象,因?yàn)樾枰M(jìn)行網(wǎng)絡(luò)通信,所以我們的接口實(shí)現(xiàn)類、調(diào)用參數(shù)以及返回值都需要實(shí)現(xiàn)java序列化接口。客戶端跟服務(wù)端其實(shí)都是在一個(gè)Akka 集群關(guān)系中,Client跟Server都是集群中的一個(gè)節(jié)點(diǎn)。首先Client需要初始化RpcClient對(duì)象,在初始化的過程中,我們啟動(dòng)了AkkaSystem,加入到整個(gè)集群中,并創(chuàng)建了負(fù)責(zé)與Server進(jìn)行通信的Actor。然后通過RpcClient中的getBean(Class<T> clz)方法獲取Server端的接口實(shí)現(xiàn)類的實(shí)例對(duì)象,然后通過動(dòng)態(tài)代理攔截這個(gè)對(duì)象的所有方法。最后,在執(zhí)行方法的時(shí)候,在RpcBeanProxy中向Server發(fā)送CallMethod事件,執(zhí)行遠(yuǎn)程實(shí)現(xiàn)類的方法,獲取返回值給Client。
Server端核心代碼
public class RpcServer extends UntypedActor {
private Map<String, Object> proxyBeans;
public RpcServer(Map<Class<?>, Object> beans) {
proxyBeans = new HashMap<String, Object>();
for (Iterator<Class<?>> iterator = beans.keySet().iterator(); iterator
.hasNext();) {
Class<?> inface = iterator.next();
proxyBeans.put(inface.getName(), beans.get(inface));
}
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof RpcEvent.CallBean) { //返回Server端的接口實(shí)現(xiàn)類的實(shí)例
CallBean event = (CallBean) message;
ReturnBean bean = new ReturnBean(
proxyBeans.get(event.getBeanName()), getSelf());
getSender().tell(bean, getSelf());
} else if (message instanceof RpcEvent.CallMethod) {
CallMethod event = (CallMethod) message;
Object bean = proxyBeans.get(event.getBeanName());
Object[] params = event.getParams();
List<Class<?>> paraTypes = new ArrayList<Class<?>>();
Class<?>[] paramerTypes = new Class<?>[] {};
if (params != null) {
for (Object param : params) {
paraTypes.add(param.getClass());
}
}
Method method = bean.getClass().getMethod(event.getMethodName(),
paraTypes.toArray(paramerTypes));
Object o = method.invoke(bean, params);
getSender().tell(o, getSelf());
}
}
}
啟動(dòng)Server
public static void main(String[] args) {
final Config config = ConfigFactory
.parseString("akka.remote.netty.tcp.port=" + 2551)
.withFallback(
ConfigFactory
.parseString("akka.cluster.roles = [RpcServer]"))
.withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("EsbSystem", config);
// Server 加入發(fā)布的服務(wù)
Map<Class<?>, Object> beans = new HashMap<Class<?>, Object>();
beans.put(ExampleInterface.class, new ExampleInterfaceImpl());
system.actorOf(Props.create(RpcServer.class, beans), "rpcServer");
}
Client端核心代碼
RpcClient類型集成了Thread,為了解決一個(gè)問題:因?yàn)锳kkaSystem在加入集群中的時(shí)候是異步的,所以我們?cè)诘谝淮蝞ew RpcClient對(duì)象的時(shí)候需要等待加入集群成功以后,才可以執(zhí)行下面的方法,不然獲取的 /user/rpcServer Route中沒有Server的Actor,請(qǐng)求會(huì)失敗。
public class RpcClient extends Thread {
private ActorSystem system;
private ActorRef rpc;
private ActorRef clientServer;
private static RpcClient instance = null;
public RpcClient() {
this.start();
final Config config = ConfigFactory
.parseString("akka.remote.netty.tcp.port=" + 2552)
.withFallback(
ConfigFactory
.parseString("akka.cluster.roles = [RpcClient]"))
.withFallback(ConfigFactory.load());
system = ActorSystem.create("EsbSystem", config);
int totalInstances = 100;
Iterable<String> routeesPaths = Arrays.asList("/user/rpcServer");
boolean allowLocalRoutees = false;
ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
new AdaptiveLoadBalancingGroup(
HeapMetricsSelector.getInstance(),
Collections.<String> emptyList()),
new ClusterRouterGroupSettings(totalInstances, routeesPaths,
allowLocalRoutees, "RpcServer"));
rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall");
clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc),
"client");
Cluster.get(system).registerOnMemberUp(new Runnable() { //加入集群成功后的回調(diào)事件,恢復(fù)當(dāng)前線程的中斷
@Override
public void run() {
synchronized (instance) {
System.out.println("notify");
instance.notify();
}
}
});
}
public static RpcClient getInstance() {
if (instance == null) {
instance = new RpcClient();
synchronized (instance) {
try { //中斷當(dāng)前線程,等待加入集群成功后,恢復(fù)
System.out.println("wait");
instance.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return instance;
}
public <T> T getBean(Class<T> clz) {
Future<Object> future = Patterns.ask(clientServer,
new RpcEvent.CallBean(clz.getName(), clientServer),
new Timeout(Duration.create(5, TimeUnit.SECONDS)));
try {
Object o = Await.result(future,
Duration.create(5, TimeUnit.SECONDS));
if (o != null) {
ReturnBean returnBean = (ReturnBean) o;
return (T) new RpcBeanProxy().proxy(returnBean.getObj(),
clientServer, clz);
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
RpcClientServer
public class RpcClientServer extends UntypedActor {
private ActorRef rpc;
public RpcClientServer(ActorRef rpc) {
this.rpc = rpc;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof RpcEvent.CallBean) { //向Server發(fā)送CallBean請(qǐng)求
CallBean event = (CallBean) message;
Future<Object> future = Patterns.ask(rpc, event, new Timeout(
Duration.create(5, TimeUnit.SECONDS)));
Object o = Await.result(future,
Duration.create(5, TimeUnit.SECONDS));
getSender().tell(o, getSelf());
} else if (message instanceof RpcEvent.CallMethod) { //向Server發(fā)送方法調(diào)用請(qǐng)求
Future<Object> future = Patterns.ask(rpc, message, new Timeout(
Duration.create(5, TimeUnit.SECONDS)));
Object o = Await.result(future,
Duration.create(5, TimeUnit.SECONDS));
getSender().tell(o, getSelf());
}
}
}
RpcBeanProxy,客戶端的動(dòng)態(tài)代理類
public class RpcBeanProxy implements InvocationHandler {
private ActorRef rpcClientServer;
private Class<?> clz;
public Object proxy(Object target, ActorRef rpcClientServer, Class<?> clz) {
this.rpcClientServer = rpcClientServer;
this.clz = clz;
return Proxy.newProxyInstance(target.getClass().getClassLoader(),
target.getClass().getInterfaces(), this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
Object result = null;
RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod(
method.getName(), args, clz.getName());
Future<Object> future = Patterns.ask(rpcClientServer, callMethod,
new Timeout(Duration.create(5, TimeUnit.SECONDS)));
Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
result = o;
return result;
}
}
Demo
Interface,Client和Server都需要這個(gè)類,必須實(shí)現(xiàn)序列化
public interface ExampleInterface extends Serializable{
public String sayHello(String name);
}
實(shí)現(xiàn)類,只需要Server端存在這個(gè)類。
public class ExampleInterfaceImpl implements ExampleInterface {
@Override
public String sayHello(String name) {
System.out.println("Be Called !");
return "Hello " + name;
}
}
Client調(diào)用
public static void main(String[] args) {
RpcClient client = RpcClient.getInstance();
long start = System.currentTimeMillis();
ExampleInterface example = client.getBean(ExampleInterface.class);
System.out.println(example.sayHello("rpc"));
long time = System.currentTimeMillis() - start;
System.out.println("time :" + time);
}
這里第一次調(diào)用耗時(shí)比較長(zhǎng)需要46毫秒,akka會(huì)對(duì)消息進(jìn)行優(yōu)化,調(diào)用多次以后時(shí)間為 1~2毫秒。上述就是小編為大家分享的基于akka怎樣實(shí)現(xiàn)RPC了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
新聞名稱:基于akka怎樣實(shí)現(xiàn)RPC
本文鏈接:http://chinadenli.net/article44/ipdiee.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App設(shè)計(jì)、網(wǎng)站收錄、企業(yè)建站、靜態(tài)網(wǎng)站、微信小程序、電子商務(wù)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)