Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.nio.channels.ScatteringByteChannel;
import java.util.List;
import java.util.Locale;
import java.util.OptionalLong;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -91,7 +92,15 @@ final class GapicUnbufferedReadableByteChannel
this.result = result;
this.read = read;
this.req = req;
this.hasher = hasher;
this.hasher =
(req.getReadOffset() == 0 && !(hasher instanceof Hasher.NoOpHasher))
? new CumulativeHasher(
hasher,
0,
req.getReadLimit() <= 0
? OptionalLong.empty()
: OptionalLong.of(req.getReadLimit()))
: hasher;
this.fetchOffset = new AtomicLong(req.getReadOffset());
this.blobOffset = req.getReadOffset();
this.retrier = retrier;
Expand Down Expand Up @@ -154,7 +163,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
if (take instanceof IOException) {
IOException ioe = (IOException) take;
if (alg.shouldRetry(ioe, null)) {
readObjectObserver = null;
cancelAndDrainCurrentObserver();
continue;
} else {
ioe.addSuppressed(new AsyncStorageTaskException());
Expand All @@ -165,7 +174,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
Throwable throwable = (Throwable) take;
BaseServiceException coalesce = StorageException.coalesce(throwable);
if (alg.shouldRetry(coalesce, null)) {
readObjectObserver = null;
cancelAndDrainCurrentObserver();
continue;
} else {
close();
Expand All @@ -174,6 +183,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
}
if (take == EOF_MARKER) {
complete = true;
validateCumulativeChecksum();
break;
}

Expand Down Expand Up @@ -240,7 +250,9 @@ private void drainQueue() throws IOException {
while (queue.nonEmpty()) {
try {
java.lang.Object queueValue = queue.poll();
if (queueValue instanceof ReadObjectResponse) {
if (queueValue instanceof java.io.Closeable) {
((java.io.Closeable) queueValue).close();
} else if (queueValue instanceof ReadObjectResponse) {
ReadObjectResponse resp = (ReadObjectResponse) queueValue;
ResponseContentLifecycleHandle<ReadObjectResponse> handle =
read.getResponseContentLifecycleManager().get(resp);
Expand Down Expand Up @@ -273,6 +285,19 @@ private void drainQueue() throws IOException {
}
}

private void cancelAndDrainCurrentObserver() {
if (readObjectObserver != null) {
readObjectObserver.cancel();
try {
drainQueue();
} catch (IOException e) {
// drainQueue() in this context can be ignored because we are resetting the
// stream.
}
readObjectObserver = null;
}
}

ApiFuture<Object> getResult() {
return result;
}
Expand Down Expand Up @@ -311,14 +336,27 @@ private IOException createError(String message) throws IOException {
return new IOException(message, cause);
}

private void validateCumulativeChecksum() throws IOException {
if (hasher instanceof CumulativeHasher) {
CumulativeHasher cumulativeHasher = (CumulativeHasher) hasher;
try {
cumulativeHasher.validateCumulativeChecksum(metadata);
} catch (UncheckedCumulativeChecksumMismatchException exception) {
throw new IOException(StorageException.coalesce(exception));
}
}
}

private final class ReadObjectObserver extends StateCheckingResponseObserver<ReadObjectResponse> {

private final SettableApiFuture<Void> open = SettableApiFuture.create();
private final SettableApiFuture<Throwable> cancellation = SettableApiFuture.create();

private volatile StreamController controller;
private volatile boolean cancelled = false;

void cancel() {
cancelled = true;
controller.cancel();
}

Expand All @@ -331,10 +369,13 @@ protected void onStartImpl(StreamController controller) {

@Override
protected void onResponseImpl(ReadObjectResponse response) {
controller.request(1);
open.set(null);
try (ResponseContentLifecycleHandle<ReadObjectResponse> handle =
read.getResponseContentLifecycleManager().get(response)) {
if (cancelled) {
return;
}
controller.request(1);
open.set(null);
ChecksummedData checksummedData = response.getChecksummedData();
ByteString content = checksummedData.getContent();
int contentSize = content.size();
Expand All @@ -348,6 +389,8 @@ protected void onResponseImpl(ReadObjectResponse response) {
queue.offer(e);
return;
}
} else if (hasher instanceof CumulativeHasher) {
hasher.validateUnchecked(null, content);
}
if (response.hasMetadata()) {
Object respMetadata = response.getMetadata();
Expand Down Expand Up @@ -380,6 +423,12 @@ protected void onResponseImpl(ReadObjectResponse response) {

@Override
protected void onErrorImpl(Throwable t) {
if (t instanceof CancellationException) {
cancellation.set(t);
}
if (cancelled) {
return;
}
if (t instanceof OutOfRangeException) {
try {
queue.offer(EOF_MARKER);
Expand All @@ -389,17 +438,15 @@ protected void onErrorImpl(Throwable t) {
throw Code.ABORTED.toStatus().withCause(e).asRuntimeException();
}
}
if (t instanceof CancellationException) {
cancellation.set(t);
}
if (!open.isDone()) {
open.setException(t);
}
try {
queue.offer(t);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Code.ABORTED.toStatus().withCause(e).asRuntimeException();
} else {
try {
queue.offer(t);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Code.ABORTED.toStatus().withCause(e).asRuntimeException();
}
}
}

Expand Down
Loading
Loading