diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index c6bc4a0069..c7b779556b 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -795,21 +795,13 @@ SrsGoApiSdp::~SrsGoApiSdp() srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { srs_error_t err = srs_success; - - SrsStatistic* stat = SrsStatistic::instance(); - - // path: {pattern}{stream_id} - // e.g. /api/v1/streams/100 pattern= /api/v1/streams/, stream_id=100 - int sid = r->parse_rest_id(entry->pattern); - - SrsStatisticStream* stream = NULL; - if (sid >= 0 && (stream = stat->find_stream(sid)) == NULL) { - return srs_api_response_code(w, r, ERROR_RTMP_STREAM_NOT_FOUND); - } + // path: {pattern} + // method: POST + // e.g. /api/v1/sdp/ args = json:{"sdp":"sdp...", "app":"webrtc", "stream":"test"} + string req_json; r->body_read_all(req_json); - srs_trace("req_json=%s", req_json.c_str()); SrsJsonAny* json = SrsJsonAny::loads(req_json); SrsJsonObject* req_obj = json->to_object(); @@ -826,18 +818,17 @@ srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* string app = app_obj->to_str(); string stream_name = stream_name_obj->to_str(); - srs_trace("remote_sdp_str=%s", remote_sdp_str.c_str()); - srs_trace("app=%s, stream=%s", app.c_str(), stream_name.c_str()); - SrsSdp remote_sdp; err = remote_sdp.decode(remote_sdp_str); if (err != srs_success) { return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest); } + SrsRequest request; + request.app = app; + request.stream = stream_name; SrsSdp local_sdp; - SrsRtcSession* rtc_session = rtc_server->create_rtc_session(remote_sdp, local_sdp); - rtc_session->set_app_stream(app, stream_name); + SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp); string local_sdp_str = ""; err = local_sdp.encode(local_sdp_str); @@ -849,22 +840,12 @@ srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* SrsAutoFree(SrsJsonObject, obj); obj->set("code", SrsJsonAny::integer(ERROR_SUCCESS)); - obj->set("server", SrsJsonAny::integer(stat->server_id())); - - // XXX: ice candidate - //string candidate_str = "candidate:1 1 udp 2115783679 192.168.170.129:8000 typ host generation 0 ufrag " - // + local_sdp.get_ice_ufrag() + "netwrok-cost 50"; - - //SrsJsonObject* candidate_obj = SrsJsonAny::object(); - //SrsAutoFree(SrsJsonObject, candidate_obj); + obj->set("server", SrsJsonAny::integer(SrsStatistic::instance()->server_id())); - //candidate_obj->set("candidate", SrsJsonAny::str(candidate_str.c_str())); - //candidate_obj->set("sdpMid", SrsJsonAny::str("0")); - //candidate_obj->set("sdpMLineIndex", SrsJsonAny::str("0")); + // TODO: add candidates in response json? if (r->is_http_post()) { obj->set("sdp", SrsJsonAny::str(local_sdp_str.c_str())); - // obj->set("candidate", candidate_obj); } else { return srs_go_http_error(w, SRS_CONSTS_HTTP_MethodNotAllowed); } diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index f743a056c6..4d73ebf8e2 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -37,7 +37,7 @@ #include // the max size of a line of log. -#define LOG_MAX_SIZE 4096000 +#define LOG_MAX_SIZE 4096 // the tail append to each log. #define LOG_TAIL '\n' diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 802e911c8c..62ed7bf03e 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -51,24 +51,24 @@ using namespace std; #include #include -static bool is_stun(const char* data, const int size) +static bool is_stun(const uint8_t* data, const int size) { return data != NULL && size > 0 && (data[0] == 0 || data[0] == 1); } -static bool is_dtls(const char* data, size_t len) +static bool is_dtls(const uint8_t* data, size_t len) { return (len >= 13 && (data[0] > 19 && data[0] < 64)); } -static bool is_rtp_or_rtcp(const char* data, size_t len) +static bool is_rtp_or_rtcp(const uint8_t* data, size_t len) { return (len >= 12 && (data[0] & 0xC0) == 0x80); } -static bool is_rtcp(const char* data, size_t len) +static bool is_rtcp(const uint8_t* data, size_t len) { - return (len >=12) && (data[0] & 0x80) && (data[1] >= 200 && data[1] <= 209); + return (len >= 12) && (data[0] & 0x80) && (data[1] >= 200 && data[1] <= 209); } static string gen_random_str(int len) @@ -141,7 +141,7 @@ srs_error_t SrsSdp::decode(const string& sdp_str) string line; istringstream is(sdp_str); while (getline(is, line)) { - srs_trace("line=%s", line.c_str()); + srs_verbose("line=%s", line.c_str()); if (line.size() < 2 || line[1] != '=') { return srs_error_wrap(err, "invalid sdp line=%s", line.c_str()); @@ -267,7 +267,7 @@ srs_error_t SrsSdp::parse_attr(const string& line) } } - srs_trace("sdp attribute key=%s, val=%s", key.c_str(), val.c_str()); + srs_verbose("sdp attribute key=%s, val=%s", key.c_str(), val.c_str()); if (key == "ice-ufrag") { ice_ufrag = val; @@ -345,7 +345,6 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt) } if (out_bio_len) { - srs_trace("send dtls handshake data"); udp_mux_skt->sendto(out_bio_data, out_bio_len, 0); } @@ -405,7 +404,7 @@ srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int void SrsDtlsSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt) { if (dtls == NULL) { - srs_trace("send client hello"); + srs_verbose("send client hello"); dtls = SSL_new(SrsDtls::instance()->get_dtls_ctx()); SSL_set_connect_state(dtls); @@ -631,21 +630,17 @@ srs_error_t SrsRtcSenderThread::cycle() srs_error_t err = srs_success; SrsSource* source = NULL; - SrsRequest req; - req.app = rtc_session->app; - req.stream = rtc_session->stream; - if (_srs_sources->fetch_or_create(&req, rtc_session->server, &source) != srs_success) { - srs_error("rtc fetch source failed"); + if (_srs_sources->fetch_or_create(&rtc_session->request, rtc_session->server, &source) != srs_success) { return srs_error_wrap(err, "rtc fetch source failed"); } - srs_trace("rtc fetch source success, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + srs_trace("source url=%s, source_id=%d[%d]", + rtc_session->request.get_stream_url().c_str(), source->source_id(), source->source_id()); SrsConsumer* consumer = NULL; if (source->create_consumer(NULL, consumer) != srs_success) { - srs_trace("rtc create consumer, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); - return srs_error_wrap(err, "rtc create consumer, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + return srs_error_wrap(err, "rtc create consumer, source url=%s", rtc_session->request.get_stream_url().c_str()); } SrsAutoFree(SrsConsumer, consumer); @@ -689,6 +684,7 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int int nb_protected_buf = msg->rtp_packets[i]->size; rtc_session->dtls_session->protect_rtp(protected_buf, msg->rtp_packets[i]->payload, nb_protected_buf); + // TODO: use sendmmsg to send multi packet one system call udp_mux_skt->sendto(protected_buf, nb_protected_buf, 0); } } @@ -697,7 +693,7 @@ void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int } } -SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const string& un) +SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un) { server = svr; rtc_server = rtc_svr; @@ -708,6 +704,9 @@ SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const string username = un; last_stun_time = srs_get_system_time(); + + request = req; + source = NULL; } SrsRtcSession::~SrsRtcSession() @@ -735,6 +734,13 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* return err; } +void SrsRtcSession::check_source() +{ + if (source == NULL) { + _srs_sources->fetch_or_create(&request, server, &source); + } +} + srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req) { srs_error_t err = srs_success; @@ -771,6 +777,210 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS return err; } +srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + if (nb_buf < 12) { + return srs_error_wrap(err, "invalid rtp feedback packet, nb_buf=%d", nb_buf); + } + + SrsBuffer* stream = new SrsBuffer(buf, nb_buf); + SrsAutoFree(SrsBuffer, stream); + + // @see: https://tools.ietf.org/html/rfc4585#section-6.1 + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P| FMT | PT | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of packet sender | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of media source | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + : Feedback Control Information (FCI) : + : : + */ + uint8_t first = stream->read_1bytes(); + uint8_t version = first & 0xC0; + uint8_t padding = first & 0x20; + uint8_t fmt = first & 0x1F; + + uint8_t payload_type = stream->read_1bytes(); + uint16_t length = stream->read_2bytes(); + uint32_t ssrc_of_sender = stream->read_4bytes(); + uint32_t ssrc_of_media_source = stream->read_4bytes(); + + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | PID | BLP | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + + uint16_t pid = stream->read_2bytes(); + int blp = stream->read_2bytes(); + + srs_verbose("pid=%u, blp=%d", pid, blp); + + check_source(); + if (! source) { + return srs_error_wrap(err, "can not found source"); + } + + vector resend_pkts; + SrsRtpSharedPacket* pkt = source->find_rtp_packet(pid); + if (pkt) { + resend_pkts.push_back(pkt); + } + + uint16_t mask = 0x01; + for (int i = 0; i < 16 && blp; ++i, mask <<= 1) { + if (! (blp & mask)) { + continue; + } + + uint32_t loss_seq = pid + i; + + SrsRtpSharedPacket* pkt = source->find_rtp_packet(loss_seq); + if (! pkt) { + continue; + } + + resend_pkts.push_back(pkt); + } + + for (int i = 0; i < resend_pkts.size(); ++i) { + if (dtls_session) { + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = resend_pkts[i]->size; + + srs_verbose("resend pkt sequence=%u", resend_pkts[i]->sequence); + + dtls_session->protect_rtp(protected_buf, resend_pkts[i]->payload, nb_protected_buf); + udp_mux_skt->sendto(protected_buf, nb_protected_buf, 0); + } + } + + return err; +} + +srs_error_t SrsRtcSession::on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + if (nb_buf < 12) { + return srs_error_wrap(err, "invalid rtp feedback packet, nb_buf=%d", nb_buf); + } + + SrsBuffer* stream = new SrsBuffer(buf, nb_buf); + SrsAutoFree(SrsBuffer, stream); + + uint8_t first = stream->read_1bytes(); + uint8_t version = first & 0xC0; + uint8_t padding = first & 0x20; + uint8_t fmt = first & 0x1F; + + uint8_t payload_type = stream->read_1bytes(); + uint16_t length = stream->read_2bytes(); + uint32_t ssrc_of_sender = stream->read_4bytes(); + uint32_t ssrc_of_media_source = stream->read_4bytes(); + + switch (fmt) { + case kPLI: { + srs_verbose("pli"); + break; + } + case kSLI: { + srs_verbose("sli"); + break; + } + case kRPSI: { + srs_verbose("rpsi"); + break; + } + case kAFB: { + srs_verbose("afb"); + break; + } + default: { + return srs_error_wrap(err, "unknown payload specific feedback=%u", fmt); + } + } + + return err; +} + +srs_error_t SrsRtcSession::on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + if (nb_buf < 8) { + return srs_error_wrap(err, "invalid rtp receiver report packet, nb_buf=%d", nb_buf); + } + + SrsBuffer* stream = new SrsBuffer(buf, nb_buf); + SrsAutoFree(SrsBuffer, stream); + + // @see: https://tools.ietf.org/html/rfc3550#section-6.4.2 + /* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +header |V=2|P| RC | PT=RR=201 | length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SSRC of packet sender | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +report | SSRC_1 (SSRC of first source) | +block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 1 | fraction lost | cumulative number of packets lost | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | extended highest sequence number received | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | interarrival jitter | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | last SR (LSR) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | delay since last SR (DLSR) | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ +report | SSRC_2 (SSRC of second source) | +block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 2 : ... : + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | profile-specific extensions | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + uint8_t first = stream->read_1bytes(); + uint8_t version = first & 0xC0; + uint8_t padding = first & 0x20; + uint8_t rc = first & 0x1F; + + uint8_t payload_type = stream->read_1bytes(); + uint16_t length = stream->read_2bytes(); + uint32_t ssrc_of_sender = stream->read_4bytes(); + + if (((length + 1) * 4) != (rc * 24 + 8)) { + return srs_error_wrap(err, "invalid rtcp receiver packet, length=%u, rc=%u", length, rc); + } + + for (int i = 0; i < rc; ++i) { + uint32_t ssrc = stream->read_4bytes(); + uint8_t fraction_lost = stream->read_1bytes(); + uint32_t cumulative_number_of_packets_lost = stream->read_3bytes(); + uint32_t highest_seq = stream->read_4bytes(); + uint32_t jitter = stream->read_4bytes(); + uint32_t lst = stream->read_4bytes(); + uint32_t dlsr = stream->read_4bytes(); + + srs_verbose("ssrc=%u, fraction_lost=%u, cumulative_number_of_packets_lost=%u, highest_seq=%u, jitter=%u, lst=%u, dlst=%u", + ssrc, fraction_lost, cumulative_number_of_packets_lost, highest_seq, jitter, lst, dlsr); + } + + return err; +} + srs_error_t SrsRtcSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt) { if (dtls_session == NULL) { @@ -854,6 +1064,7 @@ srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* udp_mux_skt) srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; + if (dtls_session == NULL) { return srs_error_wrap(err, "recv unexpect rtcp packet before dtls done"); } @@ -864,19 +1075,57 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) return srs_error_wrap(err, "rtcp unprotect failed"); } - // FIXME: use SrsRtpPacket - SrsBuffer* stream = new SrsBuffer(unprotected_buf, nb_unprotected_buf); - SrsAutoFree(SrsBuffer, stream); - uint8_t first = stream->read_1bytes(); - uint8_t payload_type = stream->read_1bytes(); + char* ph = unprotected_buf; + int nb_left = nb_unprotected_buf; + while (nb_left) { + uint8_t payload_type = ph[1]; + uint16_t length_4bytes = (((uint16_t)ph[2]) << 8) | ph[3]; - if (payload_type == kSR) { - } else if (payload_type == kRR) { - } else if (kSDES) { - } else if (kBye) { - } else if (kApp) { - } else { - return srs_error_wrap(err, "unknown rtcp type=%u", payload_type); + int length = (length_4bytes + 1) * 4; + + if (length > nb_unprotected_buf) { + return srs_error_wrap(err, "invalid rtcp packet, length=%u", length); + } + + srs_verbose("on rtcp, payload_type=%u", payload_type); + + switch (payload_type) { + case kSR: { + break; + } + case kRR: { + err = on_rtcp_receiver_report(ph, length, udp_mux_skt); + break; + } + case kSDES: { + break; + } + case kBye: { + break; + } + case kApp: { + break; + } + case kRtpFb: { + err = on_rtcp_feedback(ph, length, udp_mux_skt); + break; + } + case kPsFb: { + err = on_rtcp_ps_feedback(ph, length, udp_mux_skt); + break; + } + default:{ + return srs_error_wrap(err, "unknown rtcp type=%u", payload_type); + break; + } + } + + if (err != srs_success) { + return err; + } + + ph += length; + nb_left -= length; } return err; @@ -909,18 +1158,18 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - if (is_stun(udp_mux_skt->data(), udp_mux_skt->size())) { + if (is_stun(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { return on_stun(udp_mux_skt); - } else if (is_dtls(udp_mux_skt->data(), udp_mux_skt->size())) { + } else if (is_dtls(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { return on_dtls(udp_mux_skt); - } else if (is_rtp_or_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) { + } else if (is_rtp_or_rtcp(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { return on_rtp_or_rtcp(udp_mux_skt); } return srs_error_wrap(err, "unknown udp packet type"); } -SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp) +SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp) { std::string local_pwd = gen_random_str(32); std::string local_ufrag = ""; @@ -933,7 +1182,7 @@ SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp break; } - SrsRtcSession* session = new SrsRtcSession(server, this, username); + SrsRtcSession* session = new SrsRtcSession(server, this, req, username); map_username_session.insert(make_pair(username, session)); local_sdp.set_ice_ufrag(local_ufrag); @@ -961,7 +1210,7 @@ srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - srs_trace("recv stun packet from %s", udp_mux_skt->get_peer_id().c_str()); + srs_verbose("recv stun packet from %s", udp_mux_skt->get_peer_id().c_str()); SrsStunPacket stun_req; if (stun_req.decode(udp_mux_skt->data(), udp_mux_skt->size()) != srs_success) { @@ -980,7 +1229,6 @@ srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt) srs_error_t SrsRtcServer::on_dtls(SrsUdpMuxSocket* udp_mux_skt) { srs_error_t err = srs_success; - srs_trace("on dtls"); SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id()); @@ -1003,7 +1251,7 @@ srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt) return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str()); } - if (is_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) { + if (is_rtcp(reinterpret_cast(udp_mux_skt->data()), udp_mux_skt->size())) { rtc_session->on_rtcp(udp_mux_skt); } else { rtc_session->on_rtp(udp_mux_skt); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index fc67ba4311..d0db07e36f 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -43,12 +44,23 @@ class SrsStunPacket; class SrsRtcServer; class SrsRtcSession; class SrsSharedPtrMessage; +class SrsSource; -const uint8_t kSR = 200; -const uint8_t kRR = 201; +const uint8_t kSR = 200; +const uint8_t kRR = 201; const uint8_t kSDES = 202; -const uint8_t kBye = 203; -const uint8_t kApp = 204; +const uint8_t kBye = 203; +const uint8_t kApp = 204; + +// @see: https://tools.ietf.org/html/rfc4585#section-6.1 +const uint8_t kRtpFb = 205; +const uint8_t kPsFb = 206; + +// @see: https://tools.ietf.org/html/rfc4585#section-6.3 +const uint8_t kPLI = 1; +const uint8_t kSLI = 2; +const uint8_t kRPSI = 3; +const uint8_t kAFB = 15; const srs_utime_t kSrsRtcSessionStunTimeoutUs = 10*1000*1000LL; @@ -153,8 +165,6 @@ class SrsRtcSenderThread : public ISrsCoroutineHandler SrsRtcSession* rtc_session; SrsUdpMuxSocket ukt; public: - // Constructor. - // @param tm The receive timeout in srs_utime_t. SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid); virtual ~SrsRtcSenderThread(); public: @@ -184,10 +194,10 @@ class SrsRtcSession std::string peer_id; srs_utime_t last_stun_time; public: - std::string app; - std::string stream; + SrsRequest request; + SrsSource* source; public: - SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const std::string& un); + SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un); virtual ~SrsRtcSession(); public: SrsSdp* get_local_sdp() { return &local_sdp; } @@ -201,8 +211,6 @@ class SrsRtcSession std::string id() const { return peer_id + "_" + username; } - void set_app_stream(const std::string& a, const std::string& s) { app = a; stream = s; } - std::string get_peer_id() const { return peer_id; } void set_peer_id(const std::string& id) { peer_id = id; } public: @@ -216,10 +224,14 @@ class SrsRtcSession srs_error_t start_play(SrsUdpMuxSocket* udp_mux_skt); public: bool is_stun_timeout() { return last_stun_time + kSrsRtcSessionStunTimeoutUs < srs_get_system_time(); } +private: + void check_source(); private: srs_error_t on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req); private: - srs_error_t do_playing(SrsConsumer* consumer, SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtcp_ps_feedback(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtcp_receiver_report(char* buf, int nb_buf, SrsUdpMuxSocket* udp_mux_skt); }; // XXX: is there any other timer thread? @@ -231,8 +243,6 @@ class SrsRtcTimerThread : public ISrsCoroutineHandler private: SrsRtcServer* rtc_server; public: - // Constructor. - // @param tm The receive timeout in srs_utime_t. SrsRtcTimerThread(SrsRtcServer* rtc_svr, int parent_cid); virtual ~SrsRtcTimerThread(); public: @@ -261,7 +271,7 @@ class SrsRtcServer : public ISrsUdpMuxHandler virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt); - SrsRtcSession* create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp); + SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp); bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); void check_and_clean_timeout_session(); private: diff --git a/trunk/src/app/srs_app_rtp.cpp b/trunk/src/app/srs_app_rtp.cpp index ee88f419d1..0101e30c52 100644 --- a/trunk/src/app/srs_app_rtp.cpp +++ b/trunk/src/app/srs_app_rtp.cpp @@ -77,8 +77,8 @@ srs_error_t SrsRtpMuxer::frame_to_packet(SrsSharedPtrMessage* shared_frame, SrsF uint8_t header = sample.bytes[0]; uint8_t nal_type = header & kNalTypeMask; - if (nal_type == 0x06) { - srs_trace("ignore SEI"); + // ignore SEI nal + if (nal_type == 0x06 || nal_type == 0x09) { continue; } @@ -87,13 +87,6 @@ srs_error_t SrsRtpMuxer::frame_to_packet(SrsSharedPtrMessage* shared_frame, SrsF } else { packet_fu_a(shared_frame, format, &sample, rtp_packet_vec); } - -#if 0 - srs_trace("nal size=%d, nal=%s", sample.size, dump_string_hex(sample.bytes, sample.size, sample.size).c_str()); - for (int i = 0; i < shared_frame->nb_rtp_fragments; ++i) { - srs_trace("rtp=%s", dump_string_hex(shared_frame->rtp_fragments[i].bytes, shared_frame->rtp_fragments[i].size, kRtpPacketSize).c_str()); - } -#endif } shared_frame->set_rtp_packets(rtp_packet_vec); diff --git a/trunk/src/app/srs_app_rtp.hpp b/trunk/src/app/srs_app_rtp.hpp index 2a23440cd5..badfea4435 100644 --- a/trunk/src/app/srs_app_rtp.hpp +++ b/trunk/src/app/srs_app_rtp.hpp @@ -52,6 +52,7 @@ const uint8_t kFuA = 28; const uint8_t kStart = 0x80; const uint8_t kEnd = 0x40; +// FIXME: ssrc can relate to source const uint32_t kVideoSSRC = 3233846889; class SrsRtpMuxer diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index a7a3297bcb..3c3c07c67d 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -31,6 +31,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -326,6 +327,7 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p return err; } + srs_error_t SrsMessageQueue::dump_packets(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag) { srs_error_t err = srs_success; @@ -815,6 +817,55 @@ SrsSharedPtrMessage* SrsMixQueue::pop() return msg; } +SrsRtpPacketQueue::SrsRtpPacketQueue() +{ +} + +SrsRtpPacketQueue::~SrsRtpPacketQueue() +{ + clear(); +} + +void SrsRtpPacketQueue::clear() +{ + map::iterator iter = pkt_queue.begin(); + while (iter != pkt_queue.end()) { + srs_freep(iter->second); + pkt_queue.erase(iter++); + } +} + +void SrsRtpPacketQueue::push(std::vector& pkts) +{ + for (int i = 0; i < pkts.size(); ++i) { + insert(pkts[i]->sequence, pkts[i]); + } +} + +void SrsRtpPacketQueue::insert(const uint16_t& sequence, SrsRtpSharedPacket* pkt) +{ + pkt_queue.insert(make_pair(sequence, pkt->copy())); + if (pkt_queue.size() >= 3000) { + srs_freep(pkt_queue.begin()->second); + pkt_queue.erase(pkt_queue.begin()); + } +} + +SrsRtpSharedPacket* SrsRtpPacketQueue::find(const uint16_t& sequence) +{ + if (pkt_queue.empty()) { + return NULL; + } + + SrsRtpSharedPacket* pkt = NULL; + map::iterator iter = pkt_queue.find(sequence); + if (iter != pkt_queue.end()) { + pkt = iter->second->copy(); + } + + return pkt; +} + SrsOriginHub::SrsOriginHub() { source = NULL; @@ -1076,6 +1127,8 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se srs_error_reset(err); rtp->on_unpublish(); } + + source->rtp_queue->push(msg->rtp_packets); if ((err = hls->on_video(msg, format)) != srs_success) { // apply the error strategy for hls. @@ -1728,7 +1781,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* SrsSource* source = NULL; if ((source = fetch(r)) != NULL) { - srs_trace("found source"); *pps = source; return err; } @@ -1848,6 +1900,7 @@ SrsSource::SrsSource() jitter_algorithm = SrsRtmpJitterAlgorithmOFF; mix_correct = false; mix_queue = new SrsMixQueue(); + rtp_queue = new SrsRtpPacketQueue(); _can_publish = true; _pre_source_id = _source_id = -1; @@ -1877,6 +1930,7 @@ SrsSource::~SrsSource() srs_freep(hub); srs_freep(meta); srs_freep(mix_queue); + srs_freep(rtp_queue); srs_freep(play_edge); srs_freep(publish_edge); @@ -2634,3 +2688,7 @@ string SrsSource::get_curr_origin() return play_edge->get_curr_origin(); } +SrsRtpSharedPacket* SrsSource::find_rtp_packet(const uint16_t& seq) +{ + return rtp_queue->find(seq); +} diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index d28c37913c..8141195562 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -62,6 +62,7 @@ class SrsBuffer; #ifdef SRS_AUTO_HDS class SrsHds; #endif +class SrsRtpSharedPacket; // The time jitter algorithm: // 1. full, to ensure stream start at zero, and ensure stream monotonically increasing. @@ -324,6 +325,28 @@ class SrsMixQueue virtual SrsSharedPtrMessage* pop(); }; +class SrsRtpPacketQueue +{ +private: + struct SeqComp + { + bool operator()(const uint16_t& l, const uint16_t& r) const + { + return ((int16_t)(r - l)) > 0; + } + }; +private: + std::map pkt_queue; +public: + SrsRtpPacketQueue(); + virtual ~SrsRtpPacketQueue(); +public: + void clear(); + void push(std::vector& pkts); + void insert(const uint16_t& sequence, SrsRtpSharedPacket* pkt); + SrsRtpSharedPacket* find(const uint16_t& sequence); +}; + // The hub for origin is a collection of utilities for origin only, // For example, DVR, HLS, Forward and Transcode are only available for origin, // they are meanless for edge server. @@ -510,6 +533,8 @@ class SrsSource : public ISrsReloadHandler bool mix_correct; // The mix queue to implements the mix correct algorithm. SrsMixQueue* mix_queue; + // rtp packet queue + SrsRtpPacketQueue* rtp_queue; // For play, whether enabled atc. // The atc(use absolute time and donot adjust time), // directly use msg time and donot adjust if atc is true, @@ -598,6 +623,9 @@ class SrsSource : public ISrsReloadHandler virtual void on_edge_proxy_unpublish(); public: virtual std::string get_curr_origin(); +public: + // Find rtp packet by sequence + SrsRtpSharedPacket* find_rtp_packet(const uint16_t& seq); }; #endif diff --git a/trunk/src/protocol/srs_stun_stack.cpp b/trunk/src/protocol/srs_stun_stack.cpp index a057715d6f..ae0f43dffe 100644 --- a/trunk/src/protocol/srs_stun_stack.cpp +++ b/trunk/src/protocol/srs_stun_stack.cpp @@ -12,23 +12,6 @@ using namespace std; #include #include -static string dump_string_hex(const std::string& str, const int& max_len = 128) -{ - char buf[1024*16]; - int len = 0; - - for (int i = 0; i < str.size() && i < max_len; ++i) { - int nb = snprintf(buf + len, sizeof(buf) - len - 1, "%02X ", (uint8_t)str[i]); - if (nb <= 0) - break; - - len += nb; - } - buf[len] = '\0'; - - return string(buf, len); -} - static srs_error_t hmac_encode(const std::string& algo, const char* key, const int& key_length, const char* input, const int input_length, char* output, unsigned int& output_length) { @@ -95,16 +78,11 @@ srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf) return srs_error_wrap(err, "invalid stun packet, size=%d", stream->size()); } - srs_trace("stun packet, nb_buf=%d", nb_buf); - message_type = stream->read_2bytes(); uint16_t message_len = stream->read_2bytes(); string magic_cookie = stream->read_string(4); transcation_id = stream->read_string(12); - srs_trace("message_type=%u, message_len=%u, magic_cookie=%s, transcation_id=%s", - message_type, message_len, magic_cookie.c_str(), transcation_id.c_str()); - if (nb_buf != 20 + message_len) { return srs_error_wrap(err, "invalid stun packet, message_len=%d, nb_buf=%d", message_len, nb_buf); } @@ -113,8 +91,6 @@ srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf) uint16_t type = stream->read_2bytes(); uint16_t len = stream->read_2bytes(); - srs_trace("type=%u, len=%u", type, len); - if (stream->left() < len) { return srs_error_wrap(err, "invalid stun packet"); } @@ -124,17 +100,15 @@ srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf) if (len % 4 != 0) { stream->read_string(4 - (len % 4)); } - //srs_trace("val=%s", val.c_str()); switch (type) { - // FIXME: enum - case 6: { + case Username: { username = val; size_t p = val.find(":"); if (p != string::npos) { local_ufrag = val.substr(0, p); remote_ufrag = val.substr(p + 1); - srs_trace("stun packet local_ufrag=%s, remote_ufrag=%s", local_ufrag.c_str(), remote_ufrag.c_str()); + srs_verbose("stun packet local_ufrag=%s, remote_ufrag=%s", local_ufrag.c_str(), remote_ufrag.c_str()); } break; } @@ -168,7 +142,7 @@ srs_error_t SrsStunPacket::encode_binding_response(const string& pwd, SrsBuffer* stream->write_2bytes(BindingResponse); stream->write_2bytes(property_username.size() + mapped_address.size()); - stream->write_4bytes(0x2112A442); + stream->write_4bytes(kStunMagicCookie); stream->write_string(transcation_id); stream->write_string(property_username); stream->write_string(mapped_address); @@ -226,22 +200,12 @@ string SrsStunPacket::encode_mapped_address() SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); SrsAutoFree(SrsBuffer, stream); - uint32_t magic_cookie = 0x2112A442; -#if 1 stream->write_2bytes(XorMappedAddress); stream->write_2bytes(8); stream->write_1bytes(0); // ignore this bytes stream->write_1bytes(1); // ipv4 family - stream->write_2bytes(mapped_port ^ (magic_cookie >> 16)); - stream->write_4bytes(mapped_address ^ magic_cookie); -#else - stream->write_2bytes(MappedAddress); - stream->write_2bytes(8); - stream->write_1bytes(0); // ignore this bytes - stream->write_1bytes(1); // ipv4 family - stream->write_2bytes(mapped_port); - stream->write_4bytes(mapped_address); -#endif + stream->write_2bytes(mapped_port ^ (kStunMagicCookie >> 16)); + stream->write_4bytes(mapped_address ^ kStunMagicCookie); return string(stream->data(), stream->pos()); } diff --git a/trunk/src/protocol/srs_stun_stack.hpp b/trunk/src/protocol/srs_stun_stack.hpp index aa2bc21925..db74c90b3e 100644 --- a/trunk/src/protocol/srs_stun_stack.hpp +++ b/trunk/src/protocol/srs_stun_stack.hpp @@ -31,6 +31,10 @@ class SrsBuffer; +// @see: https://tools.ietf.org/html/rfc5389 +// The magic cookie field MUST contain the fixed value 0x2112A442 in network byte order +const uint32_t kStunMagicCookie = 0x2112A442; + enum SrsStunMessageType { // see @ https://tools.ietf.org/html/rfc3489#section-11.1