Skip to content

Commit 403fa71

Browse files
authored
TransL Bulk API enhancement (#364)
Modified transl's Bulk processing API to support new translib Bulk API
1 parent 558cda6 commit 403fa71

File tree

1 file changed

+82
-87
lines changed

1 file changed

+82
-87
lines changed

transl_utils/transl_utils.go

+82-87
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,18 @@ var (
2323
Writer *syslog.Writer
2424
)
2525

26+
var (
27+
transLibOpMap map[int]string
28+
)
29+
30+
func init() {
31+
transLibOpMap = map[int]string{
32+
translib.REPLACE: "REPLACE",
33+
translib.UPDATE: "UPDATE",
34+
translib.DELETE: "DELETE",
35+
}
36+
}
37+
2638
func __log_audit_msg(ctx context.Context, reqType string, uriPath string, err error) {
2739
var err1 error
2840
username := "invalid"
@@ -293,120 +305,103 @@ func TranslProcessUpdate(prefix *gnmipb.Path, entry *gnmipb.Update, ctx context.
293305
return nil
294306
}
295307

308+
// TranslProcessBulk - Process Bulk Set request
296309
func TranslProcessBulk(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update, prefix *gnmipb.Path, ctx context.Context) error {
297-
var br translib.BulkRequest
298-
var uri string
299-
300-
var deleteUri []string
301-
var replaceUri []string
302-
var updateUri []string
303310

304-
rc, ctx := common_utils.GetContext(ctx)
305-
log.V(2).Info("TranslProcessBulk Called")
306-
var nver translib.Version
311+
var uri string
307312
var err error
313+
var payload []byte
314+
var resp translib.BulkResponse
315+
var errors []string
316+
rc, ctx := common_utils.GetContext(ctx)
317+
br := translib.BulkRequest{}
318+
319+
//set ClientVersion
308320
if rc.BundleVersion != nil {
309-
nver, err = translib.NewVersion(*rc.BundleVersion)
321+
nver, err := translib.NewVersion(*rc.BundleVersion)
310322
if err != nil {
311-
log.V(2).Infof("Bundle Version Check failed with error =%v", err.Error())
323+
log.V(2).Infof("Bulk Set operation failed with error =%v", err.Error())
312324
return err
313325
}
326+
br.ClientVersion = nver
327+
}
328+
//set User roles
329+
br.User = translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}
330+
331+
//set Auth setting
332+
if rc.Auth.AuthEnabled {
333+
br.AuthEnabled = true
314334
}
335+
log.V(2).Info("TranslProcessBulk Called")
315336
for _, d := range delete {
316-
if uri, err = ConvertToURI(prefix, d); err != nil {
337+
fullPath := GnmiTranslFullPath(prefix, d)
338+
if uri, err = ConvertToURI(nil, fullPath); err != nil {
317339
return err
318340
}
319-
req := translib.SetRequest{
320-
Path: uri,
321-
User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles},
322-
}
323-
if rc.BundleVersion != nil {
324-
req.ClientVersion = nver
325-
}
326-
if rc.Auth.AuthEnabled {
327-
req.AuthEnabled = true
328-
}
329-
br.DeleteRequest = append(br.DeleteRequest, req)
330-
deleteUri = append(deleteUri, uri)
341+
342+
bulkReqEntry := translib.BulkRequestEntry{}
343+
bulkReqEntry.Entry = translib.SetRequest{
344+
Path: uri,
345+
Payload: nil}
346+
bulkReqEntry.Operation = translib.DELETE
347+
br.Request = append(br.Request, bulkReqEntry)
331348
}
349+
332350
for _, r := range replace {
333-
if uri, err = ConvertToURI(prefix, r.GetPath()); err != nil {
351+
uri, err = ConvertToURI(prefix, r.GetPath())
352+
if err != nil {
334353
return err
335354
}
336-
payload := r.GetVal().GetJsonIetfVal()
337-
req := translib.SetRequest{
338-
Path: uri,
339-
Payload: payload,
340-
User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles},
341-
}
342-
if rc.BundleVersion != nil {
343-
req.ClientVersion = nver
344-
}
345-
if rc.Auth.AuthEnabled {
346-
req.AuthEnabled = true
355+
switch v := r.GetVal().GetValue().(type) {
356+
case *gnmipb.TypedValue_JsonIetfVal:
357+
payload = v.JsonIetfVal
358+
default:
359+
return status.Errorf(codes.InvalidArgument, "unsupported value type %T for path %s", v, uri)
347360
}
348-
br.ReplaceRequest = append(br.ReplaceRequest, req)
349-
replaceUri = append(replaceUri, uri)
361+
log.V(5).Infof("Replace path = '%s', payload = %s", uri, payload)
362+
bulkReqEntry := translib.BulkRequestEntry{}
363+
bulkReqEntry.Entry = translib.SetRequest{
364+
Path: uri,
365+
Payload: payload}
366+
bulkReqEntry.Operation = translib.REPLACE
367+
br.Request = append(br.Request, bulkReqEntry)
350368
}
351369
for _, u := range update {
352-
if uri, err = ConvertToURI(prefix, u.GetPath()); err != nil {
370+
uri, err = ConvertToURI(prefix, u.GetPath())
371+
if err != nil {
353372
return err
354373
}
355-
payload := u.GetVal().GetJsonIetfVal()
356-
req := translib.SetRequest{
357-
Path: uri,
358-
Payload: payload,
359-
User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles},
360-
}
361-
if rc.BundleVersion != nil {
362-
req.ClientVersion = nver
363-
}
364-
if rc.Auth.AuthEnabled {
365-
req.AuthEnabled = true
374+
switch v := u.GetVal().GetValue().(type) {
375+
case *gnmipb.TypedValue_JsonIetfVal:
376+
payload = v.JsonIetfVal
377+
default:
378+
return status.Errorf(codes.InvalidArgument, "unsupported value type %T for path %s", v, uri)
366379
}
367-
br.UpdateRequest = append(br.UpdateRequest, req)
368-
updateUri = append(updateUri, uri)
380+
log.V(5).Infof("Update path = '%s', payload = %s", uri, payload)
381+
bulkReqEntry := translib.BulkRequestEntry{}
382+
bulkReqEntry.Entry = translib.SetRequest{
383+
Path: uri,
384+
Payload: payload}
385+
bulkReqEntry.Operation = translib.UPDATE
386+
br.Request = append(br.Request, bulkReqEntry)
369387
}
370388

371-
resp, err := translib.Bulk(br)
389+
resp, err = translib.Bulk(br)
372390

373-
i := 0
374-
for _, d := range resp.DeleteResponse {
375-
__log_audit_msg(ctx, "DELETE", deleteUri[i], d.Err)
376-
i++
377-
}
378-
i = 0
379-
for _, r := range resp.ReplaceResponse {
380-
__log_audit_msg(ctx, "REPLACE", replaceUri[i], r.Err)
381-
i++
391+
for k := range resp.Response {
392+
__log_audit_msg(ctx, transLibOpMap[resp.Response[k].Operation], br.Request[k].Entry.Path, resp.Response[k].Entry.Err)
393+
if resp.Response[k].Entry.Err != nil {
394+
log.Warningf("%s=%v", resp.Response[k].Entry.Err.Error(), resp.Response[k].Entry.ErrSrc)
395+
errors = append(errors, resp.Response[k].Entry.Err.Error())
396+
}
382397
}
383-
i = 0
384-
for _, u := range resp.UpdateResponse {
385-
__log_audit_msg(ctx, "UPDATE", updateUri[i], u.Err)
386-
i++
398+
399+
if err != nil && len(errors) == 0 { //Global error
400+
log.Errorf("Bulk Operation failed with Error: %v", err.Error())
401+
errors = append(errors, err.Error())
387402
}
388403

389-
var errors []string
390-
if err != nil {
391-
log.V(2).Info("BULK SET operation failed with error(s):")
392-
for _, d := range resp.DeleteResponse {
393-
if d.Err != nil {
394-
log.V(2).Infof("%s=%v", d.Err.Error(), d.ErrSrc)
395-
errors = append(errors, d.Err.Error())
396-
}
397-
}
398-
for _, r := range resp.ReplaceResponse {
399-
if r.Err != nil {
400-
log.V(2).Infof("%s=%v", r.Err.Error(), r.ErrSrc)
401-
errors = append(errors, r.Err.Error())
402-
}
403-
}
404-
for _, u := range resp.UpdateResponse {
405-
if u.Err != nil {
406-
log.V(2).Infof("%s=%v", u.Err.Error(), u.ErrSrc)
407-
errors = append(errors, u.Err.Error())
408-
}
409-
}
404+
if len(errors) > 0 {
410405
return fmt.Errorf("SET failed: %s", strings.Join(errors, "; "))
411406
}
412407

0 commit comments

Comments
 (0)