forked from omec-project/pfcp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transaction.go
147 lines (121 loc) · 4.07 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// SPDX-FileCopyrightText: 2021 Open Networking Foundation <[email protected]>
// Copyright 2019 free5GC.org
//
// SPDX-License-Identifier: Apache-2.0
//
package pfcp
import (
"net"
"sync"
"time"
"github.com/free5gc/pfcp/logger"
"github.com/pkg/errors"
)
type TransactionType uint8
type TxTable struct {
m sync.Map // map[uint32]*Transaction
}
func (t *TxTable) Store(sequenceNumber uint32, tx *Transaction) {
t.m.Store(sequenceNumber, tx)
}
func (t *TxTable) Load(sequenceNumber uint32) (*Transaction, bool) {
tx, ok := t.m.Load(sequenceNumber)
if ok {
return tx.(*Transaction), ok
}
return nil, false
}
func (t *TxTable) Delete(sequenceNumber uint32) {
t.m.Delete(sequenceNumber)
}
const (
SendingRequest TransactionType = iota
SendingResponse
)
const (
NumOfResend = 3
ResendRequestTimeOutPeriod = 3
ResendResponseTimeOutPeriod = 15
)
// Transaction - represent the transaction state of pfcp message
type Transaction struct {
SendMsg []byte
SequenceNumber uint32
MessageType MessageType
TxType TransactionType
EventChannel chan EventType
Conn *net.UDPConn
DestAddr *net.UDPAddr
ConsumerAddr string
ErrHandler func(*Message, error)
EventData interface{}
}
// NewTransaction - create pfcp transaction object
func NewTransaction(pfcpMSG Message, binaryMSG []byte, Conn *net.UDPConn, DestAddr *net.UDPAddr, eventData interface{}) (tx *Transaction) {
tx = &Transaction{
SendMsg: binaryMSG,
SequenceNumber: pfcpMSG.Header.SequenceNumber,
MessageType: pfcpMSG.Header.MessageType,
EventChannel: make(chan EventType, 1),
Conn: Conn,
DestAddr: DestAddr,
EventData: eventData,
}
if pfcpMSG.IsRequest() {
tx.TxType = SendingRequest
tx.ConsumerAddr = Conn.LocalAddr().String()
} else if pfcpMSG.IsResponse() {
tx.TxType = SendingResponse
tx.ConsumerAddr = DestAddr.String()
}
logger.PFCPLog.Tracef("New Transaction SEQ[%d] DestAddr[%s]", tx.SequenceNumber, DestAddr.String())
return
}
func (transaction *Transaction) Start() error {
logger.PFCPLog.Tracef("Start Transaction [%d]\n", transaction.SequenceNumber)
if transaction.TxType == SendingRequest {
for iter := 0; iter < NumOfResend; iter++ {
timer := time.NewTimer(ResendRequestTimeOutPeriod * time.Second)
_, err := transaction.Conn.WriteToUDP(transaction.SendMsg, transaction.DestAddr)
if err != nil {
logger.PFCPLog.Warnf("Request Transaction [%d]: %s\n", transaction.SequenceNumber, err)
return err
}
select {
case event := <-transaction.EventChannel:
if event == ReceiveValidResponse {
logger.PFCPLog.Tracef("Request Transaction [%d]: receive valid response\n", transaction.SequenceNumber)
return nil
}
case <-timer.C:
logger.PFCPLog.Tracef("Request Transaction [%d]: timeout expire\n", transaction.SequenceNumber)
logger.PFCPLog.Tracef("Request Transaction [%d]: Resend packet\n", transaction.SequenceNumber)
continue
}
}
//Num of retries exhausted, send failure back to app
return errors.Errorf("request timeout, seq [%d]", transaction.SequenceNumber)
} else if transaction.TxType == SendingResponse {
//Todo :Implement SendingResponse type of reliable delivery
timer := time.NewTimer(ResendResponseTimeOutPeriod * time.Second)
for iter := 0; iter < NumOfResend; iter++ {
_, err := transaction.Conn.WriteToUDP(transaction.SendMsg, transaction.DestAddr)
if err != nil {
logger.PFCPLog.Warnf("Response Transaction [%d]: sending error\n", transaction.SequenceNumber)
return err
}
select {
case event := <-transaction.EventChannel:
if event == ReceiveResendRequest {
logger.PFCPLog.Tracef("Response Transaction [%d]: receive resend request\n", transaction.SequenceNumber)
logger.PFCPLog.Tracef("Response Transaction [%d]: Resend packet\n", transaction.SequenceNumber)
continue
}
case <-timer.C:
logger.PFCPLog.Tracef("Response Transaction [%d]: timeout expire\n", transaction.SequenceNumber)
return errors.Errorf("response timeout, seq [%d]", transaction.SequenceNumber)
}
}
}
return nil
}