diff --git a/.gitignore b/.gitignore index eb3204e6..172e5420 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,12 @@ pkg/modbus/modbus pkg/bluetooth/bluetooth .idea vendor/ +.lightly +.c_bin +.classes +logs +*.log +node_modules +dist +dist-ssr +*.local diff --git a/mappers/device-v1beta1/mqtt-mapper/Dockerfile_nostream b/mappers/device-v1beta1/mqtt-mapper/Dockerfile_nostream new file mode 100644 index 00000000..46acf9b0 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/Dockerfile_nostream @@ -0,0 +1,20 @@ +FROM golang:1.21.11-alpine3.19 AS builder + +WORKDIR /build + +ENV GO111MODULE=on \ + GOPROXY=https://goproxy.cn,direct + +COPY . . + +RUN CGO_ENABLED=0 GOOS=linux go build -o main cmd/main.go + + +FROM ubuntu:18.04 + +RUN mkdir -p kubeedge + +COPY --from=builder /build/main kubeedge/ +COPY ./config.yaml kubeedge/ + +WORKDIR kubeedge \ No newline at end of file diff --git a/mappers/device-v1beta1/mqtt-mapper/Dockerfile_stream b/mappers/device-v1beta1/mqtt-mapper/Dockerfile_stream new file mode 100644 index 00000000..e1956a1a --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/Dockerfile_stream @@ -0,0 +1,35 @@ +FROM golang:1.21.11-bullseye AS builder + +WORKDIR /build + +ENV GO111MODULE=on \ + GOPROXY=https://goproxy.cn,direct + +COPY . . + +RUN apt-get update && \ + apt-get install -y bzip2 curl upx-ucl gcc-aarch64-linux-gnu libc6-dev-arm64-cross gcc-arm-linux-gnueabi libc6-dev-armel-cross libva-dev libva-drm2 libx11-dev libvdpau-dev libxext-dev libsdl1.2-dev libxcb1-dev libxau-dev libxdmcp-dev yasm + +RUN curl -sLO https://ffmpeg.org/releases/ffmpeg-4.1.6.tar.bz2 && \ + tar -jx --strip-components=1 -f ffmpeg-4.1.6.tar.bz2 && \ + ./configure && make && \ + make install + +RUN GOOS=linux go build -o main cmd/main.go + +FROM ubuntu:18.04 + +RUN mkdir -p kubeedge + +RUN apt-get update && \ + apt-get install -y bzip2 curl upx-ucl gcc-aarch64-linux-gnu libc6-dev-arm64-cross gcc-arm-linux-gnueabi libc6-dev-armel-cross libva-dev libva-drm2 libx11-dev libvdpau-dev libxext-dev libsdl1.2-dev libxcb1-dev libxau-dev libxdmcp-dev yasm + +RUN curl -sLO https://ffmpeg.org/releases/ffmpeg-4.1.6.tar.bz2 && \ + tar -jx --strip-components=1 -f ffmpeg-4.1.6.tar.bz2 && \ + ./configure && make && \ + make install + +COPY --from=builder /build/main kubeedge/ +COPY ./config.yaml kubeedge/ + +WORKDIR kubeedge \ No newline at end of file diff --git a/mappers/device-v1beta1/mqtt-mapper/cmd/main.go b/mappers/device-v1beta1/mqtt-mapper/cmd/main.go new file mode 100644 index 00000000..b539e0a5 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/cmd/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "errors" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mqtt/device" + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/config" + "github.com/kubeedge/mapper-framework/pkg/grpcclient" + "github.com/kubeedge/mapper-framework/pkg/grpcserver" + "github.com/kubeedge/mapper-framework/pkg/httpserver" +) + +func main() { + var err error + var c *config.Config + + klog.InitFlags(nil) + defer klog.Flush() + + if c, err = config.Parse(); err != nil { + klog.Fatal(err) + } + klog.Infof("config: %+v", c) + + klog.Infoln("Mapper will register to edgecore") + deviceList, deviceModelList, err := grpcclient.RegisterMapper(true) + if err != nil { + klog.Fatal(err) + } + klog.Infoln("Mapper register finished") + + panel := device.NewDevPanel() + err = panel.DevInit(deviceList, deviceModelList) + if err != nil && !errors.Is(err, device.ErrEmptyData) { + klog.Fatal(err) + } + klog.Infoln("devInit finished") + go panel.DevStart() + + // start http server + httpServer := httpserver.NewRestServer(panel, c.Common.HTTPPort) + go httpServer.StartServer() + + // start grpc server + grpcServer := grpcserver.NewServer( + grpcserver.Config{ + SockPath: c.GrpcServer.SocketPath, + Protocol: common.ProtocolCustomized, + }, + panel, + ) + defer grpcServer.Stop() + if err = grpcServer.Start(); err != nil { + klog.Fatal(err) + } + +} diff --git a/mappers/device-v1beta1/mqtt-mapper/config.yaml b/mappers/device-v1beta1/mqtt-mapper/config.yaml new file mode 100644 index 00000000..0522f7e1 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/config.yaml @@ -0,0 +1,9 @@ +grpc_server: + socket_path: /etc/kubeedge/mqtt.sock +common: + name: Mqtt-mapper + version: v1.13.0 + api_version: v1.0.0 + protocol: # TODO add your protocol name + address: 127.0.0.1 + edgecore_sock: /etc/kubeedge/dmi.sock diff --git a/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/influxdb2/client.go b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/influxdb2/client.go new file mode 100644 index 00000000..ce909eb7 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/influxdb2/client.go @@ -0,0 +1,76 @@ +package influxdb2 + +import ( + "context" + "encoding/json" + "os" + "time" + + "k8s.io/klog/v2" + + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +type DataBaseConfig struct { + Influxdb2ClientConfig *Influxdb2ClientConfig `json:"influxdb2ClientConfig,omitempty"` + Influxdb2DataConfig *Influxdb2DataConfig `json:"influxdb2DataConfig,omitempty"` +} + +type Influxdb2ClientConfig struct { + Url string `json:"url,omitempty"` + Org string `json:"org,omitempty"` + Bucket string `json:"bucket,omitempty"` +} + +type Influxdb2DataConfig struct { + Measurement string `json:"measurement,omitempty"` + Tag map[string]string `json:"tag,omitempty"` + FieldKey string `json:"fieldKey,omitempty"` +} + +func NewDataBaseClient(clientConfig json.RawMessage, dataConfig json.RawMessage) (*DataBaseConfig, error) { + // parse influx database config data + influxdb2ClientConfig := new(Influxdb2ClientConfig) + influxdb2DataConfig := new(Influxdb2DataConfig) + err := json.Unmarshal(clientConfig, influxdb2ClientConfig) + if err != nil { + return nil, err + } + err = json.Unmarshal(dataConfig, influxdb2DataConfig) + if err != nil { + return nil, err + } + return &DataBaseConfig{ + Influxdb2ClientConfig: influxdb2ClientConfig, + Influxdb2DataConfig: influxdb2DataConfig, + }, nil +} + +func (d *DataBaseConfig) InitDbClient() influxdb2.Client { + var usrtoken string + usrtoken = os.Getenv("TOKEN") + client := influxdb2.NewClient(d.Influxdb2ClientConfig.Url, usrtoken) + + return client +} + +func (d *DataBaseConfig) CloseSession(client influxdb2.Client) { + client.Close() +} + +func (d *DataBaseConfig) AddData(data *common.DataModel, client influxdb2.Client) error { + // write device data to influx database + writeAPI := client.WriteAPIBlocking(d.Influxdb2ClientConfig.Org, d.Influxdb2ClientConfig.Bucket) + p := influxdb2.NewPoint(d.Influxdb2DataConfig.Measurement, + d.Influxdb2DataConfig.Tag, + map[string]interface{}{d.Influxdb2DataConfig.FieldKey: data.Value}, + time.Now()) + // write point immediately + err := writeAPI.WritePoint(context.Background(), p) + if err != nil { + klog.V(4).Info("Exit AddData") + return err + } + return nil +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/influxdb2/handler.go b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/influxdb2/handler.go new file mode 100644 index 00000000..8c97b06e --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/influxdb2/handler.go @@ -0,0 +1,73 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package influxdb2 + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mqtt/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2ClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2DataConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + dbClient := dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init database client err: %v", err) + return + } + reportCycle := time.Millisecond * time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel, dbClient) + if err != nil { + klog.Errorf("influx database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession(dbClient) + return + } + } + }() +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/mysql/client.go b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/mysql/client.go new file mode 100644 index 00000000..0c75d7c6 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/mysql/client.go @@ -0,0 +1,108 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysql + +import ( + "database/sql" + "encoding/json" + "fmt" + "os" + "time" + + _ "github.com/go-sql-driver/mysql" + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" +) + +var ( + DB *sql.DB +) + +type DataBaseConfig struct { + MySQLClientConfig *MySQLClientConfig `json:"mysqlClientConfig"` +} + +type MySQLClientConfig struct { + Addr string `json:"addr,omitempty"` + Database string `json:"database,omitempty"` + UserName string `json:"userName,omitempty"` +} + +func NewDataBaseClient(config json.RawMessage) (*DataBaseConfig, error) { + configdata := new(MySQLClientConfig) + err := json.Unmarshal(config, configdata) + if err != nil { + return nil, err + } + return &DataBaseConfig{ + MySQLClientConfig: configdata, + }, nil +} + +func (d *DataBaseConfig) InitDbClient() error { + password := os.Getenv("PASSWORD") + usrName := d.MySQLClientConfig.UserName + addr := d.MySQLClientConfig.Addr + dataBase := d.MySQLClientConfig.Database + dataSourceName := fmt.Sprintf("%s:%s@tcp(%s)/%s", usrName, password, addr, dataBase) + var err error + DB, err = sql.Open("mysql", dataSourceName) + if err != nil { + return fmt.Errorf("connection to %s of mysql faild with err:%v", dataBase, err) + } + + return nil +} + +func (d *DataBaseConfig) CloseSession() { + err := DB.Close() + if err != nil { + klog.Errorf("close mysql failed with err:%v", err) + } +} + +func (d *DataBaseConfig) AddData(data *common.DataModel) error { + deviceName := data.DeviceName + propertyName := data.PropertyName + namespace := data.Namespace + tableName := namespace + "/" + deviceName + "/" + propertyName + datatime := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05") + + createTable := fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` (id INT AUTO_INCREMENT PRIMARY KEY, ts DATETIME NOT NULL,field TEXT)", tableName) + _, err := DB.Exec(createTable) + if err != nil { + return fmt.Errorf("create tabe into mysql failed with err:%v", err) + } + + stmt, err := DB.Prepare(fmt.Sprintf("INSERT INTO `%s` (ts,field) VALUES (?,?)", tableName)) + if err != nil { + return fmt.Errorf("prepare parament failed with err:%v", err) + } + defer func(stmt *sql.Stmt) { + err := stmt.Close() + if err != nil { + klog.Errorf("close mysql's statement failed with err:%v", err) + } + }(stmt) + _, err = stmt.Exec(datatime, data.Value) + if err != nil { + return fmt.Errorf("insert data into msyql failed with err:%v", err) + } + + return nil +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/mysql/handler.go b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/mysql/handler.go new file mode 100644 index 00000000..51489238 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/mysql/handler.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysql + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mqtt/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.MySQLClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init redis database client err: %v", err) + return + } + reportCycle := time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("mysql database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession() + return + } + } + }() +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/redis/client.go b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/redis/client.go new file mode 100644 index 00000000..bd036a5f --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/redis/client.go @@ -0,0 +1,119 @@ +package redis + +import ( + "context" + "encoding/json" + "errors" + "os" + "strconv" + + "github.com/go-redis/redis/v8" + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" +) + +var ( + RedisCli *redis.Client +) + +type DataBaseConfig struct { + RedisClientConfig *RedisClientConfig +} + +type RedisClientConfig struct { + Addr string `json:"addr,omitempty"` + DB int `json:"db,omitempty"` + PoolSize int `json:"poolSize,omitempty"` + MinIdleConns int `json:"minIdleConns,omitempty"` +} + +func NewDataBaseClient(config json.RawMessage) (*DataBaseConfig, error) { + configdata := new(RedisClientConfig) + err := json.Unmarshal(config, configdata) + if err != nil { + return nil, err + } + return &DataBaseConfig{RedisClientConfig: configdata}, nil +} + +func (d *DataBaseConfig) InitDbClient() error { + var password string + password = os.Getenv("PASSWORD") + RedisCli = redis.NewClient(&redis.Options{ + Addr: d.RedisClientConfig.Addr, + Password: password, + DB: d.RedisClientConfig.DB, + PoolSize: d.RedisClientConfig.PoolSize, + MinIdleConns: d.RedisClientConfig.MinIdleConns, + }) + pong, err := RedisCli.Ping(context.Background()).Result() + if err != nil { + klog.Errorf("init redis database failed, err = %v", err) + return err + } + klog.V(1).Infof("init redis database successfully, with return cmd %s", pong) + return nil +} + +func (d *DataBaseConfig) CloseSession() { + err := RedisCli.Close() + if err != nil { + klog.V(4).Info("close database failed") + } +} + +func (d *DataBaseConfig) AddData(data *common.DataModel) error { + ctx := context.Background() + // The key to construct the ordered set, here DeviceName is used as the key + klog.V(1).Infof("deviceName:%s", data.DeviceName) + // Check if the current ordered set exists + deviceData := "TimeStamp: " + strconv.FormatInt(data.TimeStamp, 10) + " PropertyName: " + data.PropertyName + " data: " + data.Value + // Add data to ordered set. If the ordered set does not exist, it will be created. + _, err := RedisCli.ZAdd(ctx, data.DeviceName, &redis.Z{ + Score: float64(data.TimeStamp), + Member: deviceData, + }).Result() + if err != nil { + klog.V(4).Info("Exit AddData") + return err + } + return nil +} + +func (d *DataBaseConfig) GetDataByDeviceName(deviceName string) ([]*common.DataModel, error) { + ctx := context.Background() + + dataJSON, err := RedisCli.ZRevRange(ctx, deviceName, 0, -1).Result() + if err != nil { + klog.V(4).Infof("fail query data for deviceName,err:%v", err) + } + + var dataModels []*common.DataModel + + for _, jsonStr := range dataJSON { + var data common.DataModel + if err := json.Unmarshal([]byte(jsonStr), &data); err != nil { + klog.V(4).Infof("Error unMarshaling data: %v\n", err) + continue + } + + dataModels = append(dataModels, &data) + } + return dataModels, nil +} + +func (d *DataBaseConfig) GetPropertyDataByDeviceName(deviceName string, propertyData string) ([]*common.DataModel, error) { + //TODO implement me + return nil, errors.New("implement me") +} + +func (d *DataBaseConfig) GetDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) { + //TODO implement me + return nil, errors.New("implement me") +} + +func (d *DataBaseConfig) DeleteDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) { + //TODO implement me + return nil, errors.New("implement me") +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/redis/handler.go b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/redis/handler.go new file mode 100644 index 00000000..383fc052 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/redis/handler.go @@ -0,0 +1,74 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package redis + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mqtt/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.RedisClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init redis database client err: %v", err) + return + } + reportCycle := time.Millisecond * time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("redis database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSession() + return + } + } + }() + +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/tdengine/client.go b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/tdengine/client.go new file mode 100644 index 00000000..3ab157cc --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/tdengine/client.go @@ -0,0 +1,156 @@ +package tdengine + +import ( + "database/sql" + "encoding/json" + "errors" + "fmt" + "os" + "strings" + "time" + + _ "github.com/taosdata/driver-go/v3/taosRestful" + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" +) + +var ( + DB *sql.DB +) + +type DataBaseConfig struct { + TDEngineClientConfig *TDEngineClientConfig `json:"config,omitempty"` +} +type TDEngineClientConfig struct { + Addr string `json:"addr,omitempty"` + DBName string `json:"dbName,omitempty"` +} + +func NewDataBaseClient(config json.RawMessage) (*DataBaseConfig, error) { + configdata := new(TDEngineClientConfig) + err := json.Unmarshal(config, configdata) + if err != nil { + return nil, err + } + return &DataBaseConfig{ + TDEngineClientConfig: configdata, + }, nil +} +func (d *DataBaseConfig) InitDbClient() error { + username := os.Getenv("USERNAME") + password := os.Getenv("PASSWORD") + dsn := fmt.Sprintf("%s:%s@http(%s)/%s", username, password, d.TDEngineClientConfig.Addr, d.TDEngineClientConfig.DBName) + var err error + DB, err = sql.Open("taosRestful", dsn) + if err != nil { + klog.Errorf("init TDEngine db fail, err= %v:", err) + } + klog.V(1).Infof("init TDEngine database successfully") + return nil +} + +func (d *DataBaseConfig) CloseSessio() { + err := DB.Close() + if err != nil { + klog.Errorf("close TDEngine failed") + } +} + +func (d *DataBaseConfig) AddData(data *common.DataModel) error { + + legal_table := strings.Replace(data.DeviceName, "-", "_", -1) + legal_tag := strings.Replace(data.PropertyName, "-", "_", -1) + + stable_name := fmt.Sprintf("SHOW STABLES LIKE '%s'", legal_table) + stabel := fmt.Sprintf("CREATE STABLE %s (ts timestamp, devicename binary(64), propertyname binary(64), data binary(64),type binary(64)) TAGS (localtion binary(64));", legal_table) + + datatime := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05") + insertSQL := fmt.Sprintf("INSERT INTO %s USING %s TAGS ('%s') VALUES('%v','%s', '%s', '%s', '%s');", + legal_tag, legal_table, legal_tag, datatime, data.DeviceName, data.PropertyName, data.Value, data.Type) + + rows, _ := DB.Query(stable_name) + defer rows.Close() + + if err := rows.Err(); err != nil { + klog.Errorf("query stable failed:%v", err) + } + + switch rows.Next() { + case false: + _, err := DB.Exec(stabel) + if err != nil { + klog.Errorf("create stable failed %v\n", err) + } + _, err = DB.Exec(insertSQL) + if err != nil { + klog.Errorf("failed add data to TdEngine:%v", err) + } + case true: + _, err := DB.Exec(insertSQL) + if err != nil { + klog.Errorf("failed add data to TdEngine:%v", err) + } + default: + klog.Infoln("failed add data to TdEngine") + } + + return nil +} +func (d *DataBaseConfig) GetDataByDeviceName(deviceName string) ([]*common.DataModel, error) { + querySql := fmt.Sprintf("SELECT ts, devicename, propertyname, data, type FROM %s", deviceName) + rows, err := DB.Query(querySql) + if err != nil { + return nil, err + } + defer rows.Close() + var dataModel []*common.DataModel + for rows.Next() { + var data common.DataModel + var ts time.Time + err := rows.Scan(&ts, &data.DeviceName, &data.PropertyName, &data.Value, &data.Type) + if err != nil { + klog.Errorf(" data scan error: %v\n", err) + //fmt.Printf("scan error:\n", err) + return nil, err + } + data.TimeStamp = ts.Unix() + dataModel = append(dataModel, &data) + } + return dataModel, nil +} +func (d *DataBaseConfig) GetPropertyDataByDeviceName(deviceName string, propertyData string) ([]*common.DataModel, error) { + //TODO implement me + panic("implement me") +} +func (d *DataBaseConfig) GetDataByTimeRange(deviceName string, start int64, end int64) ([]*common.DataModel, error) { + + legal_table := strings.Replace(deviceName, "-", "_", -1) + startTime := time.Unix(start, 0).UTC().Format("2006-01-02 15:04:05") + endTime := time.Unix(end, 0).UTC().Format("2006-01-02 15:04:05") + //Query data within a specified time range + querySQL := fmt.Sprintf("SELECT ts, devicename, propertyname, data, type FROM %s WHERE ts >= '%s' AND ts <= '%s'", legal_table, startTime, endTime) + fmt.Println(querySQL) + rows, err := DB.Query(querySQL) + if err != nil { + return nil, err + } + defer rows.Close() + + var dataModels []*common.DataModel + for rows.Next() { + var data common.DataModel + var ts time.Time + err := rows.Scan(&ts, &data.DeviceName, &data.PropertyName, &data.Value, &data.Type) + if err != nil { + klog.V(4).Infof("data scan failed:%v", err) + continue + } + dataModels = append(dataModels, &data) + } + return dataModels, nil +} +func (d *DataBaseConfig) DeleteDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) { + //TODO implement me + return nil, errors.New("implement me") +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/tdengine/handler.go b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/tdengine/handler.go new file mode 100644 index 00000000..9385082e --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/dbmethod/tdengine/handler.go @@ -0,0 +1,74 @@ +/* +Copyright 2023 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tdengine + +import ( + "context" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mqtt/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func DataHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + dbConfig, err := NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.TDEngineClientConfig) + if err != nil { + klog.Errorf("new database client error: %v", err) + return + } + err = dbConfig.InitDbClient() + if err != nil { + klog.Errorf("init database client err: %v", err) + return + } + reportCycle := time.Millisecond * time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + + err = dbConfig.AddData(dataModel) + if err != nil { + klog.Errorf("tdengine database add data error: %v", err) + return + } + case <-ctx.Done(): + dbConfig.CloseSessio() + return + } + } + }() + +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/publish/http/client.go b/mappers/device-v1beta1/mqtt-mapper/data/publish/http/client.go new file mode 100644 index 00000000..b0491219 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/publish/http/client.go @@ -0,0 +1,73 @@ +package http + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/global" +) + +type PushMethod struct { + HTTP *HTTPConfig `json:"http"` +} + +type HTTPConfig struct { + HostName string `json:"hostName,omitempty"` + Port int `json:"port,omitempty"` + RequestPath string `json:"requestPath,omitempty"` + Timeout int `json:"timeout,omitempty"` +} + +func NewDataPanel(config json.RawMessage) (global.DataPanel, error) { + httpConfig := new(HTTPConfig) + err := json.Unmarshal(config, httpConfig) + if err != nil { + return nil, err + } + return &PushMethod{ + HTTP: httpConfig, + }, nil +} + +func (pm *PushMethod) InitPushMethod() error { + klog.V(1).Info("Init HTTP") + return nil +} + +func (pm *PushMethod) Push(data *common.DataModel) { + klog.V(2).Info("Publish device data by HTTP") + + targetUrl := pm.HTTP.HostName + ":" + strconv.Itoa(pm.HTTP.Port) + pm.HTTP.RequestPath + payload := data.PropertyName + "=" + data.Value + formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05") + currentTime := "&time" + "=" + formatTimeStr + payload += currentTime + + klog.V(3).Infof("Publish %v to %s", payload, targetUrl) + + resp, err := http.Post(targetUrl, + "application/x-www-form-urlencoded", + strings.NewReader(payload)) + + if err != nil { + fmt.Println(err) + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + // handle error + klog.Errorf("Publish device data by HTTP failed, err = %v", err) + return + } + klog.V(1).Info("############### Message published. ###############") + klog.V(3).Infof("HTTP reviced %s", string(body)) + +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/publish/mqtt/client.go b/mappers/device-v1beta1/mqtt-mapper/data/publish/mqtt/client.go new file mode 100644 index 00000000..522e38fa --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/publish/mqtt/client.go @@ -0,0 +1,63 @@ +package mqtt + +import ( + "encoding/json" + "fmt" + "os" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "k8s.io/klog/v2" + + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/global" +) + +type PushMethod struct { + MQTT *MQTTConfig `json:"http"` +} + +type MQTTConfig struct { + Address string `json:"address,omitempty"` + Topic string `json:"topic,omitempty"` + QoS int `json:"qos,omitempty"` + Retained bool `json:"retained,omitempty"` +} + +func NewDataPanel(config json.RawMessage) (global.DataPanel, error) { + mqttConfig := new(MQTTConfig) + err := json.Unmarshal(config, mqttConfig) + if err != nil { + return nil, err + } + return &PushMethod{ + MQTT: mqttConfig, + }, nil +} + +func (pm *PushMethod) InitPushMethod() error { + klog.V(1).Info("Init MQTT") + return nil +} + +func (pm *PushMethod) Push(data *common.DataModel) { + klog.V(1).Infof("Publish %v to %s on topic: %s, Qos: %d, Retained: %v", + data.Value, pm.MQTT.Address, pm.MQTT.Topic, pm.MQTT.QoS, pm.MQTT.Retained) + + opts := mqtt.NewClientOptions().AddBroker(pm.MQTT.Address) + client := mqtt.NewClient(opts) + + if token := client.Connect(); token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + os.Exit(1) + } + formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05") + str_time := "time is " + formatTimeStr + " " + str_publish := str_time + pm.MQTT.Topic + ": " + data.Value + + token := client.Publish(pm.MQTT.Topic, byte(pm.MQTT.QoS), pm.MQTT.Retained, str_publish) + token.Wait() + + client.Disconnect(250) + klog.V(2).Info("############### Message published. ###############") +} diff --git a/mappers/device-v1beta1/mqtt-mapper/data/stream/handler.go b/mappers/device-v1beta1/mqtt-mapper/data/stream/handler.go new file mode 100644 index 00000000..b92e3698 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/data/stream/handler.go @@ -0,0 +1,33 @@ +/* +Copyright 2024 The KubeEdge Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stream + +import ( + "errors" + + "github.com/kubeedge/mqtt/driver" + "github.com/kubeedge/mapper-framework/pkg/common" +) + +type StreamConfig struct { + Format string `json:"format"` + OutputDir string `json:"outputDir"` + FrameCount int `json:"frameCount"` + FrameInterval int `json:"frameInterval"` + VideoNum int `json:"videoNum"` +} + +func StreamHandler(twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig) error { + return errors.New("need to add the stream flag when make generate if you want to enable stream data processing.") +} diff --git a/mappers/device-v1beta1/mqtt-mapper/device/device.go b/mappers/device-v1beta1/mqtt-mapper/device/device.go new file mode 100644 index 00000000..455af06d --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/device/device.go @@ -0,0 +1,493 @@ +package device + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "os/signal" + "strings" + "sync" + "time" + + "k8s.io/klog/v2" + + dbInflux "github.com/kubeedge/mqtt/data/dbmethod/influxdb2" + dbMysql "github.com/kubeedge/mqtt/data/dbmethod/mysql" + dbRedis "github.com/kubeedge/mqtt/data/dbmethod/redis" + dbTdengine "github.com/kubeedge/mqtt/data/dbmethod/tdengine" + httpMethod "github.com/kubeedge/mqtt/data/publish/http" + mqttMethod "github.com/kubeedge/mqtt/data/publish/mqtt" + "github.com/kubeedge/mqtt/data/stream" + "github.com/kubeedge/mqtt/driver" + dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1beta1" + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/global" + "github.com/kubeedge/mapper-framework/pkg/util/parse" +) + +type DevPanel struct { + deviceMuxs map[string]context.CancelFunc + devices map[string]*driver.CustomizedDev + models map[string]common.DeviceModel + wg sync.WaitGroup + serviceMutex sync.Mutex + quitChan chan os.Signal +} + +var ( + devPanel *DevPanel + once sync.Once +) + +var ErrEmptyData = errors.New("device or device model list is empty") + +// NewDevPanel init and return devPanel +func NewDevPanel() *DevPanel { + once.Do(func() { + devPanel = &DevPanel{ + deviceMuxs: make(map[string]context.CancelFunc), + devices: make(map[string]*driver.CustomizedDev), + models: make(map[string]common.DeviceModel), + wg: sync.WaitGroup{}, + serviceMutex: sync.Mutex{}, + quitChan: make(chan os.Signal), + } + }) + return devPanel +} + +// DevStart start all devices. +func (d *DevPanel) DevStart() { + for id, dev := range d.devices { + klog.V(4).Info("Dev: ", id, dev) + ctx, cancel := context.WithCancel(context.Background()) + d.deviceMuxs[id] = cancel + d.wg.Add(1) + go d.start(ctx, dev) + } + signal.Notify(d.quitChan, os.Interrupt) + go func() { + <-d.quitChan + for id, device := range d.devices { + err := device.CustomizedClient.StopDevice() + if err != nil { + klog.Errorf("Service has stopped but failed to stop %s:%v", id, err) + } + } + klog.V(1).Info("Exit mapper") + os.Exit(1) + }() + d.wg.Wait() +} + +// start the device +func (d *DevPanel) start(ctx context.Context, dev *driver.CustomizedDev) { + defer d.wg.Done() + + var protocolConfig driver.ProtocolConfig + if err := json.Unmarshal(dev.Instance.PProtocol.ConfigData, &protocolConfig); err != nil { + klog.Errorf("Unmarshal ProtocolConfigs error: %v", err) + return + } + client, err := driver.NewClient(protocolConfig) + if err != nil { + klog.Errorf("Init dev %s error: %v", dev.Instance.Name, err) + return + } + dev.CustomizedClient = client + err = dev.CustomizedClient.InitDevice() + if err != nil { + klog.Errorf("Init device %s error: %v", dev.Instance.ID, err) + return + } + go dataHandler(ctx, dev) + <-ctx.Done() +} + +// dataHandler initialize the timer to handle data plane and devicetwin. +func dataHandler(ctx context.Context, dev *driver.CustomizedDev) { + for _, twin := range dev.Instance.Twins { + twin.Property.PProperty.DataType = strings.ToLower(twin.Property.PProperty.DataType) + var visitorConfig driver.VisitorConfig + + err := json.Unmarshal(twin.Property.Visitors, &visitorConfig) + visitorConfig.VisitorConfigData.DataType = strings.ToLower(visitorConfig.VisitorConfigData.DataType) + if err != nil { + klog.Errorf("Unmarshal VisitorConfig error: %v", err) + continue + } + err = setVisitor(&visitorConfig, &twin, dev) + if err != nil { + klog.Error(err) + continue + } + + // If the device property type is streaming, it will directly enter the streaming data processing function, + // such as saving frames or saving videos, and will no longer push it to the user database and application. + // If there are other needs for stream data processing, users can add functions in the mapper/data/stream directory. + if twin.Property.PProperty.DataType == "stream" { + err = stream.StreamHandler(&twin, dev.CustomizedClient, &visitorConfig) + if err != nil { + klog.Errorf("processed streaming data by %s Error: %v", twin.PropertyName, err) + } + continue + } + + // handle twin + twinData := &TwinData{ + DeviceName: dev.Instance.Name, + DeviceNamespace: dev.Instance.Namespace, + Client: dev.CustomizedClient, + Name: twin.PropertyName, + Type: twin.ObservedDesired.Metadata.Type, + ObservedDesired: twin.ObservedDesired, + VisitorConfig: &visitorConfig, + Topic: fmt.Sprintf(common.TopicTwinUpdate, dev.Instance.ID), + CollectCycle: time.Millisecond * time.Duration(twin.Property.CollectCycle), + ReportToCloud: twin.Property.ReportToCloud, + } + go twinData.Run(ctx) + + //handle status + getStates := &DeviceStates{Client: dev.CustomizedClient, DeviceName: dev.Instance.Name, + DeviceNamespace: dev.Instance.Namespace} + go getStates.Run(ctx) + + // handle push method + if twin.Property.PushMethod.MethodConfig != nil && twin.Property.PushMethod.MethodName != "" { + dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, dev.Instance.Namespace, common.WithType(twin.ObservedDesired.Metadata.Type)) + pushHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + } + // handle database + if twin.Property.PushMethod.DBMethod.DBMethodName != "" { + dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, dev.Instance.Namespace, common.WithType(twin.ObservedDesired.Metadata.Type)) + dbHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + switch twin.Property.PushMethod.DBMethod.DBMethodName { + // TODO add more database + case "influx": + dbInflux.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "redis": + dbRedis.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "tdengine": + dbTdengine.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + case "mysql": + dbMysql.DataHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel) + } + } + } +} + +// pushHandler start data panel work +func pushHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + var dataPanel global.DataPanel + var err error + // initialization dataPanel + switch twin.Property.PushMethod.MethodName { + case "http": + dataPanel, err = httpMethod.NewDataPanel(twin.Property.PushMethod.MethodConfig) + case "mqtt": + dataPanel, err = mqttMethod.NewDataPanel(twin.Property.PushMethod.MethodConfig) + default: + err = errors.New("custom protocols are not currently supported when push data") + } + if err != nil { + klog.Errorf("new data panel error: %v", err) + return + } + // initialization PushMethod + err = dataPanel.InitPushMethod() + if err != nil { + klog.Errorf("init publish method err: %v", err) + return + } + reportCycle := time.Millisecond * time.Duration(twin.Property.ReportCycle) + if reportCycle == 0 { + reportCycle = common.DefaultReportCycle + } + ticker := time.NewTicker(reportCycle) + go func() { + for { + select { + case <-ticker.C: + deviceData, err := client.GetDeviceData(visitorConfig) + if err != nil { + klog.Errorf("publish error: %v", err) + continue + } + sData, err := common.ConvertToString(deviceData) + if err != nil { + klog.Errorf("Failed to convert publish method data : %v", err) + continue + } + dataModel.SetValue(sData) + dataModel.SetTimeStamp() + dataPanel.Push(dataModel) + case <-ctx.Done(): + return + } + } + }() +} + +// dbHandler start db client to save data +func dbHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) { + switch twin.Property.PushMethod.DBMethod.DBMethodName { + // TODO add more database + case "influx": + dbInflux.DataHandler(ctx, twin, client, visitorConfig, dataModel) + + case "redis": + dbRedis.DataHandler(ctx, twin, client, visitorConfig, dataModel) + + case "tdengine": + dbTdengine.DataHandler(ctx, twin, client, visitorConfig, dataModel) + + case "mysql": + dbMysql.DataHandler(ctx, twin, client, visitorConfig, dataModel) + } +} + +// setVisitor check if visitor property is readonly, if not then set it. +func setVisitor(visitorConfig *driver.VisitorConfig, twin *common.Twin, dev *driver.CustomizedDev) error { + if twin.Property.PProperty.AccessMode == "ReadOnly" { + klog.V(3).Infof("%s twin readonly property: %s", dev.Instance.Name, twin.PropertyName) + return nil + } + klog.V(2).Infof("Convert type: %s, value: %s ", twin.Property.PProperty.DataType, twin.ObservedDesired.Value) + value, err := common.Convert(twin.Property.PProperty.DataType, twin.ObservedDesired.Value) + if err != nil { + klog.Errorf("Failed to convert value as %s : %v", twin.Property.PProperty.DataType, err) + return err + } + err = dev.CustomizedClient.SetDeviceData(value, visitorConfig) + if err != nil { + return fmt.Errorf("%s set device data error: %v", twin.PropertyName, err) + } + return nil +} + +// DevInit initialize the device +func (d *DevPanel) DevInit(deviceList []*dmiapi.Device, deviceModelList []*dmiapi.DeviceModel) error { + if len(deviceList) == 0 || len(deviceModelList) == 0 { + return ErrEmptyData + } + + for i := range deviceModelList { + model := deviceModelList[i] + cur := parse.GetDeviceModelFromGrpc(model) + d.models[model.Name] = cur + } + + for i := range deviceList { + device := deviceList[i] + commonModel := d.models[device.Spec.DeviceModelReference] + protocol, err := parse.BuildProtocolFromGrpc(device) + if err != nil { + return err + } + instance, err := parse.GetDeviceFromGrpc(device, &commonModel) + if err != nil { + return err + } + instance.PProtocol = protocol + + cur := new(driver.CustomizedDev) + cur.Instance = *instance + d.devices[instance.ID] = cur + } + + return nil +} + +// UpdateDev stop old device, then update and start new device +func (d *DevPanel) UpdateDev(model *common.DeviceModel, device *common.DeviceInstance) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + + if oldDevice, ok := d.devices[device.ID]; ok { + err := d.stopDev(oldDevice, device.ID) + if err != nil { + klog.Error(err) + } + } + // start new device + d.devices[device.ID] = new(driver.CustomizedDev) + d.devices[device.ID].Instance = *device + d.models[model.ID] = *model + + ctx, cancelFunc := context.WithCancel(context.Background()) + d.deviceMuxs[device.ID] = cancelFunc + d.wg.Add(1) + go d.start(ctx, d.devices[device.ID]) +} + +// UpdateDevTwins update device's twins +func (d *DevPanel) UpdateDevTwins(deviceID string, twins []common.Twin) error { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return fmt.Errorf("device %s not found", deviceID) + } + dev.Instance.Twins = twins + model := d.models[dev.Instance.Model] + d.UpdateDev(&model, &dev.Instance) + + return nil +} + +// DealDeviceTwinGet get device's twin data +func (d *DevPanel) DealDeviceTwinGet(deviceID string, twinName string) (interface{}, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return nil, fmt.Errorf("not found device %s", deviceID) + } + var res []parse.TwinResultResponse + for _, twin := range dev.Instance.Twins { + if twinName != "" && twin.PropertyName != twinName { + continue + } + payload, err := getTwinData(deviceID, twin, d.devices[deviceID]) + if err != nil { + return nil, err + } + item := parse.TwinResultResponse{ + PropertyName: twinName, + Payload: payload, + } + res = append(res, item) + } + return json.Marshal(res) +} + +// getTwinData get twin +func getTwinData(deviceID string, twin common.Twin, dev *driver.CustomizedDev) ([]byte, error) { + var visitorConfig driver.VisitorConfig + err := json.Unmarshal(twin.Property.Visitors, &visitorConfig) + if err != nil { + return nil, err + } + err = setVisitor(&visitorConfig, &twin, dev) + if err != nil { + return nil, err + } + twinData := &TwinData{ + DeviceName: deviceID, + Client: dev.CustomizedClient, + Name: twin.PropertyName, + Type: twin.ObservedDesired.Metadata.Type, + VisitorConfig: &visitorConfig, + Topic: fmt.Sprintf(common.TopicTwinUpdate, deviceID), + } + return twinData.GetPayLoad() +} + +// GetDevice get device instance +func (d *DevPanel) GetDevice(deviceID string) (interface{}, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + found, ok := d.devices[deviceID] + if !ok || found == nil { + return nil, fmt.Errorf("device %s not found", deviceID) + } + + // get the latest reported twin value + for i, twin := range found.Instance.Twins { + payload, err := getTwinData(deviceID, twin, found) + if err != nil { + return nil, err + } + found.Instance.Twins[i].Reported.Value = string(payload) + } + return found, nil +} + +// RemoveDevice remove device instance +func (d *DevPanel) RemoveDevice(deviceID string) error { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev := d.devices[deviceID] + delete(d.devices, deviceID) + err := d.stopDev(dev, deviceID) + if err != nil { + return err + } + return nil +} + +// stopDev stop device and goroutine +func (d *DevPanel) stopDev(dev *driver.CustomizedDev, id string) error { + cancelFunc, ok := d.deviceMuxs[id] + if !ok { + return fmt.Errorf("can not find device %s from device muxs", id) + } + + err := dev.CustomizedClient.StopDevice() + if err != nil { + klog.Errorf("stop device %s error: %v", id, err) + } + cancelFunc() + return nil +} + +// GetModel if the model exists, return device model +func (d *DevPanel) GetModel(modelID string) (common.DeviceModel, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + if model, ok := d.models[modelID]; ok { + return model, nil + } + return common.DeviceModel{}, fmt.Errorf("deviceModel %s not found", modelID) +} + +// UpdateModel update device model +func (d *DevPanel) UpdateModel(model *common.DeviceModel) { + d.serviceMutex.Lock() + d.models[model.ID] = *model + d.serviceMutex.Unlock() +} + +// RemoveModel remove device model +func (d *DevPanel) RemoveModel(modelID string) { + d.serviceMutex.Lock() + delete(d.models, modelID) + d.serviceMutex.Unlock() +} + +// GetTwinResult Get twin's value and data type +func (d *DevPanel) GetTwinResult(deviceID string, twinName string) (string, string, error) { + d.serviceMutex.Lock() + defer d.serviceMutex.Unlock() + dev, ok := d.devices[deviceID] + if !ok { + return "", "", fmt.Errorf("not found device %s", deviceID) + } + var res string + var dataType string + for _, twin := range dev.Instance.Twins { + if twinName != "" && twin.PropertyName != twinName { + continue + } + var visitorConfig driver.VisitorConfig + err := json.Unmarshal(twin.Property.Visitors, &visitorConfig) + if err != nil { + return "", "", err + } + err = setVisitor(&visitorConfig, &twin, dev) + + data, err := dev.CustomizedClient.GetDeviceData(&visitorConfig) + if err != nil { + return "", "", fmt.Errorf("get device data failed: %v", err) + } + res, err = common.ConvertToString(data) + if err != nil { + return "", "", err + } + dataType = twin.Property.PProperty.DataType + } + return res, dataType, nil +} diff --git a/mappers/device-v1beta1/mqtt-mapper/device/devicestatus.go b/mappers/device-v1beta1/mqtt-mapper/device/devicestatus.go new file mode 100644 index 00000000..57d809e9 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/device/devicestatus.go @@ -0,0 +1,69 @@ +/* +Copyright 2024 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package device + +import ( + "context" + "log" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mqtt/driver" + dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1beta1" + "github.com/kubeedge/mapper-framework/pkg/grpcclient" +) + +// DeviceStates is structure for getting device states. +type DeviceStates struct { + Client *driver.CustomizedClient + DeviceName string + DeviceNamespace string +} + +// Run timer function. +func (deviceStates *DeviceStates) PushStatesToEdgeCore() { + states, error := deviceStates.Client.GetDeviceStates() + if error != nil { + klog.Errorf("GetDeviceStates failed: %v", error) + return + } + + statesRequest := &dmiapi.ReportDeviceStatesRequest{ + DeviceName: deviceStates.DeviceName, + State: states, + DeviceNamespace: deviceStates.DeviceNamespace, + } + + log.Printf("send statesRequest", statesRequest.DeviceName, statesRequest.State) + if err := grpcclient.ReportDeviceStates(statesRequest); err != nil { + klog.Errorf("fail to report device states of %s with err: %+v", deviceStates.DeviceName, err) + } +} + +func (deviceStates *DeviceStates) Run(ctx context.Context) { + // TODO setting states reportCycle + ticker := time.NewTicker(2 * time.Second) + for { + select { + case <-ticker.C: + deviceStates.PushStatesToEdgeCore() + case <-ctx.Done(): + return + } + } +} diff --git a/mappers/device-v1beta1/mqtt-mapper/device/devicetwin.go b/mappers/device-v1beta1/mqtt-mapper/device/devicetwin.go new file mode 100644 index 00000000..f725ec8b --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/device/devicetwin.go @@ -0,0 +1,108 @@ +package device + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "k8s.io/klog/v2" + + "github.com/kubeedge/mqtt/driver" + dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1beta1" + "github.com/kubeedge/mapper-framework/pkg/common" + "github.com/kubeedge/mapper-framework/pkg/grpcclient" + "github.com/kubeedge/mapper-framework/pkg/util/parse" +) + +type TwinData struct { + DeviceName string + DeviceNamespace string + Client *driver.CustomizedClient + Name string + Type string + ObservedDesired common.TwinProperty + VisitorConfig *driver.VisitorConfig + Topic string + Results interface{} + CollectCycle time.Duration + ReportToCloud bool +} + +func (td *TwinData) GetPayLoad() ([]byte, error) { + var err error + td.VisitorConfig.VisitorConfigData.DataType = strings.ToLower(td.VisitorConfig.VisitorConfigData.DataType) + td.Results, err = td.Client.GetDeviceData(td.VisitorConfig) + if err != nil { + return nil, fmt.Errorf("get device data failed: %v", err) + } + sData, err := common.ConvertToString(td.Results) + if err != nil { + klog.Errorf("Failed to convert %s %s value as string : %v", td.DeviceName, td.Name, err) + return nil, err + } + if len(sData) > 30 { + klog.V(4).Infof("Get %s : %s ,value is %s......", td.DeviceName, td.Name, sData[:30]) + } else { + klog.V(4).Infof("Get %s : %s ,value is %s", td.DeviceName, td.Name, sData) + } + var payload []byte + if strings.Contains(td.Topic, "$hw") { + if payload, err = common.CreateMessageTwinUpdate(td.Name, td.Type, sData, td.ObservedDesired.Value); err != nil { + return nil, fmt.Errorf("create message twin update failed: %v", err) + } + } else { + if payload, err = common.CreateMessageData(td.Name, td.Type, sData); err != nil { + return nil, fmt.Errorf("create message data failed: %v", err) + } + } + return payload, nil +} + +func (td *TwinData) PushToEdgeCore() { + payload, err := td.GetPayLoad() + if err != nil { + klog.Errorf("twindata %s unmarshal failed, err: %s", td.Name, err) + return + } + + var msg common.DeviceTwinUpdate + if err = json.Unmarshal(payload, &msg); err != nil { + klog.Errorf("twindata %s unmarshal failed, err: %s", td.Name, err) + return + } + + twins := parse.ConvMsgTwinToGrpc(msg.Twin) + + var rdsr = &dmiapi.ReportDeviceStatusRequest{ + DeviceName: td.DeviceName, + DeviceNamespace: td.DeviceNamespace, + ReportedDevice: &dmiapi.DeviceStatus{ + Twins: twins, + //State: "OK", + }, + } + + if err := grpcclient.ReportDeviceStatus(rdsr); err != nil { + klog.Errorf("fail to report device status of %s with err: %+v", rdsr.DeviceName, err) + } +} + +func (td *TwinData) Run(ctx context.Context) { + if !td.ReportToCloud { + return + } + if td.CollectCycle == 0 { + td.CollectCycle = common.DefaultCollectCycle + } + ticker := time.NewTicker(td.CollectCycle) + for { + select { + case <-ticker.C: + td.PushToEdgeCore() + case <-ctx.Done(): + return + } + } +} diff --git a/mappers/device-v1beta1/mqtt-mapper/driver/devicetype.go b/mappers/device-v1beta1/mqtt-mapper/driver/devicetype.go new file mode 100644 index 00000000..9925b5d3 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/driver/devicetype.go @@ -0,0 +1,73 @@ +package driver + +import ( + "sync" + "time" + + "github.com/kubeedge/mapper-framework/pkg/common" +) + +// CustomizedDev is the customized device configuration and client information. +type CustomizedDev struct { + CustomizedClient *CustomizedClient + Instance common.DeviceInstance +} + +type CustomizedClient struct { + // TODO add some variables to help you better implement device drivers + deviceMutex sync.Mutex + ProtocolConfig + TempMessage string `json:"tempMessage"` + DeviceConfigData interface{} `json:"deviceConfigData"` +} + +type ProtocolConfig struct { + ProtocolName string `json:"protocolName"` + ConfigData `json:"configData"` +} + +type ConfigData struct { + // MQTT protocol config data + ClientID string `json:"clientID"` // MQTT Client ID + BrokerURL string `json:"brokerURL"` // MQTT Broker URL + Topic string `json:"topic"` // Topic for publishing or subscribing + Message string `json:"message"` // Content of the message + Username string `json:"username"` // Username for MQTT broker authentication + Password string `json:"password"` // Password for MQTT broker authentication + ConnectionTTL time.Duration `json:"connectionTTL"` // Connection timeout duration + LastMessage time.Time `json:"lastMessage"` // Timestamp of the last received message +} + +type VisitorConfig struct { + ProtocolName string `json:"protocolName"` + VisitorConfigData `json:"configData"` +} + +type VisitorConfigData struct { + DataType string `json:"dataType"` + + ClientID string `json:"clientID"` // MQTT Client ID + DeviceInfo string `json:"deviceInfo"` // Device information, such as device identification or other important information. + OperationInfo OperationInfoType `json:"operationInfo"` // Operation information, such as adding, deleting, modifying and so on. + SerializedFormat SerializedFormatType `json:"fileType"` // Supported formats: json, xml and yaml. + ParsedMessage map[string]interface{} `json:"parsedMessage"` // The parsed message +} + +// OperationInfoType defines the enumeration values for device operation. +type OperationInfoType uint + +const ( + DEVICEINfO OperationInfoType = iota // set global device config data + UPDATE // update the device config data + SETSINGLEVALUE // find the most related setting value and update + GETSINGLEVALUE // find the most related setting value +) + +// SerializedFormatType defines the enumeration values for serialized types. +type SerializedFormatType uint + +const ( + JSON SerializedFormatType = iota // json + YAML // yaml + XML // xml +) diff --git a/mappers/device-v1beta1/mqtt-mapper/driver/driver.go b/mappers/device-v1beta1/mqtt-mapper/driver/driver.go new file mode 100644 index 00000000..9187cc7b --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/driver/driver.go @@ -0,0 +1,620 @@ +package driver + +import ( + "encoding/json" + "encoding/xml" + "errors" + "fmt" + "gopkg.in/yaml.v3" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/kubeedge/mapper-framework/pkg/common" +) + +func NewClient(protocol ProtocolConfig) (*CustomizedClient, error) { + client := &CustomizedClient{ + ProtocolConfig: protocol, + deviceMutex: sync.Mutex{}, + TempMessage: "", + DeviceConfigData: nil, + } + return client, nil +} + +func (c *CustomizedClient) InitDevice() error { + configData := &c.ProtocolConfig.ConfigData + _, operationInfo, _, err := configData.SplitTopic() + if operationInfo != DEVICEINfO { + return errors.New("This is not a device config.") + } + if err != nil { + return err + } + c.TempMessage = configData.Message + return nil +} + +func (c *CustomizedClient) GetDeviceData(visitor *VisitorConfig) (interface{}, error) { + configData := &c.ProtocolConfig.ConfigData + _, operationInfo, _, err := configData.SplitTopic() + if operationInfo != DEVICEINfO { + return nil, errors.New("This is not a device config.") + } + if err != nil { + return nil, err + } + visitor.ProcessOperation(c.DeviceConfigData) + return c.DeviceConfigData, nil +} + +func (c *CustomizedClient) SetDeviceData(visitor *VisitorConfig) error { + configData := &c.ProtocolConfig.ConfigData + _, operationInfo, _, err := configData.SplitTopic() + if operationInfo == DEVICEINfO { + return errors.New("This is a device config, not to set device data.") + } + if err != nil { + return err + } + visitor.ProcessOperation(c.DeviceConfigData) + return nil +} + +func (c *CustomizedClient) StopDevice() error { + updateFieldsByTag(c.DeviceConfigData, map[string]interface{}{ + "status": common.DeviceStatusDisCONN, + "Status": common.DeviceStatusDisCONN, + }, "json") + updateFieldsByTag(c.DeviceConfigData, map[string]interface{}{ + "status": common.DeviceStatusDisCONN, + "Status": common.DeviceStatusDisCONN, + }, "yaml") + updateFieldsByTag(c.DeviceConfigData, map[string]interface{}{ + "status": common.DeviceStatusDisCONN, + "Status": common.DeviceStatusDisCONN, + }, "xml") + return nil +} + +func (c *CustomizedClient) GetDeviceStates(visitor *VisitorConfig) (string, error) { + res, err := visitor.getFieldByTag(c.DeviceConfigData) + if err != nil { + return common.DeviceStatusOK, nil + } + return res, nil + +} + +/* --------------------------------------------------------------------------------------- */ +// The function NewConfigData is a constructor for ConfigData to initialize the structure. +// It returns the ConfigData instance and an error value to handle the validity of the passed parameters. +func NewConfigData(clientID, brokerURL, topic, message, username, password string, connectionTTL time.Duration) (*ConfigData, error) { + if clientID == "" { + return nil, errors.New("clientID cannot be empty") + } + if brokerURL == "" { + return nil, errors.New("borkerURL cannot be empty") + } + if topic == "" { + return nil, errors.New("topic cannot be empty") + } + if message == "" { + return nil, errors.New("message cannot be empty") + } + if username == "" { + username = "defaultUser" + } + if password == "" { + password = "defaultPass" + } + if connectionTTL == 0 { + connectionTTL = 30 * time.Second // default timeout of 30 seconds + } + + return &ConfigData{ + ClientID: clientID, + BrokerURL: brokerURL, + Topic: topic, + Message: message, + Username: username, + Password: password, + ConnectionTTL: connectionTTL, + LastMessage: time.Now(), // set last message time to current time + }, nil +} + +// The function GetClientID returns the value of the ClientID field and error. +func (c *ConfigData) GetClientID() (string, error) { + if c.ClientID == "" { + return "", errors.New("clientID is empty") + } + return c.ClientID, nil +} + +// The function GetTopic returns the value of the Topic field and error. +func (c *ConfigData) GetTopic() (string, error) { + if c.Topic == "" { + return "", errors.New("topic is empty") + } + return c.Topic, nil +} + +// GetMessage returns the value of the Message field and error. +func (c *ConfigData) GetMessage() (string, error) { + if c.Message == "" { + return "", errors.New("message is empty") + } + return c.Message, nil +} + +// OperationInfoType and SerializedFormatType mappings +var operationTypeMap = map[string]OperationInfoType{ + "update": UPDATE, + "deviceinfo": DEVICEINfO, + "setsinglevalue" : SETSINGLEVALUE, + "getsinglevalue" : GETSINGLEVALUE, +} + +var serializedFormatMap = map[string]SerializedFormatType{ + "json": JSON, + "yaml": YAML, + "xml": XML, +} + +// The function SplitTopic splits the Topic into three parts and returns each. +// OperationInfoType(fulltextmodify: 0, pathmodify: 1, valuemodify: 2) +// SerializedFormatType(json: 0, yaml: 1, xml: 2) +func (c *ConfigData) SplitTopic() (string, OperationInfoType, SerializedFormatType, error) { + if c.Topic == "" { + return "", 0, 0, errors.New("topic is empty") + } + + parts := strings.Split(c.Topic, "/") + + if len(parts) < 3 { + return "", 0, 0, errors.New("topic format is invalid, must have at least three parts") + } + + deviceInfo := strings.Join(parts[:len(parts)-2], "/") + + // Get operation type from map + operationType, exists := operationTypeMap[parts[len(parts)-2]] + if !exists { + return "", 0, 0, errors.New("invalid operation type") + } + + // Get serialized format from map + serializedFormat, exists := serializedFormatMap[parts[len(parts)-1]] + if !exists { + return "", 0, 0, errors.New("invalid serialized format") + } + + return deviceInfo, operationType, serializedFormat, nil +} + +// The function ParseMessage parses the Message field according to the incoming type. +// parseType(0: json, 1: yaml, 2: xml) +// The value interface{} represents the parsed structure. +func (c *ConfigData) ParseMessage(parseType SerializedFormatType) (map[string]interface{}, error) { + if c.Message == "" { + return nil, errors.New("message is empty") + } + + switch parseType { + case JSON: // json + return c.jsonParse() + + case YAML: // yaml + return c.yamlParse() + + case XML: // xml + return c.xmlParse() + + default: + return nil, errors.New("unsupported parse type") + } +} + +// The function parseJSON parses the Message field of the ConfigData (assumed to be a JSON string). +func (c *ConfigData) jsonParse() (map[string]interface{}, error) { + if c.Message == "" { + return nil, errors.New("message is empty") + } + + var jsonMsg map[string]interface{} + err := json.Unmarshal([]byte(c.Message), &jsonMsg) + if err != nil { + return nil, err + } + return jsonMsg, nil +} + +// The function parseYAML parses the Message field of the ConfigData (assumed to be a YAML string). +func (c *ConfigData)yamlParse() (map[string]interface{}, error) { + if c.Message == "" { + return nil, errors.New("message is empty") + } + + var yamlMsg map[string]interface{} + err := yaml.Unmarshal([]byte(c.Message), &yamlMsg) + if err != nil { + return nil, err + } + return yamlMsg, nil +} + +// The function xmlParse parses the Message field of the ConfigData (assumed to be a XML string). +func (c *ConfigData)xmlParse() (map[string]interface{}, error) { + msg := c.Message + if strings.HasPrefix(msg, "") + if end != -1 { + msg = msg[end+2:] + } + } + + var node Node + err := xml.Unmarshal([]byte(msg), &node) + if err != nil { + return nil, err + } + + xmlMsg := nodeToMap(node) + var mp map[string]interface{} + for _, value := range xmlMsg { + if nestedMap, ok := value.(map[string]interface{}); ok { + mp = nestedMap + break + } + } + return mp, err +} + +// NewVisitorConfig creates a new instance of VisitorConfig using ConfigData pointer and the result of SplitTopic. +func (c *ConfigData) NewVisitorConfig() (*VisitorConfig, error) { + // get ClientID + clientID, err := c.GetClientID() + if err != nil { + return nil, err + } + + // get DeviceInfo, OperationInfo and SerializedFormat + deviceInfo, operationInfo, serializedFormat, err := c.SplitTopic() + if err != nil { + return nil, err + } + + // get ParsedMessage + parsedMessage, err := c.ParseMessage(serializedFormat) + if err != nil { + return nil, err + } + + // create + return &VisitorConfig{ + ProtocolName: "mqtt", + VisitorConfigData: VisitorConfigData{ + DataType: "DefaultDataType", + ClientID: clientID, + DeviceInfo: deviceInfo, + OperationInfo: operationInfo, + SerializedFormat: serializedFormat, + ParsedMessage: parsedMessage, + }, + }, nil +} + +/* --------------------------------------------------------------------------------------- */ +// The function ParseMessage parses the Message field according to the incoming type. +// parseType(0: json, 1: yaml, 2: xml) +// The value interface{} represents the parsed structure. +func (v *VisitorConfig) ProcessOperation(deviceConfigData interface{}) error { + if v.VisitorConfigData.ParsedMessage == nil { + return errors.New("visitor message is empty") + } + + if deviceConfigData == nil { + return errors.New("device message is empty") + } + + switch v.VisitorConfigData.OperationInfo { + case DEVICEINfO: // device config data + v.updateFullConfig(deviceConfigData) + return nil + case UPDATE: // update the full text according the visitor config and the tag (json, yaml, xml) + v.updateFullConfig(deviceConfigData) + return nil + case SETSINGLEVALUE: // update the single value according the visitor config and the tag (json, yaml, xml) + v.updateFieldsByTag(deviceConfigData) + return nil + default: + return errors.New("unsupported operation type") + } +} + +func (v *VisitorConfig) updateFullConfig(destDataConfig interface{}) error { + destValue := reflect.ValueOf(destDataConfig) + if destValue.Kind() != reflect.Ptr || destValue.Elem().Kind() != reflect.Struct { + return errors.New("destDataConfig must be a pointer to a struct") + } + + destValue = destValue.Elem() + + var tagName string + switch v.VisitorConfigData.SerializedFormat { + case JSON: + tagName = "json" + case YAML: + tagName = "yaml" + case XML: + tagName = "xml" + default: + return errors.New("unknown serialized format") + } + + // Update the destination struct using JSON tag + if err := updateStructFields(destValue, v.VisitorConfigData.ParsedMessage, tagName); err != nil { + return err + } + + return nil +} + +func (v *VisitorConfig)updateFieldsByTag(destDataConfig interface{}) error { + vv := reflect.ValueOf(destDataConfig).Elem() + + var tagName string + switch v.VisitorConfigData.SerializedFormat { + case JSON: + tagName = "json" + case YAML: + tagName = "yaml" + case XML: + tagName = "xml" + default: + return errors.New("unknown serialized format") + } + + for key, value := range v.VisitorConfigData.ParsedMessage { + if err := setFieldByTag(vv, key, value, tagName); err != nil { + return err + } + } + return nil +} + +/* --------------------------------------------------------------------------------------- */ +// updateStructFields recursively updates struct fields from the given map using specified tag type +func updateStructFields(structValue reflect.Value, data map[string]interface{}, tagName string) error { + structType := structValue.Type() + + for i := 0; i < structValue.NumField(); i++ { + field := structValue.Field(i) + fieldType := structType.Field(i) + tagValue := fieldType.Tag.Get(tagName) + + var value interface{} + var exists bool + + if tagValue != "" { + // Attempt to get value using tag + value, exists = data[tagValue] + } + + if !exists { + // Fallback to field name if tag is not found + tagValue = fieldType.Name + value, exists = data[tagValue] + } + + if !exists { + continue + } + + // Update the field based on its kind + if field.Kind() == reflect.Struct { + nestedData, ok := value.(map[string]interface{}) + if !ok { + return fmt.Errorf("type mismatch for nested field %s", tagValue) + } + if err := updateStructFields(field, nestedData, tagName); err != nil { + return err + } + } else if field.Kind() == reflect.Slice { + sliceData, ok := value.([]interface{}) + if !ok { + return fmt.Errorf("type mismatch for slice field %s", tagValue) + } + newSlice := reflect.MakeSlice(field.Type(), len(sliceData), len(sliceData)) + for j, item := range sliceData { + itemValue := reflect.ValueOf(item) + if newSlice.Index(j).Kind() == itemValue.Kind() { + newSlice.Index(j).Set(itemValue) + } else { + return fmt.Errorf("type mismatch for slice item in field %s", tagValue) + } + } + field.Set(newSlice) + } else { + fieldValue := reflect.ValueOf(value) + if field.Type() == fieldValue.Type() { + field.Set(fieldValue) + } else { + return fmt.Errorf("type mismatch for field %s", tagValue) + } + } + } + return nil +} + +// Node structure +type Node struct { + XMLName xml.Name + Content string `xml:",chardata"` + Nodes []Node `xml:",any"` + Attr []xml.Attr `xml:"-"` +} + +// convertValue attempts to convert string content to appropriate type. +func convertValue(content string) interface{} { + if f, err := strconv.ParseFloat(content, 64); err == nil { + return f + } else if i, err := strconv.Atoi(content); err == nil { + return i + } else if b, err := strconv.ParseBool(content); err == nil { + return b + } else { + return content + } +} + +// Convert XML attributes to map entries +func attrsToMap(attrs []xml.Attr) map[string]interface{} { + attrMap := make(map[string]interface{}) + for _, attr := range attrs { + attrMap[attr.Name.Local] = attr.Value + } + return attrMap +} + +// The function nodeToMap recursively converts XML nodes to map[string]interface{}. +func nodeToMap(node Node) map[string]interface{} { + xmlMsg := make(map[string]interface{}) + + // Process attributes + if len(node.Attr) > 0 { + xmlMsg["attributes"] = attrsToMap(node.Attr) + } + + // If the node has no children, it is a leaf node, apply type conversion. + if len(node.Nodes) == 0 { + xmlMsg[node.XMLName.Local] = convertValue(strings.TrimSpace(node.Content)) + return xmlMsg + } + + // Process child nodes recursively. + children := make(map[string]interface{}) + for _, child := range node.Nodes { + childMap := nodeToMap(child) + if existing, found := children[child.XMLName.Local]; found { + switch v := existing.(type) { + case []interface{}: + children[child.XMLName.Local] = append(v, childMap[child.XMLName.Local]) + default: + children[child.XMLName.Local] = []interface{}{v, childMap[child.XMLName.Local]} + } + } else { + children[child.XMLName.Local] = childMap[child.XMLName.Local] + } + } + + xmlMsg[node.XMLName.Local] = children + return xmlMsg +} + +func setFieldByTag(v reflect.Value, key string, value interface{}, tagName string) error { + if v.Kind() == reflect.Pointer { + v = v.Elem() + } + for i := 0; i < v.NumField(); i++ { + field := v.Type().Field(i) + fieldVal := v.Field(i) + + if field.Tag.Get(tagName) == key { + val := reflect.ValueOf(value) + if fieldVal.Type() != val.Type() { + return fmt.Errorf("type mismatch: cannot assign %s to %s", val.Type(), fieldVal.Type()) + } + fieldVal.Set(val) + return nil + } + + if fieldVal.Kind() == reflect.Struct { + if err := setFieldByTag(fieldVal, key, value, tagName); err == nil { + return nil + } + } + } + return fmt.Errorf("no such field with tag: %s", key) +} + +// The function MapToJSON converts map[string]interface{} to JSON string. +func mapToJSON(data map[string]interface{}) (string, error) { + jsonData, err := json.Marshal(data) + if err != nil { + return "", err + } + return string(jsonData), nil +} + +func StructToJSON(v interface{}) (string, error) { + jsonData, err := json.MarshalIndent(v, "", " ") + if err != nil { + return "", err + } + return string(jsonData), nil +} + +func updateFieldsByTag(s interface{}, updates map[string]interface{}, tagName string) error { + v := reflect.ValueOf(s).Elem() + for key, value := range updates { + if err := setFieldByTag(v, key, value, tagName); err != nil { + return err + } + } + return nil +} + +func (v * VisitorConfig)getFieldByTag(s interface{}) (string, error) { + vv := reflect.ValueOf(s).Elem() + + var tagName string + switch v.VisitorConfigData.SerializedFormat { + case JSON: + tagName = "json" + case YAML: + tagName = "yaml" + case XML: + tagName = "xml" + default: + return "", errors.New("unknown serialized format") + } + + res, err := findFieldByTag(vv, "status", tagName) + if err != nil { + res, err = findFieldByTag(vv, "Status", tagName) + if err != nil { + return "", err + } else { + return res, nil + } + } else { + return res, nil + } +} + +func findFieldByTag(v reflect.Value, key string, tagName string) (string, error) { + if v.Kind() == reflect.Pointer { + v = v.Elem() + } + for i := 0; i < v.NumField(); i++ { + field := v.Type().Field(i) + fieldVal := v.Field(i) + + if field.Tag.Get(tagName) == key { + return fieldVal.String(), nil + } + + if fieldVal.Kind() == reflect.Struct { + if value, err := findFieldByTag(fieldVal, key, tagName); err == nil { + return value, nil + } + } + } + return "", fmt.Errorf("no such field with tag: %s", key) +} +/* --------------------------------------------------------------------------------------- */ \ No newline at end of file diff --git a/mappers/device-v1beta1/mqtt-mapper/go.mod b/mappers/device-v1beta1/mqtt-mapper/go.mod new file mode 100644 index 00000000..50753052 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/go.mod @@ -0,0 +1,40 @@ +module github.com/kubeedge/mqtt + +go 1.21 + +require ( + github.com/eclipse/paho.mqtt.golang v1.2.0 + github.com/go-redis/redis/v8 v8.11.5 + github.com/go-sql-driver/mysql v1.7.1 + github.com/influxdata/influxdb-client-go/v2 v2.13.0 + github.com/kubeedge/kubeedge v1.18.0 + github.com/kubeedge/mapper-framework v1.17.1-0.20240727071908-23ae39c11809 + github.com/taosdata/driver-go/v3 v3.5.1 + k8s.io/klog/v2 v2.110.1 +) + +require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/avast/retry-go v3.0.0+incompatible // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/mux v1.8.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect + github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.19.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect + google.golang.org/grpc v1.63.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/mappers/device-v1beta1/mqtt-mapper/go.sum b/mappers/device-v1beta1/mqtt-mapper/go.sum new file mode 100644 index 00000000..4b4d05c7 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/go.sum @@ -0,0 +1,101 @@ +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= +github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= +github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/influxdata/influxdb-client-go/v2 v2.13.0 h1:ioBbLmR5NMbAjP4UVA5r9b5xGjpABD7j65pI8kFphDM= +github.com/influxdata/influxdb-client-go/v2 v2.13.0/go.mod h1:k+spCbt9hcvqvUiz0sr5D8LolXHqAAOfPw9v/RIRHl4= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kubeedge/kubeedge v1.18.0 h1:v2RFM9c/iGSkQ0Jd4TIco1GnKFZPmdinI9P7v87VaH8= +github.com/kubeedge/kubeedge v1.18.0/go.mod h1:lw/MuITfLLSzWa5OTrzSckayd0frVuj/wbdGyvPM7xw= +github.com/kubeedge/mapper-framework v1.17.1-0.20240727071908-23ae39c11809 h1:LnToqAGc9C58zP6AW60uq+1eDrE3tUVIlpASmO/0Q94= +github.com/kubeedge/mapper-framework v1.17.1-0.20240727071908-23ae39c11809/go.mod h1:opmle2heQfoWs1HM6TtEqCrSDSQ8pstMCFsEvIl4EFY= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= +github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace h1:9PNP1jnUjRhfmGMlkXHjYPishpcw4jpSt/V/xYY3FMA= +github.com/spf13/pflag v1.0.6-0.20210604193023-d5e0c0615ace/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/taosdata/driver-go/v3 v3.5.1 h1:ln8gLJ6HR6gHU6dodmOa9utUjPUpAcdIplh6arFO26Q= +github.com/taosdata/driver-go/v3 v3.5.1/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= +google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8= +google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= +k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= diff --git a/mappers/device-v1beta1/mqtt-mapper/hack/make-rules/mapper.sh b/mappers/device-v1beta1/mqtt-mapper/hack/make-rules/mapper.sh new file mode 100755 index 00000000..f30afd37 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/hack/make-rules/mapper.sh @@ -0,0 +1,159 @@ +#!/usr/bin/env bash + +set -o errexit +set -o nounset +set -o pipefail + +CURR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd -P)" +ROOT_DIR="$(cd "${CURR_DIR}/../.." && pwd -P)" +source "${ROOT_DIR}/hack/lib/init.sh" + +mkdir -p "${CURR_DIR}/bin" +mkdir -p "${CURR_DIR}/dist" + +function mod() { + [[ "${2:-}" != "only" ]] + local mapper="${1}" + + # the mapper is sharing the vendor with root + pushd "${ROOT_DIR}" >/dev/null || exist 1 + echo "downloading dependencies for mapper ${mapper}..." + + if [[ "$(go env GO111MODULE)" == "off" ]]; then + echo "go mod has been disabled by GO111MODULE=off" + else + echo "tidying" + go mod tidy + echo "vending" + go mod vendor + fi + + echo "...done" + popd >/dev/null || return +} + +function lint() { + [[ "${2:-}" != "only" ]] && mod "$@" + local mapper="${1}" + + echo "fmt and linting mapper ${mapper}..." + + gofmt -s -w "${CURR_DIR}/" + golangci-lint run "${CURR_DIR}/..." + + echo "...done" +} + +function build() { + [[ "${2:-}" != "only" ]] && lint "$@" + local mapper="${1}" + + local flags=" -w -s " + local ext_flags=" -extldflags '-static' " + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + + local platform + if [[ "${ARM:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm" + platform=("linux/arm") + elif [[ "${ARM64:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm64" + platform=("linux/arm64") + else + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + platform=("${os}/${arch}") + fi + + echo "building ${platform}" + + local os_arch + IFS="/" read -r -a os_arch <<<"${platform}" + local os=${os_arch[0]} + local arch=${os_arch[1]} + GOOS=${os} GOARCH=${arch} CGO_ENABLED=0 go build \ + -ldflags "${flags} ${ext_flags}" \ + -o "${CURR_DIR}/bin/${mapper}_${os}_${arch}" \ + "${CURR_DIR}/cmd/main.go" + + cp ${CURR_DIR}/bin/${mapper}_${os}_${arch} ${CURR_DIR}/bin/${mapper} + echo "...done" +} + +function package() { + [[ "${2:-}" != "only" ]] && build "$@" + local mapper="${1}" + + echo "packaging mapper ${mapper}..." + + local image_name="${mapper}-mapper" + local tag=v1.0 + + local platform + if [[ "${ARM:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm" + platform=("linux/arm") + elif [[ "${ARM64:-false}" == "true" ]]; then + echo "crossed packaging for linux/arm64" + platform=("linux/arm64") + else + local os="${OS:-$(go env GOOS)}" + local arch="${ARCH:-$(go env GOARCH)}" + platform=("${os}/${arch}") + fi + + pushd "${CURR_DIR}" >/dev/null 2>&1 + if [[ "${platform}" =~ darwin/* ]]; then + echo "package into Darwin OS image is unavailable, please use CROSS=true env to containerize multiple arch images or use OS=linux ARCH=amd64 env to containerize linux/amd64 image" + fi + + local image_tag="${image_name}:${tag}-${platform////-}" + echo "packaging ${image_tag}" + sudo docker build \ + --platform "${platform}" \ + -t "${image_tag}" . + popd >/dev/null 2>&1 + + echo "...done" +} + +function clean() { + local mapper="${1}" + + echo "cleanup mapper ${mapper}..." + + rm -rf "${CURR_DIR}/bin/*" + + echo "...done" +} + +function entry() { + local mapper="${1:-}" + shift 1 + + local stages="${1:-build}" + shift $(($# > 0 ? 1 : 0)) + + IFS="," read -r -a stages <<<"${stages}" + local commands=$* + if [[ ${#stages[@]} -ne 1 ]]; then + commands="only" + fi + + for stage in "${stages[@]}"; do + echo "# make mapper ${mapper} ${stage} ${commands}" + case ${stage} in + m | mod) mod "${mapper}" "${commands}" ;; + l | lint) lint "${mapper}" "${commands}" ;; + b | build) build "${mapper}" "${commands}" ;; + p | pkg | package) package "${mapper}" "${commands}" ;; + t | test) test "${mapper}" "${commands}" ;; + c | clean) clean "${mapper}" "${commands}" ;; + *) echo "unknown action '${stage}', select from mod,lint,build,test,clean" ;; + esac + done +} + +echo $@ +entry "$@" \ No newline at end of file diff --git a/mappers/device-v1beta1/mqtt-mapper/resource/configmap.yaml b/mappers/device-v1beta1/mqtt-mapper/resource/configmap.yaml new file mode 100644 index 00000000..a74aa1d1 --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/resource/configmap.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: cm-mapper +data: + configData: | + grpc_server: + socket_path: /etc/kubeedge/mqtt.sock + common: + name: Mqtt-mapper + version: v1.13.0 + api_version: v1.0.0 + protocol: # TODO add your protocol name + address: 127.0.0.1 + edgecore_sock: /etc/kubeedge/dmi.sock diff --git a/mappers/device-v1beta1/mqtt-mapper/resource/deployment.yaml b/mappers/device-v1beta1/mqtt-mapper/resource/deployment.yaml new file mode 100644 index 00000000..1531245e --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/resource/deployment.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mapper-test + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: demo + template: + metadata: + labels: + app: demo + spec: + nodeName: # replace with your edge node name + containers: + - name: demo + volumeMounts: # Required, mapper need to communicate with grpcclient and get the config + - name: test-volume + mountPath: /etc/kubeedge + - name: config + mountPath: /tmp + env: # Not Required, this field is used to mount the user database key + - name: TOKEN + valueFrom: + secretKeyRef: + name: mysecret + key: token + image: # Replace with your mapper image name + imagePullPolicy: IfNotPresent + resources: + limits: + cpu: 300m + memory: 500Mi + requests: + cpu: 100m + memory: 100Mi + command: [ "/bin/sh","-c" ] + args: [ "/kubeedge/main --config-file /tmp/config.yaml --v 4" ] + volumes: + - name: test-volume + hostPath: + path: /etc/kubeedge + type: Directory + - name: config + configMap: + name: cm-mapper + items: + - key: configData + path: config.yaml diff --git a/mappers/device-v1beta1/mqtt-mapper/resource/mqttdevice-instance.yaml b/mappers/device-v1beta1/mqtt-mapper/resource/mqttdevice-instance.yaml new file mode 100644 index 00000000..8bd91b8e --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/resource/mqttdevice-instance.yaml @@ -0,0 +1,49 @@ +apiVersion: devices.kubeedge.io/v1beta1 + kind: Device + metadata: + name: beta1-device + spec: + deviceModelRef: + name: temperture-model + nodeName: k8s-worker1 + properties: + - name: temperature + collectCycle: 10000000000 # The frequency of reporting data to the cloud, once every 10 seconds + reportCycle: 10000000000 # The frequency of data push to user applications or databases, once every 10 seconds + reportToCloud: true + desired: + value: "30" + pushMethod: + mqtt: + address: tcp://101.133.150.110:1883 + topic: temperture/update/json + qos: 0 + retained: false + dbMethod: + influxdb2: + influxdb2ClientConfig: + url: http://127.0.0.1:8086 + org: test-org + bucket: test-bucket + influxdb2DataConfig: + measurement: temperture_stats + tag: + unit: temperature + fieldKey: temperture_value + visitors: + protocolName: mqtt + configData: + topic: "sensor/data" + qos: 1 + retain: false + clientId: "temperture_client" + username: "user" + password: "pass" + cleanSession: true + keepAlive: 60 + + protocol: + protocolName: mqtt + configData: + ip: 101.133.150.110 + port: 1883 \ No newline at end of file diff --git a/mappers/device-v1beta1/mqtt-mapper/resource/mqttdevice-model.yaml b/mappers/device-v1beta1/mqtt-mapper/resource/mqttdevice-model.yaml new file mode 100644 index 00000000..5a601acf --- /dev/null +++ b/mappers/device-v1beta1/mqtt-mapper/resource/mqttdevice-model.yaml @@ -0,0 +1,15 @@ + apiVersion: devices.kubeedge.io/v1beta1 + kind: DeviceModel + metadata: + name: temperture-model + namespace: default + spec: + properties: + - name: temperture + description: Temperture sensor model + type: INT + accessMode: ReadWrite + maximum: "100" + minimum: "1" + unit: "Celsius" + protocol: mqtt \ No newline at end of file