Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

entrance suport priority queue #679

Merged
merged 11 commits into from
Dec 19, 2024

Conversation

aiceflower
Copy link

entrance suport priority queue

* 优先级队列,优先级相同时先进先出
* @param group
*/
class PriorityLoopArrayQueue(var group: Group) extends ConsumeQueue with Logging {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need change done

consumerQueue = new LoopArrayQueue(
getSchedulerContext.getOrCreateGroupFactory.getOrCreateGroup(null)
)
val fifoQueueStrategy: String = FIFO_QUEUE_STRATEGY.toLowerCase()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该基于任务参数来判断new哪个队列

val PFIFO = "pfifo"
val fifoStrategy: String = FIFO_QUEUE_STRATEGY
if (PFIFO.equalsIgnoreCase(fifoStrategy) && properties != null && !properties.isEmpty) {
val priorityValue: AnyRef = properties.get(ENGINE_PRIORITY_RUNTIME_KEY)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key为啥和wtss没关系

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wtss just use "priority" as key

Utils.tryAndWarn{
// 如果是使用优先级队列,设置下优先级
val properties: util.Map[String, AnyRef] = TaskUtils.getRuntimeMap(params)
val PFIFO = "pfifo"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

定义为常量

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
event = fixedSizeCollection.get(index).asInstanceOf[SchedulerEvent]
if (event == null) {
val eventSeq = toIndexedSeq.filter(x => x.getTimestamp.equals(index)).seq

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该先取缓存,再取缓存队列

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*/
private def getAndRemoveTop: PriorityQueueElement = {
// 根节点为最大值
val top: PriorityQueueElement = priorityEventQueue.get(0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要判断list是否为空

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


override def clearAll(): Unit = priorityEventQueue synchronized {
realSize = 0
priorityEventQueue.clear()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

其他两个集合也需要clear

Copy link
Author

@aiceflower aiceflower Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@casionone casionone merged commit 0d5d9f5 into dev-1.10.0-webank-merge Dec 19, 2024
8 of 16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants