From ab01d5f6a3194af660c02bf6e840d2a5810c7dc3 Mon Sep 17 00:00:00 2001 From: sepfy Date: Mon, 21 Oct 2024 09:37:46 +0800 Subject: [PATCH 1/8] Works with aiortc --- src/agent.c | 9 ++++- src/config.h | 7 ++++ src/dtls_srtp.c | 6 ++- src/ice.c | 105 +++++++++++++++++++++--------------------------- src/ice.h | 9 +---- 5 files changed, 67 insertions(+), 69 deletions(-) diff --git a/src/agent.c b/src/agent.c index c0942ac..0e50ab8 100644 --- a/src/agent.c +++ b/src/agent.c @@ -415,7 +415,14 @@ void agent_set_remote_description(Agent* agent, char* description) { } else if (strncmp(line_start, "a=candidate:", strlen("a=candidate:")) == 0) { if (ice_candidate_from_description(&agent->remote_candidates[agent->remote_candidates_count], line_start, line_end) == 0) { - agent->remote_candidates_count++; + for (i = 0; i < agent->remote_candidates_count; i++) { + if (strcmp(agent->remote_candidates[i].foundation, agent->remote_candidates[agent->remote_candidates_count].foundation) == 0) { + break; + } + } + if (i == agent->remote_candidates_count) { + agent->remote_candidates_count++; + } } } diff --git a/src/config.h b/src/config.h index 4de3495..c8b681e 100644 --- a/src/config.h +++ b/src/config.h @@ -1,6 +1,9 @@ #ifndef CONFIG_H_ #define CONFIG_H_ +// uncomment this if you want to handshake with a aiortc +#define CONFIG_DTLS_USE_ECDSA 1 + #define SCTP_MTU (1200) #define CONFIG_MTU (1300) @@ -22,6 +25,10 @@ #define RSA_KEY_LENGTH 1024 #endif +#ifndef CONFIG_DTLS_USE_ECDSA +#define CONFIG_DTLS_USE_ECDSA 0 +#endif + #ifndef CONFIG_USE_USRSCTP #define CONFIG_USE_USRSCTP 1 #endif diff --git a/src/dtls_srtp.c b/src/dtls_srtp.c index a233fe8..c0d8c97 100644 --- a/src/dtls_srtp.c +++ b/src/dtls_srtp.c @@ -87,9 +87,13 @@ static int dtls_srtp_selfsign_cert(DtlsSrtp* dtls_srtp) { mbedtls_ctr_drbg_seed(&dtls_srtp->ctr_drbg, mbedtls_entropy_func, &dtls_srtp->entropy, (const unsigned char*)pers, strlen(pers)); +#if CONFIG_DTLS_USE_ECDSA + mbedtls_pk_setup(&dtls_srtp->pkey, mbedtls_pk_info_from_type(MBEDTLS_PK_ECKEY)); + mbedtls_ecp_gen_key(MBEDTLS_ECP_DP_SECP256R1, mbedtls_pk_ec(dtls_srtp->pkey), mbedtls_ctr_drbg_random, &dtls_srtp->ctr_drbg); +#else mbedtls_pk_setup(&dtls_srtp->pkey, mbedtls_pk_info_from_type(MBEDTLS_PK_RSA)); - mbedtls_rsa_gen_key(mbedtls_pk_rsa(dtls_srtp->pkey), mbedtls_ctr_drbg_random, &dtls_srtp->ctr_drbg, RSA_KEY_LENGTH, 65537); +#endif mbedtls_x509write_crt_init(&crt); diff --git a/src/ice.c b/src/ice.c index 4e33c46..5a6ba98 100644 --- a/src/ice.c +++ b/src/ice.c @@ -36,7 +36,8 @@ static void ice_candidate_priority(IceCandidate* candidate) { void ice_candidate_create(IceCandidate* candidate, int foundation, IceCandidateType type, Address* addr) { memcpy(&candidate->addr, addr, sizeof(Address)); candidate->type = type; - candidate->foundation = foundation; + + snprintf(candidate->foundation, sizeof(candidate->foundation), "%d", foundation); // 1: RTP, 2: RTCP candidate->component = 1; @@ -77,70 +78,56 @@ void ice_candidate_to_description(IceCandidate* candidate, char* description, in } int ice_candidate_from_description(IceCandidate* candidate, char* description, char* end) { - char* split_start = description; - char* split_end = NULL; - int index = 0; - char buf[64]; + char* candidate_start = description; + uint32_t port; + char type[16]; + char addrstring[ADDRSTRLEN]; - if (strncmp("a=", split_start, strlen("a=")) == 0) { - split_start += strlen("a="); + if (strncmp("a=", candidate_start, strlen("a=")) == 0) { + candidate_start += strlen("a="); } - split_start += strlen("candidate:"); + candidate_start += strlen("candidate:"); + printf("candidate_start: %s\n", candidate_start); // a=candidate:448736988 1 udp 2122260223 172.17.0.1 49250 typ host generation 0 network-id 1 network-cost 50 // a=candidate:udpcandidate 1 udp 120 192.168.1.102 8000 typ host - while ((split_end = strstr(split_start, " ")) != NULL && split_start < end) { - memset(buf, 0, sizeof(buf)); - strncpy(buf, split_start, split_end - split_start); - switch (index) { - case 0: - candidate->foundation = atoi(buf); - break; - case 1: - candidate->component = atoi(buf); - break; - case 2: - if (strstr(buf, "UDP") == 0 && strstr(buf, "udp") == 0) { - // Only accept UDP candidates - return -1; - } - strncpy(candidate->transport, buf, strlen(buf)); - break; - case 3: - candidate->priority = atoi(buf); - break; - case 4: - if (strstr(buf, "local") != 0) { - if (mdns_resolve_addr(buf, &candidate->addr) == 0) { - return -1; - } - } else if (addr_from_string(buf, &candidate->addr) == 0) { - return -1; - } - break; - case 5: - addr_set_port(&candidate->addr, atoi(buf)); - break; - case 7: - - if (strncmp(buf, "host", 4) == 0) { - candidate->type = ICE_CANDIDATE_TYPE_HOST; - } else if (strncmp(buf, "srflx", 5) == 0) { - candidate->type = ICE_CANDIDATE_TYPE_SRFLX; - } else if (strncmp(buf, "relay", 5) == 0) { - candidate->type = ICE_CANDIDATE_TYPE_RELAY; - } else { - LOGE("Unknown candidate type: %s", buf); - return -1; - } - // End of description - return 0; - default: - break; - } + if (sscanf(candidate_start, "%s %d %s %" PRIu32 " %s %" PRIu32 " typ %s", + candidate->foundation, + &candidate->component, + candidate->transport, + &candidate->priority, + addrstring, + &port, + type) != 7) { + LOGE("Failed to parse ICE candidate description"); + return -1; + } + + if (strncmp(candidate->transport, "UDP", 3) != 0 && strncmp(candidate->transport, "udp", 3) != 0) { + LOGE("Only UDP transport is supported"); + return -1; + } - split_start = split_end + 1; - index++; + if (strncmp(type, "host", 4) == 0) { + candidate->type = ICE_CANDIDATE_TYPE_HOST; + } else if (strncmp(type, "srflx", 5) == 0) { + candidate->type = ICE_CANDIDATE_TYPE_SRFLX; + } else if (strncmp(type, "relay", 5) == 0) { + candidate->type = ICE_CANDIDATE_TYPE_RELAY; + } else { + LOGE("Unknown candidate type: %s", type); + return -1; + } + + addr_set_port(&candidate->addr, port); + + if (strstr(addrstring, "local") != NULL) { + if (mdns_resolve_addr(addrstring, &candidate->addr) == 0) { + LOGW("Failed to resolve mDNS address"); + return -1; + } + } else if (addr_from_string(addrstring, &candidate->addr) == 0) { + return -1; } return 0; diff --git a/src/ice.h b/src/ice.h index 7e8350e..2628a54 100644 --- a/src/ice.h +++ b/src/ice.h @@ -32,20 +32,13 @@ typedef enum IceCandidateType { typedef struct IceCandidate IceCandidate; struct IceCandidate { - int foundation; - + char foundation[32 + 1]; int component; - uint32_t priority; - char transport[32 + 1]; - IceCandidateType type; - IceCandidateState state; - Address addr; - Address raddr; }; From 9b32da306fb8284982631ed65186b7727948093e Mon Sep 17 00:00:00 2001 From: sepfy Date: Sun, 10 Nov 2024 22:41:23 +0800 Subject: [PATCH 2/8] [pico] Support i2s mic --- examples/pico/CMakeLists.txt | 3 ++ examples/pico/main.c | 53 +++++++++++++++++++++++++++++++++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/examples/pico/CMakeLists.txt b/examples/pico/CMakeLists.txt index 52082ab..a117033 100644 --- a/examples/pico/CMakeLists.txt +++ b/examples/pico/CMakeLists.txt @@ -96,6 +96,7 @@ target_compile_definitions(peer PRIVATE CONFIG_MBEDTLS_2_X=1 CONFIG_DATA_BUFFER_SIZE=512 CONFIG_AUDIO_BUFFER_SIZE=2048 + CONFIG_HTTP_BUFFER_SIZE=1024 CONFIG_SDP_BUFFER_SIZE=4096 HTTP_DO_NOT_USE_CUSTOM_CONFIG MQTT_DO_NOT_USE_CUSTOM_CONFIG @@ -110,6 +111,8 @@ target_link_libraries(peer # Build pico_peer add_executable(pico_peer main.c + rp2040_i2s_example/i2s.c + pcm-g711/pcm-g711/g711.c ) target_compile_definitions(pico_peer PRIVATE diff --git a/examples/pico/main.c b/examples/pico/main.c index cf966d6..7e7319f 100644 --- a/examples/pico/main.c +++ b/examples/pico/main.c @@ -11,8 +11,14 @@ #include "pico/stdlib.h" #include "task.h" +#include "hardware/dma.h" + +#include "pcm-g711/pcm-g711/g711.h" +#include "rp2040_i2s_example/i2s.h" #include "peer.h" +static __attribute__((aligned(8))) pio_i2s i2s; + #define TEST_TASK_PRIORITY (tskIDLE_PRIORITY + 1UL) TaskHandle_t xPcTaskHandle; @@ -38,6 +44,46 @@ void onopen(void* userdata) { static void onclose(void* userdata) { } +#if 1 +uint32_t get_epoch_time() { + struct timeval tv; + gettimeofday(&tv, NULL); + return (uint32_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; +} +#endif +static void dma_i2s_in_handler(void) { + + int8_t alaw[AUDIO_BUFFER_FRAMES]; + int16_t pcm[AUDIO_BUFFER_FRAMES]; + int32_t* input_buffer; + if (*(int32_t**)dma_hw->ch[i2s.dma_ch_in_ctrl].read_addr == i2s.input_buffer) { + input_buffer = i2s.input_buffer; + } else { + input_buffer = i2s.input_buffer + STEREO_BUFFER_SIZE; + } + for (int i = 0; i < AUDIO_BUFFER_FRAMES; i++) { + pcm[i] = (int16_t)(input_buffer[2*i+1] >> 16); + alaw[i] = ALaw_Encode(pcm[i]); + } + +#if 1 +static uint32_t total_bytes = 0; +static uint32_t last_time = 0; +total_bytes += AUDIO_BUFFER_FRAMES; +uint32_t current_time = get_epoch_time(); +if (current_time - last_time > 1000) { +printf("AUDIO_BUFFER_FRAMES: %d, bps: %d\n", AUDIO_BUFFER_FRAMES, 1000*total_bytes * 8 / (current_time - last_time)); +total_bytes = 0; +last_time = current_time; +} +#endif + + if (eState == PEER_CONNECTION_COMPLETED) { + peer_connection_send_audio(g_pc, alaw, AUDIO_BUFFER_FRAMES); + } + + dma_hw->ints0 = 1u << i2s.dma_ch_in_data; // clear the IRQ +} void peer_connection_task() { printf("Run peer connection task on the core: %d\n", portGET_CORE_ID()); @@ -48,6 +94,7 @@ void peer_connection_task() { } void main_task(__unused void* params) { + if (cyw43_arch_init()) { printf("failed to initialise\n"); vTaskDelete(NULL); @@ -68,6 +115,7 @@ void main_task(__unused void* params) { PeerConfiguration config = { .ice_servers = { {.urls = "stun:stun.l.google.com:19302"}}, + .audio_codec = CODEC_PCMA, .datachannel = DATA_CHANNEL_STRING, }; @@ -84,6 +132,8 @@ void main_task(__unused void* params) { xTaskCreate(peer_connection_task, "PeerConnectionTask", 4096, NULL, TEST_TASK_PRIORITY, &xPcTaskHandle); + i2s_program_start_synched(pio0, &i2s_config_default, dma_i2s_in_handler, &i2s); + printf("Run main task on the core: %d\n", portGET_CORE_ID()); printf("open https://sepfy.github.io/webrtc?deviceId=mypico\n"); while (true) { @@ -102,8 +152,9 @@ void vLaunch(void) { } int main(void) { + stdio_init_all(); - // set_sys_clock_khz(200000, true); + //set_sys_clock_khz(132000, true); vLaunch(); return 0; } From da7cfcb7da8cddf8b128251a667134998d0dddea Mon Sep 17 00:00:00 2001 From: sepfy Date: Mon, 18 Nov 2024 22:33:31 +0800 Subject: [PATCH 3/8] Fix build failed --- examples/pico/CMakeLists.txt | 2 ++ examples/pico/main.c | 27 ++++++++++++--------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/examples/pico/CMakeLists.txt b/examples/pico/CMakeLists.txt index a117033..cbf80a1 100644 --- a/examples/pico/CMakeLists.txt +++ b/examples/pico/CMakeLists.txt @@ -108,6 +108,8 @@ target_link_libraries(peer cjson ) +pico_generate_pio_header(pico_peer ${CMAKE_CURRENT_LIST_DIR}/rp2040_i2s_example/i2s.pio) + # Build pico_peer add_executable(pico_peer main.c diff --git a/examples/pico/main.c b/examples/pico/main.c index 7e7319f..299d64c 100644 --- a/examples/pico/main.c +++ b/examples/pico/main.c @@ -14,8 +14,8 @@ #include "hardware/dma.h" #include "pcm-g711/pcm-g711/g711.h" -#include "rp2040_i2s_example/i2s.h" #include "peer.h" +#include "rp2040_i2s_example/i2s.h" static __attribute__((aligned(8))) pio_i2s i2s; @@ -52,7 +52,6 @@ uint32_t get_epoch_time() { } #endif static void dma_i2s_in_handler(void) { - int8_t alaw[AUDIO_BUFFER_FRAMES]; int16_t pcm[AUDIO_BUFFER_FRAMES]; int32_t* input_buffer; @@ -62,20 +61,20 @@ static void dma_i2s_in_handler(void) { input_buffer = i2s.input_buffer + STEREO_BUFFER_SIZE; } for (int i = 0; i < AUDIO_BUFFER_FRAMES; i++) { - pcm[i] = (int16_t)(input_buffer[2*i+1] >> 16); + pcm[i] = (int16_t)(input_buffer[2 * i + 1] >> 16); alaw[i] = ALaw_Encode(pcm[i]); } #if 1 -static uint32_t total_bytes = 0; -static uint32_t last_time = 0; -total_bytes += AUDIO_BUFFER_FRAMES; -uint32_t current_time = get_epoch_time(); -if (current_time - last_time > 1000) { -printf("AUDIO_BUFFER_FRAMES: %d, bps: %d\n", AUDIO_BUFFER_FRAMES, 1000*total_bytes * 8 / (current_time - last_time)); -total_bytes = 0; -last_time = current_time; -} + static uint32_t total_bytes = 0; + static uint32_t last_time = 0; + total_bytes += AUDIO_BUFFER_FRAMES; + uint32_t current_time = get_epoch_time(); + if (current_time - last_time > 1000) { + printf("AUDIO_BUFFER_FRAMES: %d, bps: %d\n", AUDIO_BUFFER_FRAMES, 1000 * total_bytes * 8 / (current_time - last_time)); + total_bytes = 0; + last_time = current_time; + } #endif if (eState == PEER_CONNECTION_COMPLETED) { @@ -94,7 +93,6 @@ void peer_connection_task() { } void main_task(__unused void* params) { - if (cyw43_arch_init()) { printf("failed to initialise\n"); vTaskDelete(NULL); @@ -152,9 +150,8 @@ void vLaunch(void) { } int main(void) { - stdio_init_all(); - //set_sys_clock_khz(132000, true); + // set_sys_clock_khz(132000, true); vLaunch(); return 0; } From c75499af1138c777fa8226f062912c8ae99ee0a0 Mon Sep 17 00:00:00 2001 From: sepfy Date: Mon, 18 Nov 2024 22:34:11 +0800 Subject: [PATCH 4/8] Fix aiortc datachannel not open issue --- src/peer_connection.c | 4 +- src/sctp.c | 139 +++++++++++++++++++++++++++--------------- src/sctp.h | 23 +++++-- 3 files changed, 112 insertions(+), 54 deletions(-) diff --git a/src/peer_connection.c b/src/peer_connection.c index 56ec655..608f672 100644 --- a/src/peer_connection.c +++ b/src/peer_connection.c @@ -208,7 +208,7 @@ PeerConnection* peer_connection_create(PeerConfiguration* config) { void peer_connection_destroy(PeerConnection* pc) { if (pc) { - sctp_destroy_socket(&pc->sctp); + sctp_destroy_association(&pc->sctp); dtls_srtp_deinit(&pc->dtls_srtp); agent_destroy(&pc->agent); buffer_free(pc->data_rb); @@ -386,7 +386,7 @@ int peer_connection_loop(PeerConnection* pc) { if (pc->config.datachannel) { LOGI("SCTP create socket"); - sctp_create_socket(&pc->sctp, &pc->dtls_srtp); + sctp_create_association(&pc->sctp, &pc->dtls_srtp); pc->sctp.userdata = pc->config.user_data; } diff --git a/src/sctp.c b/src/sctp.c index 76ebeaa..a30c364 100644 --- a/src/sctp.c +++ b/src/sctp.c @@ -1,21 +1,13 @@ #include #include +#include "dtls_srtp.h" #include "sctp.h" +#include "utils.h" #if CONFIG_USE_USRSCTP #include #endif -#include "dtls_srtp.h" -#include "utils.h" - -#define DATA_CHANNEL_PPID_CONTROL 50 -#define DATA_CHANNEL_PPID_DOMSTRING 51 -#define DATA_CHANNEL_PPID_BINARY_PARTIAL 52 -#define DATA_CHANNEL_PPID_BINARY 53 -#define DATA_CHANNEL_PPID_DOMSTRING_PARTIAL 54 -#define DATA_CHANNEL_OPEN 0x03 - static const uint32_t crc32c_table[256] = { 0x00000000L, 0xF26B8303L, 0xE13B70F7L, 0x1350F3F4L, 0xC79A971FL, 0x35F1141CL, 0x26A1E7E8L, 0xD4CA64EBL, @@ -98,7 +90,6 @@ static int sctp_outgoing_data_cb(void* userdata, void* buf, size_t len, uint8_t Sctp* sctp = (Sctp*)userdata; dtls_srtp_write(sctp->dtls_srtp, buf, len); - return 0; } @@ -114,8 +105,9 @@ int sctp_outgoing_data(Sctp* sctp, char* buf, size_t len, SctpDataPpid ppid, uin spa.sendv_sndinfo.snd_ppid = htonl(ppid); res = usrsctp_sendv(sctp->sock, buf, len, NULL, 0, &spa, sizeof(spa), SCTP_SENDV_SPA, 0); - if (res < 0) - LOGE("sctp sendv error %d %s", errno, strerror(errno)); + if (res < 0) { + LOGE("sctp sendv error %d: %s", errno, strerror(errno)); + } return res; #else size_t padding_len = 0; @@ -201,6 +193,8 @@ void sctp_parse_data_channel_open(Sctp* sctp, uint16_t sid, char* data, size_t l // Add stream mapping sctp_add_stream_mapping(sctp, label_str, sid); + char ack = DATA_CHANNEL_ACK; + sctp_outgoing_data(sctp, &ack, 1, DATA_CHANNEL_PPID_CONTROL, sid); } } @@ -229,8 +223,6 @@ void sctp_incoming_data(Sctp* sctp, char* buf, size_t len) { size_t length = 0; size_t pos = sizeof(SctpHeader); SctpChunkCommon* chunk_common; - SctpDataChunk* data_chunk; - SctpSackChunk* sack; SctpPacket* in_packet = (SctpPacket*)buf; SctpPacket* out_packet = (SctpPacket*)sctp->buf; @@ -252,39 +244,42 @@ void sctp_incoming_data(Sctp* sctp, char* buf, size_t len) { // prepare outgoing packet memset(sctp->buf, 0, sizeof(sctp->buf)); - - // chunks while ((4 * (pos + 3) / 4) < len) { chunk_common = (SctpChunkCommon*)(buf + pos); switch (chunk_common->type) { - case SCTP_DATA: - - data_chunk = (SctpDataChunk*)(buf + pos); - LOGD("SCTP_DATA. ppid = %ld", ntohl(data_chunk->ppid)); - -// XXX: not check DATA_CHANNEL_OPEN? -#if 0 + case SCTP_DATA: { + SctpDataChunk* data_chunk = (SctpDataChunk*)(buf + pos); + SctpSackChunk* sack_chunk = (SctpSackChunk*)out_packet->chunks; + + sack_chunk->common.type = SCTP_SACK; + sack_chunk->common.flags = 0x00; + sack_chunk->common.length = htons(16); + sack_chunk->cumulative_tsn_ack = data_chunk->tsn; + sack_chunk->a_rwnd = htonl(0x02); + length = ntohs(sack_chunk->common.length) + sizeof(SctpHeader); + + LOGD("SCTP_DATA. ppid = %ld, data = %d", ntohl(data_chunk->ppid), data_chunk->data[0]); if (ntohl(data_chunk->ppid) == DATA_CHANNEL_PPID_CONTROL && data_chunk->data[0] == DATA_CHANNEL_OPEN) { - + data_chunk = (SctpDataChunk*)sack_chunk->blocks; + data_chunk->type = SCTP_DATA; + data_chunk->iube = 0x03; + data_chunk->tsn = htonl(sctp->tsn++); + data_chunk->sid = htons(0); + data_chunk->sqn = htons(0); + data_chunk->ppid = htonl(DATA_CHANNEL_PPID_CONTROL); + data_chunk->length = htons(1 + sizeof(SctpDataChunk)); + data_chunk->data[0] = DATA_CHANNEL_ACK; + length += ntohs(data_chunk->length); } else if (ntohl(data_chunk->ppid) == DATA_CHANNEL_PPID_DOMSTRING) { -#endif - if (ntohl(data_chunk->ppid) == DATA_CHANNEL_PPID_DOMSTRING) { if (sctp->onmessage) { - sctp->onmessage((char*)data_chunk->data, ntohs(data_chunk->length) - sizeof(SctpDataChunk), sctp->userdata, ntohs(data_chunk->sid)); + sctp->onmessage((char*)data_chunk->data, ntohs(data_chunk->length) - sizeof(SctpDataChunk), + sctp->userdata, ntohs(data_chunk->sid)); } } - - sack = (SctpSackChunk*)out_packet->chunks; - sack->common.type = SCTP_SACK; - sack->common.flags = 0x00; - sack->common.length = htons(16); - sack->cumulative_tsn_ack = data_chunk->tsn; - sack->a_rwnd = htonl(0x02); - length = ntohs(sack->common.length) + sizeof(SctpHeader); pos = len; // Do not handle other msg - break; - case SCTP_INIT: + } break; + case SCTP_INIT: { LOGD("SCTP_INIT"); SctpInitChunk* init_chunk; @@ -304,11 +299,36 @@ void sctp_incoming_data(Sctp* sctp, char* buf, size_t len) { SctpChunkParam* param = init_ack->param; param->type = htons(SCTP_PARAM_STATE_COOKIE); - param->length = htons(0x08); - uint32_t value = htonl(0x02); - memcpy(¶m->value, &value, 4); + param->length = htons(8); + *(uint32_t*)¶m->value = htonl(0x02); length = ntohs(init_ack->common.length) + sizeof(SctpHeader); - break; + } break; + case SCTP_INIT_ACK: { + SctpInitChunk* init_ack = (SctpInitChunk*)in_packet->chunks; + SctpCookieEchoChunk* cookie_echo = (SctpCookieEchoChunk*)out_packet->chunks; + SctpChunkParam* param; + sctp->verification_tag = init_ack->initiate_tag; + int type; + // find cookie + uint8_t* cookie = NULL; + cookie = (uint8_t*)&init_ack->param[0]; + for (int i = 0; i < init_ack->common.length - 20; i += 2) { + type = ntohs(*(uint16_t*)&cookie[i]); + // find cookie param + if (type == 0x07) { + param = (SctpChunkParam*)&cookie[i]; + break; + } + } + + cookie_echo->common.type = SCTP_COOKIE_ECHO; + cookie_echo->common.flags = 0x00; + // cookie echo: type + flag + length (4 bytes) + cookie + cookie_echo->common.length = htons(ntohs(param->length)); + // param: type + length (4 bytes) + cookie + memcpy(cookie_echo->cookie, param->value, ntohs(param->length) - 4); + length = ntohs(cookie_echo->common.length) + sizeof(SctpHeader); + } break; case SCTP_SACK: #if 0 LOGD("SCTP_SACK"); @@ -341,7 +361,7 @@ void sctp_incoming_data(Sctp* sctp, char* buf, size_t len) { } #endif break; - case SCTP_COOKIE_ECHO: + case SCTP_COOKIE_ECHO: { LOGD("SCTP_COOKIE_ECHO"); SctpChunkCommon* common = (SctpChunkCommon*)out_packet->chunks; common->type = SCTP_COOKIE_ACK; @@ -356,7 +376,7 @@ void sctp_incoming_data(Sctp* sctp, char* buf, size_t len) { sctp->onopen(sctp->userdata); } } - break; + } break; case SCTP_ABORT: sctp->connected = 0; if (sctp->onclose) { @@ -383,7 +403,6 @@ void sctp_incoming_data(Sctp* sctp, char* buf, size_t len) { } pos += ntohs(chunk_common->length); } - #endif } @@ -464,7 +483,7 @@ static int sctp_incoming_data_cb(struct socket* sock, union sctp_sockstore addr, } #endif -int sctp_create_socket(Sctp* sctp, DtlsSrtp* dtls_srtp) { +int sctp_create_association(Sctp* sctp, DtlsSrtp* dtls_srtp) { sctp->dtls_srtp = dtls_srtp; sctp->local_port = 5000; sctp->remote_port = 5000; @@ -561,17 +580,41 @@ int sctp_create_socket(Sctp* sctp, DtlsSrtp* dtls_srtp) { } while (0); if (ret < 0) { - sctp_destroy_socket(sctp); + sctp_destroy_association(sctp); return -1; } sctp->sock = sock; +#else + // send SCTP_INIT + int length = 0; + SctpInitChunk* init_chunk; + SctpHeader* header; + SctpPacket* out_packet = (SctpPacket*)sctp->buf; + header = &out_packet->header; + init_chunk = (SctpInitChunk*)out_packet->chunks; + + header->source_port = htons(sctp->local_port); + header->destination_port = htons(sctp->remote_port); + header->verification_tag = 0x0; + init_chunk->common.type = SCTP_INIT; + init_chunk->common.flags = 0x00; + init_chunk->common.length = htons(20); + init_chunk->initiate_tag = htonl(0x12345678); + init_chunk->a_rwnd = htonl(0x100000); + init_chunk->number_of_outbound_streams = 0xffff; + init_chunk->number_of_inbound_streams = 0xffff; + init_chunk->initial_tsn = htonl(sctp->tsn); + length = ntohs(init_chunk->common.length) + sizeof(SctpHeader); + length = (4 * ((length + 3) / 4)); + header->checksum = sctp_get_checksum(sctp, sctp->buf, length); + dtls_srtp_write(sctp->dtls_srtp, sctp->buf, length); #endif return 0; } -void sctp_destroy_socket(Sctp* sctp) { +void sctp_destroy_association(Sctp* sctp) { #if CONFIG_USE_USRSCTP if (sctp && sctp->sock) { usrsctp_shutdown(sctp->sock, SHUT_RDWR); diff --git a/src/sctp.h b/src/sctp.h index c7f70c7..50e6464 100644 --- a/src/sctp.h +++ b/src/sctp.h @@ -6,8 +6,6 @@ #include "dtls_srtp.h" #include "utils.h" -#if !CONFIG_USE_USRSCTP - typedef enum DecpMsgType { DATA_CHANNEL_OPEN = 0x03, @@ -15,6 +13,18 @@ typedef enum DecpMsgType { } DecpMsgType; +typedef enum DataChannelPpid { + + DATA_CHANNEL_PPID_CONTROL = 50, + DATA_CHANNEL_PPID_DOMSTRING = 51, + DATA_CHANNEL_PPID_BINARY_PARTIAL = 52, + DATA_CHANNEL_PPID_BINARY = 53, + DATA_CHANNEL_PPID_DOMSTRING_PARTIAL = 54 + +} DataChannelPpid; + +#if !CONFIG_USE_USRSCTP + typedef struct SctpChunkParam { uint16_t type; uint16_t length; @@ -114,6 +124,11 @@ typedef struct SctpInitChunk { } SctpInitChunk; +typedef struct SctpCookieEchoChunk { + SctpChunkCommon common; + uint8_t cookie[0]; +} SctpCookieEchoChunk; + #endif typedef enum SctpDataPpid { @@ -155,9 +170,9 @@ typedef struct Sctp { uint8_t buf[CONFIG_MTU]; } Sctp; -int sctp_create_socket(Sctp* sctp, DtlsSrtp* dtls_srtp); +int sctp_create_association(Sctp* sctp, DtlsSrtp* dtls_srtp); -void sctp_destroy_socket(Sctp* sctp); +void sctp_destroy_association(Sctp* sctp); int sctp_is_connected(Sctp* sctp); From 60ef03f6813896e12089455b32b11dfaf36924ff Mon Sep 17 00:00:00 2001 From: sepfy Date: Sun, 8 Dec 2024 21:47:27 +0800 Subject: [PATCH 5/8] Simplify signaling configuration --- examples/generic/main.c | 50 +++++-- src/ice.c | 2 +- src/peer_signaling.c | 284 ++++++++++++++++++++++------------------ src/peer_signaling.h | 8 +- 4 files changed, 201 insertions(+), 143 deletions(-) diff --git a/examples/generic/main.c b/examples/generic/main.c index eae1fc8..ef2c88a 100644 --- a/examples/generic/main.c +++ b/examples/generic/main.c @@ -60,18 +60,43 @@ static uint64_t get_timestamp() { return tv.tv_sec * 1000 + tv.tv_usec / 1000; } +void print_usage(const char *prog_name) { + printf("Usage: %s -u [-t ]\n", prog_name); +} + +void parse_arguments(int argc, char *argv[], const char **url, const char **token) { + + *token = NULL; + *url = NULL; + + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "-u") == 0 && (i + 1) < argc) { + *url = argv[++i]; + } else if (strcmp(argv[i], "-t") == 0 && (i + 1) < argc) { + *token = argv[++i]; + } else { + print_usage(argv[0]); + exit(1); + } + } + + if (*url == NULL) { + print_usage(argv[0]); + exit(1); + } +} + int main(int argc, char* argv[]) { uint64_t curr_time, video_time, audio_time; uint8_t buf[102400]; + const char *url = NULL; + const char *token = NULL; int size; pthread_t peer_singaling_thread; pthread_t peer_connection_thread; - if (argc < 2) { - printf("Usage: %s \n", argv[0]); - return -1; - } + parse_arguments(argc, argv, &url, &token); signal(SIGINT, signal_handler); @@ -81,21 +106,20 @@ int main(int argc, char* argv[]) { }, .datachannel = DATA_CHANNEL_STRING, .video_codec = CODEC_H264, - .audio_codec = CODEC_PCMA}; - - ServiceConfiguration service_config = SERVICE_CONFIG_DEFAULT(); + .audio_codec = CODEC_PCMA + }; - printf("open https://sepfy.github.io/webrtc?deviceId=%s\n", argv[1]); + printf("=========== Parsed Arguments ===========\n"); + printf(" %-5s : %s\n", "URL", url); + printf(" %-5s : %s\n", "Token", token ? token : ""); + printf("========================================\n"); peer_init(); g_pc = peer_connection_create(&config); peer_connection_oniceconnectionstatechange(g_pc, onconnectionstatechange); peer_connection_ondatachannel(g_pc, onmessage, onopen, onclose); - service_config.client_id = argv[1]; - service_config.pc = g_pc; - peer_signaling_set_config(&service_config); - peer_signaling_join_channel(); + peer_signaling_connect(url, token, g_pc); pthread_create(&peer_connection_thread, NULL, peer_connection_task, NULL); pthread_create(&peer_singaling_thread, NULL, peer_singaling_task, NULL); @@ -130,7 +154,7 @@ int main(int argc, char* argv[]) { reader_deinit(); - peer_signaling_leave_channel(); + peer_signaling_disconnect(); peer_connection_destroy(g_pc); peer_deinit(); diff --git a/src/ice.c b/src/ice.c index 5a6ba98..c22911f 100644 --- a/src/ice.c +++ b/src/ice.c @@ -67,7 +67,7 @@ void ice_candidate_to_description(IceCandidate* candidate, char* description, in } addr_to_string(&candidate->addr, addr_string, sizeof(addr_string)); - snprintf(description, length, "a=candidate:%d %d %s %" PRIu32 " %s %d typ %s\r\n", + snprintf(description, length, "a=candidate:%s %d %s %" PRIu32 " %s %d typ %s\r\n", candidate->foundation, candidate->component, candidate->transport, diff --git a/src/peer_signaling.c b/src/peer_signaling.c index bd0fa53..12cbd26 100644 --- a/src/peer_signaling.c +++ b/src/peer_signaling.c @@ -18,12 +18,11 @@ #define KEEP_ALIVE_TIMEOUT_SECONDS 60 #define CONNACK_RECV_TIMEOUT_MS 1000 -#define TOPIC_SIZE 128 -#define HOST_LEN 64 -#define CRED_LEN 128 +#define URL_MAX_LEN 256 +#define TOPIC_MAX_LEN 128 +#define TOKEN_MAX_LEN 256 #define RPC_VERSION "2.0" - #define RPC_METHOD_STATE "state" #define RPC_METHOD_OFFER "offer" #define RPC_METHOD_ANSWER "answer" @@ -45,25 +44,95 @@ typedef struct PeerSignaling { uint8_t mqtt_buf[CONFIG_MQTT_BUFFER_SIZE]; uint8_t http_buf[CONFIG_HTTP_BUFFER_SIZE]; - char subtopic[TOPIC_SIZE]; - char pubtopic[TOPIC_SIZE]; + char subtopic[TOPIC_MAX_LEN]; + char pubtopic[TOPIC_MAX_LEN]; uint16_t packet_id; int id; - int mqtt_port; - int http_port; - char mqtt_host[HOST_LEN]; - char http_host[HOST_LEN]; - char http_path[HOST_LEN]; - char username[CRED_LEN]; - char password[CRED_LEN]; - char client_id[CRED_LEN]; + int proto; // 0: MQTT, 1: HTTP + int port; + char host[URL_MAX_LEN]; + char path[URL_MAX_LEN]; + char token[TOKEN_MAX_LEN]; + char client_id[32]; + PeerConnection* pc; } PeerSignaling; -static PeerSignaling g_ps; +static PeerSignaling g_ps = {0}; + +static int peer_signaling_resolve_token(const char* token, char* username, char* password) { + char plaintext[TOKEN_MAX_LEN] = {0}; + char* colon; + + if (token == NULL || strlen(token) == 0) { + LOGW("Invalid token"); + return -1; + } + base64_decode(token, strlen(token), (unsigned char*)plaintext, sizeof(plaintext)); + colon = strchr(plaintext, ':'); + if (colon == NULL) { + LOGW("Invalid token: %s", token); + return -1; + } + + strncpy(username, plaintext, colon - plaintext); + strncpy(password, colon + 1, strlen(colon + 1)); + LOGD("Username: %s, Password: %s", username, password); + return 0; +} + +static int peer_signaling_resolve_url(const char* url, char* host, int *port, char* path) { + + char* port_start, *path_start; + int proto = 0; + + if (url == NULL || strlen(url) == 0) { + LOGW("Invalid URL"); + return -1; + } + + if (strncmp(url, "mqtts://", 8) == 0) { + *port = 8883; + url += 8; + } else if (strncmp(url, "https://", 8) == 0) { + *port = 443; + url += 8; + proto = 1; + } else if (strncmp(url, "mqtt://", 7) == 0) { + *port = 1883; + url += 7; + } else if (strncmp(url, "http://", 7) == 0) { + *port = 80; + url += 7; + proto = 1; + } else { + LOGW("Invalid URL: %s", url); + return -1; + } + + port_start = strchr(url, ':'); + path_start = strchr(url, '/'); + + if (port_start != NULL && path_start != NULL && port_start < path_start) { + strncpy(host, url, port_start - url); + strncpy(path, path_start, strlen(path_start)); + *port = atoi(port_start + 1); + } else if (port_start == NULL && path_start != NULL) { + strncpy(host, url, path_start - url); + strncpy(path, path_start, strlen(path_start)); + } else if (port_start != NULL && path_start == NULL) { + strncpy(host, url, port_start - url); + *port = atoi(port_start + 1); + } else { + strncpy(host, url, strlen(url)); + } + + LOGI("Host: %s, Port: %d, Path: %s", host, *port, path); + return proto; +} static void peer_signaling_mqtt_publish(MQTTContext_t* mqtt_ctx, const char* message) { MQTTStatus_t status; @@ -244,10 +313,7 @@ static int peer_signaling_http_post(const char* hostname, const char* path, int trans_if.send = ssl_transport_send; trans_if.pNetworkContext = &net_ctx; - if (port <= 0) { - LOGE("Invalid port number: %d", port); - return -1; - } + assert(port > 0); ret = ssl_transport_connect(&net_ctx, hostname, port, NULL); @@ -285,17 +351,36 @@ static int peer_signaling_http_post(const char* hostname, const char* path, int static void peer_signaling_mqtt_event_cb(MQTTContext_t* mqtt_ctx, MQTTPacketInfo_t* packet_info, MQTTDeserializedInfo_t* deserialized_info) { + MQTTStatus_t status = MQTTSuccess; switch (packet_info->type) { - case MQTT_PACKET_TYPE_CONNACK: - LOGI("MQTT_PACKET_TYPE_CONNACK"); - break; case MQTT_PACKET_TYPE_PUBLISH: - LOGI("MQTT_PACKET_TYPE_PUBLISH"); + LOGD("MQTT received message: %s"); peer_signaling_on_pub_event(deserialized_info->pPublishInfo->pPayload, deserialized_info->pPublishInfo->payloadLength); break; - case MQTT_PACKET_TYPE_SUBACK: - LOGD("MQTT_PACKET_TYPE_SUBACK"); + case MQTT_PACKET_TYPE_SUBACK: { + size_t ncodes = 0; + int i = 0; + uint8_t* codes = NULL; + status = MQTT_GetSubAckStatusCodes(packet_info, &codes, &ncodes); + + assert(status == MQTTSuccess); + + assert(ncodes == 1); + + for (i = 0; i < ncodes; i++) { + if (codes[0] == MQTTSubAckFailure) { + LOGE("MQTT Subscription failed. Please check authorization"); + break; + } + } + + if (i == ncodes) { + LOGI("MQTT Subscribe succeeded."); + } + } break; + case MQTT_PACKET_TYPE_UNSUBACK: + LOGI("MQTT Unsubscribe succeeded."); break; default: break; @@ -306,6 +391,8 @@ static int peer_signaling_mqtt_connect(const char* hostname, int port) { MQTTStatus_t status; MQTTConnectInfo_t conn_info; bool session_present; + char username[TOKEN_MAX_LEN] = {0}; + char password[TOKEN_MAX_LEN] = {0}; if (ssl_transport_connect(&g_ps.net_ctx, hostname, port, NULL) < 0) { LOGE("ssl transport connect failed"); @@ -323,14 +410,13 @@ static int peer_signaling_mqtt_connect(const char* hostname, int port) { memset(&conn_info, 0, sizeof(conn_info)); conn_info.cleanSession = false; - if (strlen(g_ps.username) > 0) { - conn_info.pUserName = g_ps.username; - conn_info.userNameLength = strlen(g_ps.username); - } - if (strlen(g_ps.password) > 0) { - conn_info.pPassword = g_ps.password; - conn_info.passwordLength = strlen(g_ps.password); + if (strlen(g_ps.token) > 0) { + peer_signaling_resolve_token(g_ps.token, username, password); + conn_info.pUserName = username; + conn_info.userNameLength = strlen(username); + conn_info.pPassword = password; + conn_info.passwordLength = strlen(password); } if (strlen(g_ps.client_id) > 0) { @@ -364,6 +450,7 @@ static int peer_signaling_mqtt_subscribe(int subscribed) { sub_info.topicFilterLength = strlen(g_ps.subtopic); if (subscribed) { + LOGI("Subscribing topic %s", g_ps.subtopic); status = MQTT_Subscribe(&g_ps.mqtt_ctx, &sub_info, 1, packet_id); } else { status = MQTT_Unsubscribe(&g_ps.mqtt_ctx, &sub_info, 1, packet_id); @@ -380,16 +467,12 @@ static int peer_signaling_mqtt_subscribe(int subscribed) { return -1; } - LOGD("MQTT Subscribe/Unsubscribe succeeded."); return 0; } static void peer_signaling_onicecandidate(char* description, void* userdata) { cJSON* res; char* payload; - char cred_plaintext[2 * CRED_LEN + 1]; - char cred_base64[2 * CRED_LEN + 10]; - if (g_ps.id > 0) { res = cJSON_CreateObject(); cJSON_AddStringToObject(res, "jsonrpc", RPC_VERSION); @@ -403,123 +486,68 @@ static void peer_signaling_onicecandidate(char* description, void* userdata) { cJSON_Delete(res); g_ps.id = 0; } else { - // enable authentication - if (strlen(g_ps.username) > 0 && strlen(g_ps.password) > 0) { - snprintf(cred_plaintext, sizeof(cred_plaintext), "%s:%s", g_ps.username, g_ps.password); - snprintf(cred_base64, sizeof(cred_base64), "Basic "); - base64_encode((unsigned char*)cred_plaintext, strlen(cred_plaintext), - cred_base64 + strlen(cred_base64), sizeof(cred_base64) - strlen(cred_base64)); - LOGD("Basic Auth: %s", cred_base64); - peer_signaling_http_post(g_ps.http_host, g_ps.http_path, g_ps.http_port, cred_base64, description); + if (g_ps.token != NULL && strlen(g_ps.token) > 0) { + char cred[TOKEN_MAX_LEN]; + memset(cred, 0, sizeof(cred)); + snprintf(cred, sizeof(cred), "Basic %s", g_ps.token); + peer_signaling_http_post(g_ps.host, g_ps.path, g_ps.port, cred, description); } else { - peer_signaling_http_post(g_ps.http_host, g_ps.http_path, g_ps.http_port, "", description); + peer_signaling_http_post(g_ps.host, g_ps.path, g_ps.port, "", description); } } } -int peer_signaling_whip_connect() { - if (g_ps.pc == NULL) { - LOGW("PeerConnection is NULL"); - return -1; - } else if (g_ps.http_port <= 0) { - LOGW("Invalid HTTP port number: %d", g_ps.http_port); - return -1; - } - - peer_connection_create_offer(g_ps.pc); - return 0; -} +int peer_signaling_connect(const char* url, const char* token, PeerConnection* pc) { -void peer_signaling_whip_disconnect() { - // TODO: implement -} + char* client_id; -int peer_signaling_join_channel() { - if (g_ps.pc == NULL) { - LOGW("PeerConnection is NULL"); - return -1; - } else if (g_ps.mqtt_port <= 0) { - LOGW("Invalid MQTT port number: %d", g_ps.mqtt_port); - if (peer_signaling_whip_connect() < 0) { - LOGW("Tried MQTT and WHIP, connect failed"); - return -1; - } - return 0; + if ((g_ps.proto = peer_signaling_resolve_url(url, g_ps.host, &g_ps.port, g_ps.path)) < 0) { + LOGE("Resolve URL failed"); } - if (peer_signaling_mqtt_connect(g_ps.mqtt_host, g_ps.mqtt_port) < 0) { - LOGW("Connect MQTT server failed"); - return -1; + if (token) { + strncpy(g_ps.token, token, sizeof(g_ps.token)); } - peer_signaling_mqtt_subscribe(1); - return 0; -} + g_ps.pc = pc; + peer_connection_onicecandidate(g_ps.pc, peer_signaling_onicecandidate); + + switch (g_ps.proto) { + case 0: { // MQTT + client_id = strrchr(g_ps.path, '/'); + snprintf(g_ps.client_id, sizeof(g_ps.client_id), "%s", client_id + 1); + snprintf(g_ps.subtopic, sizeof(g_ps.subtopic), "%s/invoke", g_ps.path); + snprintf(g_ps.pubtopic, sizeof(g_ps.pubtopic), "%s/result", g_ps.path); + if (peer_signaling_mqtt_connect(g_ps.host, g_ps.port) == 0) { + peer_signaling_mqtt_subscribe(1); + } + } break; + case 1: { // HTTP + peer_connection_create_offer(g_ps.pc); + } break; + default: { + } break; -int peer_signaling_loop() { - if (g_ps.mqtt_port > 0) { - MQTT_ProcessLoop(&g_ps.mqtt_ctx); } + return 0; } -void peer_signaling_leave_channel() { +void peer_signaling_disconnect() { MQTTStatus_t status = MQTTSuccess; - if (g_ps.mqtt_port > 0 && peer_signaling_mqtt_subscribe(0) == 0) { + if (!g_ps.proto && !peer_signaling_mqtt_subscribe(0)) { status = MQTT_Disconnect(&g_ps.mqtt_ctx); if (status != MQTTSuccess) { LOGE("Failed to disconnect with broker: %s", MQTT_Status_strerror(status)); } } + LOGI("Disconnected"); } -void peer_signaling_set_config(ServiceConfiguration* service_config) { - char* pos; - - memset(&g_ps, 0, sizeof(g_ps)); - do { - if (service_config->http_url == NULL || strlen(service_config->http_url) == 0) { - break; - } - - if ((pos = strstr(service_config->http_url, "/")) != NULL) { - strncpy(g_ps.http_host, service_config->http_url, pos - service_config->http_url); - strncpy(g_ps.http_path, pos, HOST_LEN); - } else { - strncpy(g_ps.http_host, service_config->http_url, HOST_LEN); - } - - g_ps.http_port = service_config->http_port; - LOGI("HTTP Host: %s, Port: %d, Path: %s", g_ps.http_host, g_ps.http_port, g_ps.http_path); - } while (0); - - do { - if (service_config->mqtt_url == NULL || strlen(service_config->mqtt_url) == 0) { - break; - } - - strncpy(g_ps.mqtt_host, service_config->mqtt_url, HOST_LEN); - g_ps.mqtt_port = service_config->mqtt_port; - LOGD("MQTT Host: %s, Port: %d", g_ps.mqtt_host, g_ps.mqtt_port); - } while (0); - - if (service_config->client_id != NULL && strlen(service_config->client_id) > 0) { - strncpy(g_ps.client_id, service_config->client_id, CRED_LEN); - snprintf(g_ps.subtopic, sizeof(g_ps.subtopic), "webrtc/%s/jsonrpc", service_config->client_id); - snprintf(g_ps.pubtopic, sizeof(g_ps.pubtopic), "webrtc/%s/jsonrpc-reply", service_config->client_id); - } - - if (service_config->username != NULL && strlen(service_config->username) > 0) { - strncpy(g_ps.username, service_config->username, CRED_LEN); - } - - if (service_config->password != NULL && strlen(service_config->password) > 0) { - strncpy(g_ps.password, service_config->password, CRED_LEN); - } - - g_ps.pc = service_config->pc; - peer_connection_onicecandidate(g_ps.pc, peer_signaling_onicecandidate); +int peer_signaling_loop() { + MQTT_ProcessLoop(&g_ps.mqtt_ctx); + return 0; } #endif // DISABLE_PEER_SIGNALING diff --git a/src/peer_signaling.h b/src/peer_signaling.h index 679cc1f..ab61c8a 100644 --- a/src/peer_signaling.h +++ b/src/peer_signaling.h @@ -22,7 +22,7 @@ typedef struct ServiceConfiguration { #define SERVICE_CONFIG_DEFAULT() \ { \ - .mqtt_url = "broker.emqx.io", \ + .mqtt_url = "libpeer.com", \ .mqtt_port = 8883, \ .client_id = "peer", \ .http_url = "", \ @@ -32,6 +32,12 @@ typedef struct ServiceConfiguration { .pc = NULL \ } +#define DEFAULT_PEER_SIGNALING "mqtts://libpeer.com/public" + +int peer_signaling_connect(const char* url, const char* token, PeerConnection* pc); + +void peer_signaling_disconnect(); + void peer_signaling_set_config(ServiceConfiguration* config); int peer_signaling_whip_connect(); From 055b6dea5d84754afab659dcec110b0401fceeb3 Mon Sep 17 00:00:00 2001 From: sepfy Date: Sun, 8 Dec 2024 23:17:25 +0800 Subject: [PATCH 6/8] [esp32] Fix compile error --- examples/esp32/main/Kconfig.projbuild | 12 +++++++- examples/esp32/main/app_main.c | 21 ++++---------- src/peer_signaling.c | 40 +++++++++++++-------------- src/peer_signaling.h | 35 ----------------------- src/sctp.c | 2 +- src/stun.c | 4 +-- 6 files changed, 39 insertions(+), 75 deletions(-) diff --git a/examples/esp32/main/Kconfig.projbuild b/examples/esp32/main/Kconfig.projbuild index 684c936..cdf2b09 100644 --- a/examples/esp32/main/Kconfig.projbuild +++ b/examples/esp32/main/Kconfig.projbuild @@ -1,4 +1,4 @@ -menu "ESP32 Hardware Configuration" +menu "Peer Example Configuration" choice ESP32_CAMERA_MODEL prompt "Camera model" @@ -15,4 +15,14 @@ menu "ESP32 Hardware Configuration" bool "ESP32S3-XIAO-SENSE" endchoice + config SIGNALING_URL + string "Signaling URL" + default "https://libpeer.com/public/test" + help + Enter the URL of the Peer library to use. + config SIGNALING_TOKEN + string "Signaling Token" + default "" + help + Enter the token of the Peer library to use. endmenu diff --git a/examples/esp32/main/app_main.c b/examples/esp32/main/app_main.c index da2685a..1f162e5 100644 --- a/examples/esp32/main/app_main.c +++ b/examples/esp32/main/app_main.c @@ -77,9 +77,6 @@ void peer_connection_task(void* arg) { } void app_main(void) { - static char deviceid[32] = {0}; - uint8_t mac[8] = {0}; - PeerConfiguration config = { .ice_servers = { {.urls = "stun:stun.l.google.com:19302"}}, @@ -104,11 +101,6 @@ void app_main(void) { ESP_ERROR_CHECK(esp_event_loop_create_default()); ESP_ERROR_CHECK(example_connect()); - if (esp_read_mac(mac, ESP_MAC_WIFI_STA) == ESP_OK) { - sprintf(deviceid, "esp32-%02x%02x%02x%02x%02x%02x", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); - ESP_LOGI(TAG, "Device ID: %s", deviceid); - } - xSemaphore = xSemaphoreCreateMutex(); peer_init(); @@ -122,13 +114,7 @@ void app_main(void) { g_pc = peer_connection_create(&config); peer_connection_oniceconnectionstatechange(g_pc, oniceconnectionstatechange); peer_connection_ondatachannel(g_pc, onmessage, onopen, onclose); - - ServiceConfiguration service_config = SERVICE_CONFIG_DEFAULT(); - service_config.client_id = deviceid; - service_config.pc = g_pc; - service_config.mqtt_url = "broker.emqx.io"; - peer_signaling_set_config(&service_config); - peer_signaling_join_channel(); + peer_signaling_connect(CONFIG_SIGNALING_URL, CONFIG_SIGNALING_TOKEN, g_pc); #if defined(CONFIG_ESP32S3_XIAO_SENSE) StackType_t* stack_memory = (StackType_t*)heap_caps_malloc(8192 * sizeof(StackType_t), MALLOC_CAP_SPIRAM); @@ -143,7 +129,10 @@ void app_main(void) { xTaskCreatePinnedToCore(peer_connection_task, "peer_connection", 8192, NULL, 5, &xPcTaskHandle, 1); ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); - ESP_LOGI(TAG, "open https://sepfy.github.io/webrtc?deviceId=%s", deviceid); + printf("============= Configuration =============\n"); + printf(" %-5s : %s\n", "URL", CONFIG_SIGNALING_URL); + printf(" %-5s : %s\n", "Token", CONFIG_SIGNALING_TOKEN); + printf("=========================================\n"); while (1) { peer_signaling_loop(); diff --git a/src/peer_signaling.c b/src/peer_signaling.c index 12cbd26..b01b12d 100644 --- a/src/peer_signaling.c +++ b/src/peer_signaling.c @@ -18,8 +18,10 @@ #define KEEP_ALIVE_TIMEOUT_SECONDS 60 #define CONNACK_RECV_TIMEOUT_MS 1000 -#define URL_MAX_LEN 256 -#define TOPIC_MAX_LEN 128 +#define PATH_MAX_LEN 128 +#define HOST_MAX_LEN 64 +#define URL_MAX_LEN (PATH_MAX_LEN + HOST_MAX_LEN + 8) +#define TOPIC_MAX_LEN URL_MAX_LEN #define TOKEN_MAX_LEN 256 #define RPC_VERSION "2.0" @@ -50,10 +52,10 @@ typedef struct PeerSignaling { uint16_t packet_id; int id; - int proto; // 0: MQTT, 1: HTTP + int proto; // 0: MQTT, 1: HTTP int port; - char host[URL_MAX_LEN]; - char path[URL_MAX_LEN]; + char host[HOST_MAX_LEN]; + char path[PATH_MAX_LEN]; char token[TOKEN_MAX_LEN]; char client_id[32]; @@ -76,7 +78,7 @@ static int peer_signaling_resolve_token(const char* token, char* username, char* if (colon == NULL) { LOGW("Invalid token: %s", token); return -1; - } + } strncpy(username, plaintext, colon - plaintext); strncpy(password, colon + 1, strlen(colon + 1)); @@ -84,9 +86,8 @@ static int peer_signaling_resolve_token(const char* token, char* username, char* return 0; } -static int peer_signaling_resolve_url(const char* url, char* host, int *port, char* path) { - - char* port_start, *path_start; +static int peer_signaling_resolve_url(const char* url, char* host, int* port, char* path) { + char *port_start, *path_start; int proto = 0; if (url == NULL || strlen(url) == 0) { @@ -354,7 +355,9 @@ static void peer_signaling_mqtt_event_cb(MQTTContext_t* mqtt_ctx, MQTTStatus_t status = MQTTSuccess; switch (packet_info->type) { case MQTT_PACKET_TYPE_PUBLISH: - LOGD("MQTT received message: %s"); + LOGD("MQTT received message: %.*s", + deserialized_info->pPublishInfo->payloadLength, + (char*)deserialized_info->pPublishInfo->pPayload); peer_signaling_on_pub_event(deserialized_info->pPublishInfo->pPayload, deserialized_info->pPublishInfo->payloadLength); break; @@ -371,8 +374,8 @@ static void peer_signaling_mqtt_event_cb(MQTTContext_t* mqtt_ctx, for (i = 0; i < ncodes; i++) { if (codes[0] == MQTTSubAckFailure) { LOGE("MQTT Subscription failed. Please check authorization"); - break; - } + break; + } } if (i == ncodes) { @@ -486,8 +489,8 @@ static void peer_signaling_onicecandidate(char* description, void* userdata) { cJSON_Delete(res); g_ps.id = 0; } else { - if (g_ps.token != NULL && strlen(g_ps.token) > 0) { - char cred[TOKEN_MAX_LEN]; + if (strlen(g_ps.token) > 0) { + char cred[TOKEN_MAX_LEN + 10]; memset(cred, 0, sizeof(cred)); snprintf(cred, sizeof(cred), "Basic %s", g_ps.token); peer_signaling_http_post(g_ps.host, g_ps.path, g_ps.port, cred, description); @@ -498,14 +501,13 @@ static void peer_signaling_onicecandidate(char* description, void* userdata) { } int peer_signaling_connect(const char* url, const char* token, PeerConnection* pc) { - char* client_id; if ((g_ps.proto = peer_signaling_resolve_url(url, g_ps.host, &g_ps.port, g_ps.path)) < 0) { LOGE("Resolve URL failed"); } - if (token) { + if (token && strlen(token) > 0) { strncpy(g_ps.token, token, sizeof(g_ps.token)); } @@ -513,7 +515,7 @@ int peer_signaling_connect(const char* url, const char* token, PeerConnection* p peer_connection_onicecandidate(g_ps.pc, peer_signaling_onicecandidate); switch (g_ps.proto) { - case 0: { // MQTT + case 0: { // MQTT client_id = strrchr(g_ps.path, '/'); snprintf(g_ps.client_id, sizeof(g_ps.client_id), "%s", client_id + 1); snprintf(g_ps.subtopic, sizeof(g_ps.subtopic), "%s/invoke", g_ps.path); @@ -522,12 +524,11 @@ int peer_signaling_connect(const char* url, const char* token, PeerConnection* p peer_signaling_mqtt_subscribe(1); } } break; - case 1: { // HTTP + case 1: { // HTTP peer_connection_create_offer(g_ps.pc); } break; default: { } break; - } return 0; @@ -545,7 +546,6 @@ void peer_signaling_disconnect() { LOGI("Disconnected"); } - int peer_signaling_loop() { MQTT_ProcessLoop(&g_ps.mqtt_ctx); return 0; diff --git a/src/peer_signaling.h b/src/peer_signaling.h index ab61c8a..e73c0a3 100644 --- a/src/peer_signaling.h +++ b/src/peer_signaling.h @@ -9,45 +9,10 @@ extern "C" { #ifndef DISABLE_PEER_SIGNALING -typedef struct ServiceConfiguration { - const char* mqtt_url; - int mqtt_port; - const char* client_id; - const char* http_url; - int http_port; - const char* username; - const char* password; - PeerConnection* pc; -} ServiceConfiguration; - -#define SERVICE_CONFIG_DEFAULT() \ - { \ - .mqtt_url = "libpeer.com", \ - .mqtt_port = 8883, \ - .client_id = "peer", \ - .http_url = "", \ - .http_port = 443, \ - .username = "", \ - .password = "", \ - .pc = NULL \ - } - -#define DEFAULT_PEER_SIGNALING "mqtts://libpeer.com/public" - int peer_signaling_connect(const char* url, const char* token, PeerConnection* pc); void peer_signaling_disconnect(); -void peer_signaling_set_config(ServiceConfiguration* config); - -int peer_signaling_whip_connect(); - -void peer_signaling_whip_disconnect(); - -int peer_signaling_join_channel(); - -void peer_signaling_leave_channel(); - int peer_signaling_loop(); #endif // DISABLE_PEER_SIGNALING diff --git a/src/sctp.c b/src/sctp.c index a30c364..e06bf36 100644 --- a/src/sctp.c +++ b/src/sctp.c @@ -306,7 +306,7 @@ void sctp_incoming_data(Sctp* sctp, char* buf, size_t len) { case SCTP_INIT_ACK: { SctpInitChunk* init_ack = (SctpInitChunk*)in_packet->chunks; SctpCookieEchoChunk* cookie_echo = (SctpCookieEchoChunk*)out_packet->chunks; - SctpChunkParam* param; + SctpChunkParam* param = NULL; sctp->verification_tag = init_ack->initiate_tag; int type; // find cookie diff --git a/src/stun.c b/src/stun.c index d32f6cd..0b17202 100644 --- a/src/stun.c +++ b/src/stun.c @@ -87,7 +87,7 @@ int stun_set_mapped_address(char* value, uint8_t* mask, Address* addr) { LOGD("XOR Mapped Address Family: %d", *family); LOGD("XOR Mapped Address Port: %d (Port XOR: %04x)", addr->port, *port); - LOGD("XOR Mapped Address IP: %s (IP XOR: %08x)", addr_string, *addr32); + LOGD("XOR Mapped Address IP: %s (IP XOR: %08" PRIu32 ")", addr_string, *addr32); return ret; } @@ -119,7 +119,7 @@ void stun_get_mapped_address(char* value, uint8_t* mask, Address* addr) { LOGD("XOR Mapped Address Family: %d", family); LOGD("XOR Mapped Address Port: %d (Port XOR: %04x)", addr->port, port); - LOGD("XOR Mapped Address IP: %s (IP XOR: %08x)", addr_string, *addr32); + LOGD("XOR Mapped Address IP: %s (IP XOR: %08" PRIu32 ")", addr_string, *addr32); } void stun_parse_msg_buf(StunMessage* msg) { From 9135cde2a444464619c9e5f9fc29b218a904c20c Mon Sep 17 00:00:00 2001 From: sepfy Date: Mon, 16 Dec 2024 21:20:08 +0800 Subject: [PATCH 7/8] Fix incorrect rtp package header for esp32 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4933e1f..69b7fe7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,7 @@ if (DEFINED ENV{IDF_PATH}) INCLUDE_DIRS "./src" ${HTTP_INCLUDE_PUBLIC_DIRS} ${MQTT_INCLUDE_PUBLIC_DIRS} REQUIRES mbedtls srtp json esp_netif ) - add_definitions("-DCONFIG_USE_LWIP=1" "-DCONFIG_USE_USRSCTP=0" "-DCONFIG_AUDIO_BUFFER_SIZE=8096" "-DCONFIG_DATA_BUFFER_SIZE=102400" "-DHTTP_DO_NOT_USE_CUSTOM_CONFIG" "-DMQTT_DO_NOT_USE_CUSTOM_CONFIG") + add_definitions("-DCONFIG_USE_LWIP=1" "-DCONFIG_USE_USRSCTP=0" "-DCONFIG_AUDIO_BUFFER_SIZE=8096" "-DCONFIG_DATA_BUFFER_SIZE=102400" "-D__BYTE_ORDER=__LITTLE_ENDIAN" "-DHTTP_DO_NOT_USE_CUSTOM_CONFIG" "-DMQTT_DO_NOT_USE_CUSTOM_CONFIG") return() endif() From c52c9e20e6dd640c16e11a352bbcbaee4f6f3bda Mon Sep 17 00:00:00 2001 From: sepfy Date: Mon, 16 Dec 2024 21:30:25 +0800 Subject: [PATCH 8/8] Update README.md --- README.md | 6 ++++-- examples/generic/main.c | 16 +++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 9dd0bc9..0d815eb 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # libpeer - Portable WebRTC Library for IoT/Embedded Device ![build](https://github.com/sepfy/pear/actions/workflows/build.yml/badge.svg) -[![Discord](https://img.shields.io/discord/1265486725967253667?logo=discord)](https://discord.gg/AsPgabeS67) libpeer is a WebRTC implementation written in C, developed with BSD socket. The library aims to integrate IoT/Embedded device video/audio streaming with WebRTC, such as ESP32 and Raspberry Pi @@ -29,6 +28,8 @@ libpeer is a WebRTC implementation written in C, developed with BSD socket. The * [coreMQTT](https://github.com/FreeRTOS/coreMQTT) ### Getting Started with Generic Example +- Copy URL from the test [website](https://sepfy.github.io/libpeer) +- Build and run the example ```bash $ sudo apt -y install git cmake $ git clone --recursive https://github.com/sepfy/libpeer @@ -36,8 +37,9 @@ $ cd libpeer $ cmake -S . -B build && cmake --build build $ wget http://www.live555.com/liveMedia/public/264/test.264 # Download test video file $ wget https://mauvecloud.net/sounds/alaw08m.wav # Download test audio file -$ ./generic/sample/sample +$ ./examples/generic/sample -u ``` +- Click Connect button on the website ### Examples for Platforms - [ESP32](https://github.com/sepfy/libpeer/tree/main/examples/esp32): MJPEG over datachannel diff --git a/examples/generic/main.c b/examples/generic/main.c index ef2c88a..2e3e7ff 100644 --- a/examples/generic/main.c +++ b/examples/generic/main.c @@ -60,12 +60,11 @@ static uint64_t get_timestamp() { return tv.tv_sec * 1000 + tv.tv_usec / 1000; } -void print_usage(const char *prog_name) { +void print_usage(const char* prog_name) { printf("Usage: %s -u [-t ]\n", prog_name); } -void parse_arguments(int argc, char *argv[], const char **url, const char **token) { - +void parse_arguments(int argc, char* argv[], const char** url, const char** token) { *token = NULL; *url = NULL; @@ -75,8 +74,8 @@ void parse_arguments(int argc, char *argv[], const char **url, const char **toke } else if (strcmp(argv[i], "-t") == 0 && (i + 1) < argc) { *token = argv[++i]; } else { - print_usage(argv[0]); - exit(1); + print_usage(argv[0]); + exit(1); } } @@ -89,8 +88,8 @@ void parse_arguments(int argc, char *argv[], const char **url, const char **toke int main(int argc, char* argv[]) { uint64_t curr_time, video_time, audio_time; uint8_t buf[102400]; - const char *url = NULL; - const char *token = NULL; + const char* url = NULL; + const char* token = NULL; int size; pthread_t peer_singaling_thread; @@ -106,8 +105,7 @@ int main(int argc, char* argv[]) { }, .datachannel = DATA_CHANNEL_STRING, .video_codec = CODEC_H264, - .audio_codec = CODEC_PCMA - }; + .audio_codec = CODEC_PCMA}; printf("=========== Parsed Arguments ===========\n"); printf(" %-5s : %s\n", "URL", url);