Skip to content

Commit

Permalink
swift: support pluggable decompression and validate chunk CRCs (foxgl…
Browse files Browse the repository at this point in the history
…ove#561)

Adds `decompressHandlers` to MCAPStreamedReader (similar to TS library).
Calls the decompress handler for a chunk and validates the CRC before trying to read chunk data.
  • Loading branch information
jtbandes authored Aug 31, 2022
1 parent 4860c49 commit f274a4c
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 11 deletions.
1 change: 1 addition & 0 deletions .swiftformat
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
--swiftversion 5.2
--indent 2
--maxwidth 120
1 change: 1 addition & 0 deletions cspell.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ words:
- nanos
- nsec
- nsecs
- parseable
- pipenv
- pointcloud
- proto
Expand Down
2 changes: 1 addition & 1 deletion docs/support-matrix.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The Swift MCAP library is experimental, and not actively developed. This means t
| Indexed attachment reading | Yes | Yes [^2] | Yes [^2] | Yes [^2] | No |
| Non-materialized attachment reading | No | Yes [^3] | No | No | No |
| Non-indexed reading | Yes | Yes | Yes | Yes | Yes |
| CRC validation | No | No | Yes | Yes | No |
| CRC validation | No | No | Yes | Yes | Yes |
| ROS1 wrapper | Yes | No | No | No | No |
| ROS2 wrapper | Yes [^4] | Yes [^4] | No | No | No |
| Protobuf wrapper | Yes | No | No | No | No |
Expand Down
65 changes: 61 additions & 4 deletions swift/mcap/MCAPStreamedReader.swift
Original file line number Diff line number Diff line change
@@ -1,11 +1,46 @@
import crc
import struct Foundation.Data

public typealias DecompressHandlers =
[String: (_ compressedData: Data, _ decompressedSize: UInt64) throws -> Data]

/**
A reader that parses MCAP data from a stream. Rather than expecting the entire MCAP file to be
available up front, this reader emits records as they are encountered. This means it does not use
the summary or index data to read the file, and can be used when only some of the data is available
(such as when streaming over the network).

Call ``append(_:)`` when new data is available to add it to the reader's internal buffer. Then,
call ``nextRecord()`` repeatedly to consume records that are fully parseable.

```
let reader = MCAPStreamedReader()
while let data = readSomeData() {
reader.append(data)
while let record = try reader.nextRecord() {
// process a record...
}
}
```
*/
public class MCAPStreamedReader {
private let recordReader = RecordReader()
private var chunkReader: RecordReader?
private var readHeaderMagic = false
private var decompressHandlers: DecompressHandlers

public init() {}
/**
Create a streamed reader.

- Parameter decompressHandlers: A user-specified collection of functions to be used to decompress
chunks in the MCAP file. When a chunk is encountered, its `compression` field is used as the
key to select one of the functions in `decompressHandlers`. If a decompress handler is not
available for the chunk's `compression`, a `MCAPReadError.unsupportedCompression` will be
thrown.
*/
public init(decompressHandlers: DecompressHandlers = [:]) {
self.decompressHandlers = decompressHandlers
}

public func append(_ data: Data) {
recordReader.append(data)
Expand All @@ -23,7 +58,7 @@ public class MCAPStreamedReader {
let record = try recordReader.nextRecord()
switch record {
case let chunk as Chunk:
chunkReader = RecordReader(chunk.records)
chunkReader = RecordReader(try _decompress(chunk))
default:
return record
}
Expand All @@ -43,6 +78,27 @@ public class MCAPStreamedReader {

return nil
}

private func _decompress(_ chunk: Chunk) throws -> Data {
let decompressedData: Data
if chunk.compression.isEmpty {
decompressedData = chunk.records
} else if let decompress = self.decompressHandlers[chunk.compression] {
decompressedData = try decompress(chunk.records, chunk.uncompressedSize)
} else {
throw MCAPReadError.unsupportedCompression(chunk.compression)
}

if chunk.uncompressedCRC != 0 {
var crc = CRC32()
crc.update(decompressedData)
if chunk.uncompressedCRC != crc.final {
throw MCAPReadError.invalidCRC(expected: chunk.uncompressedCRC, actual: crc.final)
}
}

return decompressedData
}
}

private class RecordReader {
Expand All @@ -69,8 +125,9 @@ private class RecordReader {

public func readMagic() throws -> Bool {
if offset + 8 < buffer.count {
if !MCAP0_MAGIC.elementsEqual(buffer[offset ..< offset + 8]) {
throw MCAPReadError.invalidMagic
let prefix = buffer[offset ..< offset + 8]
if !MCAP0_MAGIC.elementsEqual(prefix) {
throw MCAPReadError.invalidMagic(actual: Array(prefix))
}
offset += 8
return true
Expand Down
11 changes: 6 additions & 5 deletions swift/mcap/Records.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ public typealias Timestamp = UInt64
// swiftlint:disable:next identifier_name
public let MCAP0_MAGIC = Data([137, 77, 67, 65, 80, 48, 13, 10])

public enum MCAPReadError: Error {
case invalidMagic
public enum MCAPReadError: Error, Equatable {
case invalidMagic(actual: [UInt8])
case readBeyondBounds
case stringLengthBeyondBounds
case dataLengthBeyondBounds
case invalidCRC
case invalidCRC(expected: UInt32, actual: UInt32)
case extraneousDataInChunk
case unsupportedCompression(String)
}

public enum Opcode: UInt8 {
Expand All @@ -41,7 +42,7 @@ public protocol Record {
func serializeFields(to data: inout Data)
}

extension Record {
public extension Record {
func serialize(to data: inout Data) {
data.append(Self.opcode.rawValue)
data.append(littleEndian: UInt64(0)) // placeholder
Expand Down Expand Up @@ -547,7 +548,7 @@ public struct Attachment: Record {
var crc = CRC32()
crc.update(buffer[..<crcEndOffset])
if expectedCRC != crc.final {
throw MCAPReadError.invalidCRC
throw MCAPReadError.invalidCRC(expected: expectedCRC, actual: crc.final)
}
}
}
Expand Down
30 changes: 29 additions & 1 deletion swift/test/MCAPTests.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// swiftlint:disable force_cast

import XCTest

import crc
import mcap

class Buffer: IWritable {
Expand All @@ -15,11 +18,36 @@ class Buffer: IWritable {
}

final class MCAPTests: XCTestCase {
func testExample() async throws {
func testEmpty() async throws {
let buffer = Buffer()
let writer = MCAPWriter(buffer)
await writer.start(library: "", profile: "")
await writer.end()
XCTAssertEqual(buffer.data.count, 286)
}

func testValidatesChunkCRC() async throws {
var buffer = Data()
buffer.append(MCAP0_MAGIC)
Header(profile: "", library: "").serialize(to: &buffer)
Chunk(
messageStartTime: 0,
messageEndTime: 0,
uncompressedSize: 0,
uncompressedCRC: 12345,
compression: "",
records: Data([1, 2, 3])
).serialize(to: &buffer)
DataEnd(dataSectionCRC: 0).serialize(to: &buffer)
Footer(summaryStart: 0, summaryOffsetStart: 0, summaryCRC: 0).serialize(to: &buffer)

let reader = MCAPStreamedReader()
reader.append(buffer)
let header = try reader.nextRecord() as! Header
XCTAssertEqual(header.profile, "")
XCTAssertEqual(header.library, "")
XCTAssertThrowsError(try reader.nextRecord()) {
XCTAssertEqual($0 as! MCAPReadError, MCAPReadError.invalidCRC(expected: 12345, actual: 1_438_416_925))
}
}
}

0 comments on commit f274a4c

Please sign in to comment.