Skip to content

Commit

Permalink
linkedingo/realtime: run heartbeats loop alongside realtime connectio…
Browse files Browse the repository at this point in the history
…n loop

Signed-off-by: Sumner Evans <[email protected]>
  • Loading branch information
sumnerevans committed Feb 7, 2025
1 parent f577380 commit ecbcb5d
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 83 deletions.
30 changes: 21 additions & 9 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,21 @@ func NewLinkedInClient(ctx context.Context, lc *LinkedInConnector, login *bridge
userID: userID,
userLogin: login,
}
client.client = linkedingo.NewClient(ctx, login.Metadata.(*UserLoginMetadata).Cookies, linkedingo.Handlers{
Heartbeat: func(ctx context.Context) {
login.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
client.client = linkedingo.NewClient(
ctx,
login.Metadata.(*UserLoginMetadata).EntityURN,
login.Metadata.(*UserLoginMetadata).Cookies,
linkedingo.Handlers{
Heartbeat: func(ctx context.Context) {
login.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
},
ClientConnection: func(context.Context, *types.ClientConnection) {
login.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
},
RealtimeConnectError: client.onRealtimeConnectError,
DecoratedEvent: client.onDecoratedEvent,
},
ClientConnection: func(context.Context, *types.ClientConnection) {
login.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
},
RealtimeConnectError: client.onRealtimeConnectError,
DecoratedEvent: client.onDecoratedEvent,
})
)
return client
}

Expand All @@ -101,6 +106,13 @@ func (l *LinkedInClient) Connect(ctx context.Context) {

func (l *LinkedInClient) onRealtimeConnectError(ctx context.Context, err error) {
zerolog.Ctx(ctx).Err(err).Msg("failed to read from event stream")
// TODO probably don't do this unconditionally
l.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateBadCredentials,
Error: "linkedin-no-auth",
Message: err.Error(),
})
l.Disconnect()
}

func (l *LinkedInClient) onDecoratedEvent(ctx context.Context, decoratedEvent *types.DecoratedEvent) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/connector/dbmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package connector
import (
"maunium.net/go/mautrix/bridgev2/database"

"go.mau.fi/mautrix-linkedin/pkg/linkedingo/types"
"go.mau.fi/mautrix-linkedin/pkg/stringcookiejar"
)

Expand All @@ -33,5 +34,6 @@ func (lc *LinkedInConnector) GetDBMetaTypes() database.MetaTypes {
}

type UserLoginMetadata struct {
Cookies *stringcookiejar.Jar `json:"cookies,omitempty"`
Cookies *stringcookiejar.Jar `json:"cookies,omitempty"`
EntityURN types.URN `json:"urn,omitempty"`
}
6 changes: 4 additions & 2 deletions pkg/connector/logincookie.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"maunium.net/go/mautrix/bridgev2/networkid"

"go.mau.fi/mautrix-linkedin/pkg/linkedingo"
"go.mau.fi/mautrix-linkedin/pkg/linkedingo/types"
"go.mau.fi/mautrix-linkedin/pkg/stringcookiejar"
)

Expand Down Expand Up @@ -76,7 +77,7 @@ func (c *CookieLogin) SubmitCookies(ctx context.Context, cookies map[string]stri
return nil, err
}

loginClient := linkedingo.NewClient(ctx, jar, linkedingo.Handlers{})
loginClient := linkedingo.NewClient(ctx, types.NewURN(""), jar, linkedingo.Handlers{})
profile, err := loginClient.GetCurrentUserProfile(ctx)
if err != nil {
return nil, err
Expand All @@ -88,7 +89,8 @@ func (c *CookieLogin) SubmitCookies(ctx context.Context, cookies map[string]stri
&database.UserLogin{
ID: networkid.UserLoginID(profile.MiniProfile.ObjectURN.ID()),
Metadata: &UserLoginMetadata{
Cookies: jar,
Cookies: jar,
EntityURN: profile.MiniProfile.EntityURN,
},
RemoteName: remoteName,
RemoteProfile: status.RemoteProfile{
Expand Down
9 changes: 6 additions & 3 deletions pkg/linkedingo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

type Client struct {
http *http.Client
jar *stringcookiejar.Jar
http *http.Client
jar *stringcookiejar.Jar
userEntityURN types.URN

realtimeSessionID uuid.UUID
realtimeCtx context.Context
Expand All @@ -38,12 +39,14 @@ type Client struct {
handlers Handlers

clientPageInstanceID string
serviceVersion string
xLITrack string
i18nLocale string
}

func NewClient(ctx context.Context, jar *stringcookiejar.Jar, handlers Handlers) *Client {
func NewClient(ctx context.Context, userEntityURN types.URN, jar *stringcookiejar.Jar, handlers Handlers) *Client {
return &Client{
userEntityURN: userEntityURN,
http: &http.Client{
Jar: jar,

Expand Down
9 changes: 5 additions & 4 deletions pkg/linkedingo/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package linkedingo

const (
linkedInMessagingBaseURL = "https://www.linkedin.com/messaging"
linkedInVoyagerCommonMeURL = "https://www.linkedin.com/voyager/api/me"
linkedInRealtimeConnectURL = "https://www.linkedin.com/realtime/connect?rc=1"
linkedInLogoutURL = "https://www.linkedin.com/uas/logout"
linkedInMessagingBaseURL = "https://www.linkedin.com/messaging"
linkedInVoyagerCommonMeURL = "https://www.linkedin.com/voyager/api/me"
linkedInRealtimeConnectURL = "https://www.linkedin.com/realtime/connect?rc=1"
linkedInRealtimeHeartbeatURL = "https://www.linkedin.com/realtime/realtimeFrontendClientConnectivityTracking?action=sendHeartbeat"
linkedInLogoutURL = "https://www.linkedin.com/uas/logout"
)

const LinkedInCookieJSESSIONID = "JSESSIONID"
Expand Down
96 changes: 78 additions & 18 deletions pkg/linkedingo/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"net/http"
"time"

"github.com/rs/zerolog"
"go.mau.fi/util/exerrors"
Expand Down Expand Up @@ -77,6 +78,7 @@ func (c *Client) cacheMetaValues(ctx context.Context) error {
case "clientPageInstanceId":
c.clientPageInstanceID = content
case "serviceVersion":
c.serviceVersion = content
xLITrack, err := json.Marshal(map[string]any{
"clientVersion": content,
"mpVersion": content,
Expand Down Expand Up @@ -117,28 +119,85 @@ func (c *Client) RealtimeConnect(ctx context.Context) error {
Logger()
ctx = log.WithContext(ctx)

log.Info().Msg("Starting realtime connection loop")

c.realtimeCtx, c.realtimeCancelFn = context.WithCancel(ctx)
// TODO run sendHeartbeat loop
go c.realtimeConnectLoop()
go c.runHeartbeatsLoop(c.realtimeCtx)
go c.realtimeConnectLoop(c.realtimeCtx)
return nil
}

func (c *Client) realtimeConnectLoop() {
log := zerolog.Ctx(c.realtimeCtx)
func (c *Client) runHeartbeatsLoop(ctx context.Context) {
isFirst := true
userURN := c.userEntityURN.WithPrefix("urn", "li", "fsd_profile").String()

log := zerolog.Ctx(ctx).With().Str("usr_urn", userURN).Logger()
log.Info().Msg("Starting heartbeats loop")
for {
log.Debug().Stringer("realtime_session_id", c.realtimeSessionID).Msg("Sending heartbeat")

body, err := json.Marshal(map[string]any{
"isFirstHeartbeat": !isFirst,
"isLastHeartbeat": false,
"realtimeSessionId": c.realtimeSessionID.String(),
"mpName": "voyager-web",
"mpVersion": c.serviceVersion,
"clientId": "voyager-web",
"actorUrn": userURN,
"contextUrns": []string{userURN},
})
if err != nil {
log.Err(err).Msg("Failed to create heartbeat request body")
return
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, linkedInRealtimeHeartbeatURL, bytes.NewReader(body))
if err != nil {
log.Err(err).Msg("Failed to create heartbeat request")
return
}
req.Header.Add("csrf-token", c.getCSRFToken())
req.Header.Add("referer", linkedInMessagingBaseURL+"/")
req.Header.Add("x-li-accept", contentTypeJSONLinkedInNormalized)
req.Header.Add("x-li-page-instance", "urn:li:page:messaging_index;"+c.clientPageInstanceID)
req.Header.Add("x-li-query-accept", contentTypeGraphQL)
req.Header.Add("x-li-query-map", realtimeQueryMap)
req.Header.Add("x-li-realtime-session", c.realtimeSessionID.String())
req.Header.Add("x-li-recipe-accept", contentTypeJSONLinkedInNormalized)
req.Header.Add("x-li-recipe-map", realtimeRecipeMap)
req.Header.Add("x-li-track", c.xLITrack)
req.Header.Add("x-restli-protocol-version", "2.0.0")

_, err = c.http.Do(req)
if err != nil {
log.Err(err).Msg("Failed to send heartbeat")
return
}

isFirst = false
select {
case <-ctx.Done():
log.Info().Msg("Heartbeats loop canceled")
return
case <-time.After(time.Minute):
}
}
}

func (c *Client) realtimeConnectLoop(ctx context.Context) {
log := zerolog.Ctx(ctx)
log.Info().Msg("Starting realtime connection loop")
// Continually reconnect to the realtime connection endpoint until the
// context is done.
for {
select {
case <-c.realtimeCtx.Done():
case <-ctx.Done():
log.Info().Msg("Realtime connection loop canceled")
return
default:
}

req, err := http.NewRequestWithContext(c.realtimeCtx, http.MethodGet, linkedInRealtimeConnectURL, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, linkedInRealtimeConnectURL, nil)
if err != nil {
c.handlers.onRealtimeConnectError(c.realtimeCtx, err)
c.handlers.onRealtimeConnectError(ctx, err)
return
}
req.Header.Add("Accept", contentTypeTextEventStream)
Expand All @@ -156,15 +215,15 @@ func (c *Client) realtimeConnectLoop() {

c.realtimeResp, err = c.http.Do(req)
if err != nil {
c.handlers.onRealtimeConnectError(c.realtimeCtx, err)
c.handlers.onRealtimeConnectError(ctx, err)
return
}
if c.realtimeResp.StatusCode != http.StatusOK {
c.handlers.onRealtimeConnectError(c.realtimeCtx, fmt.Errorf("failed to connect due to status code %d", c.realtimeResp.StatusCode))
c.handlers.onRealtimeConnectError(ctx, fmt.Errorf("failed to connect due to status code %d", c.realtimeResp.StatusCode))
return
}

log.Info().Msg("Reading realtime stream")
log.Info().Stringer("realtime_session_id", c.realtimeSessionID).Msg("Reading realtime stream")
reader := bufio.NewReader(c.realtimeResp.Body)
for {
line, err := reader.ReadBytes('\n')
Expand All @@ -175,7 +234,7 @@ func (c *Client) realtimeConnectLoop() {
if errors.Is(err, io.EOF) {
break
}
c.handlers.onRealtimeConnectError(c.realtimeCtx, err)
c.handlers.onRealtimeConnectError(ctx, err)
continue
}

Expand All @@ -185,23 +244,24 @@ func (c *Client) realtimeConnectLoop() {

var realtimeEvent types.RealtimeEvent
if err = json.Unmarshal(line[6:], &realtimeEvent); err != nil {
c.handlers.onRealtimeConnectError(c.realtimeCtx, err)
c.handlers.onRealtimeConnectError(ctx, err)
continue
}

switch {
case realtimeEvent.Heartbeat != nil:
c.handlers.onHeartbeat(c.realtimeCtx)
c.handlers.onHeartbeat(ctx)
case realtimeEvent.ClientConnection != nil:
c.handlers.onClientConnection(c.realtimeCtx, realtimeEvent.ClientConnection)
c.realtimeSessionID = realtimeEvent.ClientConnection.ID
log.Debug().Stringer("realtime_session_id", c.realtimeSessionID).Msg("Got new realtime session ID")
c.handlers.onClientConnection(ctx, realtimeEvent.ClientConnection)
case realtimeEvent.DecoratedEvent != nil:
log.Debug().
Stringer("topic", realtimeEvent.DecoratedEvent.Topic).
Str("payload_type", realtimeEvent.DecoratedEvent.Payload.Data.Type).
Msg("Received decorated event")
fmt.Printf("%s\n", line)
fmt.Printf("decoratedEvent %+v\n", realtimeEvent.DecoratedEvent)
c.handlers.onDecoratedEvent(c.realtimeCtx, realtimeEvent.DecoratedEvent)
c.handlers.onDecoratedEvent(ctx, realtimeEvent.DecoratedEvent)
}
}
}
Expand Down
53 changes: 8 additions & 45 deletions pkg/linkedingo/types/realtime.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package types

import (
"github.com/google/uuid"
"go.mau.fi/util/jsontime"

"go.mau.fi/mautrix-linkedin/pkg/linkedingoold/routingold/responseold"
)

type RealtimeEvent struct {
Expand All @@ -15,7 +14,7 @@ type RealtimeEvent struct {
type Heartbeat struct{}

type ClientConnection struct {
ID string `json:"id"`
ID uuid.UUID `json:"id"`
}

type DecoratedEvent struct {
Expand All @@ -32,14 +31,8 @@ type DecoratedEventPayload struct {
}

type DecoratedEventData struct {
RecipeType string `json:"_recipeType,omitempty"`
Type string `json:"_type,omitempty"`
DecoratedMessage *DecoratedMessageRealtime `json:"doDecorateMessageMessengerRealtimeDecoration,omitempty"`
DecoratedSeenReceipt *DecoratedSeenReceipt `json:"doDecorateSeenReceiptMessengerRealtimeDecoration,omitempty"`
DecoratedTypingIndicator *DecoratedTypingIndiciator `json:"doDecorateTypingIndicatorMessengerRealtimeDecoration,omitempty"`
DecoratedMessageReaction *DecoratedMessageReaction `json:"doDecorateRealtimeReactionSummaryMessengerRealtimeDecoration,omitempty"`
DecoratedDeletedConversation *DecoratedDeletedConversation `json:"doDecorateConversationDeleteMessengerRealtimeDecoration,omitempty"`
DecoratedUpdatedConversation *DecoratedUpdatedConversation `json:"doDecorateConversationMessengerRealtimeDecoration,omitempty"`
Type string `json:"_type,omitempty"`
DecoratedMessage *ActionResponse `json:"doDecorateMessageMessengerRealtimeDecoration,omitempty"`
}

// Conversation represents a com.linkedin.messenger.Conversation object
Expand Down Expand Up @@ -100,38 +93,8 @@ type Message struct {
Conversation Conversation `json:"conversation,omitempty"`
}

type DecoratedMessageRealtime struct {
Result Message `json:"result,omitempty"`
RecipeType string `json:"_recipeType,omitempty"`
Type string `json:"_type,omitempty"`
}

type DecoratedSeenReceipt struct {
Result responseold.MessageSeenReceipt `json:"result,omitempty"`
RecipeType string `json:"_recipeType,omitempty"`
Type string `json:"_type,omitempty"`
}

type DecoratedTypingIndiciator struct {
Result responseold.TypingIndicator `json:"result,omitempty"`
RecipeType string `json:"_recipeType,omitempty"`
Type string `json:"_type,omitempty"`
}

type DecoratedMessageReaction struct {
Result responseold.MessageReaction `json:"result,omitempty"`
RecipeType string `json:"_recipeType,omitempty"`
Type string `json:"_type,omitempty"`
}

type DecoratedDeletedConversation struct {
Result responseold.Conversation `json:"result,omitempty"`
RecipeType string `json:"_recipeType,omitempty"`
Type string `json:"_type,omitempty"`
}

type DecoratedUpdatedConversation struct {
Result responseold.ThreadElement `json:"result,omitempty"`
RecipeType string `json:"_recipeType,omitempty"`
Type string `json:"_type,omitempty"`
// ActionResponse represents a com.linkedin.restli.common.ActionResponse
// object.
type ActionResponse struct {
Result Message `json:"result,omitempty"`
}
9 changes: 8 additions & 1 deletion pkg/linkedingo/types/urn.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ func (u URN) MarshalJSON() ([]byte, error) {
return json.Marshal(u.String())
}

func (u *URN) NthPart(n int) string {
func (u URN) NthPart(n int) string {
return u.parts[n]
}

// WithPrefix returns a URN with the given prefix but the same ID (last part)
func (u URN) WithPrefix(prefix ...string) (n URN) {
n.parts = append(prefix, u.parts[len(u.parts)-1])
n.idParts = u.idParts
return
}

0 comments on commit ecbcb5d

Please sign in to comment.