-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
entrance suport priority queue #679
Conversation
* 优先级队列,优先级相同时先进先出 | ||
* @param group | ||
*/ | ||
class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need change done
consumerQueue = new LoopArrayQueue( | ||
getSchedulerContext.getOrCreateGroupFactory.getOrCreateGroup(null) | ||
) | ||
val fifoQueueStrategy: String = FIFO_QUEUE_STRATEGY.toLowerCase() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应该基于任务参数来判断new哪个队列
val PFIFO = "pfifo" | ||
val fifoStrategy: String = FIFO_QUEUE_STRATEGY | ||
if (PFIFO.equalsIgnoreCase(fifoStrategy) && properties != null && !properties.isEmpty) { | ||
val priorityValue: AnyRef = properties.get(ENGINE_PRIORITY_RUNTIME_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
key为啥和wtss没关系
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wtss just use "priority" as key
Utils.tryAndWarn{ | ||
// 如果是使用优先级队列,设置下优先级 | ||
val properties: util.Map[String, AnyRef] = TaskUtils.getRuntimeMap(params) | ||
val PFIFO = "pfifo" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
定义为常量
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
event = fixedSizeCollection.get(index).asInstanceOf[SchedulerEvent] | ||
if (event == null) { | ||
val eventSeq = toIndexedSeq.filter(x => x.getTimestamp.equals(index)).seq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应该先取缓存,再取缓存队列
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
*/ | ||
private def getAndRemoveTop: PriorityQueueElement = { | ||
// 根节点为最大值 | ||
val top: PriorityQueueElement = priorityEventQueue.get(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
要判断list是否为空
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
override def clearAll(): Unit = priorityEventQueue synchronized { | ||
realSize = 0 | ||
priorityEventQueue.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
其他两个集合也需要clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
entrance suport priority queue