@@ -281,6 +281,296 @@ printADLSMessage <- function(fileName, functionName, message, error = NULL) {
281
281
print(msg )
282
282
}
283
283
284
+ # ADLS Ingress - AdlFileOutputStream ----
285
+
286
+ # ' Create an adlFileOutputStream.
287
+ # ' Create a container (`adlFileOutputStream`) for holding variables used by the Azure Data Lake Store data functions.
288
+ # '
289
+ # ' @inheritParams setAzureContext
290
+ # ' @param accountName the account name
291
+ # ' @param relativePath Relative path of a file/directory
292
+ # ' @param verbose Print tracing information (default FALSE).
293
+ # ' @return An `adlFileOutputStream` object
294
+ # '
295
+ # ' @family Azure Data Lake Store functions
296
+ adls.fileoutputstream.create <- function (azureActiveContext , accountName , relativePath , verbose = FALSE ) {
297
+ azEnv <- new.env(parent = emptyenv())
298
+ azEnv <- as.adlFileOutputStream(azEnv )
299
+ list2env(
300
+ list (azureActiveContext = " " , accountName = " " , relativePath = " " ),
301
+ envir = azEnv
302
+ )
303
+ if (! missing(azureActiveContext )) azEnv $ azureActiveContext <- azureActiveContext
304
+ if (! missing(accountName )) azEnv $ accountName <- accountName
305
+ if (! missing(relativePath )) azEnv $ relativePath <- relativePath
306
+ azEnv $ leaseId <- uuid()
307
+ azEnv $ blockSize <- getAzureDataLakeDefaultBufferSize()
308
+ azEnv $ buffer <- raw(0 )
309
+ # cursors/indices/offsets in R should start from 1 and NOT 0.
310
+ # Because of this there are many adjustments that need to be done throughout the code!
311
+ azEnv $ cursor <- 1L
312
+ res <- adls.file.info(azureActiveContext , accountName , relativePath , verbose )
313
+ azEnv $ remoteCursor <- as.integer(res $ FileStatus.length ) # this remote cursor starts from 0
314
+ azEnv $ streamClosed <- FALSE
315
+ azEnv $ lastFlushUpdatedMetadata <- FALSE
316
+
317
+ # additional param required to implement bad offset handling
318
+ azEnv $ numRetries <- 0
319
+
320
+ return (azEnv )
321
+ }
322
+
323
+ adls.fileoutputstream.addtobuffer <- function (adlFileOutputStream , contents , off , len ) {
324
+ bufferlen <- getContentSize(adlFileOutputStream $ buffer )
325
+ cursor <- adlFileOutputStream $ cursor
326
+ if (len > bufferlen - (cursor - 1 )) { # if requesting to copy more than remaining space in buffer
327
+ stop(" IllegalArgumentException: invalid buffer copy requested in adls.fileoutputstream.addtobuffer" )
328
+ }
329
+ # optimized arraycopy
330
+ adlFileOutputStream $ buffer [cursor : (cursor + len - 1 )] <- contents [off : (off + len - 1 )]
331
+ adlFileOutputStream $ cursor <- as.integer(cursor + len )
332
+ }
333
+
334
+ adls.fileoutputstream.dozerolengthappend <- function (adlFileOutputStream , azureDataLakeAccount , relativePath , offset , verbose = FALSE ) {
335
+ resHttp <- adls.append.core(adlFileOutputStream $ azureActiveContext , adlFileOutputStream ,
336
+ azureDataLakeAccount , relativePath ,
337
+ 4194304L , contents = raw(0 ), contentSize = 0L ,
338
+ leaseId = adlFileOutputStream $ leaseId , sessionId = adlFileOutputStream $ leaseId ,
339
+ syncFlag = syncFlagEnum $ METADATA , offsetToAppendTo = 0 , verbose = verbose )
340
+ stopWithAzureError(resHttp )
341
+ # retrun a NULL (void)
342
+ return (TRUE )
343
+ }
344
+
345
+ # ' The Core Append API.
346
+ # '
347
+ # ' @inheritParams setAzureContext
348
+ # ' @param azureDataLakeAccount Name of the Azure Data Lake account.
349
+ # ' @param adlFileOutputStream The adlFileOutputStream object to operate with.
350
+ # ' @param relativePath Relative path of a file.
351
+ # ' @param bufferSize Size of the buffer to be used.
352
+ # ' @param contents raw contents to be written to the file.
353
+ # ' @param contentSize size of `contents` to be written to the file.
354
+ # ' @param leaseId a String containing the lease ID (generated by client). Can be null.
355
+ # ' @param sessionId a String containing the session ID (generated by client). Can be null.
356
+ # ' @param syncFlag
357
+ # ' Use `DATA` when writing more bytes to same file path. Most performant operation.
358
+ # ' Use `METADATA` when metadata for the
359
+ # ' file also needs to be updated especially file length
360
+ # ' retrieved from `adls.file.info` or `adls.ls` API call.
361
+ # ' Has an overhead of updating metadata operation.
362
+ # ' Use `CLOSE` when no more data is
363
+ # ' expected to be written in this path. Adl backend would
364
+ # ' update metadata, close the stream handle and
365
+ # ' release the lease on the
366
+ # ' path if valid leaseId is passed.
367
+ # ' Expensive operation and should be used only when last
368
+ # ' bytes are written.
369
+ # ' @param offsetToAppendTo offset at which to append to to file.
370
+ # ' To let the server choose offset, pass `-1`.
371
+ # ' @param verbose Print tracing information (default FALSE).
372
+ # ' @return response object
373
+ # ' @details Exceptions - IOException
374
+ # '
375
+ # ' @family Azure Data Lake Store functions
376
+ # '
377
+ # ' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#upload-data}
378
+ # ' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Append_to_a_File}
379
+ # ' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
380
+ # ' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#append-org.apache.hadoop.fs.Path-int-org.apache.hadoop.util.Progressable-}
381
+ adls.append.core <- function (azureActiveContext , adlFileOutputStream = NULL , azureDataLakeAccount , relativePath , bufferSize ,
382
+ contents , contentSize = - 1L ,
383
+ leaseId = NULL , sessionId = NULL , syncFlag = NULL ,
384
+ offsetToAppendTo = - 1 ,
385
+ verbose = FALSE ) {
386
+ if (! missing(azureActiveContext ) && ! is.null(azureActiveContext )) {
387
+ assert_that(is.azureActiveContext(azureActiveContext ))
388
+ azureCheckToken(azureActiveContext )
389
+ }
390
+ assert_that(is_adls_account(azureDataLakeAccount ))
391
+ assert_that(is_relativePath(relativePath ))
392
+ assert_that(is_bufferSize(bufferSize ))
393
+ assert_that(is_content(contents ))
394
+ assert_that(is_contentSize(contentSize ))
395
+ if (contentSize == - 1 ) {
396
+ contentSize <- getContentSize(contents )
397
+ }
398
+ # allow a zero byte append
399
+ URL <- paste0(
400
+ getAzureDataLakeBasePath(azureDataLakeAccount ),
401
+ getAzureDataLakeURLEncodedString(relativePath ),
402
+ " ?op=APPEND" , " &append=true" ,
403
+ getAzureDataLakeApiVersion()
404
+ )
405
+ if (! missing(bufferSize ) && ! is.null(bufferSize )) URL <- paste0(URL , " &buffersize=" , bufferSize )
406
+ if (! is.null(leaseId )) URL <- paste0(URL , " &leaseid=" , leaseId )
407
+ if (! is.null(sessionId )) URL <- paste0(URL , " &filesessionid=" , sessionId )
408
+ if (! is.null(syncFlag )) URL <- paste0(URL , " &syncFlag=" , syncFlag )
409
+ if (offsetToAppendTo > = 0 ) URL <- paste0(URL , " &offset=" , offsetToAppendTo )
410
+ retryPolicy <- createAdlRetryPolicy(azureActiveContext , verbose = verbose )
411
+ resHttp <- callAzureDataLakeApi(URL , verb = " POST" ,
412
+ azureActiveContext = azureActiveContext ,
413
+ adlRetryPolicy = retryPolicy ,
414
+ content = contents [1 : contentSize ],
415
+ verbose = verbose )
416
+ # update retry count - required for bad offset handling
417
+ if (! is.null(adlFileOutputStream )) {
418
+ adlFileOutputStream $ numRetries <- retryPolicy $ retryCount
419
+ }
420
+ return (resHttp )
421
+ }
422
+
423
+ # ADLS Egress - AdlFileInputStream ----
424
+
425
+ # ' Create an adls.fileinputstream.create
426
+ # ' Create a container (`adlFileInputStream`) for holding variables used by the Azure Data Lake Store data functions.
427
+ # '
428
+ # ' @inheritParams setAzureContext
429
+ # ' @param accountName the account name
430
+ # ' @param relativePath Relative path of a file/directory
431
+ # ' @param verbose Print tracing information (default FALSE).
432
+ # ' @return An `adlFileOutputStream` object
433
+ # '
434
+ # ' @family Azure Data Lake Store functions
435
+ adls.fileinputstream.create <- function (azureActiveContext , accountName , relativePath , verbose = FALSE ) {
436
+ azEnv <- new.env(parent = emptyenv())
437
+ azEnv <- as.adlFileInputStream(azEnv )
438
+ list2env(
439
+ list (azureActiveContext = " " , accountName = " " , relativePath = " " ),
440
+ envir = azEnv
441
+ )
442
+ if (! missing(azureActiveContext )) azEnv $ azureActiveContext <- azureActiveContext
443
+ if (! missing(accountName )) azEnv $ accountName <- accountName
444
+ if (! missing(relativePath )) azEnv $ relativePath <- relativePath
445
+ azEnv $ directoryEntry <- adls.file.info(azureActiveContext , accountName , relativePath , verbose )
446
+ if (azEnv $ directoryEntry $ FileStatus.type == " DIRECTORY" ) {
447
+ msg <- paste0(" ADLException: relativePath is not a file: " , relativePath )
448
+ stop(msg )
449
+ }
450
+ azEnv $ sessionId <- uuid()
451
+ azEnv $ blockSize <- getAzureDataLakeDefaultBufferSize()
452
+ azEnv $ buffer <- raw(0 )
453
+ # cursors/indices/offsets in R should start from 1 and NOT 0.
454
+ # Because of this there are many adjustments that need to be done throughout the code!
455
+ azEnv $ fCursor <- 0L # cursor of buffer within file - offset of next byte to read from remote server
456
+ azEnv $ bCursor <- 1L # cursor of read within buffer - offset of next byte to be returned from buffer
457
+ azEnv $ limit <- 1L # offset of next byte to be read into buffer from service (i.e., upper marker+1 of valid bytes in buffer)
458
+ azEnv $ streamClosed <- FALSE
459
+
460
+ return (azEnv )
461
+ }
462
+
463
+ # ' Core function to open and read a file.
464
+ # '
465
+ # ' @inheritParams setAzureContext
466
+ # ' @param azureDataLakeAccount Name of the Azure Data Lake account.
467
+ # ' @param relativePath Relative path of a file/directory.
468
+ # ' @param offset Provide the offset to read from.
469
+ # ' @param length Provide length of data to read.
470
+ # ' @param bufferSize Size of the buffer to be used. (not honoured).
471
+ # ' @param verbose Print tracing information (default FALSE).
472
+ # ' @return raw contents of the file.
473
+ # ' @details Exceptions - IOException
474
+ # '
475
+ # ' @family Azure Data Lake Store functions
476
+ # '
477
+ # ' @references \url{https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-data-operations-rest-api#read-data}
478
+ # ' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Open_and_Read_a_File}
479
+ # ' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Offset}
480
+ # ' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Length}
481
+ # ' @seealso \url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Buffer_Size}
482
+ # ' @seealso \url{https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#open-org.apache.hadoop.fs.Path-int-}
483
+ adls.read.core <- function (azureActiveContext ,
484
+ azureDataLakeAccount , relativePath ,
485
+ offset , length , bufferSize = 4194304L ,
486
+ verbose = FALSE ) {
487
+ if (! missing(azureActiveContext ) && ! is.null(azureActiveContext )) {
488
+ assert_that(is.azureActiveContext(azureActiveContext ))
489
+ azureCheckToken(azureActiveContext )
490
+ }
491
+ assert_that(is_adls_account(azureDataLakeAccount ))
492
+ assert_that(is_relativePath(relativePath ))
493
+ if (! missing(offset ) && ! is.null(offset )) assert_that(is_offset(offset ))
494
+ if (! missing(length ) && ! is.null(length )) assert_that(is_length(length ))
495
+ if (! missing(bufferSize ) && ! is.null(bufferSize )) assert_that(is_bufferSize(bufferSize ))
496
+ URL <- paste0(
497
+ getAzureDataLakeBasePath(azureDataLakeAccount ),
498
+ getAzureDataLakeURLEncodedString(relativePath ),
499
+ " ?op=OPEN" , " &read=true" ,
500
+ getAzureDataLakeApiVersion()
501
+ )
502
+ if (! missing(offset ) && ! is.null(offset )) URL <- paste0(URL , " &offset=" , offset )
503
+ if (! missing(length ) && ! is.null(length )) URL <- paste0(URL , " &length=" , length )
504
+ if (! missing(bufferSize ) && ! is.null(bufferSize )) URL <- paste0(URL , " &buffersize=" , bufferSize )
505
+ retryPolicy <- createAdlRetryPolicy(azureActiveContext , verbose = verbose )
506
+ resHttp <- callAzureDataLakeApi(URL ,
507
+ azureActiveContext = azureActiveContext ,
508
+ adlRetryPolicy = retryPolicy ,
509
+ verbose = verbose )
510
+ return (resHttp )
511
+ }
512
+
513
+ # ' Read from service attempts to read `blocksize` bytes from service.
514
+ # ' Returns how many bytes are actually read, could be less than blocksize.
515
+ # '
516
+ # ' @param adlFileInputStream the `adlFileInputStream` object to read from
517
+ # ' @param verbose Print tracing information (default FALSE)
518
+ # ' @return number of bytes actually read
519
+ # '
520
+ # ' @family Azure Data Lake Store functions
521
+ adls.fileinputstream.readfromservice <- function (adlFileInputStream , verbose = FALSE ) {
522
+ if (adlFileInputStream $ bCursor < adlFileInputStream $ limit ) return (0 ) # if there's still unread data in the buffer then dont overwrite it At or past end of file
523
+ if (adlFileInputStream $ fCursor > = adlFileInputStream $ directoryEntry $ FileStatus.length ) return (- 1 )
524
+ if (adlFileInputStream $ directoryEntry $ FileStatus.length < = adlFileInputStream $ blockSize )
525
+ return (adls.fileinputstream.slurpfullfile(adlFileInputStream ))
526
+
527
+ # reset buffer to initial state - i.e., throw away existing data
528
+ adlFileInputStream $ bCursor <- 1L
529
+ adlFileInputStream $ limit <- 1L
530
+ if (is.null(adlFileInputStream $ buffer )) adlFileInputStream $ buffer <- raw(getAzureDataLakeDefaultBufferSize())
531
+
532
+ resHttp <- adls.read.core(adlFileInputStream $ azureActiveContext ,
533
+ adlFileInputStream $ accountName , adlFileInputStream $ relativePath ,
534
+ adlFileInputStream $ fCursor , adlFileInputStream $ blockSize ,
535
+ verbose = verbose )
536
+ stopWithAzureError(resHttp )
537
+ data <- content(resHttp , " raw" , encoding = " UTF-8" )
538
+ bytesRead <- getContentSize(data )
539
+ adlFileInputStream $ buffer [1 : bytesRead ] <- data [1 : bytesRead ]
540
+ adlFileInputStream $ limit <- adlFileInputStream $ limit + bytesRead
541
+ adlFileInputStream $ fCursor <- adlFileInputStream $ fCursor + bytesRead
542
+ return (bytesRead )
543
+ }
544
+
545
+ # ' Reads the whole file into buffer. Useful when reading small files.
546
+ # '
547
+ # ' @param adlFileInputStream the adlFileInputStream object to read from
548
+ # ' @param verbose Print tracing information (default FALSE)
549
+ # ' @return number of bytes actually read
550
+ adls.fileinputstream.slurpfullfile <- function (adlFileInputStream , verbose = FALSE ) {
551
+ if (is.null(adlFileInputStream $ buffer )) {
552
+ adlFileInputStream $ blocksize <- adlFileInputStream $ directoryEntry $ FileStatus.length
553
+ adlFileInputStream $ buffer <- raw(adlFileInputStream $ directoryEntry $ FileStatus.length )
554
+ }
555
+
556
+ # reset buffer to initial state - i.e., throw away existing data
557
+ adlFileInputStream $ bCursor <- adls.fileinputstream.getpos(adlFileInputStream ) + 1L # preserve current file offset (may not be 0 if app did a seek before first read)
558
+ adlFileInputStream $ limit <- 1L
559
+ adlFileInputStream $ fCursor <- 0L # read from beginning
560
+
561
+ resHttp <- adls.read.core(adlFileInputStream $ azureActiveContext ,
562
+ adlFileInputStream $ accountName , adlFileInputStream $ relativePath ,
563
+ adlFileInputStream $ fCursor , adlFileInputStream $ directoryEntry $ FileStatus.length ,
564
+ verbose = verbose )
565
+ stopWithAzureError(resHttp )
566
+ data <- content(resHttp , " raw" , encoding = " UTF-8" )
567
+ bytesRead <- getContentSize(data )
568
+ adlFileInputStream $ buffer [1 : bytesRead ] <- data [1 : bytesRead ]
569
+ adlFileInputStream $ limit <- adlFileInputStream $ limit + bytesRead
570
+ adlFileInputStream $ fCursor <- adlFileInputStream $ fCursor + bytesRead
571
+ return (bytesRead )
572
+ }
573
+
284
574
# ADLS Retry Policies ----
285
575
286
576
# ' NOTE: Folowing points on ADLS AdlsRetryPolicy:
0 commit comments