Openvidu Server 的WebRTC通讯实现 VI
Openvidu 与 Kurento javaclient中的各种session
openvidu和kurent jsonrpc client 中都有定义session, 刚开始的时候很容无法区分,下图是一个openvidu和kurent个包中涉及的session及关系。
image.png
可以看到openvidu中的session与kurento jsonrpc中的session 还是有有很大区别的,它们之间没有明显的继承关系,表达不同的意思。
- jsonrpc client包中的JsonRpcRequestSender 与 session 接口
JsonRpcRequestSender 接口定义了 jsonRpc协议的各种发送接口, 而jsonRpc协议是Kurento定义的客户端与服务间通讯的规范。
可以看到JsonRpcRequestSender 定义了两类消息, request和notification,具体的协议描述可以参考 kurento 说明文档 Kurento Protocol
public interface JsonRpcRequestSender {
<R> R sendRequest(String method, Class<R> resultClass) throws IOException;
<R> R sendRequest(String method, Object params, Class<R> resultClass) throws IOException;
JsonElement sendRequest(String method) throws IOException;
JsonElement sendRequest(String method, Object params) throws IOException;
Response<JsonElement> sendRequest(Request<JsonObject> request) throws IOException;
Response<JsonElement> sendRequestHonorId(Request<JsonObject> request) throws IOException;
void sendNotification(String method, Object params) throws IOException;
void sendNotification(String method) throws IOException;
void sendRequest(String method, JsonObject params, Continuation<JsonElement> continuation);
void sendRequest(Request<JsonObject> request, Continuation<Response<JsonElement>> continuation)
throws IOException;
void sendRequestHonorId(Request<JsonObject> request,
Continuation<Response<JsonElement>> continuation) throws IOException;
void sendNotification(String method, Object params, Continuation<JsonElement> continuation)
throws IOException;
而session 接口只是简单的增加了几个方法,这里仍旧看不出来session的用意,它们是很抽象的一些方法。但从集成关系可以看到,session有两个重要的实现:clientSession, webSocketServerSession。 从字面上理解,它们分别表示client会话,server端会话。 session接口只是它们需要的一些公共属性的抽取。 但是如果看看Session引用,它对于抽象服务器端和client的请求处理有着重要的作用。
public interface Session extends JsonRpcRequestSender {
public String getSessionId();
public Object getRegisterInfo();
public boolean isNew();
public void close() throws IOException;
void setReconnectionTimeout(long millis);
public Map<String, Object> getAttributes();
}
AbstarctSession 在session的基础上增加了公共属性和对session定义的接口的默认实现,JsonRpcRequestSender定义的接口留给了AbstarctSession的具体实现类。
- Client session
ClientSession 继承自 AbstractSession , AbstractSession 提供了Session接口定义的默认实现, 而ClientSession 实现了 所有剩余的方法,也就是JsonRpcRequestSender 中定义的方法, 但是ClientSession 并没有直接实现发送消息的代码,如下图,它把这部分工作留给了requestSender属性,自己仅仅在requestSender上做了一层包装。所以具体怎么发消息,还是得看ClientSession 获得的实际requestSender的实例。
public class ClientSession extends AbstractSession {
private JsonRpcRequestSender requestSender;
private volatile ConcurrentMap<String, Object> attributes;
想要了解它到底干了什么,一个直接得办法就是看谁使用了它,下图就是引用它的相关类。
image.png
有两个重要的类在引用它,其它可暂时忽略:AbstractJsonRpcClientWebsocket, JsonRpcClient.
AbstractJsonRpcClientWebsocket 继承自JsonRpcClient, 在JsonRpcClient中定义了一个ClientSession属性:
public abstract class JsonRpcClient implements JsonRpcRequestSender, Closeable {
protected ClientSession session;
而这个属性的创建是放在了AbstractJsonRpcClientWebsocket 中的,新建的session没有sessionid, 而requestSender就是AbstractJsonRpcClientWebsocket 自己,也就是说一个ClientWebsocket只会有一个Session属性,ClientSession 里所有发送请求的方法最终是由AbstractJsonRpcClientWebsocket 的实现类来发送的,而这个实现类就是JsonRpcClientNettyWebSocket。
private void updateSession() throws IOException {
if (session == null) {
session = new ClientSession(null, null, this);
configureResponseSender();
}
从这里虽我们看到了ClientSession如何实现发送消息接口的, 但是我们发现,ClientSession初始化时并没有ID,那么sessionid代表什么,什么时候被赋值的?为此,我们需要再看看ClientSession的setSessionid被谁调用了。有三个地方调用,如下图:
image.png
分别检查这三处的代码实现,会发现setSesionID()都是在AbstractJsonRpcClientWebsocket 处理response相应时完成的。
protected void handleResponseFromServer(JsonObject message) {
Response<JsonElement> response = fromJsonResponse(message, JsonElement.class);
setSessionId(response.getSessionId());
pendingRequests.handleResponse(response);
}
Response<R> response = MessageUtils.convertResponse(responseJson, resultClass);
if (response.getSessionId() != null) {
session.setSessionId(response.getSessionId());
}
这样看来,sessionID并不是client创建的,而是由server创建。 查看kurento 协议, 在客户端发送Server创建MediaPipeline的请求时不会有sessionid.
{
"jsonrpc": "2.0",
"id": 1,
"method": "create",
"params": {
"type": "MediaPipeline",
"constructorParams": {},
"properties": {}
}
}
但是当server返回这个请求的响应时,会带上sessionid, 而后续的请求都需要带上这个sessionid。
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"value": "c4a84b47-1acd-4930-9f6d-008c10782dfe_MediaPipeline",
"sessionId": "ba4be2a1-2b09-444e-a368-f81825a6168c"
}
}
所以这里的sessionid 类似于http中的sessionid, clientsession也类似于httpsession; 它用于jsonrpc请求中需要有状态的请求时,识别与客户端的会话。例如,在kurento服务器上创建mediaElement,需要先创建pipleLine,后续的请求都要和这次请求关联起来,而这个请求就是由sessionid来完成的。也就说,只有发送了创建pipleLine的这个请求的客户端,才能在这个pipleLine下创建mediaElement.
- Server session , webSocketServerSession wsSession
ServerSession 扩展了AbstractionSession, 相应增加了部分的新的属性。
public abstract class ServerSession extends AbstractSession {
public static final String SESSION_RECONNECTION_TIME_PROP = "ws.sessionReconnectionTime";
private static final int SESSION_RECONNECTION_TIME_DEFAULT = 10;
private final SessionsManager sessionsManager;
private JsonRpcRequestSenderHelper rsHelper;
private String transportId;
private ScheduledFuture<?> closeTimerTask;
private ExecutorService sessionExecutor;
private volatile ConcurrentMap<String, Object> attributes;
private long reconnectionTimeoutInMillis = PropertiesManager.getProperty(
SESSION_RECONNECTION_TIME_PROP, SESSION_RECONNECTION_TIME_DEFAULT) * 1000;
private boolean gracefullyClosed;
rsHelper与clientSession中的requestSender的作用一样,提供一个stub属性用于真正的发送请求。
sessionsManager 属性是对一个session管理器的引用,在ServerSession中,仅在两个地方使用到,一处是在在构造函数时赋值,另一处是在close函数时使用到。
@Override
public void close() throws IOException {
this.sessionsManager.remove(this.getSessionId());
this.sessionExecutor.shutdownNow();
}
可以这么理解,作为server端的session,需要被统一的管理,这个工作由sessionManager来做。
transportId在serversession初始化的时候被赋值,它对应的方法setTransportId也仅在处理ReconnectMessage时被调用。
在serversession的实施类WebSocketServerSession里,我们可以看到transportId被设置为了WebSocketSession的id(对应spring库中的websocket)。所以,可以理解transportID对应一个websocket连接。而当追溯setTransportId()方法时,可以发现在protocolManager中,它也被设置为websockeSession的ID。
public WebSocketServerSession(String sessionId, Object registerInfo,
SessionsManager sessionsManager, WebSocketSession wsSession) {
super(sessionId, registerInfo, sessionsManager, wsSession.getId());
this.wsSession = wsSession;
this.setRsHelper(new JsonRpcRequestSenderHelper(sessionId) {...
}
webSocketServerSession是ServerSession的一个子类,从名字就可以知道它对应的是处理websocket请求。在这个类里,新增一个重要的属性wsSession。
private WebSocketSession wsSession;
它对应spring框架中websocketSession. 这个wssession一个重要工作就是帮助向客户端发送消息,在webSocketServerSession中可以看到rsHelper的主要工作就是由wssession来完成的。
this.setRsHelper(new JsonRpcRequestSenderHelper(sessionId) {
@Override
public <P, R> Response<R> internalSendRequest(Request<P> request, Class<R> resultClass)
throws IOException {
return sendRequestWebSocket(request, resultClass);
}
@Override
protected void internalSendRequest(Request<? extends Object> request,
Class<JsonElement> resultClass, Continuation<Response<JsonElement>> continuation) {
sendRequestWebSocket(request, resultClass, continuation);
}
});
所以在Jsonrpc处理中,使用serverSession处理向客户端发送消息,实际上就是使用websocket的wssession向客户端发送消息。
再看sessionMananger,无论是通过serverSession的ID或者是TransportID获取的serversession实例对应的都是一个websocket连接。
回想一下前面谈到的Transaction, 可以发现,Transaction定义的是发送jsonrpc response, Session定义的是发送jsonrpc Request,同时Transaction定义了一个getSession的接口,用户获取Session。所以我们再来看Openvidu中的RpcHandler的处理消息的签名,就能明白为什么需要transaction了。
public void handleRequest(Transaction transaction, Request<JsonObject> request) {
4。Openvidu session 与kurentoSession
Openvidu Server中也定义了两个session类, 分别是Session和KurentoSession。
session类继承自SessionInterface, 从sessionInterface的接口定义可以看到,它的方法主要涉及到会议业务。
public interface SessionInterface {
String getSessionId();
SessionProperties getSessionProperties();
void join(Participant participant);
void leave(String participantPrivateId, EndReason reason);
boolean close(EndReason reason);
boolean isClosed();
Set<Participant> getParticipants();
Participant getParticipantByPrivateId(String participantPrivateId);
Participant getParticipantByPublicId(String participantPublicId);
int getActivePublishers();
String getMediaNodeId();
JsonObject toJson(boolean withPendingConnections, boolean withWebrtcStats);
Long getStartTime();
}
加入会议、离开会议,获取参会人,关闭会议等,这些方法都和业务有关系。Sesson类里新定义了几个属性:
protected OpenviduConfig openviduConfig; //openvidu配置属性
protected RecordingManager recordingManager; //录播管理器
protected ConcurrentMap<String, Token> tokens = new ConcurrentHashMap<>(); // 参会令牌集合
protected final ConcurrentMap<String, Participant> participants = new ConcurrentHashMap<>(); //参会者集合
protected String sessionId;
protected SessionProperties sessionProperties;
protected Long startTime;
protected volatile boolean closed = false;
protected AtomicInteger activePublishers = new AtomicInteger(0); //发布音视频的结合
protected AtomicInteger activeIndividualRecordedPublishers = new AtomicInteger(0);
可以看到也都和会议有关。所以openvidu的session实际就是对应的会议。但是sessioninterface中定义的加入会议、离开会议、关闭会议等实际操作方法都是空的。Session有两个构造方法,public Session(Session previousSession)尽在kurentoSession初始化的时候被调用,而public Session(String sessionId,...)在两个地方被调用:
一个调用的地方是在在sessionManager中,在 处理客户端"/sessions"的post的请求时,会新建一个Session,并把它存储到storeSessionNotActive中,可以这么理解,他代表了一个只创建了会议号(sessionid)了会议,还没有被正式的激活。
//SessionManager
public Session storeSessionNotActive(String sessionId, SessionProperties sessionProperties) {
Session sessionNotActive = new Session(sessionId, sessionProperties, openviduConfig, recordingManager);
return this.storeSessionNotActive(sessionNotActive);
}
另外一个调用的地方是在KurentoSessionManager中,在加入会议的时候,如果是首个用户连入,并且找不到sessionNotActive 则新建一个Session.
//kurentoSessionManager
public void joinRoom(Participant participant, String sessionId, Integer transactionId) {
Set<Participant> existingParticipants = null;
try {
KurentoSession kSession = (KurentoSession) sessions.get(sessionId);
if (kSession == null) {
// First user connecting to the session
Session sessionNotActive = sessionsNotActive.get(sessionId);
if (sessionNotActive == null && this.isInsecureParticipant(participant.getParticipantPrivateId())) {
// Insecure user directly call joinRoom RPC method, without REST API use
sessionNotActive = new Session(sessionId,
new SessionProperties.Builder().mediaMode(MediaMode.ROUTED)
.recordingMode(RecordingMode.ALWAYS)
.defaultRecordingLayout(RecordingLayout.BEST_FIT).build(),
openviduConfig, recordingManager);
}
从以上的代码中可以看到Session只表示创建了一个没有任何人加入的会议号(notactive),当首次有人加入的时候才会真正的创建会议。
而这个真正的会议就是KurentoSession, KurentoSession继承自Session,它定义了一些新的属性:
private MediaPipeline pipeline; //对应kurento server上的pipeline
private Kms kms; //对应创建会议的kurento server实例
private KurentoSessionEventsHandler kurentoSessionHandler; //KMS 事件处理器
private KurentoParticipantEndpointConfig kurentoEndpointConfig;//会议配置
用户从openvidu客户端创建的会议,最终对应到某个具体Kms Server的pipleLine, 而一个会议在kms表示为N*N个被pipleLine编排的媒体元素。
在kurentoSession中真正实现了加入、离开、关闭会议的方法,同时定义并实现了以下与会议有关的方法:
public void sendIceCandidate()//创建webrtc连接时用
private void removeParticipant() //移除参会者,移除对应的mediaElement
public void cancelPublisher()// 取消publisher取消相应的MediaElement
private void createPipeline()//创建pipleline
可以看到,KrentoSession里出来的方法都跟KMS上的远程调用有关。在KurentoSessionManager的joinRoom方法里,获得Session实例后(从sessionsNotActive集合获取),如果检查到KurentoSessionManager中没有存储对应的krentoSession, 就会根据负载获取一个指定的kms,并在其上创建一个kurentoSession,然后从sessionsNotActive中移除这个Session对象。
可以这么说,Openviduserver中 Session和kurentoSession都代表的会议, session代表的是一个没有激活仅有成功创建会议号的会议。 KurentoSession代表的是激活的会议, 当第一个参会者加入会议的时候,openvidu服务器会把没有激活的session实例转化为已经激活的session实例。
没有激活的session实例只有sessionid 会议号,激活的时候才会给该会议号分配真正的kurento 服务器实例,并在其上创建pipleLine, 一个pipleLine对应一个会议。 美一个参会方加入会议,都会创建对应的mediaElement, N个参会人最终会创建一个N*N的数量的mediaElement, 它们同属于一个pipleLine.
在openvidu中,Session对应的管理器是SessionMananger, KurentoSession对应的管理器是KurentoSessionManager; SessionMananger 是虚类, KurentoSessionManager继承自它,实现了必要的方法。 所以无论是创建Session还是kurentoSession, 最终都是由KurentoSessionManager管理的。从程序入口的配置可以看到,系统启动时就会创建一个KurentoSessionManager的实例,
@Bean
@ConditionalOnMissingBean
@DependsOn("openviduConfig")
public SessionManager sessionManager() {
return new KurentoSessionManager();
}
系统创建的KurentoSessionManager实例会被注入给其它实例,最主要的是两个:SessionRestController, RpcHandler:
SessionRestController 是openvidu项目Http restful请求的入口,在这里系统会创建session, 并把它保存给KurentoSessionManager, 当查询是否有没有被激活的session的时候在查询。
而RpcHandler是openvidu webSocket的请求处理器,一方面用来获取session,另一方面用来直接通过sessionManager进行会议操作(实际由kurentoSession实例完成)。
SessionManager定义了大量的会议方法,如果某方法需要和kurentoServer进行交互,它就标记为abstract,由kurentoSessionManager来实现。 如果是不需要kurento的交互,就直接实现了。
//加入会议,需要创建pipleLine
public abstract void joinRoom(Participant participant, String sessionId, Integer transactionId);
//离开会议,需要清理自己的meidaElement
public abstract boolean leaveRoom(Participant participant, Integer transactionId, EndReason reason,
boolean closeWebSocket);
//发布视频需要创建webrtcMediaElement
public abstract void publishVideo(Participant participant, MediaOptions mediaOptions, Integer transactionId);
//需要取消webrtcMediaElement
public abstract void unpublishVideo(Participant participant, Participant moderator, Integer transactionId,
EndReason reason);
//需要创建接收用的mediaElement
public abstract void subscribe(Participant participant, String senderName, String sdpOffer, Integer transactionId);
//需要删除接收用的mediaElement
public abstract void unsubscribe(Participant participant, String senderName, Integer transactionId);
//需要变更webrtc媒体流连接属性
public abstract void streamPropertyChanged(Participant participant, Integer transactionId, String streamId,
String property, JsonElement newValue, String changeReason);
//需要通知kurento新的webrtc连接用candidate
public abstract void onIceCandidate(Participant participant, String endpointName, String candidate,
int sdpMLineIndex, String sdpMid, Integer transactionId);
//需要删除接收用meidaElement
public abstract boolean unpublishStream(Session session, String streamId, Participant moderator,
Integer transactionId, EndReason reason);
//驱离参与用户时,需要同时删除该用户的mediaElement资源
public abstract boolean evictParticipant(Participant evictedParticipant, Participant moderator,
Integer transactionId, EndReason reason);
// 创建一个filter类型的mediaElement, 例如人脸视频
public abstract void applyFilter(Session session, String streamId, String filterType, JsonObject filterOptions,
Participant moderator, Integer transactionId, String reason);
//需要对filter进行相关操作
public abstract void execFilterMethod(Session session, String streamId, String filterMethod,
JsonObject filterParams, Participant moderator, Integer transactionId, String reason);
//删除filter
public abstract void removeFilter(Session session, String streamId, Participant moderator, Integer transactionId,
String reason);
//添加事件监听器,监听来自kurento filter的事件
public abstract void addFilterEventListener(Session session, Participant subscriber, String streamId,
String eventType);
//删除事件监听器
public abstract void removeFilterEventListener(Session session, Participant subscriber, String streamId,
String eventType);
//创建一个ipcam类型的mediaElement
public abstract Participant publishIpcam(Session session, MediaOptions mediaOptions,
ConnectionProperties connectionProperties) throws Exception;
//发送spdOffer给Kurento Server用于webrtc连接
public abstract void reconnectStream(Participant participant, String streamId, String sdpOffer,
Integer transactionId);
//
public abstract String getParticipantPrivateIdFromStreamId(String sessionId, String streamId)
throws OpenViduException;
//
public abstract void onVideoData(Participant participant, Integer transactionId, Integer height, Integer width,
Boolean videoActive, Boolean audioActive);
上面是sessionManager标记的虚类,可以看到都和kurento操作有关。而下面的接口不需要操作kurento或者由上面一定义的虚类接口来完成。
// 通过sessionid获取已经激活的Session, 实际返回的是KurentoSession实例
public Session getSession(String sessionId)
//获取未激活的Session, 实际返回Session实例。
public Session getSessionNotActive(String sessionId)
//返回已经激活的session, 如果没有则返回未激活的session
public Session getSessionWithNotActive(String sessionId)
//返回一个会议里的所有参会者
public Set<Participant> getParticipants(String sessionId)
//返回具体参会者
public Participant getParticipant(String sessionId, String participantPrivateId)
public Participant getParticipant(String participantPrivateId) throws OpenViduException
//返回最终用户,由htttpSessionID确定
public Map<String, FinalUser> getFinalUsers(String sessionId)
//移除最终用户
public Map<String, FinalUser> removeFinalUsers(String sessionId)
//保存未激活的session
public Session storeSessionNotActive(String sessionId, SessionProperties sessionProperties)
public Session storeSessionNotActive(Session sessionNotActive)
//为客户端新建token, 参会方的标识
public Token newToken(Session session, OpenViduRole role, String serverMetadata, boolean record,
KurentoOptions kurentoOptions)
public Token newTokenForInsecureUser(Session session, String token, ConnectionProperties connectionProperties)
//判断参会者
public boolean isPublisherInSession(String sessionId, Participant participant)
public boolean isModeratorInSession(String sessionId, Participant participant)
public boolean isInsecureParticipant(String participantPrivateId)
//黄健参会者
public void newInsecureParticipant(String participantPrivateId)
public Participant newParticipant(String sessionId, String participantPrivatetId, Token token,...
public Participant newIpcamParticipant(String sessionId, String ipcamId, Token token, GeoLocation location,
//关闭会议
public void close()
public void closeSession(String sessionId, EndReason reason)
public void closeNonActiveSessions(Function<Session, Boolean> conditionToRemove)
public void closeSessionAndEmptyCollections(Session session, EndReason reason, boolean stopRecording)
可以看到所有的会议操作都是由sessionMananger来完成,Openvidu中初始化一个kurentoSessionManager的实例。所有与kurento有关的操作都是在kurentoSessionManager里操作的。
当一个会议被创建时会创建一个未激活的Session实例,并保存在sessionMananger的未激活会议集合中,当有人加入会议时,会根据未激活的Session新建KurentoSession实例,并移除未激活实例。 当用户进行加入会议,离开会议,发布视频等操作,Openvidu在解析完jsonrpc请求后都调用SessionManager的接口来处理,SessionManager是进行会议操作(包括webrtc连接需要的信号处理方法)的系统入口。