12
12
13
13
package com .rawlabs .das .server .grpc
14
14
15
- import scala .concurrent .ExecutionContext
16
- import scala .concurrent .duration .{DurationInt , FiniteDuration }
17
- import scala .jdk .CollectionConverters ._
18
- import scala .jdk .OptionConverters ._
19
- import scala .util .{Failure , Success }
20
-
21
15
import org .apache .pekko .NotUsed
22
16
import org .apache .pekko .stream .scaladsl .{Keep , Sink , Source }
23
17
import org .apache .pekko .stream .{KillSwitches , Materializer , UniqueKillSwitch }
24
18
25
- import com .rawlabs .das .sdk .{ DASExecuteResult , DASSdk , DASSdkUnsupportedException , DASTable }
19
+ import com .rawlabs .das .sdk ._
26
20
import com .rawlabs .das .server .cache .{QueryCacheKey , QueryResultCache }
27
21
import com .rawlabs .das .server .manager .DASSdkManager
28
22
import com .rawlabs .protocol .das .v1 .common .DASId
29
23
import com .rawlabs .protocol .das .v1 .services ._
30
24
import com .rawlabs .protocol .das .v1 .tables ._
31
25
import com .typesafe .scalalogging .StrictLogging
32
26
27
+ import _root_ .scala .concurrent .ExecutionContext
28
+ import _root_ .scala .concurrent .duration .{DurationInt , FiniteDuration }
29
+ import _root_ .scala .jdk .CollectionConverters ._
30
+ import _root_ .scala .jdk .OptionConverters ._
31
+ import _root_ .scala .util .{Failure , Success }
33
32
import io .grpc .stub .{ServerCallStreamObserver , StreamObserver }
34
33
import io .grpc .{Status , StatusRuntimeException }
35
34
@@ -136,23 +135,16 @@ class TableServiceGrpcImpl(
136
135
responseObserver : StreamObserver [ExplainTableResponse ]): Unit = {
137
136
logger.debug(s " Explaining query for Table ID: ${request.getTableId.getName}" )
138
137
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
139
- try {
140
- val explanation =
141
- table.explain(
142
- request.getQuery.getQualsList,
143
- request.getQuery.getColumnsList,
144
- request.getQuery.getSortKeysList,
145
- if (request.getQuery.hasLimit) java.lang.Long .valueOf(request.getQuery.getLimit) else null )
146
- val response = ExplainTableResponse .newBuilder().addAllStmts(explanation).build()
147
- responseObserver.onNext(response)
148
- responseObserver.onCompleted()
149
- logger.debug(" Query explanation sent successfully." )
150
- } catch {
151
- case t : Throwable =>
152
- logger.error(" Error explaining query" , t)
153
- responseObserver.onError(
154
- Status .INVALID_ARGUMENT .withDescription(" Error explaining query" ).withCause(t).asRuntimeException())
155
- }
138
+ val explanation =
139
+ table.explain(
140
+ request.getQuery.getQualsList,
141
+ request.getQuery.getColumnsList,
142
+ request.getQuery.getSortKeysList,
143
+ if (request.getQuery.hasLimit) java.lang.Long .valueOf(request.getQuery.getLimit) else null )
144
+ val response = ExplainTableResponse .newBuilder().addAllStmts(explanation).build()
145
+ responseObserver.onNext(response)
146
+ responseObserver.onCompleted()
147
+ logger.debug(" Query explanation sent successfully." )
156
148
}
157
149
}
158
150
@@ -224,48 +216,40 @@ class TableServiceGrpcImpl(
224
216
}
225
217
}
226
218
227
- try {
228
- val key = QueryCacheKey (request)
229
- // Check if we have a cached result for this query
230
- val source : Source [Rows , NotUsed ] = resultCache.get(key) match {
231
- case Some (iterator) =>
232
- // We do. Use the iterator to build the Source.
233
- logger.debug(s " Using cached result for $request. " )
234
- Source .fromIterator(() => iterator)
235
- case None =>
236
- // We don't. Run the query and build a Source that populates a new cache entry.
237
- // We tap the source to cache the results as they are streamed to the client.
238
- // A callback is added to the source to mark the cache entry as done when the stream completes.
239
- logger.debug(s " Cache miss for $request. " )
240
- val source = runQuery()
241
- val cachedResult = resultCache.newBuffer(key)
242
- val tappingSource : Source [Rows , NotUsed ] = source.map { chunk =>
243
- cachedResult.addChunk(chunk) // This is NOP if the internal buffer is full.
244
- chunk
245
- }
246
- val withCallBack = tappingSource.watchTermination() { (_, doneF) =>
247
- doneF.onComplete {
248
- case Success (_) =>
249
- // Registers the entry, making it available for future queries. Unless the buffer was full. Then it's a NOP.
250
- cachedResult.register()
251
- case Failure (ex) =>
252
- // If the stream fails, we don't cache the result.
253
- logger.warn(s " Failed streaming for $request" , ex)
254
- }(ec)
255
- }
256
- withCallBack.mapMaterializedValue(_ => NotUsed )
257
- }
258
- // Run the final streaming result: pipe the source through a kill switch and to the gRPC response observer.
259
- val ks = runStreamedResult(source, request, responseObserver, maybeServerCallObs)
260
- // Store the kill switch so that we can cancel the stream if needed.
261
- killSwitchRef.set(Some (ks))
262
- } catch {
263
- case t : Throwable =>
264
- logger.error(" Error executing query" , t)
265
- responseObserver.onError(
266
- Status .INVALID_ARGUMENT .withDescription(" Error executing query" ).withCause(t).asRuntimeException())
219
+ val key = QueryCacheKey (request)
220
+ // Check if we have a cached result for this query
221
+ val source : Source [Rows , NotUsed ] = resultCache.get(key) match {
222
+ case Some (iterator) =>
223
+ // We do. Use the iterator to build the Source.
224
+ logger.debug(s " Using cached result for $request. " )
225
+ Source .fromIterator(() => iterator)
226
+ case None =>
227
+ // We don't. Run the query and build a Source that populates a new cache entry.
228
+ // We tap the source to cache the results as they are streamed to the client.
229
+ // A callback is added to the source to mark the cache entry as done when the stream completes.
230
+ logger.debug(s " Cache miss for $request. " )
231
+ val source = runQuery()
232
+ val cachedResult = resultCache.newBuffer(key)
233
+ val tappingSource : Source [Rows , NotUsed ] = source.map { chunk =>
234
+ cachedResult.addChunk(chunk) // This is NOP if the internal buffer is full.
235
+ chunk
236
+ }
237
+ val withCallBack = tappingSource.watchTermination() { (_, doneF) =>
238
+ doneF.onComplete {
239
+ case Success (_) =>
240
+ // Registers the entry, making it available for future queries. Unless the buffer was full. Then it's a NOP.
241
+ cachedResult.register()
242
+ case Failure (ex) =>
243
+ // If the stream fails, we don't cache the result.
244
+ logger.warn(s " Failed streaming for $request" , ex)
245
+ }(ec)
246
+ }
247
+ withCallBack.mapMaterializedValue(_ => NotUsed )
267
248
}
268
-
249
+ // Run the final streaming result: pipe the source through a kill switch and to the gRPC response observer.
250
+ val ks = runStreamedResult(source, request, responseObserver, maybeServerCallObs)
251
+ // Store the kill switch so that we can cancel the stream if needed.
252
+ killSwitchRef.set(Some (ks))
269
253
}
270
254
}
271
255
@@ -315,11 +299,9 @@ class TableServiceGrpcImpl(
315
299
logger.error(s " Error during streaming for planID= ${request.getPlanId}. " , ex)
316
300
maybeServerCallObs match {
317
301
case Some (sco) if ! sco.isCancelled =>
318
- sco.onError(
319
- new StatusRuntimeException (Status .INTERNAL .withDescription(s " Error during streaming: ${ex.getMessage}" )))
302
+ sco.onError(new StatusRuntimeException (Status .INTERNAL .withCause(ex)))
320
303
case _ =>
321
- responseObserver.onError(
322
- new StatusRuntimeException (Status .INTERNAL .withDescription(s " Error during streaming: ${ex.getMessage}" )))
304
+ responseObserver.onError(new StatusRuntimeException (Status .INTERNAL .withCause(ex)))
323
305
// If cancelled, no need to call onError (client is gone).
324
306
}
325
307
}(ec)
@@ -339,20 +321,10 @@ class TableServiceGrpcImpl(
339
321
responseObserver : StreamObserver [GetTableUniqueColumnResponse ]): Unit = {
340
322
logger.debug(s " Fetching unique columns for Table ID: ${request.getTableId.getName}" )
341
323
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
342
- try {
343
- val response = GetTableUniqueColumnResponse .newBuilder().setColumn(table.uniqueColumn).build()
344
- responseObserver.onNext(response)
345
- responseObserver.onCompleted()
346
- logger.debug(" Unique column information sent successfully." )
347
- } catch {
348
- case t : DASSdkUnsupportedException =>
349
- responseObserver.onError(
350
- Status .INVALID_ARGUMENT .withDescription(" Unsupported operation" ).withCause(t).asRuntimeException())
351
- case t : Throwable =>
352
- logger.error(" Error fetching unique column" , t)
353
- responseObserver.onError(
354
- Status .INTERNAL .withDescription(" Error fetching unique column" ).withCause(t).asRuntimeException())
355
- }
324
+ val response = GetTableUniqueColumnResponse .newBuilder().setColumn(table.uniqueColumn).build()
325
+ responseObserver.onNext(response)
326
+ responseObserver.onCompleted()
327
+ logger.debug(" Unique column information sent successfully." )
356
328
}
357
329
}
358
330
@@ -384,20 +356,10 @@ class TableServiceGrpcImpl(
384
356
override def insertTable (request : InsertTableRequest , responseObserver : StreamObserver [InsertTableResponse ]): Unit = {
385
357
logger.debug(s " Inserting row into Table ID: ${request.getTableId.getName}" )
386
358
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
387
- try {
388
- val row = table.insert(request.getRow)
389
- responseObserver.onNext(InsertTableResponse .newBuilder().setRow(row).build())
390
- responseObserver.onCompleted()
391
- logger.debug(" Row inserted successfully." )
392
- } catch {
393
- case t : DASSdkUnsupportedException =>
394
- responseObserver.onError(
395
- Status .UNIMPLEMENTED .withDescription(" Unsupported operation" ).withCause(t).asRuntimeException())
396
- case t : Throwable =>
397
- logger.error(" Error inserting row" , t)
398
- responseObserver.onError(
399
- Status .INVALID_ARGUMENT .withDescription(" Error inserting row" ).withCause(t).asRuntimeException())
400
- }
359
+ val row = table.insert(request.getRow)
360
+ responseObserver.onNext(InsertTableResponse .newBuilder().setRow(row).build())
361
+ responseObserver.onCompleted()
362
+ logger.debug(" Row inserted successfully." )
401
363
}
402
364
}
403
365
@@ -412,20 +374,10 @@ class TableServiceGrpcImpl(
412
374
responseObserver : StreamObserver [BulkInsertTableResponse ]): Unit = {
413
375
logger.debug(s " Performing bulk insert into Table ID: ${request.getTableId.getName}" )
414
376
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
415
- try {
416
- val rows = table.bulkInsert(request.getRowsList)
417
- responseObserver.onNext(BulkInsertTableResponse .newBuilder().addAllRows(rows).build())
418
- responseObserver.onCompleted()
419
- logger.debug(" Bulk insert completed successfully." )
420
- } catch {
421
- case t : DASSdkUnsupportedException =>
422
- responseObserver.onError(
423
- Status .UNIMPLEMENTED .withDescription(" Unsupported operation" ).withCause(t).asRuntimeException())
424
- case t : Throwable =>
425
- logger.error(" Error inserting rows" , t)
426
- responseObserver.onError(
427
- Status .INVALID_ARGUMENT .withDescription(" Error inserting rows" ).withCause(t).asRuntimeException())
428
- }
377
+ val rows = table.bulkInsert(request.getRowsList)
378
+ responseObserver.onNext(BulkInsertTableResponse .newBuilder().addAllRows(rows).build())
379
+ responseObserver.onCompleted()
380
+ logger.debug(" Bulk insert completed successfully." )
429
381
}
430
382
}
431
383
@@ -438,20 +390,10 @@ class TableServiceGrpcImpl(
438
390
override def updateTable (request : UpdateTableRequest , responseObserver : StreamObserver [UpdateTableResponse ]): Unit = {
439
391
logger.debug(s " Updating rows in Table ID: ${request.getTableId.getName}" )
440
392
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
441
- try {
442
- val newRow = table.update(request.getRowId, request.getNewRow)
443
- responseObserver.onNext(UpdateTableResponse .newBuilder().setRow(newRow).build())
444
- responseObserver.onCompleted()
445
- logger.debug(" Rows updated successfully." )
446
- } catch {
447
- case t : DASSdkUnsupportedException =>
448
- responseObserver.onError(
449
- Status .UNIMPLEMENTED .withDescription(" Unsupported operation" ).withCause(t).asRuntimeException())
450
- case t : Throwable =>
451
- logger.error(" Error updating row" , t)
452
- responseObserver.onError(
453
- Status .INVALID_ARGUMENT .withDescription(" Error updating row" ).withCause(t).asRuntimeException())
454
- }
393
+ val newRow = table.update(request.getRowId, request.getNewRow)
394
+ responseObserver.onNext(UpdateTableResponse .newBuilder().setRow(newRow).build())
395
+ responseObserver.onCompleted()
396
+ logger.debug(" Rows updated successfully." )
455
397
}
456
398
}
457
399
@@ -464,20 +406,10 @@ class TableServiceGrpcImpl(
464
406
override def deleteTable (request : DeleteTableRequest , responseObserver : StreamObserver [DeleteTableResponse ]): Unit = {
465
407
logger.debug(s " Deleting rows from Table ID: ${request.getTableId.getName}" )
466
408
withTable(request.getDasId, request.getTableId, responseObserver) { table =>
467
- try {
468
- table.delete(request.getRowId)
469
- responseObserver.onNext(DeleteTableResponse .getDefaultInstance)
470
- responseObserver.onCompleted()
471
- logger.debug(" Rows deleted successfully." )
472
- } catch {
473
- case t : DASSdkUnsupportedException =>
474
- responseObserver.onError(
475
- Status .UNIMPLEMENTED .withDescription(" Unsupported operation" ).withCause(t).asRuntimeException())
476
- case t : Throwable =>
477
- logger.error(" Error deleting row" , t)
478
- responseObserver.onError(
479
- Status .INVALID_ARGUMENT .withDescription(" Error deleting row" ).withCause(t).asRuntimeException())
480
- }
409
+ table.delete(request.getRowId)
410
+ responseObserver.onNext(DeleteTableResponse .getDefaultInstance)
411
+ responseObserver.onCompleted()
412
+ logger.debug(" Rows deleted successfully." )
481
413
}
482
414
}
483
415
@@ -486,7 +418,21 @@ class TableServiceGrpcImpl(
486
418
case None =>
487
419
// We use 'NOT_FOUND' so that the client doesn't confuse that error with a user-visible error.
488
420
responseObserver.onError(Status .NOT_FOUND .withDescription(" DAS not found" ).asRuntimeException())
489
- case Some (das) => f(das)
421
+ case Some (das) =>
422
+ try {
423
+ f(das)
424
+ } catch {
425
+ case ex : DASSdkInvalidArgumentException =>
426
+ logger.error(" DASSdk invalid argument error" , ex)
427
+ responseObserver.onError(Status .INVALID_ARGUMENT .withDescription(ex.getMessage).asRuntimeException())
428
+ case ex : DASSdkUnsupportedException =>
429
+ logger.error(" DASSdk unsupported feature" , ex)
430
+ responseObserver.onError(
431
+ Status .UNIMPLEMENTED .withDescription(" Unsupported operation" ).withCause(ex).asRuntimeException())
432
+ case t : Throwable =>
433
+ logger.error(" DASSdk unexpected error" , t)
434
+ responseObserver.onError(Status .INTERNAL .withCause(t).asRuntimeException())
435
+ }
490
436
}
491
437
}
492
438
0 commit comments