From 033800267ca5364c5f2016f76f66ae808361c61b Mon Sep 17 00:00:00 2001 From: Jerry Ling Date: Thu, 8 Jul 2021 17:38:16 +0200 Subject: [PATCH] Add Iterator interface fix #6 (#28) * Fix entry size bug Co-authored-by: Nick Amin * Indexing interface for `LazyBranch` * Indexing interface's test * Support map and broadcasting over LazyBranch via iteration * Bump version fix #6 --- Project.toml | 4 +- README.md | 41 ++++++++++++---- src/UnROOT.jl | 6 ++- src/arrayapi.jl | 122 +++++++++++++++++++++++++++++++++++++++++++++++ src/bootstrap.jl | 9 +++- src/itr.jl | 37 ++++++++++++++ src/root.jl | 68 +++++++------------------- src/streamers.jl | 23 +++++---- src/types.jl | 16 ++++--- src/utils.jl | 13 +++++ test/runtests.jl | 11 +++++ 11 files changed, 268 insertions(+), 82 deletions(-) create mode 100644 src/arrayapi.jl create mode 100644 src/itr.jl diff --git a/Project.toml b/Project.toml index 874f2416..5606fd85 100644 --- a/Project.toml +++ b/Project.toml @@ -1,12 +1,13 @@ name = "UnROOT" uuid = "3cd96dde-e98d-4713-81e9-a4a1b0235ce9" authors = ["Tamas Gal", "Jerry Ling"] -version = "0.1.9" +version = "0.2.0" [deps] CodecLz4 = "5ba52731-8f18-5e0d-9241-30f10d1ec561" CodecXz = "ba30903b-d9e8-5048-a5ec-d1f5b0d4b47b" CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" +CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2" LRUCache = "8ac3fa9e-de4c-5943-b1dc-09c6b5f20637" MD5 = "6ac74813-4b46-53a4-afec-0b5dc9d7885c" Memoization = "6fafb56a-5788-4b4e-91ca-c0cea6611c73" @@ -18,6 +19,7 @@ StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" CodecLz4 = "^0.3.0, ^0.4.0" CodecXz = "^0.6.0, ^0.7.0" CodecZlib = "^0.6.0, ^0.7.0" +CodecZstd = "^0.6.0, ^0.7.0" LRUCache = "^1.3.0" MD5 = "^0.2.1" Memoization = "^0.1.10" diff --git a/README.md b/README.md index 46be54b8..8b0cd092 100644 --- a/README.md +++ b/README.md @@ -25,11 +25,9 @@ Three's also a [discussion](https://github.com/scikit-hep/uproot/issues/401) rea documentation on uproot's issue page. ## Status -The project is in early prototyping phase and contributions are very -welcome. - -We support reading all scalar branch and jagged branch of "basic" types, as -a metric, UnROOT can already read all branches of CMS' NanoAOD: +We support reading all scalar branch and jagged branch of "basic" types, provide +indexing interface (thus iteration too) with basket-cache. As +a metric, UnROOT can read all branches of CMS NanoAOD: ``` julia using UnROOT @@ -37,7 +35,28 @@ using UnROOT julia> t = ROOTFile("test/samples/NanoAODv5_sample.root") ROOTFile("test/samples/NanoAODv5_sample.root") with 2 entries and 21 streamers. -# reading a flat branch +julia< b = rf["Events/Electron_dxy"]; + +julia> BA = LazyBranch(rf, b); + +# you can access a branch by index, this is fairly fast, memory footprint ~ single basket +julia> for i = 5:8 + @show BA[i] + end +ab[i] = Float32[] +ab[i] = Float32[-0.0012559891] +ab[i] = Float32[0.06121826, 0.00064229965] +ab[i] = Float32[0.005870819, 0.00054883957, -0.00617218] + +# or a range +julia> BA[5:8] +4-element Vector{Vector{Float32}}: + [] + [-0.0012559891] + [0.06121826, 0.00064229965] + [0.005870819, 0.00054883957, -0.00617218] + +# or dump a branch julia> array(t, "Events/HLT_Mu3_PFJet40") 1000-element BitVector: 0 @@ -47,7 +66,7 @@ julia> array(t, "Events/HLT_Mu3_PFJet40") 0 ... -# reading a jagged branch +# a jagged branch julia> array(t, "Events/Electron_dxy") 1000-element Vector{Vector{Float32}}: [0.00037050247] @@ -56,7 +75,7 @@ julia> array(t, "Events/Electron_dxy") [-0.0015697479] ... -# reading branch is also thread-safe, although may not be faster depending to disk I/O and cache +# reading branch is also thread-safe, although may not be much faster depending to disk I/O and cache julia> using ThreadsX julia> branch_names = keys(t["Events"]) @@ -95,7 +114,9 @@ julia> UnROOT.splitup(data, offsets, UnROOT.KM3NETDAQHit) [UnROOT.KM3NETDAQHit(1073742790, 0x00, 9, 0x60)...... ``` -This is what happens behind the scenes with some additional debug output: +
This is what happens behind the scenes with some additional debug output: +

+ ``` julia julia> using UnROOT @@ -173,6 +194,8 @@ Compressed datastream of 1317 bytes at 6180 (TKey 't1' (TTree)) 10 10 ``` +

+
## Main challenges diff --git a/src/UnROOT.jl b/src/UnROOT.jl index 2590bc19..00d1c368 100644 --- a/src/UnROOT.jl +++ b/src/UnROOT.jl @@ -1,13 +1,13 @@ module UnROOT -export ROOTFile, array +export ROOTFile, array, LazyBranch import Base: keys, get, getindex, show, length, iterate, position, ntoh, lock, unlock using Base.Threads: SpinLock using Memoization, LRUCache ntoh(b::Bool) = b -using CodecZlib, CodecLz4, CodecXz +using CodecZlib, CodecLz4, CodecXz, CodecZstd using Mixers using Parameters using StaticArrays @@ -19,6 +19,8 @@ include("utils.jl") include("streamers.jl") include("bootstrap.jl") include("root.jl") +include("arrayapi.jl") +# include("itr.jl") include("custom.jl") @static if VERSION < v"1.1" diff --git a/src/arrayapi.jl b/src/arrayapi.jl new file mode 100644 index 00000000..3b21e566 --- /dev/null +++ b/src/arrayapi.jl @@ -0,0 +1,122 @@ +""" + arrays(f::ROOTFile, treename) + +Reads all branches from a tree. +""" +function arrays(f::ROOTFile, treename) + names = keys(f[treename]) + res = Vector{Any}(undef, length(names)) + Threads.@threads for i in eachindex(names) + res[i] = array(f, "$treename/$(names[i])") + end + res +end + + +""" + array(f::ROOTFile, path; raw=false) + +Reads an array from a branch. Set `raw=true` to return raw data and correct offsets. +""" +array(f::ROOTFile, path::AbstractString; raw=false) = array(f::ROOTFile, f[path]; raw=raw) + +function array(f::ROOTFile, branch; raw=false) + ismissing(branch) && error("No branch found at $path") + (!raw && length(branch.fLeaves.elements) > 1) && error( + "Branches with multiple leaves are not supported yet. Try reading with `array(...; raw=true)`.") + + rawdata, rawoffsets = readbranchraw(f, branch) + if raw + return rawdata, rawoffsets + end + leaf = first(branch.fLeaves.elements) + jagt = JaggType(leaf) + T = eltype(branch) + interped_data(rawdata, rawoffsets, branch, jagt, T) +end + +""" + basketarray(f::ROOTFile, path, ith; raw=false) +Reads an array from ith basket of a branch. Set `raw=true` to return raw data and correct offsets. +""" +basketarray(f::ROOTFile, path::AbstractString, ithbasket) = basketarray(f, f[path], ithbasket) + +function basketarray(f::ROOTFile, branch, ithbasket) + ismissing(branch) && error("No branch found at $path") + length(branch.fLeaves.elements) > 1 && error( + "Branches with multiple leaves are not supported yet. Try reading with `array(...; raw=true)`.") + + rawdata, rawoffsets = readbasket(f, branch, ithbasket) + leaf = first(branch.fLeaves.elements) + jagt = JaggType(leaf) + T = eltype(branch) + interped_data(rawdata, rawoffsets, branch, jagt, T) +end + +# function barrior to make getting individual index faster +# TODO upstream some types into parametric types for Branch/BranchElement +# +""" + LazyBranch(f::ROOTFile, branch) + +Construct an accessor for a given branch such that `BA[idx]` and or `BA[1:20]` is almost +type-stable. And memory footprint is a single basket (<20MB usually). + +# Example +```julia +julia> rf = ROOTFile("./test/samples/tree_with_large_array.root"); + +julia> b = rf["t1/int32_array"]; + +julia> ab = UnROOT.LazyBranch(rf, b); + +julia> ab[1] +0 + +julia> ab[begin:end] +0 +1 +... +``` +""" +mutable struct LazyBranch{T, J} + f::ROOTFile + b::Union{TBranch, TBranchElement} + L::Int64 + fEntry::Vector{Int64} + buffer_seek::Int64 + buffer::Vector{T} + + function LazyBranch(f::ROOTFile, b::Union{TBranch, TBranchElement}) + T = eltype(b) + J = JaggType(first(b.fLeaves.elements)) + max_len = maximum(diff(b.fBasketEntry)) + # we don't know how to deal with multiple leaves yet + new{T, J}(f, b, length(b), b.fBasketEntry, -1, T[]) + end +end +Base.firstindex(ba::LazyBranch) = 1 +Base.lastindex(ba::LazyBranch) = ba.L +Base.length(ba::LazyBranch) = ba.L +Base.eltype(ba::LazyBranch{T,J}) where {T,J} = T + +function Base.getindex(ba::LazyBranch{T, J}, idx::Integer) where {T, J} + # I hate 1-based indexing + seek_idx = findfirst(x -> x>(idx-1), ba.fEntry) - 1 #support 1.0 syntax + localidx = idx - ba.fEntry[seek_idx] + if seek_idx != ba.buffer_seek # update buffer + ba.buffer = basketarray(ba.f, ba.b, seek_idx) + ba.buffer_seek = seek_idx + end + return ba.buffer[localidx] +end + +function Base.iterate(ba::LazyBranch{T, J}, idx=1) where {T, J} + idx>ba.L && return nothing + return (ba[idx], idx+1) +end + +# TODO this is not terribly slow, but we can get faster implementation still ;) +function Base.getindex(ba::LazyBranch{T, J}, rang::UnitRange) where {T, J} + [ba[i] for i in rang] +end diff --git a/src/bootstrap.jl b/src/bootstrap.jl index 58122c21..90b253e7 100644 --- a/src/bootstrap.jl +++ b/src/bootstrap.jl @@ -369,6 +369,14 @@ function unpack(io, tkey::TKey, refs::Dict{Int32, Any}, ::Type{T}) where {T<:TLe end abstract type TBranch <: ROOTStreamedObject end +abstract type TBranchElement <: ROOTStreamedObject end +Base.length(b::Union{TBranch, TBranchElement}) = b.fEntries +Base.eachindex(b::Union{TBranch, TBranchElement}) = Base.OneTo(b.fEntries) +function Base.eltype(b::Union{TBranch, TBranchElement}) + leaf = first(b.fLeaves.elements) + JaggType(leaf)===Nojagg ? primitivetype(leaf) : Vector{interp_jaggT(b, leaf)} +end + @with_kw struct TBranch_12 <: TBranch cursor::Cursor # from TNamed @@ -522,7 +530,6 @@ function readfields!(cursor::Cursor, fields, ::Type{T}) where {T<:TBranch_13} fields[:fFileName] = readtype(io, String) end -abstract type TBranchElement <: ROOTStreamedObject end @with_kw struct TBranchElement_9 <: TBranchElement cursor::Cursor # from TNamed diff --git a/src/itr.jl b/src/itr.jl new file mode 100644 index 00000000..8083e208 --- /dev/null +++ b/src/itr.jl @@ -0,0 +1,37 @@ +# TODO this doesn't look too useful to deserve it's own thing in light of the LazyBranch +# struct BranchItr{T, J} +# f::ROOTFile +# b::Union{TBranch, TBranchElement} +# current::Int64 +# total_entry::Int64 + +# function BranchItr(f::ROOTFile, b::Union{TBranch, TBranchElement}) +# T = eltype(b) +# J = JaggType(only(b.fLeaves.elements)) +# # we don't know how to deal with multiple leaves yet +# new{T, J}(f, b, 0, b.fEntries) +# end +# end +# Base.length(itr::BranchItr) = itr.total_entry +# Base.eltype(itr::BranchItr{T,J}) where {T,J} = T + +# function Base.iterate(itr::BranchItr{T, J}, state=(itr.current, 1, T[], 0)) where {T, J} +# current, ithbasket, entries, remaining = state + +# current >= itr.total_entry && return nothing + +# if iszero(remaining) +# rawdata, rawoffsets = readbasket(itr.f, itr.b, ithbasket) +# entries = interped_data(rawdata, rawoffsets, itr.b, J, T) +# remaining = length(entries) +# ithbasket += 1 +# end +# return (popfirst!(entries), (current+1, ithbasket, entries, remaining-1)) +# end + +# function Base.show(io::IO, itr::BranchItr) +# summary(io, itr) +# println() +# println(io, "Branch: $(itr.b.fName)") +# print(io, "Total entry: $(itr.total_entry)") +# end diff --git a/src/root.jl b/src/root.jl index c38b1322..4cc2d79b 100644 --- a/src/root.jl +++ b/src/root.jl @@ -130,53 +130,16 @@ function Base.getindex(t::TTree, s::Vector{T}) where {T<:AbstractString} [t[n] for n in s] end -""" - arrays(f::ROOTFile, treename) - -Reads all branches from a tree. -""" -function arrays(f::ROOTFile, treename) - names = keys(f[treename]) - res = Vector{Any}(undef, length(names)) - Threads.@threads for i in eachindex(names) - res[i] = array(f, "$treename/$(names[i])") - end - res -end - - -""" - function array(f::ROOTFile, path) - -Reads an array from a branch. -""" -function array(f::ROOTFile, path; raw=false) - branch = f[path] - ismissing(branch) && error("No branch found at $path") - - (!raw && length(branch.fLeaves.elements) > 1) && error( - "Branches with multiple leaves are not supported yet. Try reading with `array(...; raw=true)`.") - - rawdata, rawoffsets = readbranchraw(f, branch) - if raw - return rawdata, rawoffsets - end - leaf = first(branch.fLeaves.elements) - interped_data(rawdata, rawoffsets, branch, leaf) -end - -function interped_data(rawdata, rawoffsets, branch, leaf) - # https://github.com/scikit-hep/uproot3/blob/54f5151fb7c686c3a161fbe44b9f299e482f346b/uproot3/interp/auto.py#L144 - isjagged = (match(r"\[.*\]", leaf.fTitle) !== nothing) - +function interped_data(rawdata, rawoffsets, branch, ::Type{J}, ::Type{T}) where {J<:JaggType, T} # there are two possibility, one is the leaf is just normal leaf but the title has "[...]" in it # magic offsets, seems to be common for a lot of types, see auto.py in uproot3 # only needs when the jaggedness comes from TLeafElements, not needed when # the jaggedness comes from having "[]" in TLeaf's title - jagg_offset = leaf isa TLeafElement ? 10 : 0 # the other is where we need to auto detector T bsaed on class name - if isjagged || !iszero(jagg_offset) # non-primitive jagged leaf - T = autointerp_T(branch, leaf) + # we want the fundamental type as `reinterpret` will create vector + elT = eltype(T) + if J !== Nojagg + jagg_offset = J===Offsetjagg ? 10 : 0 # for each "event", the index range is `offsets[i] + jagg_offset + 1` to `offsets[i+1]` # this is why we need to append `rawoffsets` in the `readbranchraw()` call @@ -184,16 +147,16 @@ function interped_data(rawdata, rawoffsets, branch, leaf) # Say your real data is Int32 and you see 8 bytes after indexing, then this event has [num1, num2] as real data @views [ ntoh.(reinterpret( - T, rawdata[ (rawoffsets[i]+jagg_offset+1):rawoffsets[i+1] ] + elT, rawdata[ (rawoffsets[i]+jagg_offset+1):rawoffsets[i+1] ] )) for i in 1:(length(rawoffsets) - 1) ] else # the branch is not jagged - return ntoh.(reinterpret(primitivetype(leaf), rawdata)) + return ntoh.(reinterpret(T, rawdata)) end end -@memoize LRU(;maxsize=10^3) function autointerp_T(branch, leaf) +@memoize LRU(;maxsize=10^3) function interp_jaggT(branch, leaf) if hasproperty(branch, :fClassName) classname = branch.fClassName # the C++ class name, such as "vector" m = match(r"vector<(.*)>", classname) @@ -217,9 +180,9 @@ function readbranchraw(f::ROOTFile, branch) nbytes = branch.fBasketBytes datas = sizehint!(Vector{UInt8}(), sum(nbytes)) # maximum length if all data are UInt8 offsets = sizehint!(Vector{Int32}(), branch.fEntries+1) # this is always Int32 - for (i, pos) in enumerate(branch.fBasketSeek) - pos == 0 && break - data, offset = readbasket(f, branch, i) + foreach(branch.fBasketSeek) do seek + seek==0 && return + data, offset = readbasketseek(f, branch, seek) append!(datas, data) append!(offsets, offset) end @@ -237,9 +200,11 @@ end # │ │ # │← fObjlen →│ # 3GB cache for baskets -@memoize LRU(;maxsize=3*1024^3, by=x->sum(length,x)) function readbasket(f::ROOTFile, branch, ith) - seek_pos = branch.fBasketSeek[ith] +readbasket(f::ROOTFile, branch, ith) = readbasketseek(f, branch, branch.fBasketSeek[ith]) +@memoize LRU(; maxsize=3 * 1024^3, by=x -> sum(length, x)) function readbasketseek( + f::ROOTFile, branch, seek_pos +)::Tuple{Vector{UInt8},Vector{Int32},Int32} # just being extra careful lock(f) seek(f.fobj, seek_pos) basketkey = unpack(f.fobj, TBasketKey) @@ -248,7 +213,6 @@ end basketrawbytes = decompress_datastreambytes(compressedbytes, basketkey) - Keylen = basketkey.fKeylen contentsize = Int32(basketkey.fLast - Keylen) @@ -258,7 +222,7 @@ end if offsetbytesize > 0 #indexing is inclusive on both ends - offbytes = @view basketrawbytes[contentsize+4+1:end-4] + offbytes = @view basketrawbytes[(contentsize + 4 + 1):(end - 4)] # offsets starts at -fKeylen, same as the `local_offset` we pass in in the loop offset = ntoh.(reinterpret(Int32, offbytes)) .- Keylen diff --git a/src/streamers.jl b/src/streamers.jl index 91a40848..a793d9c4 100644 --- a/src/streamers.jl +++ b/src/streamers.jl @@ -80,20 +80,23 @@ function Streamers(io) # notice our `TKey` size is not the same as official TKey, can't use sizeof() skipped = position(io) - start compressedbytes = read(io, tkey.fNbytes - skipped) - - if String(compression_header.algo) == "ZL" - stream = IOBuffer(transcode(ZlibDecompressor, compressedbytes)) - elseif String(compression_header.algo) == "XZ" - stream = IOBuffer(transcode(XzDecompressor, compressedbytes)) - elseif String(compression_header.algo) == "L4" - stream = IOBuffer(lz4_decompress(compressedbytes[9:end], tkey.fObjlen)) + cname = String(compression_header.algo) + + stream = if cname == "ZL" + IOBuffer(transcode(ZlibDecompressor, compressedbytes)) + elseif cname == "XZ" + IOBuffer(transcode(XzDecompressor, compressedbytes)) + elseif cname == "ZS" + IOBuffer(transcode(ZstdDecompressor, io)) + elseif cname == "L4" + IOBuffer(lz4_decompress(compressedbytes[9:end], tkey.fObjlen)) else error("Unsupported compression type '$(String(compression_header.algo))'") end else @debug "Unompressed stream at $(start)" - stream = io + io end preamble = Preamble(stream, Streamers) skiptobj(stream) @@ -129,7 +132,7 @@ function Streamers(io) streamer_infos = topological_sort(streamer_infos) - # FIXME not implemented + # TODO not implemented # for streamer_info in streamer_infos # initialise_streamer(streamer_info) # end @@ -363,7 +366,7 @@ function parsefields!(io, fields, T::Type{TStreamerElement}) endcheck(io, preamble) end -# FIXME really not used? +# TODO really not used? # function unpack(io, tkey::TKey, refs::Dict{Int32, Any}, T::Type{TStreamerElement}) # @initparse # parsefields!(io, fields, T) diff --git a/src/types.jl b/src/types.jl index 3515ad69..b54e8192 100644 --- a/src/types.jl +++ b/src/types.jl @@ -103,28 +103,30 @@ function compressed_datastream(io, tkey) seekstart(io, tkey) return read(io, tkey.fNbytes - tkey.fKeylen) end + function decompress_datastreambytes(compbytes, tkey) # not compressed iscompressed(tkey) || return compbytes # compressed - io = IOBuffer(compbytes) + io = compbytes fufilled = 0 uncomp_data = Vector{UInt8}(undef, tkey.fObjlen) while fufilled < tkey.fObjlen # careful with 0/1-based index when thinking about offsets - compression_header = unpack(io, CompressionHeader) + compression_header = unpack(IOBuffer(io[1:9]), CompressionHeader) cname, _, compbytes, uncompbytes = unpack(compression_header) - - iobytes = read(io, compbytes) + io = io[10:end] # indexing `0+1 to 0+2` are two bytes, no need to +1 in the second term @view(uncomp_data[fufilled+1:fufilled+uncompbytes]) .= if cname == "ZL" - transcode(ZlibDecompressor, iobytes) + transcode(ZlibDecompressor, io) elseif cname == "XZ" - transcode(XzDecompressor, iobytes) + transcode(XzDecompressor, io) + elseif cname == "ZS" + transcode(ZstdDecompressor, io) elseif cname == "L4" # skip checksum which is 8 bytes - lz4_decompress(iobytes[9:end], uncompbytes) + lz4_decompress(io[9:end], uncompbytes) else error("Unsupported compression type '$(String(compression_header.algo))'") end diff --git a/src/utils.jl b/src/utils.jl index 2019077b..b44aced3 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -48,3 +48,16 @@ function unpack(x::CompressionHeader) return algname, x.method, compressedbytes, uncompressedbytes end + + +abstract type JaggType end +struct Nojagg <:JaggType end +struct Nooffsetjagg<:JaggType end +struct Offsetjagg <:JaggType end + +function JaggType(leaf) + leaf isa TLeafElement && return Offsetjagg + # https://github.com/scikit-hep/uproot3/blob/54f5151fb7c686c3a161fbe44b9f299e482f346b/uproot3/interp/auto.py#L144 + (match(r"\[.*\]", leaf.fTitle) !== nothing) && return Nooffsetjagg + return Nojagg +end diff --git a/test/runtests.jl b/test/runtests.jl index 081bcb49..e12ace73 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -184,6 +184,17 @@ end # @test 1619244 == header.fSeekKeys end +@testset "getindex() of LazyBranch" begin + rootfile = ROOTFile(joinpath(SAMPLES_DIR, "tree_with_large_array.root")) + branch = rootfile["t1/int32_array"] + arr = array(rootfile, branch) + BA = LazyBranch(rootfile, branch) + @test length(arr) == length(BA) + @test BA[1] == arr[1] + @test BA[end] == arr[end] + @test BA[20:30] == arr[20:30] + @test BA[1:end] == arr +end @testset "array()" begin rootfile = ROOTFile(joinpath(SAMPLES_DIR, "tree_with_histos.root"))