forked from pravega/pravega-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHelloWorldReader.java
127 lines (112 loc) · 5.53 KB
/
HelloWorldReader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*/
package io.pravega.example.gettingstarted;
import java.net.URI;
import java.util.UUID;
import io.pravega.client.ClientConfig;
import io.pravega.client.stream.Stream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReinitializationRequiredException;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.JavaSerializer;
/**
* A simple example app that uses a Pravega Reader to read from a given scope and stream.
*/
public class HelloWorldReader {
private static final int READER_TIMEOUT_MS = 2000;
public final String scope;
public final String streamName;
public final URI controllerURI;
public HelloWorldReader(String scope, String streamName, URI controllerURI) {
this.scope = scope;
this.streamName = streamName;
this.controllerURI = controllerURI;
}
public void run() {
StreamManager streamManager = StreamManager.create(controllerURI);
final boolean scopeIsNew = streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig);
final String readerGroup = UUID.randomUUID().toString().replace("-", "");
final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream(Stream.of(scope, streamName))
.build();
try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) {
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
}
try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope,
ClientConfig.builder().controllerURI(controllerURI).build());
EventStreamReader<String> reader = clientFactory.createReader("reader",
readerGroup,
new JavaSerializer<String>(),
ReaderConfig.builder().build())) {
System.out.format("Reading all the events from %s/%s%n", scope, streamName);
EventRead<String> event = null;
do {
try {
event = reader.readNextEvent(READER_TIMEOUT_MS);
if (event.getEvent() != null) {
System.out.format("Read event '%s'%n", event.getEvent());
}
} catch (ReinitializationRequiredException e) {
//There are certain circumstances where the reader needs to be reinitialized
e.printStackTrace();
}
} while (event.getEvent() != null);
System.out.format("No more events from %s/%s%n", scope, streamName);
}
}
public static void main(String[] args) {
Options options = getOptions();
CommandLine cmd = null;
try {
cmd = parseCommandLineArgs(options, args);
} catch (ParseException e) {
System.out.format("%s.%n", e.getMessage());
final HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("HelloWorldReader", options);
System.exit(1);
}
final String scope = cmd.getOptionValue("scope") == null ? Constants.DEFAULT_SCOPE : cmd.getOptionValue("scope");
final String streamName = cmd.getOptionValue("name") == null ? Constants.DEFAULT_STREAM_NAME : cmd.getOptionValue("name");
final String uriString = cmd.getOptionValue("uri") == null ? Constants.DEFAULT_CONTROLLER_URI : cmd.getOptionValue("uri");
final URI controllerURI = URI.create(uriString);
HelloWorldReader hwr = new HelloWorldReader(scope, streamName, controllerURI);
hwr.run();
}
private static Options getOptions() {
final Options options = new Options();
options.addOption("s", "scope", true, "The scope name of the stream to read from.");
options.addOption("n", "name", true, "The name of the stream to read from.");
options.addOption("u", "uri", true, "The URI to the controller in the form tcp://host:port");
return options;
}
private static CommandLine parseCommandLineArgs(Options options, String[] args) throws ParseException {
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
return cmd;
}
}