Skip to content

Commit 708f1c8

Browse files
drow724fmbenhassine
authored andcommitted
Add default component name in constructor of AvroItemReader
Issue #4285
1 parent bd43ca1 commit 708f1c8

File tree

1 file changed

+186
-180
lines changed
  • spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro

1 file changed

+186
-180
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,180 +1,186 @@
1-
/*
2-
* Copyright 2019 the original author or authors.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* https://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
package org.springframework.batch.item.avro;
17-
18-
import java.io.IOException;
19-
import java.io.InputStream;
20-
21-
import org.apache.avro.Schema;
22-
import org.apache.avro.file.DataFileStream;
23-
import org.apache.avro.generic.GenericDatumReader;
24-
import org.apache.avro.generic.GenericRecord;
25-
import org.apache.avro.io.BinaryDecoder;
26-
import org.apache.avro.io.DatumReader;
27-
import org.apache.avro.io.DecoderFactory;
28-
import org.apache.avro.reflect.ReflectDatumReader;
29-
import org.apache.avro.specific.SpecificDatumReader;
30-
import org.apache.avro.specific.SpecificRecordBase;
31-
32-
import org.springframework.batch.item.ItemReader;
33-
import org.springframework.batch.item.ItemStreamException;
34-
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
35-
import org.springframework.core.io.Resource;
36-
import org.springframework.lang.Nullable;
37-
import org.springframework.util.Assert;
38-
39-
/**
40-
* An {@link ItemReader} that deserializes data from a {@link Resource} containing serialized Avro objects.
41-
*
42-
* @author David Turanski
43-
* @author Mahmoud Ben Hassine
44-
* @since 4.2
45-
*/
46-
public class AvroItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> {
47-
48-
private boolean embeddedSchema = true;
49-
50-
private InputStreamReader<T> inputStreamReader;
51-
52-
private DataFileStream<T> dataFileReader;
53-
54-
private InputStream inputStream;
55-
56-
private DatumReader<T> datumReader;
57-
58-
/**
59-
*
60-
* @param resource the {@link Resource} containing objects serialized with Avro.
61-
* @param clazz the data type to be deserialized.
62-
*/
63-
public AvroItemReader(Resource resource, Class<T> clazz) {
64-
Assert.notNull(resource, "'resource' is required.");
65-
Assert.notNull(clazz, "'class' is required.");
66-
67-
try {
68-
this.inputStream = resource.getInputStream();
69-
this.datumReader = datumReaderForClass(clazz);
70-
}
71-
catch (IOException e) {
72-
throw new IllegalArgumentException(e.getMessage(), e);
73-
}
74-
}
75-
76-
/**
77-
*
78-
* @param data the {@link Resource} containing the data to be read.
79-
* @param schema the {@link Resource} containing the Avro schema.
80-
*/
81-
public AvroItemReader(Resource data, Resource schema) {
82-
Assert.notNull(data, "'data' is required.");
83-
Assert.state(data.exists(), "'data' " + data.getFilename() +" does not exist.");
84-
Assert.notNull(schema, "'schema' is required");
85-
Assert.state(schema.exists(), "'schema' " + schema.getFilename() +" does not exist.");
86-
try {
87-
this.inputStream = data.getInputStream();
88-
Schema avroSchema = new Schema.Parser().parse(schema.getInputStream());
89-
this.datumReader = new GenericDatumReader<>(avroSchema);
90-
}
91-
catch (IOException e) {
92-
throw new IllegalArgumentException(e.getMessage(), e);
93-
}
94-
}
95-
96-
/**
97-
* Disable or enable reading an embedded Avro schema. True by default.
98-
* @param embeddedSchema set to false to if the input does not embed an Avro schema.
99-
*/
100-
public void setEmbeddedSchema(boolean embeddedSchema) {
101-
this.embeddedSchema = embeddedSchema;
102-
}
103-
104-
105-
@Nullable
106-
@Override
107-
protected T doRead() throws Exception {
108-
if (this.inputStreamReader != null) {
109-
return this.inputStreamReader.read();
110-
}
111-
return this.dataFileReader.hasNext()? this.dataFileReader.next(): null;
112-
}
113-
114-
@Override
115-
protected void doOpen() throws Exception {
116-
initializeReader();
117-
}
118-
119-
@Override
120-
protected void doClose() throws Exception {
121-
if (this.inputStreamReader != null) {
122-
this.inputStreamReader.close();
123-
return;
124-
}
125-
this.dataFileReader.close();
126-
}
127-
128-
private void initializeReader() throws IOException {
129-
if (this.embeddedSchema) {
130-
this.dataFileReader = new DataFileStream<>(this.inputStream, this.datumReader);
131-
} else {
132-
this.inputStreamReader = createInputStreamReader(this.inputStream, this.datumReader);
133-
}
134-
135-
}
136-
137-
private InputStreamReader<T> createInputStreamReader(InputStream inputStream, DatumReader<T> datumReader) {
138-
return new InputStreamReader<>(inputStream, datumReader);
139-
}
140-
141-
private static <T> DatumReader<T> datumReaderForClass(Class<T> clazz) {
142-
if (SpecificRecordBase.class.isAssignableFrom(clazz)){
143-
return new SpecificDatumReader<>(clazz);
144-
}
145-
if (GenericRecord.class.isAssignableFrom(clazz)) {
146-
return new GenericDatumReader<>();
147-
}
148-
return new ReflectDatumReader<>(clazz);
149-
}
150-
151-
152-
private static class InputStreamReader<T> {
153-
private final DatumReader<T> datumReader;
154-
155-
private final BinaryDecoder binaryDecoder;
156-
157-
private final InputStream inputStream;
158-
159-
private InputStreamReader(InputStream inputStream, DatumReader<T> datumReader) {
160-
this.inputStream = inputStream;
161-
this.datumReader = datumReader;
162-
this.binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
163-
}
164-
165-
private T read() throws Exception {
166-
if (!this.binaryDecoder.isEnd()) {
167-
return this.datumReader.read(null, this.binaryDecoder);
168-
}
169-
return null;
170-
}
171-
172-
private void close() {
173-
try {
174-
this.inputStream.close();
175-
} catch (IOException e) {
176-
throw new ItemStreamException(e.getMessage(), e);
177-
}
178-
}
179-
}
180-
}
1+
/*
2+
* Copyright 2019-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.avro;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
21+
import org.apache.avro.Schema;
22+
import org.apache.avro.file.DataFileStream;
23+
import org.apache.avro.generic.GenericDatumReader;
24+
import org.apache.avro.generic.GenericRecord;
25+
import org.apache.avro.io.BinaryDecoder;
26+
import org.apache.avro.io.DatumReader;
27+
import org.apache.avro.io.DecoderFactory;
28+
import org.apache.avro.reflect.ReflectDatumReader;
29+
import org.apache.avro.specific.SpecificDatumReader;
30+
import org.apache.avro.specific.SpecificRecordBase;
31+
32+
import org.springframework.batch.item.ItemReader;
33+
import org.springframework.batch.item.ItemStreamException;
34+
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
35+
import org.springframework.core.io.Resource;
36+
import org.springframework.lang.Nullable;
37+
import org.springframework.util.Assert;
38+
import org.springframework.util.ClassUtils;
39+
40+
/**
41+
* An {@link ItemReader} that deserializes data from a {@link Resource} containing serialized Avro objects.
42+
*
43+
* @author David Turanski
44+
* @author Mahmoud Ben Hassine
45+
* @author Song JaeGeun
46+
* @since 4.2
47+
*/
48+
public class AvroItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> {
49+
50+
private boolean embeddedSchema = true;
51+
52+
private InputStreamReader<T> inputStreamReader;
53+
54+
private DataFileStream<T> dataFileReader;
55+
56+
private InputStream inputStream;
57+
58+
private DatumReader<T> datumReader;
59+
60+
/**
61+
*
62+
* @param resource the {@link Resource} containing objects serialized with Avro.
63+
* @param clazz the data type to be deserialized.
64+
*/
65+
public AvroItemReader(Resource resource, Class<T> clazz) {
66+
setName(ClassUtils.getShortName(AvroItemReader.class));
67+
68+
Assert.notNull(resource, "'resource' is required.");
69+
Assert.notNull(clazz, "'class' is required.");
70+
71+
try {
72+
this.inputStream = resource.getInputStream();
73+
this.datumReader = datumReaderForClass(clazz);
74+
}
75+
catch (IOException e) {
76+
throw new IllegalArgumentException(e.getMessage(), e);
77+
}
78+
}
79+
80+
/**
81+
*
82+
* @param data the {@link Resource} containing the data to be read.
83+
* @param schema the {@link Resource} containing the Avro schema.
84+
*/
85+
public AvroItemReader(Resource data, Resource schema) {
86+
setName(ClassUtils.getShortName(AvroItemReader.class));
87+
88+
Assert.notNull(data, "'data' is required.");
89+
Assert.state(data.exists(), "'data' " + data.getFilename() +" does not exist.");
90+
Assert.notNull(schema, "'schema' is required");
91+
Assert.state(schema.exists(), "'schema' " + schema.getFilename() +" does not exist.");
92+
try {
93+
this.inputStream = data.getInputStream();
94+
Schema avroSchema = new Schema.Parser().parse(schema.getInputStream());
95+
this.datumReader = new GenericDatumReader<>(avroSchema);
96+
}
97+
catch (IOException e) {
98+
throw new IllegalArgumentException(e.getMessage(), e);
99+
}
100+
}
101+
102+
/**
103+
* Disable or enable reading an embedded Avro schema. True by default.
104+
* @param embeddedSchema set to false to if the input does not embed an Avro schema.
105+
*/
106+
public void setEmbeddedSchema(boolean embeddedSchema) {
107+
this.embeddedSchema = embeddedSchema;
108+
}
109+
110+
111+
@Nullable
112+
@Override
113+
protected T doRead() throws Exception {
114+
if (this.inputStreamReader != null) {
115+
return this.inputStreamReader.read();
116+
}
117+
return this.dataFileReader.hasNext()? this.dataFileReader.next(): null;
118+
}
119+
120+
@Override
121+
protected void doOpen() throws Exception {
122+
initializeReader();
123+
}
124+
125+
@Override
126+
protected void doClose() throws Exception {
127+
if (this.inputStreamReader != null) {
128+
this.inputStreamReader.close();
129+
return;
130+
}
131+
this.dataFileReader.close();
132+
}
133+
134+
private void initializeReader() throws IOException {
135+
if (this.embeddedSchema) {
136+
this.dataFileReader = new DataFileStream<>(this.inputStream, this.datumReader);
137+
} else {
138+
this.inputStreamReader = createInputStreamReader(this.inputStream, this.datumReader);
139+
}
140+
141+
}
142+
143+
private InputStreamReader<T> createInputStreamReader(InputStream inputStream, DatumReader<T> datumReader) {
144+
return new InputStreamReader<>(inputStream, datumReader);
145+
}
146+
147+
private static <T> DatumReader<T> datumReaderForClass(Class<T> clazz) {
148+
if (SpecificRecordBase.class.isAssignableFrom(clazz)){
149+
return new SpecificDatumReader<>(clazz);
150+
}
151+
if (GenericRecord.class.isAssignableFrom(clazz)) {
152+
return new GenericDatumReader<>();
153+
}
154+
return new ReflectDatumReader<>(clazz);
155+
}
156+
157+
158+
private static class InputStreamReader<T> {
159+
private final DatumReader<T> datumReader;
160+
161+
private final BinaryDecoder binaryDecoder;
162+
163+
private final InputStream inputStream;
164+
165+
private InputStreamReader(InputStream inputStream, DatumReader<T> datumReader) {
166+
this.inputStream = inputStream;
167+
this.datumReader = datumReader;
168+
this.binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
169+
}
170+
171+
private T read() throws Exception {
172+
if (!this.binaryDecoder.isEnd()) {
173+
return this.datumReader.read(null, this.binaryDecoder);
174+
}
175+
return null;
176+
}
177+
178+
private void close() {
179+
try {
180+
this.inputStream.close();
181+
} catch (IOException e) {
182+
throw new ItemStreamException(e.getMessage(), e);
183+
}
184+
}
185+
}
186+
}

0 commit comments

Comments
 (0)