Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: support livekit group call. #1551

Closed
wants to merge 11 commits into from
2 changes: 2 additions & 0 deletions lib/matrix.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export 'src/voip/voip.dart';
export 'src/voip/voip_content.dart';
export 'src/voip/conn_tester.dart';
export 'src/voip/utils.dart';
export 'src/voip/types.dart';
export 'src/voip/voip_room_extension.dart';
export 'src/voip/sframe_key_provider.dart';
export 'src/room.dart';
export 'src/timeline.dart';
export 'src/user.dart';
Expand Down
16 changes: 16 additions & 0 deletions lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import 'dart:typed_data';
import 'package:async/async.dart';
import 'package:collection/collection.dart' show IterableExtension;
import 'package:http/http.dart' as http;
import 'package:matrix/src/voip/types.dart';
import 'package:mime/mime.dart';
import 'package:olm/olm.dart' as olm;
import 'package:random_string/random_string.dart';
Expand Down Expand Up @@ -88,6 +89,12 @@ class Client extends MatrixApi {

bool shareKeysWithUnverifiedDevices;

bool useLivekitForGroupCalls = false;

bool useE2eForGroupCall = true;

String? livekitServiceURL;

// For CommandsClientExtension
final Map<String, FutureOr<String?> Function(CommandArgs)> commands = {};
final Filter syncFilter;
Expand Down Expand Up @@ -177,6 +184,9 @@ class Client extends MatrixApi {
this.shareKeysWithUnverifiedDevices = true,
this.enableDehydratedDevices = false,
this.receiptsPublicByDefault = true,
this.useE2eForGroupCall = true,
this.useLivekitForGroupCalls = false,
this.livekitServiceURL,
}) : syncFilter = syncFilter ??
Filter(
room: RoomFilter(
Expand Down Expand Up @@ -1229,6 +1239,10 @@ class Client extends MatrixApi {
final CachedStreamController<Event> onSDPStreamMetadataChangedReceived =
CachedStreamController();

/// Will be called on sframe keys received.
final CachedStreamController<Event> onEncryptionKeysReceived =
CachedStreamController();

/// Will be called when another device is requesting session keys for a room.
final CachedStreamController<RoomKeyRequest> onRoomKeyRequest =
CachedStreamController();
Expand Down Expand Up @@ -2190,6 +2204,8 @@ class Client extends MatrixApi {
onSDPStreamMetadataChangedReceived.add(event);
// TODO(duan): Only used (org.matrix.msc3401.call) during the current test,
// need to add GroupCallPrefix in matrix_api_lite
} else if (event.type == VoipEventTypes.EncryptionKeysPrefix) {
onEncryptionKeysReceived.add(event);
} else if (event.type == EventTypes.GroupCallPrefix) {
onGroupCallRequest.add(event);
}
Expand Down
227 changes: 223 additions & 4 deletions lib/src/voip/group_call.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/

import 'dart:async';
import 'dart:convert';
import 'dart:core';
import 'dart:math';
import 'dart:typed_data';

import 'package:collection/collection.dart';
import 'package:webrtc_interface/webrtc_interface.dart';
Expand All @@ -28,6 +31,15 @@ import 'package:matrix/src/utils/cached_stream_controller.dart';
/// TODO(@duan): Need to add voice activity detection mechanism
/// const int SPEAKING_THRESHOLD = -60; // dB

// A delay after a member leaves before we create and publish a new key, because people
// tend to leave calls at the same time
const MAKE_KEY_DELAY = 3000;
// The delay between creating and sending a new key and starting to encrypt with it. This gives others
// a chance to receive the new key to minimise the chance they don't get media they can't decrypt.
// The total time between a member leaving and the call switching to new keys is therefore
// MAKE_KEY_DELAY + SEND_KEY_DELAY
const USE_KEY_DELAY = 5000;

class GroupCallIntent {
static String Ring = 'm.ring';
static String Prompt = 'm.prompt';
Expand Down Expand Up @@ -185,6 +197,8 @@ class GroupCall {
final Room room;
final String intent;
final String type;
bool useLivekit = false;
String? _livekitServiceURL;
String state = GroupCallState.LocalCallFeedUninitialized;
StreamSubscription<CallSession>? _callSubscription;
final Map<String, double> audioLevelsMap = {};
Expand All @@ -206,6 +220,12 @@ class GroupCall {

Timer? resendMemberStateEventTimer;

Timeline timeLine;

Map<String, Map<int, Uint8List>> encryptionKeys = {};

List<Timer> setNewKeyTimeouts = [];

final CachedStreamController<GroupCall> onGroupCallFeedsChanged =
CachedStreamController();

Expand All @@ -228,8 +248,12 @@ class GroupCall {
required this.room,
required this.type,
required this.intent,
required this.timeLine,
this.useLivekit = false,
String? livekitServiceURL,
}) {
this.groupCallId = groupCallId ?? genCallID();
_livekitServiceURL = livekitServiceURL;
}

Future<GroupCall> create() async {
Expand All @@ -243,6 +267,8 @@ class GroupCall {
{
'm.intent': intent,
'm.type': type,
if (_livekitServiceURL != null)
'io.element.livekit_service_url': _livekitServiceURL,
},
);

Expand All @@ -265,6 +291,12 @@ class GroupCall {
return room.unsafeGetUserFromMemoryOrFallback(client.userID!);
}

String? get livekitServiceURL => _livekitServiceURL;

Future<void> updateLivekitServiceURL(String url) async {
_livekitServiceURL = url;
}

Event? getMemberStateEvent(String userId) {
final event = room.getState(EventTypes.GroupCallMemberPrefix, userId);
if (event != null) {
Expand Down Expand Up @@ -306,6 +338,175 @@ class GroupCall {
return feeds;
}

Map<int, Uint8List>? getKeysForParticipant(String userId, String deviceId) {
return encryptionKeys[getParticipantId(userId, deviceId)];
}

int getNewEncryptionKeyIndex() {
final userId = client.userID;
final deviceId = client.deviceID;

if (userId == null) throw Exception('No userId');
if (deviceId == null) throw Exception('No deviceId');

return (getKeysForParticipant(userId, deviceId)?.length ?? 0) % 16;
}

String generateRandomKey(int len) {
final r = Random();
const chars =
'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz1234567890';
return List.generate(len, (index) => chars[r.nextInt(chars.length)]).join();
}

void makeNewSenderKey(bool delayBeforeUse) {
final userId = client.userID;
final deviceId = client.deviceID;

if (userId == null) throw Exception('No userId');
if (deviceId == null) throw Exception('No deviceId');

final encryptionKey = generateRandomKey(16);
final encryptionKeyIndex = getNewEncryptionKeyIndex();
Logs().i('Generated new key at index $encryptionKeyIndex');

setEncryptionKey(userId, deviceId, encryptionKeyIndex, encryptionKey,
delayBeforeuse: delayBeforeUse);
}

Future<void> sendEncryptionKeysEvent() async {
Logs().i('Sending encryption keys event');

if (state != GroupCallState.Entered) return;

final userId = client.userID;
final deviceId = client.deviceID;

if (userId == null) throw Exception('No userId');
if (deviceId == null) throw Exception('No deviceId');

final myKeys = getKeysForParticipant(userId, deviceId);

if (myKeys == null) {
Logs().w('Tried to send encryption keys event but no keys found!');
return;
}

try {
final List<EncryptionKeyEntry> keys = [];
for (int i = 0; i < myKeys.length; i++) {
if (myKeys[i] != null) {
keys.add(EncryptionKeyEntry(i, base64UrlEncode(myKeys[i]!)));
}
}
final content = EncryptionKeysEventContent(
keys,
deviceId,
'',
);
final txid = client.generateUniqueTransactionId();
await client.sendMessage(
room.id, VoipEventTypes.EncryptionKeysPrefix, txid, content.toJson());

Logs().d(
'E2EE: updateEncryptionKeyEvent participantId=$userId:$deviceId numSent=${myKeys.length}');
} catch (error) {
// TODO: resend keys.
}
}

Future<void> onTimeLineUpdate() async {
final events = timeLine.events;
for (final event in events) {
if (event.type != VoipEventTypes.EncryptionKeysPrefix) {
continue;
}
onCallEncryption(event);
}
}

void onCallEncryption(Event event) {
final userId = event.senderId;
final content = EncryptionKeysEventContent.fromJson(event.content);

final deviceId = content.deviceId;
final callId = content.callId;

if (userId == '') {
Logs()
.w('Received m.call.encryption_keys with no userId: callId=$callId');
return;
}

// We currently only handle callId = ""
if (callId != '') {
Logs().w(
'Received m.call.encryption_keys with unsupported callId: userId=$userId, deviceId=$deviceId, callId=$callId');
return;
}

if (content.keys.isEmpty) {
Logs().w(
'Received m.call.encryption_keys where keys is empty: callId=$callId');
return;
}

if (userId == client.userID && deviceId == client.deviceID) {
// We store our own sender key in the same set along with keys from others, so it's
// important we don't allow our own keys to be set by one of these events (apart from
// the fact that we don't need it anyway because we already know our own keys).
Logs().i('Ignoring our own keys event');
return;
}

for (final key in content.keys) {
final encryptionKey = key.key;
final encryptionKeyIndex = key.index;
Logs().d(
'E2EE: onCallEncryption userId=$userId:$deviceId encryptionKeyIndex=$encryptionKeyIndex');
setEncryptionKey(userId, deviceId, encryptionKeyIndex, encryptionKey);
}
}

String getParticipantId(String userId, String deviceId) =>
'$userId:$deviceId';

void setEncryptionKey(String userId, String deviceId, int encryptionKeyIndex,
String encryptionKeyString,
{bool delayBeforeuse = false}) {
final keyBin = base64.decode(encryptionKeyString);

final participantId = getParticipantId(userId, deviceId);
final encryptionKeys =
this.encryptionKeys[participantId] ?? <int, Uint8List>{};

if (encryptionKeys[encryptionKeyIndex] != null &&
listEquals(encryptionKeys[encryptionKeyIndex]!, keyBin)) {
Logs().i('Ignoring duplicate key');
return;
}

encryptionKeys[encryptionKeyIndex] = keyBin;

this.encryptionKeys[participantId] = encryptionKeys;

if (delayBeforeuse) {
final useKeyTimeout =
Timer.periodic(Duration(milliseconds: USE_KEY_DELAY), (Timer timer) {
setNewKeyTimeouts.remove(timer);
timer.cancel();
Logs().i(
'Delayed-emitting key changed event for $participantId idx $encryptionKeyIndex');
voip.delegate.keyProvider?.onSetEncryptionKey(
participantId, encryptionKeyString, encryptionKeyIndex);
});
setNewKeyTimeouts.add(useKeyTimeout);
} else {
voip.delegate.keyProvider?.onSetEncryptionKey(
participantId, encryptionKeyString, encryptionKeyIndex);
}
}

bool hasLocalParticipant() {
final userId = client.userID;
return participants.indexWhere((member) => member.id == userId) != -1;
Expand Down Expand Up @@ -353,8 +554,12 @@ class GroupCall {
/// you can pass that `stream` on to this function.
/// This allows you to configure the camera before joining the call without
/// having to reopen the stream and possibly losing settings.
Future<WrappedMediaStream> initLocalStream(
Future<WrappedMediaStream?> initLocalStream(
{WrappedMediaStream? stream}) async {
if (useLivekit) {
Logs().i('Livekit group call: not starting local call feed.');
return null;
}
if (state != GroupCallState.LocalCallFeedUninitialized) {
throw Exception('Cannot initialize local call feed in the $state state.');
}
Expand Down Expand Up @@ -454,7 +659,14 @@ class GroupCall {
await onMemberStateChanged(memberState);
}

onActiveSpeakerLoop();
if (useLivekit) {
makeNewSenderKey(false);
await sendEncryptionKeysEvent();
}

if (!useLivekit) {
onActiveSpeakerLoop();
}

voip.currentGroupCID = groupCallId;

Expand Down Expand Up @@ -673,6 +885,12 @@ class GroupCall {
return;
}

if (useLivekit) {
Logs()
.i('Received incoming call whilst in signaling-only mode! Ignoring.');
return;
}

final opponentMemberId = newCall.remoteUser!.id;
final existingCall = getCallByUserId(opponentMemberId);

Expand Down Expand Up @@ -851,7 +1069,7 @@ class GroupCall {
return;
}

if (state != GroupCallState.Entered) {
if (state != GroupCallState.Entered || useLivekit) {
return;
}

Expand Down Expand Up @@ -1153,7 +1371,8 @@ class GroupCall {
userMediaStreams.indexWhere((stream) => stream.userId == stream.userId);

if (streamIndex == -1) {
throw Exception('Couldn\'t find user media stream to remove');
Logs().w('Couldn\'t find user media stream to remove');
return;
}

userMediaStreams.removeWhere((element) => element.userId == stream.userId);
Expand Down
Loading
Loading