简介
Kurento-group-call是一个类似视频会议的官方例子,在每个用户端都会创建N个端点,相比one2one-call/one2many-call要复杂些。
在用户之间连接多个客户端,创建一个视频会议;同样,运行之前要安装Kurento媒体服务。先克隆项目,然后运行主类:
git clone https://github.com/Kurento/kurento-tutorial-java.git
cd kurento-tutorial-java/kurento-group-call
运行项目
mvn -U clean spring-boot:run \
-Dspring-boot.run.jvmArguments="-Dkms.url=ws://{KMS_HOST}:8888/kurento"
功能分析
- 用户进入房间时,会创建一个新的Media,另外会通知其他用户有新用户连接,然后所有参于者将请求服务器接收新参于者的媒体。
- 新用户依次获取所有已连接的参于者列表,并请求服务器接收房间中所有客户端的media。
- 每个客户端都发送自己的media,然后从其他用户那里接收媒体,每个客户端都会有n个端点,房间总共会有n*n个端点。
- 当用户离开房间时,服务器会通知所有客户端。然后,客户端代码请求服务器取消与离开的客户端相关的所有媒体元素。
服务端
实例化Kurento客户端后,您就可以与Kurento媒体服务器通信并控制其多媒体功能了。GroupCallApp实现了接口WebSocketConfigurer,注册了一个WebSocketHandler用于处理WebSocket请求
@EnableWebSocket
@SpringBootApplication
public class GroupCallApp implements WebSocketConfigurer {
@Bean
public UserRegistry registry() {
return new UserRegistry();
}
@Bean
public RoomManager roomManager() {
return new RoomManager();
}
@Bean
public CallHandler groupCallHandler() {
return new CallHandler();
}
@Bean
public KurentoClient kurentoClient() {
return KurentoClient.create();
}
public static void main(String[] args) throws Exception {
SpringApplication.run(GroupCallApp.class, args);
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(groupCallHandler(), "/groupcall");
}
}
CallHandler类实现TextWebSocketHandler处理文本WebSocket请求。这个类的核心部分是方法handleTextMessage。此方法实现请求的操作,通过WebSocket返回响应
public class CallHandler extends TextWebSocketHandler {
private static final Logger log = LoggerFactory.getLogger(CallHandler.class);
private static final Gson gson = new GsonBuilder().create();
@Autowired
private RoomManager roomManager;
@Autowired
private UserRegistry registry;
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
final JsonObject jsonMessage = gson.fromJson(message.getPayload(), JsonObject.class);
final UserSession user = registry.getBySession(session);
if (user != null) {
log.debug("Incoming message from user '{}': {}", user.getName(), jsonMessage);
} else {
log.debug("Incoming message from new user: {}", jsonMessage);
}
switch (jsonMessage.get("id").getAsString()) {
case "joinRoom":
joinRoom(jsonMessage, session);
break;
case "receiveVideoFrom":
final String senderName = jsonMessage.get("sender").getAsString();
final UserSession sender = registry.getByName(senderName);
final String sdpOffer = jsonMessage.get("sdpOffer").getAsString();
user.receiveVideoFrom(sender, sdpOffer);
break;
case "leaveRoom":
leaveRoom(user);
break;
case "onIceCandidate":
JsonObject candidate = jsonMessage.get("candidate").getAsJsonObject();
if (user != null) {
IceCandidate cand = new IceCandidate(candidate.get("candidate").getAsString(),
candidate.get("sdpMid").getAsString(), candidate.get("sdpMLineIndex").getAsInt());
user.addCandidate(cand, jsonMessage.get("name").getAsString());
}
break;
default:
break;
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
...
}
private void joinRoom(JsonObject params, WebSocketSession session) throws IOException {
...
}
private void leaveRoom(UserSession user) throws IOException {
...
}
}
在afterConnectionClosed方法里,它会将用户userSession从registry房间中移除并驱逐出去
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
UserSession user = registry.removeBySession(session);
roomManager.getRoom(user.getRoomName()).leave(user);
}
在joinRoom方法中,服务器检查是否有指定名称的已注册房间,将用户添加到该房间并注册用户
private void joinRoom(JsonObject params, WebSocketSession session) throws IOException {
final String roomName = params.get("room").getAsString();
final String name = params.get("name").getAsString();
log.info("PARTICIPANT {}: trying to join room {}", name, roomName);
Room room = roomManager.getRoom(roomName);
final UserSession user = room.join(name, session);
registry.register(user);
}
leaveRoom方法结束一个用户的视频通话
private void leaveRoom(UserSession user) throws IOException {
final Room room = roomManager.getRoom(user.getRoomName());
room.leave(user);
if (room.getParticipants().isEmpty()) {
roomManager.removeRoom(room);
}
}
客户端
创建WebSocket,在onMessage方法下监听JSON信令协议。有5个不用的消息传入到客户端:existingParticipants,newParticipantArrived,participantLeft,receiveVideoAnswer和iceCandidate。
var ws = new WebSocket('wss://' + location.host + '/groupcall');
var participants = {};
var name;
window.onbeforeunload = function() {
ws.close();
};
ws.onmessage = function(message) {
var parsedMessage = JSON.parse(message.data);
console.info('Received message: ' + message.data);
switch (parsedMessage.id) {
case 'existingParticipants':
onExistingParticipants(parsedMessage);
break;
case 'newParticipantArrived':
onNewParticipant(parsedMessage);
break;
case 'participantLeft':
onParticipantLeft(parsedMessage);
break;
case 'receiveVideoAnswer':
receiveVideoResponse(parsedMessage);
break;
case 'iceCandidate':
participants[parsedMessage.name].rtcPeer.addIceCandidate(parsedMessage.candidate, function (error) {
if (error) {
console.error("Error adding candidate: " + error);
return;
}
});
break;
default:
console.error('Unrecognized message', parsedMessage);
}
}
function register() {
name = document.getElementById('name').value;
var room = document.getElementById('roomName').value;
document.getElementById('room-header').innerText = 'ROOM ' + room;
document.getElementById('join').style.display = 'none';
document.getElementById('room').style.display = 'block';
var message = {
id : 'joinRoom',
name : name,
room : room,
}
sendMessage(message);
}
function onNewParticipant(request) {
receiveVideo(request.name);
}
function receiveVideoResponse(result) {
participants[result.name].rtcPeer.processAnswer (result.sdpAnswer, function (error) {
if (error) return console.error (error);
});
}
function callResponse(message) {
if (message.response != 'accepted') {
console.info('Call not accepted by peer. Closing call');
stop();
} else {
webRtcPeer.processAnswer(message.sdpAnswer, function (error) {
if (error) return console.error (error);
});
}
}
function onExistingParticipants(msg) {
var constraints = {
audio : true,
video : {
mandatory : {
maxWidth : 320,
maxFrameRate : 15,
minFrameRate : 15
}
}
};
console.log(name + " registered in room " + room);
var participant = new Participant(name);
participants[name] = participant;
var video = participant.getVideoElement();
var options = {
localVideo: video,
mediaConstraints: constraints,
onicecandidate: participant.onIceCandidate.bind(participant)
}
participant.rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(options,
function (error) {
if(error) {
return console.error(error);
}
this.generateOffer (participant.offerToReceiveVideo.bind(participant));
});
msg.data.forEach(receiveVideo);
}
function leaveRoom() {
sendMessage({
id : 'leaveRoom'
});
for ( var key in participants) {
participants[key].dispose();
}
document.getElementById('join').style.display = 'block';
document.getElementById('room').style.display = 'none';
ws.close();
}
function receiveVideo(sender) {
var participant = new Participant(sender);
participants[sender] = participant;
var video = participant.getVideoElement();
var options = {
remoteVideo: video,
onicecandidate: participant.onIceCandidate.bind(participant)
}
participant.rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly(options,
function (error) {
if(error) {
return console.error(error);
}
this.generateOffer (participant.offerToReceiveVideo.bind(participant));
});;
}
function onParticipantLeft(request) {
console.log('Participant ' + request.name + ' left');
var participant = participants[request.name];
participant.dispose();
delete participants[request.name];
}
function sendMessage(message) {
var jsonMessage = JSON.stringify(message);
console.log('Sending message: ' + jsonMessage);
ws.send(jsonMessage);
}
这里主要依赖于kurento-utils来完成
Android
上面是官方的代码分析,接下来主要用Android来实现客户端功能,跟one2one一样,都是引用org.webrtc:
implementation 'org.webrtc:google-webrtc:1.0.32006'
implementation 'org.java-websocket:Java-WebSocket:1.5.3'
implementation "com.google.code.gson:gson:2.+"
初始化WebSocket,在onMessage里监听JSON信令消息,实现五个方法:existingParticipants,newParticipantArrived,participantLeft,receiveVideoAnswer和iceCandidate
when (id) {
"existingParticipants" -> {
onExistingParticipants(jsonObject)
}
"newParticipantArrived" -> {
onNewParticipantArrived(jsonObject)
}
"participantLeft" -> {
onParticipantLeft(jsonObject)
}
"receiveVideoAnswer" -> {
onReceiveVideoAnswer(jsonObject)
}
"iceCandidate" -> {
iceCandidate(jsonObject)
}
}
- existingParticipants:在加入房间后会收到existingParticipants,并告诉有哪些参于者
- newParticipantArrived:如果有新的参于者加入时,会收到此信息
- participantLeft:有参于者离开,会收到此信息
- receiveVideoAnswer:接收到服务器发送的Answer信令
- iceCandidate:candidates事件
发送给服务端的请求:
- joinRoom:加入到房间的请求
- receiveVideoFrom:发起接收某参于者时间的请求
- leaveRoom:离开房间的请求
- onIceCandidate:将candidate发送到服务端
评论