Skip to content

Commit

Permalink
[Exporter] Improving reliability of Emit function
Browse files Browse the repository at this point in the history
I found in the large-scale testing that sometimes we don't handle emitting of the same
resource reliably, and this may lead to generation of duplicate resources (very small
amount, but still) - found this in a very specific case when notebooks were listed without
directories.

This PR fixes this problem:

- by tracking if resource is already in importing queue
- detecting duplicates during code generation

It also may improve performance a bit (2-3%).
  • Loading branch information
alexott committed Oct 28, 2024
1 parent 8b2a735 commit 5d34165
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 23 deletions.
25 changes: 15 additions & 10 deletions exporter/codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,22 +904,27 @@ func (ic *importContext) handleResourceWrite(generatedFile string, ch dataWriteC
return
}

//
newResources := make(map[string]struct{}, 100)
log.Printf("[DEBUG] started processing new writes for %s", generatedFile)
for f := range ch {
if f != nil {
log.Printf("[DEBUG] started writing resource body for %s", f.BlockName)
_, err = tf.WriteString(f.ResourceBody)
if err == nil {
newResources[f.BlockName] = struct{}{}
if f.ImportCommand != "" {
ic.waitGroup.Add(1)
importChan <- f.ImportCommand
// check if we have the same blockname already written. To avoid duplicates
_, exists := newResources[f.BlockName]
if !exists {
log.Printf("[DEBUG] started writing resource body for %s", f.BlockName)
_, err = tf.WriteString(f.ResourceBody)
if err == nil {
newResources[f.BlockName] = struct{}{}
if f.ImportCommand != "" {
ic.waitGroup.Add(1)
importChan <- f.ImportCommand
}
log.Printf("[DEBUG] finished writing resource body for %s", f.BlockName)
} else {
log.Printf("[ERROR] Error when writing to %s: %v", generatedFile, err)
}
log.Printf("[DEBUG] finished writing resource body for %s", f.BlockName)
} else {
log.Printf("[ERROR] Error when writing to %s: %v", generatedFile, err)
log.Printf("[WARN] Found duplicate resource: '%s'", f.BlockName)
}
} else {
log.Print("[WARN] got nil as resourceWriteData!")
Expand Down
41 changes: 28 additions & 13 deletions exporter/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ var goroutinesNumber = map[string]int{
"databricks_sql_dashboard": 3,
"databricks_sql_widget": 4,
"databricks_sql_visualization": 4,
"databricks_query": 4,
"databricks_query": 6,
"databricks_alert": 2,
"databricks_permissions": 11,
}
Expand Down Expand Up @@ -615,17 +615,20 @@ func (ic *importContext) HasInState(r *resource, onlyAdded bool) bool {
return ic.State.Has(r)
}

func (ic *importContext) setImportingState(s string, state bool) {
ic.importingMutex.Lock()
defer ic.importingMutex.Unlock()
ic.importing[s] = state
}

func (ic *importContext) Add(r *resource) {
if ic.HasInState(r, true) { // resource must exist and already marked as added
return
}
ic.setImportingState(r.String(), true) // mark resource as added
rString := r.String()
ic.importingMutex.Lock()
_, ok := ic.importing[rString]
if ok {
ic.importingMutex.Unlock()
log.Printf("[DEBUG] %s already being added", rString)
return
}
ic.importing[rString] = true // mark resource as added
ic.importingMutex.Unlock()
state := r.Data.State()
if state == nil {
log.Printf("[ERROR] state is nil for %s", r)
Expand All @@ -648,7 +651,6 @@ func (ic *importContext) Add(r *resource) {
Instances: []instanceApproximation{inst},
Resource: r,
})
// in single-threaded scenario scope is toposorted
ic.Scope.Append(r)
}

Expand Down Expand Up @@ -727,14 +729,25 @@ func (ic *importContext) Emit(r *resource) {
log.Printf("[DEBUG] %s already imported", r)
return
}
rString := r.String()
if ic.testEmits != nil {
log.Printf("[INFO] %s is emitted in test mode", r)
ic.testEmitsMutex.Lock()
ic.testEmits[r.String()] = true
ic.testEmits[rString] = true
ic.testEmitsMutex.Unlock()
return
}
ic.setImportingState(r.String(), false) // we're starting to add a new resource
// we need to check that we're not importing the same resource twice - this may happen under high concurrency
// for specific resources, for example, directories when they aren't part of the listing
ic.importingMutex.Lock()
res, ok := ic.importing[rString]
if ok {
ic.importingMutex.Unlock()
log.Printf("[DEBUG] %s already being imported: %v", rString, res)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

Sensitive data returned by an access to Ai21labsApiKey
flows to a logging call.
Sensitive data returned by an access to AnthropicApiKey
flows to a logging call.
Sensitive data returned by an access to CohereApiKey
flows to a logging call.
Sensitive data returned by an access to OpenaiApiKey
flows to a logging call.
Sensitive data returned by an access to PalmApiKey
flows to a logging call.
return
}
ic.importing[rString] = false // // we're starting to add a new resource
ic.importingMutex.Unlock()
_, ok = ic.Resources[r.Resource]
if !ok {
log.Printf("[ERROR] %s is not available in provider", r)
Expand All @@ -745,8 +758,10 @@ func (ic *importContext) Emit(r *resource) {
log.Printf("[DEBUG] %s (%s service) is not part of the account level export", r.Resource, ir.Service)
return
}
// TODO: add similar condition for checking workspace-level objects only. After new ACLs import is merged

if !ic.accountLevel && !ir.WorkspaceLevel {
log.Printf("[DEBUG] %s (%s service) is not part of the workspace level export", r.Resource, ir.Service)
return
}
// from here, it should be done by the goroutine... send resource into the channel
ch, exists := ic.channels[r.Resource]
if exists {
Expand Down

0 comments on commit 5d34165

Please sign in to comment.