Skip to content

Commit

Permalink
storage
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunsk committed Feb 24, 2024
1 parent ca53e76 commit 64c36a8
Show file tree
Hide file tree
Showing 19 changed files with 974 additions and 16 deletions.
96 changes: 83 additions & 13 deletions cmd/group1/b_storageserv/a_app/b_storage_mgr.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package app

import (
"fmt"
"github.com/golang/glog"
config "junodb_lite/cmd/group1/b_storageserv/b_config"
initmgr "junodb_lite/pkg/e_initmgr"
"net"
"os"
)

type (
Manager struct {
CmdStorageCommon
Expand All @@ -9,55 +18,116 @@ type (
}
)

func (s *Manager) GetName() string {
func (c *Manager) GetName() string {
//TODO implement me
panic("implement me")
}

func (s *Manager) GetDesc() string {
func (c *Manager) GetDesc() string {
//TODO implement me
panic("implement me")
}

func (s *Manager) GetSynopsis() string {
func (c *Manager) GetSynopsis() string {
//TODO implement me
panic("implement me")
}

func (s *Manager) GetDetails() string {
func (c *Manager) GetDetails() string {
//TODO implement me
panic("implement me")
}

func (s *Manager) GetOptionDesc() string {
func (c *Manager) GetOptionDesc() string {
//TODO implement me
panic("implement me")
}

func (s *Manager) GetExample() string {
func (c *Manager) GetExample() string {
//TODO implement me
panic("implement me")
}

func (s *Manager) AddExample(cmdExample string, desc string) {
func (c *Manager) AddExample(cmdExample string, desc string) {
//TODO implement me
panic("implement me")
}

func (s *Manager) AddDetails(txt string) {
func (c *Manager) AddDetails(txt string) {
//TODO implement me
panic("implement me")
}

func (s *Manager) Exec() {
//TODO implement me
panic("implement me")
func (c *Manager) Exec() {
initmgr.Register(config.Initializer, c.optConfigFile)
initmgr.Init() //initalize config first as others depend on it

cfg := config.ServerConfig()

var connInfo []ConnectInfo
if c.optIpAddress == "" {
for row := 0; row < len(cfg.ClusterInfo.ConnInfo); row++ {
for col := 0; col < len(cfg.ClusterInfo.ConnInfo[row]); col++ {
ipport := cfg.ClusterInfo.ConnInfo[row][col]
if ip, _, err := net.SplitHostPort(ipport); err == nil {
{
// for K8s storage pod initialization check in GKE
k8sPodName, ok := os.LookupEnv("POD_NAME")
if ok {
k8sPodFqdn := k8sPodName
k8sPodDomain, ok := os.LookupEnv("POD_DOMAIN")
if ok {
k8sPodFqdn = k8sPodName + "." + k8sPodDomain
}
if ip == k8sPodFqdn {
connInfo = append(connInfo, ConnectInfo{Listener: ipport, ZoneId: row, MachineIndex: col})
} else {
glog.Errorf("[K8s] ConnInfo Ip of pod (%c) doesn't match pod fqdn (%c)", ip, k8sPodFqdn)
}
}
}
} else {
glog.Errorf("wrong connect info string %c [%d][%d]", ipport, row, col)
}

}
}
} else {
for row := 0; row < len(cfg.ClusterInfo.ConnInfo); row++ {
for col := 0; col < len(cfg.ClusterInfo.ConnInfo[row]); col++ {
ipport := cfg.ClusterInfo.ConnInfo[row][col]
if ip, _, err := net.SplitHostPort(ipport); err == nil {
if ip == c.optIpAddress {
connInfo = append(connInfo, ConnectInfo{Listener: ipport, ZoneId: row, MachineIndex: col})
}
} else {
glog.Errorf("wrong connect info string %c [%d][%d]", ipport, row, col)
}
}
}
}
if len(connInfo) == 0 {
glog.Errorf("No cluster info found for this host, exit")
return
}

initmgr.Init()

// Parent process
var cmdArgs []string

if c.optConfigFile != "" {
cmdArgs = append(cmdArgs, fmt.Sprintf("-config=%c", c.optConfigFile))
}
servermgr := NewServerManager(len(connInfo), cfg.PidFileName, os.Args[0], cmdArgs, connInfo,
cfg.HttpMonAddr, int(8080), cfg.CloudEnabled)
servermgr.Run()
}

func (s *Manager) PrintUsage() {
func (c *Manager) PrintUsage() {
//TODO implement me
panic("implement me")
}

func (s *Manager) Init(name string, desc string) {
func (c *Manager) Init(name string, desc string) {
}
181 changes: 181 additions & 0 deletions cmd/group1/b_storageserv/a_app/b_storage_mgr_ext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package app

import (
"errors"
"fmt"
"github.com/golang/glog"
"io/fs"
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"syscall"
)

type (
ConnectInfo struct {
Listener string
ZoneId int
MachineIndex int
}
ChildInfo struct {
Id int
Cmd *exec.Cmd
}
ServerManager struct {
//monitoring *httpMonitoringT

connectInfo []ConnectInfo
pidFileName string
cmdPath string
cmdArgs []string
numChildren int
pidMap map[int]ChildInfo
doneCh chan struct{}
deadCh chan int
stopping bool
procCreateItvlBase int
procCreateItvlMax int
lruCacheSizeInMB int
dbScanPort int
cloudEnabled bool
}
)

func NewServerManager(num int, pidFileName string, path string, args []string,
connInfo []ConnectInfo, httpMonAddr string, dbScanPort int, cloudEnabled bool) *ServerManager {
s := &ServerManager{
connectInfo: connInfo,
pidFileName: pidFileName,
cmdPath: path,
cmdArgs: args,
numChildren: num,
pidMap: make(map[int]ChildInfo),
doneCh: make(chan struct{}),
deadCh: make(chan int, num),
stopping: false,
procCreateItvlBase: 100,
procCreateItvlMax: 20000,
// For rocksdb. min(10% * mem / num_of_db_instances, 3072)
//lruCacheSizeInMB: int(math.Min(float64(util.GetTotalMemMB()/(10*num)), 3072)),
dbScanPort: dbScanPort,
cloudEnabled: cloudEnabled,
}
if len(httpMonAddr) != 0 { ///TODO validate addr?
for i := 0; i < s.numChildren; i++ {
if ln, err := net.Listen("tcp", "127.0.0.1:0"); err == nil {
//s.monitoring.listeners[i] = ln
//s.monitoring.lsnrFiles[i], _ = ln.(*net.TCPListener).File()
glog.Infof("monitoring listener (%s) created for worker %d", ln.Addr(), i)

} else {
glog.Errorf("fail to create listeners for monitoring")
}
}
}

return s
}

func (s *ServerManager) Run() {
pidFile := s.pidFileName

if data, err := os.ReadFile(pidFile); err == nil {
if pid, err := strconv.Atoi(strings.TrimSpace(string(data))); err == nil {
if process, err := os.FindProcess(pid); err == nil {
if err := process.Signal(syscall.Signal(0)); err == nil {
fmt.Fprintf(os.Stderr, "process pid: %d in %s is still running\n", pid, pidFile)
///TODO check if it is storageserv process
os.Exit(-1)
}
}
}
}

if s.dbScanPort > 0 {
cmdPath := fmt.Sprintf("%s/%s", filepath.Dir(s.cmdPath), "dbscanserv")
_, err := os.Stat(cmdPath)
if errors.Is(err, fs.ErrNotExist) {
glog.Exitf("missing executable file: dbscanserv.")
}
}

os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0644)
defer os.Remove(pidFile)
//defer shmstats.Finalize()

//if err := shmstats.InitForManager(s.numChildren); err != nil {
// glog.Error(err.Error())
// return
//}

//s.handleSignals()

hostName, err := os.Hostname()
spawn := true
if err == nil && s.cloudEnabled {
shutdownList := fmt.Sprintf(" %s ", os.Getenv("SHUTDOWN_LIST"))
name := fmt.Sprintf(" %s ", hostName)
glog.Infof("host=%s shutdownList=%s", hostName, shutdownList)
if strings.Contains(shutdownList, " all ") || strings.Contains(shutdownList, name) {
glog.Infof("Skip starting workers on:%s", shutdownList)
spawn = false
}
}
if spawn {
s.spawnChildren()
}

Loop:
for {
select {
case pid := <-s.deadCh:
s.handleDeadChild(pid)
case <-s.doneCh:
//s.shutdown()
break Loop
}
}
}
func (s *ServerManager) spawnChildren() {
for i := 0; i < s.numChildren; i++ {
s.spawnOneChild(i)
}
if s.dbScanPort > 0 {
s.spawnDbScanChild()
}
}

func (s *ServerManager) spawnDbScanChild() {

cmdPath := fmt.Sprintf("%s/%s", filepath.Dir(s.cmdPath), "dbscanserv")

var argConfig string
for _, val := range s.cmdArgs {
if strings.Index(val, "-config") == 0 || strings.Index(val, "-c") == 0 {
argConfig = val
break
}
}
glog.Infof("%s %s", s.cmdPath, argConfig)
cmd := exec.Command(cmdPath, argConfig)

cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

err := cmd.Start()
if err != nil {
glog.Fatalf("Failed to launch child process, error: %v", err)
}

// save the cmd for later
s.pidMap[cmd.Process.Pid] = ChildInfo{-2, cmd}

}
func (s *ServerManager) spawnOneChild(i int) {

}
func (s *ServerManager) handleDeadChild(pid int) {
}
Loading

0 comments on commit 64c36a8

Please sign in to comment.