I wrote a method to read information from S3 object.
It looks fine to me1.
There are multiple records in S3Object, what's the best way to read all the lines.
Your code should read all of the lines.
Does it only read the first line of the object?
No. It should read all of the lines2. That while loop reads until readLine() returns null, and that only happens when you reach the end of the stream.
How to make sure all the lines are read?
If you are getting fewer lines than you expect, EITHER the S3 object contains fewer lines than you think, OR something is causing the object stream to close prematurely.
For the former, count the lines as you read them and compare that with the expected line count.
The latter could possibly be due to a timeout when reading a very large file. See How to read file chunk by chunk from S3 using aws-java-sdk for some ideas on how to deal with that problem.
1 - Actually, it would be better if you used a try with resources to ensure that the S3 stream is always closed. But that won't cause you to "lose" lines.
2 - This assumes that the S3 service doesn't time out the connection, and that you are not requesting a part (chunk) or a range in the URI request parameters; see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html .
amazon web services - How to read file chunk by chunk from S3 using aws-java-sdk - Stack Overflow
SDK repeatedly complaining "Not all bytes were read from the S3ObjectInputStream"
Line-by-line publisher of S3 object content
string - How to get the value from the s3 input stream in java - Stack Overflow
My usual approach (InputStream -> BufferedReader.lines() -> batches of lines -> CompletableFuture) won't work here because the underlying S3ObjectInputStream times out eventually for huge files.
So I created a new class S3InputStream, which doesn't care how long it's open for and reads byte blocks on demand using short-lived AWS SDK calls. You provide a byte[] that will be reused. new byte[1 << 24] (16Mb) appears to work well.
package org.harrison;
import java.io.IOException;
import java.io.InputStream;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
/**
* An {@link InputStream} for S3 files that does not care how big the file is.
*
* @author stephen harrison
*/
public class S3InputStream extends InputStream {
private static class LazyHolder {
private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
}
private final String bucket;
private final String file;
private final byte[] buffer;
private long lastByteOffset;
private long offset = 0;
private int next = 0;
private int length = 0;
public S3InputStream(final String bucket, final String file, final byte[] buffer) {
this.bucket = bucket;
this.file = file;
this.buffer = buffer;
this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
}
@Override
public int read() throws IOException {
if (next >= length) {
fill();
if (length <= 0) {
return -1;
}
next = 0;
}
if (next >= length) {
return -1;
}
return buffer[this.next++];
}
public void fill() throws IOException {
if (offset >= lastByteOffset) {
length = -1;
} else {
try (final InputStream inputStream = s3Object()) {
length = 0;
int b;
while ((b = inputStream.read()) != -1) {
buffer[length++] = (byte) b;
}
if (length > 0) {
offset += length;
}
}
}
}
private InputStream s3Object() {
final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
offset + buffer.length - 1);
return LazyHolder.S3.getObject(request).getObjectContent();
}
}
The aws-java-sdk already provides streaming functionality for your S3 objects. You have to call "getObject" and the result will be an InputStream.
1) AmazonS3Client.getObject(GetObjectRequest getObjectRequest) -> S3Object
2) S3Object.getObjectContent()
Note: The method is a simple getter and does not actually create a stream. If you retrieve an S3Object, you should close this input stream as soon as possible, because the object contents aren't buffered in memory and stream directly from Amazon S3. Further, failure to close this stream can cause the request pool to become blocked.
aws java docs
Got the answer via other medium. Sharing it here:
The warning indicates that you called close() without reading the whole file. This is problematic because S3 is still trying to send the data and you're leaving the connection in a sad state.
There's two options here:
- Read the rest of the data from the input stream so the connection can be reused.
- Call s3ObjectInputStream.abort() to close the connection without reading the data. The connection won't be reused, so you take some performance hit with the next request to re-create the connection. This may be worth it if it's going to take a long time to read the rest of the file.
Following option #1 of Chirag Sejpal's answer I used the below statement to drain the S3AbortableInputStream to ensure the connection can be reused:
com.amazonaws.util.IOUtils.drainInputStream(s3ObjectInputStream);
Here's a solution which actually streams the data line by line:
from io import TextIOWrapper
from gzip import GzipFile
...
# get StreamingBody from botocore.response
response = s3.get_object(Bucket=bucket, Key=key)
# if gzipped
gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
data = TextIOWrapper(gzipped)
for line in data:
# process line
You may find https://pypi.python.org/pypi/smart_open useful for your task.
From documentation:
for line in smart_open.smart_open('s3://mybucket/mykey.txt'):
print line