dapeng 调用
2018-02-06 本文已影响0人
偶像本人
dapeng调用过程
标签(空格分隔): dapeng
1、服务端启动(主要介绍几个插件spring、zookeeper、netty,服务端只要启动这三个,就能完成与客户端的通信)容器加载完各个classloader之后开始启动各个插件
- SpringPlugin:拿到service.xml,通过spring的ClassPathXmlApplicationContext加载到相应processor(service的具体信息,包括接口,接口实现,对应的方法,同步异步)这个解析的过程详细可以看dapeng项目的dapeng-spring模块。加载完后注册到容器中。
@Override
@SuppressWarnings("unchecked")
public SoaServiceDefinition<?> getObject() throws Exception {
final Class<?> aClass = serviceRef.getClass();
final List<Class<?>> interfaces = Arrays.asList(aClass.getInterfaces());
List<Class<?>> filterInterfaces = interfaces.stream()
.filter(anInterface -> anInterface.isAnnotationPresent(Service.class) && anInterface.isAnnotationPresent(Processor.class))
.collect(toList());
if (filterInterfaces.isEmpty()) {
throw new RuntimeException("not config @Service & @Processor in " + refId);
}
Class<?> interfaceClass = filterInterfaces.get(filterInterfaces.size() - 1);
Processor processor = interfaceClass.getAnnotation(Processor.class);
Class<?> processorClass = Class.forName(processor.className(), true, interfaceClass.getClassLoader());
Constructor<?> constructor = processorClass.getConstructor(interfaceClass,Class.class);
SoaServiceDefinition tProcessor = (SoaServiceDefinition) constructor.newInstance(serviceRef,interfaceClass);
return tProcessor;
}
- zookeeper:
连接zookeeper,zookeeper从容器获取服务信息,注册服务(/soa/runtime/services/com.isuwang.soa.order.service.EnquiryPriceService data:"10.117.17.97:9084:1.0.0")
@Override
public void registerService(String serverName, String versionName) {
try {
//注册服务信息到runtime节点
String path = "/soa/runtime/services/" + serverName + "/" + SoaSystemEnvProperties.SOA_CONTAINER_IP + ":" + SoaSystemEnvProperties.SOA_CONTAINER_PORT + ":" + versionName;
String data = "";
zooKeeperHelper.addOrUpdateServerInfo(path, data);
//注册服务信息到master节点,并进行master选举
// TODO 后续需要优化选举机制
if (SoaSystemEnvProperties.SOA_ZOOKEEPER_MASTER_ISCONFIG) {
zooKeeperMasterHelper.createCurrentNode(ZookeeperHelper.generateKey(serverName, versionName));
}
else {
zooKeeperHelper.createCurrentNode(ZookeeperHelper.generateKey(serverName, versionName));
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
- nettyPlugin:开放的端口和zookeeper注册的端口一致,主要的处理在pipeline()
@Override
public void start() {
LOGGER.warn("Plugin::NettyPlugin start");
LOGGER.info("Bind Local Port {} [Netty]", port);
new Thread("NettyContainer-Thread") {
@Override
public void run() {
try {
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(15, 0, 0), //超时设置
new SoaDecoder(), //粘包和断包处理
new SoaIdleHandler(), //心跳处理
new SoaServerHandler(container)); //调用处理
}
})
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//重复利用之前分配的内存空间(PooledByteBuf -> ByteBuf)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// Start the server.
ChannelFuture f = bootstrap.bind(port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}.start();
}
2、生成代码的结构(生成domain,enums,service,同步异步的client,codec,xml文件)
- domain:定义的实体类
- enmus:定义的枚举
- client:
public class AdminCacheServiceClient implements AdminCacheService {
private final String serviceName;
private final String version;
private SoaConnectionPool pool;
public AdminCacheServiceClient() {
this.serviceName = "com.isuwang.soa.admin.service.AdminCacheService";
this.version = "1.0.0";
ServiceLoader<SoaConnectionPoolFactory> factories = ServiceLoader.load(SoaConnectionPoolFactory.class);
for (SoaConnectionPoolFactory factory : factories) {
this.pool = factory.getPool();
break;
}
this.pool.registerClientInfo(serviceName, version);
}
public java.util.Map<Integer, String> findStaffNamesByIds(java.util.Set<Integer> staffIds) throws SoaException {
String methodName = "findStaffNamesByIds";
findStaffNamesByIds_args findStaffNamesByIds_args = new findStaffNamesByIds_args();
findStaffNamesByIds_args.setStaffIds(staffIds);
findStaffNamesByIds_result response = pool.send(serviceName, version, "findStaffNamesByIds", findStaffNamesByIds_args, new FindStaffNamesByIds_argsSerializer(), new FindStaffNamesByIds_resultSerializer());
return response.getSuccess();
}
- service:定义的接口
@Service(name = "com.isuwang.soa.admin.service.AdminCacheService", version = "1.0.0")
@Processor(className = "com.isuwang.soa.admin.AdminCacheServiceCodec$Processor")
public interface AdminCacheService {
java.util.Map<Integer, String> findStaffNamesByIds(java.util.Set<Integer> staffIds) throws com.github.dapeng.core.SoaException;
String findStaffName(Integer staffId) throws com.github.dapeng.core.SoaException;
com.isuwang.soa.admin.domain.TStaffCache getEntity(Integer staffId) throws com.github.dapeng.core.SoaException;
java.util.Set<Integer> getDescendantStaffs(Integer orgId) throws com.github.dapeng.core.SoaException;
}
- codec:包含了参数对象,结果对象,对应的序列化器,processors
public static class findStaffNamesByIds<I extends com.isuwang.soa.admin.service.AdminCacheService> extends SoaFunctionDefinition.Sync<I, findStaffNamesByIds_args, findStaffNamesByIds_result> {
public findStaffNamesByIds() {
super("findStaffNamesByIds", new FindStaffNamesByIds_argsSerializer(), new FindStaffNamesByIds_resultSerializer());
}
@Override
public findStaffNamesByIds_result apply(I iface, findStaffNamesByIds_args findStaffNamesByIds_args) throws SoaException {
findStaffNamesByIds_result result = new findStaffNamesByIds_result();
result.success = iface.findStaffNamesByIds(findStaffNamesByIds_args.staffIds);
return result;
}
}
3、客户端发送请求(调用service服务的话必须依赖 dapeng-netty-client 旧版本的是dapeng-remoting-client)
request -> 建立connection -> 将请求序列化成字节流 -> 发送(channel.writeAndFlush(request))
- 建立connection:client拿到SoaConnectionPool,连接zookeeper,根据服务名和版本号获取对应的连接信息(ip和端口,可能有多个),通过负载均衡策略选择一个连接(策略:随机,轮循,最少活跃调用数,一致性Hash)
- 处理请求(经过filter处理)
- 序列化()
4 | 1 | 1 | 1 | 4 | n | n | 1 |
---|---|---|---|---|---|---|---|
length | stx(0x02) | version(1) | codec protocol(1) | seq(1) | soaHeader | request | etx(3) |
public ByteBuf build() throws TException {
InvocationContext invocationCtx = InvocationContextImpl.Factory.getCurrentInstance();
//buildHeader
protocol = protocol == null ? (invocationCtx.getCodecProtocol() == null ? CodecProtocol.CompressedBinary
: invocationCtx.getCodecProtocol()) : protocol;
TSoaTransport transport = new TSoaTransport(buffer);
TBinaryProtocol headerProtocol = new TBinaryProtocol(transport);
headerProtocol.writeByte(STX);
headerProtocol.writeByte(VERSION);
headerProtocol.writeByte(protocol.getCode());
headerProtocol.writeI32(seqid);
new SoaHeaderSerializer().write(header, headerProtocol);
//writer body
TProtocol bodyProtocol = null;
switch (protocol) {
case Binary:
bodyProtocol = new TBinaryProtocol(transport);
break;
case CompressedBinary:
bodyProtocol = new TCompactProtocol(transport);
break;
case Json:
bodyProtocol = new TJSONProtocol(transport);
break;
default:
throw new TException("通讯协议不正确(包体协议)");
}
bodySerializer.write(body, bodyProtocol);
headerProtocol.writeByte(ETX);
transport.flush();
return this.buffer;
}
- 发送请求
- 等待服务端处理请求
- 处理返回结果(反序列化返回的字节流)
@Override
public void onEntry(FilterContext ctx, FilterChain next) throws SoaException {
ByteBuf requestBuf = buildRequestBuf(service, version, method, seqid, request, requestSerializer);
// TODO filter
checkChannel();
ByteBuf responseBuf = client.send(channel, seqid, requestBuf); //发送请求,返回结果
Result<RESP> result = processResponse(responseBuf, responseSerializer);
ctx.setAttribute("result", result);
onExit(ctx, getPrevChain(ctx));
}
4、服务端的处理
- SoaServerHandler channelRead(ChannelHandlerContext ctx, Object msg)
msg为收到的请求的字节流,服务端反序列化msg,拿到对应的soaheader,再从容器拿到注册的服务信息,调用对应的方法,得到结果。
private <I, REQ, RESP> void processRequest(ChannelHandlerContext channelHandlerContext, TProtocol contentProtocol, SoaServiceDefinition<I> serviceDef,
ByteBuf reqMessage, TransactionContext context) throws TException {
try {
SoaHeader soaHeader = context.getHeader();
Application application = container.getApplication(new ProcessorKey(soaHeader.getServiceName(), soaHeader.getVersionName()));
SoaFunctionDefinition<I, REQ, RESP> soaFunction = (SoaFunctionDefinition<I, REQ, RESP>) serviceDef.functions.get(soaHeader.getMethodName());
REQ args = soaFunction.reqSerializer.read(contentProtocol);
contentProtocol.readMessageEnd();
//
I iface = serviceDef.iface;
//log request
application.info(this.getClass(), "{} {} {} operatorId:{} operatorName:{} request body:{}", soaHeader.getServiceName(), soaHeader.getVersionName(), soaHeader.getMethodName(), soaHeader.getOperatorId(), soaHeader.getOperatorName(), formatToString(soaFunction.reqSerializer.toString(args)));
HeadFilter headFilter = new HeadFilter();
Filter dispatchFilter = new Filter() {
private FilterChain getPrevChain(FilterContext ctx) {
SharedChain chain = (SharedChain) ctx.getAttach(this, "chain");
return new SharedChain(chain.head, chain.shared, chain.tail, chain.size() - 2);
}
@Override
public void onEntry(FilterContext ctx, FilterChain next) {
try {
if (serviceDef.isAsync) {
SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args);
future.whenComplete((realResult, ex) -> {
TransactionContext.Factory.setCurrentInstance(context);
processResult(channelHandlerContext, soaFunction, context, realResult, application, ctx);
onExit(ctx, getPrevChain(ctx));
});
} else {
SoaFunctionDefinition.Sync syncFunction = (SoaFunctionDefinition.Sync) soaFunction;
RESP result = (RESP) syncFunction.apply(iface, args);
processResult(channelHandlerContext, soaFunction, context, result, application, ctx);
onExit(ctx, getPrevChain(ctx));
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
writeErrorMessage(channelHandlerContext, context, new SoaException(SoaCode.UnKnown, e.getMessage()));
}
}
@Override
public void onExit(FilterContext ctx, FilterChain prev) {
try {
prev.onExit(ctx);
} catch (TException e) {
LOGGER.error(e.getMessage(), e);
}
}
};
SharedChain sharedChain = new SharedChain(headFilter, container.getFilters(), dispatchFilter, 0);
FilterContextImpl filterContext = new FilterContextImpl();
filterContext.setAttach(dispatchFilter, "chain", sharedChain);
sharedChain.onEntry(filterContext);
} finally {
reqMessage.release();
}
}