@@ -105,7 +105,7 @@ def instance(cls):
105
105
(host , port ) = (parse .hostname , parse .port )
106
106
fixture = ExternalService (host , port )
107
107
else :
108
- (host , port ) = ("127.0.0.1" , get_open_port () )
108
+ (host , port ) = ("127.0.0.1" , None )
109
109
fixture = cls (host , port )
110
110
111
111
fixture .open ()
@@ -124,21 +124,18 @@ def kafka_run_class_env(self):
124
124
return env
125
125
126
126
def out (self , message ):
127
- log .info ("*** Zookeeper [%s:%d ]: %s" , self .host , self .port , message )
127
+ log .info ("*** Zookeeper [%s:%s ]: %s" , self .host , self .port or '(auto)' , message )
128
128
129
129
def open (self ):
130
130
self .tmp_dir = tempfile .mkdtemp ()
131
131
self .out ("Running local instance..." )
132
132
log .info (" host = %s" , self .host )
133
- log .info (" port = %s" , self .port )
133
+ log .info (" port = %s" , self .port or '(auto)' )
134
134
log .info (" tmp_dir = %s" , self .tmp_dir )
135
135
136
- # Generate configs
136
+ # Configure Zookeeper child process
137
137
template = self .test_resource ("zookeeper.properties" )
138
138
properties = os .path .join (self .tmp_dir , "zookeeper.properties" )
139
- self .render_template (template , properties , vars (self ))
140
-
141
- # Configure Zookeeper child process
142
139
args = self .kafka_run_class_args ("org.apache.zookeeper.server.quorum.QuorumPeerMain" , properties )
143
140
env = self .kafka_run_class_env ()
144
141
@@ -148,13 +145,12 @@ def open(self):
148
145
backoff = 1
149
146
end_at = time .time () + max_timeout
150
147
tries = 1
148
+ auto_port = (self .port is None )
151
149
while time .time () < end_at :
152
- self .out ('Attempting to start (try #%d)' % tries )
153
- try :
154
- os .stat (properties )
155
- except :
156
- log .warning ('Config %s not found -- re-rendering' , properties )
157
- self .render_template (template , properties , vars (self ))
150
+ if auto_port :
151
+ self .port = get_open_port ()
152
+ self .out ('Attempting to start on port %d (try #%d)' % (self .port , tries ))
153
+ self .render_template (template , properties , vars (self ))
158
154
self .child = SpawnedService (args , env )
159
155
self .child .start ()
160
156
timeout = min (timeout , max (end_at - time .time (), 0 ))
@@ -194,8 +190,6 @@ def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None,
194
190
(host , port ) = (parse .hostname , parse .port )
195
191
fixture = ExternalService (host , port )
196
192
else :
197
- if port is None :
198
- port = get_open_port ()
199
193
# force IPv6 here because of a confusing point:
200
194
#
201
195
# - if the string "localhost" is passed, Kafka will *only* bind to the IPv4 address of localhost
@@ -245,7 +239,7 @@ def kafka_run_class_env(self):
245
239
return env
246
240
247
241
def out (self , message ):
248
- log .info ("*** Kafka [%s:%d ]: %s" , self .host , self .port , message )
242
+ log .info ("*** Kafka [%s:%s ]: %s" , self .host , self .port or '(auto)' , message )
249
243
250
244
def open (self ):
251
245
if self .running :
@@ -255,7 +249,7 @@ def open(self):
255
249
self .tmp_dir = tempfile .mkdtemp ()
256
250
self .out ("Running local instance..." )
257
251
log .info (" host = %s" , self .host )
258
- log .info (" port = %s" , self .port )
252
+ log .info (" port = %s" , self .port or '(auto)' )
259
253
log .info (" transport = %s" , self .transport )
260
254
log .info (" broker_id = %s" , self .broker_id )
261
255
log .info (" zk_host = %s" , self .zk_host )
@@ -269,12 +263,6 @@ def open(self):
269
263
os .mkdir (os .path .join (self .tmp_dir , "logs" ))
270
264
os .mkdir (os .path .join (self .tmp_dir , "data" ))
271
265
272
- # Generate configs
273
- template = self .test_resource ("kafka.properties" )
274
- properties = os .path .join (self .tmp_dir , "kafka.properties" )
275
- self .render_template (template , properties , vars (self ))
276
-
277
- # Party!
278
266
self .out ("Creating Zookeeper chroot node..." )
279
267
args = self .kafka_run_class_args ("org.apache.zookeeper.ZooKeeperMain" ,
280
268
"-server" , "%s:%d" % (self .zk_host , self .zk_port ),
@@ -292,6 +280,8 @@ def open(self):
292
280
self .out ("Done!" )
293
281
294
282
# Configure Kafka child process
283
+ properties = os .path .join (self .tmp_dir , "kafka.properties" )
284
+ template = self .test_resource ("kafka.properties" )
295
285
args = self .kafka_run_class_args ("kafka.Kafka" , properties )
296
286
env = self .kafka_run_class_env ()
297
287
@@ -300,13 +290,15 @@ def open(self):
300
290
backoff = 1
301
291
end_at = time .time () + max_timeout
302
292
tries = 1
293
+ auto_port = (self .port is None )
303
294
while time .time () < end_at :
304
- self .out ('Attempting to start (try #%d)' % tries )
305
- try :
306
- os .stat (properties )
307
- except :
308
- log .warning ('Config %s not found -- re-rendering' , properties )
309
- self .render_template (template , properties , vars (self ))
295
+ # We have had problems with port conflicts on travis
296
+ # so we will try a different port on each retry
297
+ # unless the fixture was passed a specific port
298
+ if auto_port :
299
+ self .port = get_open_port ()
300
+ self .out ('Attempting to start on port %d (try #%d)' % (self .port , tries ))
301
+ self .render_template (template , properties , vars (self ))
310
302
self .child = SpawnedService (args , env )
311
303
self .child .start ()
312
304
timeout = min (timeout , max (end_at - time .time (), 0 ))
0 commit comments