Skip to content

Commit 34fa4eb

Browse files
committed
feat: sync streams
1 parent 082ccc1 commit 34fa4eb

27 files changed

+776
-545
lines changed

api/streams/advance.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package streams
22

33
import (
4+
"net/http"
5+
46
"github.com/0xJacky/Nginx-UI/api"
57
"github.com/0xJacky/Nginx-UI/internal/nginx"
68
"github.com/0xJacky/Nginx-UI/query"
79
"github.com/gin-gonic/gin"
810
"github.com/uozi-tech/cosy"
9-
"net/http"
1011
)
1112

1213
func AdvancedEdit(c *gin.Context) {
@@ -21,7 +22,7 @@ func AdvancedEdit(c *gin.Context) {
2122
name := c.Param("name")
2223
path := nginx.GetConfPath("streams-available", name)
2324

24-
s := query.Site
25+
s := query.Stream
2526

2627
_, err := s.Where(s.Path.Eq(path)).FirstOrCreate()
2728
if err != nil {
@@ -39,5 +40,4 @@ func AdvancedEdit(c *gin.Context) {
3940
c.JSON(http.StatusOK, gin.H{
4041
"message": "ok",
4142
})
42-
4343
}

api/streams/router.go

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ func InitRouter(r *gin.RouterGroup) {
66
r.GET("streams", GetStreams)
77
r.GET("streams/:name", GetStream)
88
r.POST("streams/:name", SaveStream)
9+
r.POST("streams/:name/rename", RenameStream)
910
r.POST("streams/:name/enable", EnableStream)
1011
r.POST("streams/:name/disable", DisableStream)
1112
r.POST("streams/:name/advance", AdvancedEdit)

api/streams/streams.go

+32-160
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package streams
22

33
import (
4+
"net/http"
5+
"os"
6+
"strings"
7+
"time"
8+
49
"github.com/0xJacky/Nginx-UI/api"
510
"github.com/0xJacky/Nginx-UI/internal/config"
6-
"github.com/0xJacky/Nginx-UI/internal/helper"
711
"github.com/0xJacky/Nginx-UI/internal/nginx"
12+
"github.com/0xJacky/Nginx-UI/internal/stream"
813
"github.com/0xJacky/Nginx-UI/query"
914
"github.com/gin-gonic/gin"
1015
"github.com/sashabaranov/go-openai"
1116
"github.com/uozi-tech/cosy"
12-
"net/http"
13-
"os"
14-
"strings"
15-
"time"
1617
)
1718

1819
type Stream struct {
@@ -24,6 +25,7 @@ type Stream struct {
2425
ChatGPTMessages []openai.ChatCompletionMessage `json:"chatgpt_messages,omitempty"`
2526
Tokenized *nginx.NgxConfig `json:"tokenized,omitempty"`
2627
Filepath string `json:"filepath"`
28+
SyncNodeIDs []uint64 `json:"sync_node_ids" gorm:"serializer:json"`
2729
}
2830

2931
func GetStreams(c *gin.Context) {
@@ -32,14 +34,12 @@ func GetStreams(c *gin.Context) {
3234
sort := c.DefaultQuery("sort", "desc")
3335

3436
configFiles, err := os.ReadDir(nginx.GetConfPath("streams-available"))
35-
3637
if err != nil {
3738
api.ErrHandler(c, err)
3839
return
3940
}
4041

4142
enabledConfig, err := os.ReadDir(nginx.GetConfPath("streams-enabled"))
42-
4343
if err != nil {
4444
api.ErrHandler(c, err)
4545
return
@@ -77,15 +77,8 @@ func GetStreams(c *gin.Context) {
7777
}
7878

7979
func GetStream(c *gin.Context) {
80-
rewriteName, ok := c.Get("rewriteConfigFileName")
81-
8280
name := c.Param("name")
8381

84-
// for modify filename
85-
if ok {
86-
name = rewriteName.(string)
87-
}
88-
8982
path := nginx.GetConfPath("streams-available", name)
9083
file, err := os.Stat(path)
9184
if os.IsNotExist(err) {
@@ -114,14 +107,13 @@ func GetStream(c *gin.Context) {
114107
}
115108

116109
s := query.Stream
117-
stream, err := s.Where(s.Path.Eq(path)).FirstOrInit()
118-
110+
streamModel, err := s.Where(s.Path.Eq(path)).FirstOrCreate()
119111
if err != nil {
120112
api.ErrHandler(c, err)
121113
return
122114
}
123115

124-
if stream.Advanced {
116+
if streamModel.Advanced {
125117
origContent, err := os.ReadFile(path)
126118
if err != nil {
127119
api.ErrHandler(c, err)
@@ -130,12 +122,13 @@ func GetStream(c *gin.Context) {
130122

131123
c.JSON(http.StatusOK, Stream{
132124
ModifiedAt: file.ModTime(),
133-
Advanced: stream.Advanced,
125+
Advanced: streamModel.Advanced,
134126
Enabled: enabled,
135127
Name: name,
136128
Config: string(origContent),
137129
ChatGPTMessages: chatgpt.Content,
138130
Filepath: path,
131+
SyncNodeIDs: streamModel.SyncNodeIDs,
139132
})
140133
return
141134
}
@@ -149,207 +142,86 @@ func GetStream(c *gin.Context) {
149142

150143
c.JSON(http.StatusOK, Stream{
151144
ModifiedAt: file.ModTime(),
152-
Advanced: stream.Advanced,
145+
Advanced: streamModel.Advanced,
153146
Enabled: enabled,
154147
Name: name,
155148
Config: nginxConfig.FmtCode(),
156149
Tokenized: nginxConfig,
157150
ChatGPTMessages: chatgpt.Content,
158151
Filepath: path,
152+
SyncNodeIDs: streamModel.SyncNodeIDs,
159153
})
160154
}
161155

162156
func SaveStream(c *gin.Context) {
163157
name := c.Param("name")
164158

165-
if name == "" {
166-
c.JSON(http.StatusNotAcceptable, gin.H{
167-
"message": "param name is empty",
168-
})
169-
return
170-
}
171-
172159
var json struct {
173-
Name string `json:"name" binding:"required"`
174-
Content string `json:"content" binding:"required"`
175-
Overwrite bool `json:"overwrite"`
160+
Content string `json:"content" binding:"required"`
161+
SyncNodeIDs []uint64 `json:"sync_node_ids"`
162+
Overwrite bool `json:"overwrite"`
176163
}
177164

178165
if !cosy.BindAndValid(c, &json) {
179166
return
180167
}
181168

182-
path := nginx.GetConfPath("streams-available", name)
183-
184-
if !json.Overwrite && helper.FileExists(path) {
185-
c.JSON(http.StatusNotAcceptable, gin.H{
186-
"message": "File exists",
187-
})
188-
return
189-
}
190-
191-
err := os.WriteFile(path, []byte(json.Content), 0644)
169+
err := stream.Save(name, json.Content, json.Overwrite, json.SyncNodeIDs)
192170
if err != nil {
193171
api.ErrHandler(c, err)
194172
return
195173
}
196-
enabledConfigFilePath := nginx.GetConfPath("streams-enabled", name)
197-
// rename the config file if needed
198-
if name != json.Name {
199-
newPath := nginx.GetConfPath("streams-available", json.Name)
200-
s := query.Stream
201-
_, err = s.Where(s.Path.Eq(path)).Update(s.Path, newPath)
202-
203-
// check if dst file exists, do not rename
204-
if helper.FileExists(newPath) {
205-
c.JSON(http.StatusNotAcceptable, gin.H{
206-
"message": "File exists",
207-
})
208-
return
209-
}
210-
// recreate a soft link
211-
if helper.FileExists(enabledConfigFilePath) {
212-
_ = os.Remove(enabledConfigFilePath)
213-
enabledConfigFilePath = nginx.GetConfPath("streams-enabled", json.Name)
214-
err = os.Symlink(newPath, enabledConfigFilePath)
215-
216-
if err != nil {
217-
api.ErrHandler(c, err)
218-
return
219-
}
220-
}
221-
222-
err = os.Rename(path, newPath)
223-
if err != nil {
224-
api.ErrHandler(c, err)
225-
return
226-
}
227-
228-
name = json.Name
229-
c.Set("rewriteConfigFileName", name)
230-
}
231-
232-
enabledConfigFilePath = nginx.GetConfPath("streams-enabled", name)
233-
if helper.FileExists(enabledConfigFilePath) {
234-
// Test nginx configuration
235-
output := nginx.TestConf()
236-
237-
if nginx.GetLogLevel(output) > nginx.Warn {
238-
c.JSON(http.StatusInternalServerError, gin.H{
239-
"message": output,
240-
})
241-
return
242-
}
243-
244-
output = nginx.Reload()
245-
246-
if nginx.GetLogLevel(output) > nginx.Warn {
247-
c.JSON(http.StatusInternalServerError, gin.H{
248-
"message": output,
249-
})
250-
return
251-
}
252-
}
253174

254175
GetStream(c)
255176
}
256177

257178
func EnableStream(c *gin.Context) {
258-
configFilePath := nginx.GetConfPath("streams-available", c.Param("name"))
259-
enabledConfigFilePath := nginx.GetConfPath("streams-enabled", c.Param("name"))
260-
261-
_, err := os.Stat(configFilePath)
262-
179+
err := stream.Enable(c.Param("name"))
263180
if err != nil {
264181
api.ErrHandler(c, err)
265182
return
266183
}
267184

268-
if _, err = os.Stat(enabledConfigFilePath); os.IsNotExist(err) {
269-
err = os.Symlink(configFilePath, enabledConfigFilePath)
270-
271-
if err != nil {
272-
api.ErrHandler(c, err)
273-
return
274-
}
275-
}
276-
277-
// Test nginx config, if not pass, then disable the stream.
278-
output := nginx.TestConf()
279-
280-
if nginx.GetLogLevel(output) > nginx.Warn {
281-
_ = os.Remove(enabledConfigFilePath)
282-
c.JSON(http.StatusInternalServerError, gin.H{
283-
"message": output,
284-
})
285-
return
286-
}
287-
288-
output = nginx.Reload()
289-
290-
if nginx.GetLogLevel(output) > nginx.Warn {
291-
c.JSON(http.StatusInternalServerError, gin.H{
292-
"message": output,
293-
})
294-
return
295-
}
296-
297185
c.JSON(http.StatusOK, gin.H{
298186
"message": "ok",
299187
})
300188
}
301189

302190
func DisableStream(c *gin.Context) {
303-
enabledConfigFilePath := nginx.GetConfPath("streams-enabled", c.Param("name"))
304-
305-
_, err := os.Stat(enabledConfigFilePath)
306-
191+
err := stream.Disable(c.Param("name"))
307192
if err != nil {
308193
api.ErrHandler(c, err)
309194
return
310195
}
311196

312-
err = os.Remove(enabledConfigFilePath)
197+
c.JSON(http.StatusOK, gin.H{
198+
"message": "ok",
199+
})
200+
}
313201

202+
func DeleteStream(c *gin.Context) {
203+
err := stream.Delete(c.Param("name"))
314204
if err != nil {
315205
api.ErrHandler(c, err)
316206
return
317207
}
318-
output := nginx.Reload()
319-
320-
if nginx.GetLogLevel(output) > nginx.Warn {
321-
c.JSON(http.StatusInternalServerError, gin.H{
322-
"message": output,
323-
})
324-
return
325-
}
326208

327209
c.JSON(http.StatusOK, gin.H{
328210
"message": "ok",
329211
})
330212
}
331213

332-
func DeleteStream(c *gin.Context) {
333-
var err error
334-
name := c.Param("name")
335-
availablePath := nginx.GetConfPath("streams-available", name)
336-
enabledPath := nginx.GetConfPath("streams-enabled", name)
337-
338-
if _, err = os.Stat(availablePath); os.IsNotExist(err) {
339-
c.JSON(http.StatusNotFound, gin.H{
340-
"message": "stream not found",
341-
})
342-
return
214+
func RenameStream(c *gin.Context) {
215+
oldName := c.Param("name")
216+
var json struct {
217+
NewName string `json:"new_name"`
343218
}
344-
345-
if _, err = os.Stat(enabledPath); err == nil {
346-
c.JSON(http.StatusNotAcceptable, gin.H{
347-
"message": "stream is enabled",
348-
})
219+
if !cosy.BindAndValid(c, &json) {
349220
return
350221
}
351222

352-
if err = os.Remove(availablePath); err != nil {
223+
err := stream.Rename(oldName, json.NewName)
224+
if err != nil {
353225
api.ErrHandler(c, err)
354226
return
355227
}

app/src/api/stream.ts

+5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export interface Stream {
1212
config: string
1313
chatgpt_messages: ChatComplicationMessage[]
1414
tokenized?: NgxConfig
15+
sync_node_ids: number[]
1516
}
1617

1718
class StreamCurd extends Curd<Stream> {
@@ -31,6 +32,10 @@ class StreamCurd extends Curd<Stream> {
3132
advance_mode(name: string, data: { advanced: boolean }) {
3233
return http.post(`${this.baseUrl}/${name}/advance`, data)
3334
}
35+
36+
rename(name: string, newName: string) {
37+
return http.post(`${this.baseUrl}/${name}/rename`, { new_name: newName })
38+
}
3439
}
3540

3641
const stream = new StreamCurd('/streams')

app/src/components/NodeSelector/NodeSelector.vue

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ function newSSE() {
3232
3333
s.onmessage = (e: SSEvent) => {
3434
data.value = JSON.parse(e.data)
35+
nextTick(() => {
36+
data_map.value = data.value.reduce((acc, node) => {
37+
acc[node.id] = node
38+
return acc
39+
}, {} as Record<number, Environment>)
40+
})
3541
}
3642
3743
// reconnect

0 commit comments

Comments
 (0)