Skip to content

Commit

Permalink
Experimental support for accessing ziti components over the mgmt/ctrl…
Browse files Browse the repository at this point in the history
… channels. Fixes #2439
  • Loading branch information
plorenz committed Sep 25, 2024
1 parent 3b56d73 commit b953349
Show file tree
Hide file tree
Showing 34 changed files with 3,281 additions and 641 deletions.
177 changes: 177 additions & 0 deletions common/datapipe/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datapipe

import (
"fmt"
"github.com/gliderlabs/ssh"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/identity"
gossh "golang.org/x/crypto/ssh"
"os"
"path"
"strconv"
"strings"
)

type LocalAccessType string

const (
LocalAccessTypeNone LocalAccessType = ""
LocalAccessTypePort LocalAccessType = "local-port"
LocalAccessTypeEmbeddedSshServer LocalAccessType = "embedded-ssh-server"
)

type Config struct {
Enabled bool
LocalAccessType LocalAccessType // values: 'none', 'localhost:port', 'embedded'
DestinationPort uint16
AuthorizedKeysFile string
HostKey gossh.Signer
ShellPath string
}

func (self *Config) IsLocalAccessAllowed() bool {
return self.Enabled && self.LocalAccessType != LocalAccessTypeNone
}

func (self *Config) IsLocalPort() bool {
return self.LocalAccessType == LocalAccessTypePort
}

func (self *Config) IsEmbedded() bool {
return self.LocalAccessType == LocalAccessTypeEmbeddedSshServer
}

func (self *Config) LoadConfig(m map[interface{}]interface{}) error {
log := pfxlog.Logger()
if v, ok := m["enabled"]; ok {
if enabled, ok := v.(bool); ok {
self.Enabled = enabled
} else {
self.Enabled = strings.EqualFold("true", fmt.Sprintf("%v", v))
}
}
if v, ok := m["enableExperimentalFeature"]; ok {
if enabled, ok := v.(bool); ok {
if !enabled {
self.Enabled = false
}
} else if !strings.EqualFold("true", fmt.Sprintf("%v", v)) {
self.Enabled = false
}
} else {
self.Enabled = false
}

if self.Enabled {
log.Infof("mgmt.pipe enabled")
if v, ok := m["destination"]; ok {
if destination, ok := v.(string); ok {
if strings.HasPrefix(destination, "127.0.0.1:") {
self.LocalAccessType = LocalAccessTypePort
portStr := strings.TrimPrefix(destination, "127.0.0.1:")
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
log.WithError(err).Warn("mgmt.pipe is enabled, but destination not valid. Must be '127.0.0.1:<port>' or 'embedded'")
self.Enabled = false
return nil
}
self.DestinationPort = uint16(port)
} else if destination == "embedded-ssh-server" {
self.LocalAccessType = LocalAccessTypeEmbeddedSshServer

if v, ok = m["authorizedKeysFile"]; ok {
if keysFile, ok := v.(string); ok {
self.AuthorizedKeysFile = keysFile
} else {
log.Warnf("mgmt.pipe is enabled, but 'embedded' destination configured and authorizedKeysFile configuration is not type string, but %T", v)
self.Enabled = false
return nil
}
}

if v, ok = m["shell"]; ok {
if s, ok := v.(string); ok {
self.ShellPath = s
} else {
log.Warnf("mgmt.pipe is enabled, but 'embedded' destination configured and shell configuration is not type string, but %T", v)
}
}
} else {
log.Warn("mgmt.pipe is enabled, but destination not valid. Must be 'localhost:port' or 'embedded'")
self.Enabled = false
return nil
}
}
} else {
self.Enabled = false
log.Warn("mgmt.pipe is enabled, but destination not specified. mgmt.pipe disabled.")
return nil
}
} else {
log.Infof("mgmt.pipe disabled")
}
return nil
}

func (self *Config) NewSshRequestHandler(identity *identity.TokenId) (*SshRequestHandler, error) {
if self.HostKey == nil {
signer, err := gossh.NewSignerFromKey(identity.Cert().PrivateKey)
if err != nil {
return nil, err
}
self.HostKey = signer
}

keysFile := self.AuthorizedKeysFile
if keysFile == "" {
homeDir, err := os.UserHomeDir()
if err != nil {
return nil, fmt.Errorf("could not set up ssh request handler, failing get home dir, trying to load default authorized keys (%w)", err)
}
keysFile = path.Join(homeDir, ".ssh", "authorized_keys")
}

keysFileContents, err := os.ReadFile(keysFile)
if err != nil {
return nil, fmt.Errorf("could not set up ssh request handler, failed to load authorized keys from '%s' (%w)", keysFile, err)
}

authorizedKeys := map[string]struct{}{}
entryIdx := 0
for len(keysFileContents) > 0 {
pubKey, _, _, rest, err := gossh.ParseAuthorizedKey(keysFileContents)
if err != nil {
return nil, fmt.Errorf("could not set up ssh request handler, failed to load authorized key at index %d from '%s' (%w)", entryIdx, keysFile, err)
}

authorizedKeys[string(pubKey.Marshal())] = struct{}{}
keysFileContents = rest
entryIdx++
}

publicKeyOption := ssh.PublicKeyAuth(func(ctx ssh.Context, key ssh.PublicKey) bool {
_, found := authorizedKeys[string(key.Marshal())]
return found
})

return &SshRequestHandler{
config: self,
options: []ssh.Option{publicKeyOption},
}, nil
}
133 changes: 133 additions & 0 deletions common/datapipe/embedded_ssh_server_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package datapipe

import (
"errors"
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v3"
"github.com/openziti/foundation/v2/concurrenz"
"io"
"net"
"sync/atomic"
"time"
)

type MessageTypes struct {
DataMessageType int32
PipeIdHeaderType int32
CloseMessageType int32
}

func NewEmbeddedSshConn(ch channel.Channel, id uint32, msgTypes *MessageTypes) *EmbeddedSshConn {
return &EmbeddedSshConn{
id: id,
ch: ch,
ReadAdapter: channel.NewReadAdapter(fmt.Sprintf("pipe-%d", id), 4),
msgTypes: msgTypes,
}
}

type EmbeddedSshConn struct {
msgTypes *MessageTypes
id uint32
ch channel.Channel
closed atomic.Bool
*channel.ReadAdapter
sshConn concurrenz.AtomicValue[io.Closer]
deadline atomic.Pointer[time.Time]
}

func (self *EmbeddedSshConn) Id() uint32 {
return self.id
}

func (self *EmbeddedSshConn) SetSshConn(conn io.Closer) {
self.sshConn.Store(conn)
}

func (self *EmbeddedSshConn) WriteToServer(data []byte) error {
return self.ReadAdapter.PushData(data)
}

func (self *EmbeddedSshConn) Write(data []byte) (n int, err error) {
msg := channel.NewMessage(self.msgTypes.DataMessageType, data)
msg.PutUint32Header(self.msgTypes.PipeIdHeaderType, self.id)
deadline := time.Second
if val := self.deadline.Load(); val != nil && !val.IsZero() {
deadline = time.Until(*val)
}
return len(data), msg.WithTimeout(deadline).SendAndWaitForWire(self.ch)
}

func (self *EmbeddedSshConn) Close() error {
self.CloseWithErr(errors.New("close called"))
return nil
}

func (self *EmbeddedSshConn) CloseWithErr(err error) {
if self.closed.CompareAndSwap(false, true) {
self.ReadAdapter.Close()
log := pfxlog.ContextLogger(self.ch.Label()).WithField("connId", self.id)

log.WithError(err).Info("closing mgmt pipe connection")

if sshConn := self.sshConn.Load(); sshConn != nil {
if closeErr := sshConn.Close(); closeErr != nil {
log.WithError(closeErr).Error("failed closing mgmt pipe embedded ssh connection")
}
}

if !self.ch.IsClosed() && err != io.EOF && err != nil {
msg := channel.NewMessage(self.msgTypes.CloseMessageType, []byte(err.Error()))
msg.PutUint32Header(self.msgTypes.PipeIdHeaderType, self.id)
if sendErr := self.ch.Send(msg); sendErr != nil {
log.WithError(sendErr).Error("failed sending mgmt pipe close message")
}
}

if closeErr := self.ch.Close(); closeErr != nil {
log.WithError(closeErr).Error("failed closing mgmt pipe client channel")
}
}
}

func (self *EmbeddedSshConn) LocalAddr() net.Addr {
return embeddedSshPipeAddr{
id: self.id,
}
}

func (self *EmbeddedSshConn) RemoteAddr() net.Addr {
return embeddedSshPipeAddr{
id: self.id,
}
}

func (self *EmbeddedSshConn) SetDeadline(t time.Time) error {
if err := self.ReadAdapter.SetReadDeadline(t); err != nil {
return err
}
return self.SetWriteDeadline(t)
}

func (self *EmbeddedSshConn) SetWriteDeadline(t time.Time) error {
self.deadline.Store(&t)
return nil
}

func (self *EmbeddedSshConn) WriteToClient(data []byte) error {
_, err := self.Write(data)
return err
}

type embeddedSshPipeAddr struct {
id uint32
}

func (self embeddedSshPipeAddr) Network() string {
return "ziti"
}

func (self embeddedSshPipeAddr) String() string {
return fmt.Sprintf("ssh-pipe-%d", self.id)
}
87 changes: 87 additions & 0 deletions common/datapipe/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datapipe

import (
"errors"
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
"sync"
"sync/atomic"
)

type Pipe interface {
Id() uint32
WriteToServer(data []byte) error
WriteToClient(data []byte) error
CloseWithErr(err error)
}

func NewRegistry(config *Config) *Registry {
return &Registry{
config: config,
}
}

type Registry struct {
lock sync.Mutex
nextId atomic.Uint32
connections concurrenz.CopyOnWriteMap[uint32, Pipe]
config *Config
}

func (self *Registry) GetConfig() *Config {
return self.config
}

func (self *Registry) GetNextId() (uint32, error) {
self.lock.Lock()
defer self.lock.Unlock()

limit := 0
for {
nextId := self.nextId.Add(1)
if val := self.connections.Get(nextId); val == nil {
return nextId, nil
}
if limit++; limit >= 1000 {
return 0, errors.New("pipe pool in bad state, bailing out after 1000 attempts to get next id")
}
}
}

func (self *Registry) Register(pipe Pipe) error {
self.lock.Lock()
defer self.lock.Unlock()

if self.connections.Get(pipe.Id()) != nil {
pfxlog.Logger().Errorf("pipe already registered (id=%d)", pipe.Id())
return fmt.Errorf("pipe already registered")
}

self.connections.Put(pipe.Id(), pipe)
return nil
}

func (self *Registry) Unregister(id uint32) {
self.connections.Delete(id)
}

func (self *Registry) Get(id uint32) Pipe {
return self.connections.Get(id)
}
Loading

0 comments on commit b953349

Please sign in to comment.