Recording: https://www.youtube.com/watch?v=_ayPdIVs_SI
About Kubernetes Scalability Thresholds.
PS: There is a lot of debate on whether to choose one large cluster or more small clusters.
In fact, both methods have their limitations and advantages.
For ByteDance, the maintenance complexity and user isolation of large clusters can be overcome. For details, please refer to the open source projects of KubeWharf
We are more concerned about reducing the maintenance burden and achieving higher resource utilization through large clusters.
One more thing, large-scale training workloads will inevitably require large clusters…
Currently, the largest single cluster manages more than 70,000 GPUs and achieves 95%+ GPU utilization.
Kubernetes Scheduler is monolithic in architecture, which constrains its performance ceiling.
In terms of detailed implementation, Scheduler consists of at least four core modules:
It should be noted that hotspots may not apparent in small clusters, but in large-scale clusters, these hotspots are magnified by hundreds or even thousands of times, necessitating optimization.
We will explain how to optimize performance throughput based on the experience gained in building the Gödel scheduling system
Gödel is a unified scheduling system for both online and offline workloads
It has been deployed on a large scale in ByteDance’s global data centers, supporting all kinds of workloads including MicroService/BigData/ML, etc.
In terms of high-level architecture, Gödel is a distributed scheduling system built on the idea of optimistic concurrency, which is different from the upstream Kube Scheduler. This means that it can break the performance ceiling of the single scheduler.
Specifically, concurrency is reflected in two aspects:
Multi-shards Concurrency.
TODO: Link to Gödel Architecture
Concurrent scheduling of BE&BE + SubCluster granularity within a single-shard instance, completely avoids interference between workloads of different resource types or different resource pools, and improves the scheduling efficiency of large-scale heterogeneous clusters.
Multi-shard parallelism is the most intuitive optimization method.
Next, let’s talk about how we can use various data structures and algorithms to optimize the performance of a single shard to the utmost.
Cache/Snapshot has been mentioned before. At the beginning of each scheduling process, the Cache infos need to be fully synchronized to the Snapshot. The larger the data scale, the more significant the data synchronization overhead.
We hope to reduce unnecessary data copies as much as possible. In other words, we hope to transform full copies into incremental updates.
Those familiar with Kubernetes source code may know that the Kube Scheduler uses an LRU-like mechanism to maintain NodeInfo, tracking objects based on time sensitive generation
.
For example, in the figure, assume that at a certain time point (Timestamp=X), Snapshot already contains all the Node objects in the Cache. Then, node1
and node3
are updated, node5
is deleted, and node6
is added.
Each time we operate on an object, we update its timestamp (generation).
Then, by traversing in descending order, find the object that needs to be added or updated and clone it. Finally, the object that need to be deleted will be found by traversing the snapshot.
Thinking: How to reduce unnecessary object synchronization?
=> Only incrementally update the parts that have changed
Then, how to quickly identify the objects that have changed?
=> Maintain a time-sensitive
generation
, increment it when an update occurs, and maintain it in a specific order ofgeneration
.
When the size of Snapshot is very large, we will use divide-and-conquer algorithm to reduce the time complexity of traversal and deletion.
func (s *ListStoreImpl) UpdateRawStore(store RawStore, cloneFunc CloneFunc, cleanFunc CleanFunc) { storedGeneration := store.GetGeneration() for e := s.Front(); e != nil; e = e.Next() { if e.GetGeneration() <= storedGeneration { break } cloneFunc(e.key, e.StoredObj) } store.SetGeneration(s.generation) cleanFunc() } func DefaultCleanFunc(cache ListStore, snapshot RawStore) CleanFunc { return func() { if cache.Len() != snapshot.Len() { diff := snapshot.Len() - cache.Len() snapshot.ConditionRange(func(key string, _ StoredObj) bool { if diff <= 0 { // Quick break the range loop. return false } if cache.Get(key) == nil { snapshot.Delete(key) diff-- } return true }) } } }
In fact, the objects we need to maintain are not just NodeInfo, but also various custom data structures. Therefore, we abstract and enhance the entire incremental update mechanism.
Specifically, the basic storage GenerationStore that provides a read/write (Get/Set) interface is abstracted and has two different implementations:
ListStore: Doubly-linked List + HashMap[key]ListNodeItem
Used in Cache, maintains time sequence through doubly-linked list, and quickly indexes to specific linked list elements through HashMap to implement $O(1)$ addition
/ deletion
operations while maintaining time sequence.
The data object we care about will be stored as a field of the linked list element, and the time sequence of the data object is updated when calling Set.
RawStore: HashMap[key]Object
Used in Snapshot, a pure data object storage.
We refactored all storage and reimplemented it as GenerationStore. We can see that the E2E Latency has been reduced from minutes to milliseconds and has remained stable for a long time.
PS: Note that the term E2E here refers to the time taken for a Pod within the entire scheduling cycle (from being dispatched by the Dispatcher to the completion of the final Binder binding).
Next is another step of data synchronization.
For scheduling efficiency considerations, a Pod will NOT traverse all feasible nodes in the cluster when scheduling, but will stop immediately after traversing a specific number or a specific ratio, so the scheduling of each Pod has a certain spatial locality.
To achieve natural discreteness during scheduling, the scheduler Cache maintains a NodeTree by topological zone. During data synchronization (UpdateSnapshot), the NodeTree is compressed into a one-dimensional list and stored in the snapshot, and accessed in a modular round-robin manner during each scheduling cycle.
However, this mechanism has obvious problems:
The generated one-dimensional list is not discrete, and only the front part can be evenly placed by zone, while the back part is often concentrated in a certain zone
The entire one-dimensional list will be frequently rebuilt (add, delete and other scenarios), which will bring huge computational overhead
Looking back at our needs, how can we achieve true discreteness?
=> It is equivalent to any node having a random index in the NodeList
.
So how to avoid frequent reconstructions and instead reuse existing information to support randomness?
Let’s transform the existing linear list data structure. We aim for insertion
and deletion
of individual elements to be completed in $O(1)$ time complexity. Specifically:
Insertion: Append directly to the end of the linear list.
Deletion: Swap the element to be deleted with the last element in the list, then remove the last element (this requires a HashMap for fast indexing to support element swapping).
Update: Perform a deletion followed by an insertion.
Interestingly, due to the randomness of Add/Delete/Update of all nodes in the entire cluster, the index of each node is also random.
PS: In any length interval, the ratio of nodes from different zones appearing is consistent with their total ratio.
The final effect is that while achieving better discrete effects, the efficiency of NodeList maintenance is greatly improved.
As shown in the figure on the left, in a large-scale cluster with 20K+ Nodes, 1M+ Running Pods and a load of 1K Pods/s, the main distribution of E2E scheduling latency has shifted from minutes to milliseconds.
Of course, a small part of the longer E2E distribution can still be seen in the upper right corner. This is mainly because the scheduling requirements of individual Pods cannot be met (for example, the cluster resources cannot meet the requirements). For Pods whose scheduling requirements can be met, the entire scheduling process can be completed in a shorter time.
Let’s talk about the scheduling phase, the issues we encountered, and the optimizations we’ve made. The first issue is high water levels.
Under the current Filter
& Score
mechanism, we need to find a specific number or proportion of feasible nodes in the Filter
phase to proceed to the Score
process (e.g., nodes node0,…,node3).
In scenarios with high cluster resource levels, meeting the numOfFeasibleNodes
requirement often necessitates scanning a large number of nodes (e.g., the second row in the figure, node4,…,node12,…).
The upstream community has provided some configuration parameters (scoring thresholds) to fine-tune the scheduler. However, static configurations are not suitable for large-scale production clusters with frequent resource fluctuations.
We aim to develop a more intelligent self-tuning mechanism to address such issues and achieve the best balance between scheduling throughput and scheduling quality.
Specifically, the number of feasible nodes to find
is no longer a fixed value but can adapt to the current cluster state and make adjustments accordingly.
A simple case: as the total number of filtered nodes increases, we can reduce the threshold for the number of feasible nodes to allow the filter process to complete earlier (e.g., as shown in the lower right corner of the figure).
This is because when many nodes fail to pass the filter, it means the number of feasible solutions is likely very small.
In such cases, sacrificing a large amount of throughput to find the optimal solution within a very small set of feasible solutions is not worthwhile.
Especially since cluster conditions change rapidly, what is considered “optimal” is often a false notion, making early termination a reasonable choice.
In addition, we can also take more complex factors into account, such as the number of pending pods, the current incoming rate and throughput, etc.
Even in some scenarios that focus on scheduling quality (such as ML workloads), we can also change the decay mechanism to a growth mechanism.
Leaving aside the implementation details, the key point is that all these adjustments will be done adaptively within the scheduler rather than by external intervention.
Except for some Pods that are unschedulable at P99, the Algorithm Latency of most Pods is reduced by more than 50%.
The second issue is the unschedulable scenario.
For unschedulable Pods, they may go through multiple rounds of scheduling during their entire life cycle, as shown in the figure
The first round of scheduling failed, and all nodes were unavailable
In the second round, there are quite a few nodes that have not changed, and there is no need to perform repeated calculations on these nodes
PS: For simplicity, only the case without cross-nodes constraints is considered.
How to achieve this? We added SchedulingContext to Pod&Unit. The simplest example is to maintain the maximum node generation that was previously unschedulable in the Pod. If the generaion of a node encountered in the second round is not more than the value recorded in the previous round, it can be skipped directly.
More fine-grained judgment and more intelligent queueing-hint mechanism in the future.
We can look at the data and results. The indicator in the lower right corner is the percentage of nodes that have not changed between two rounds for unschedulable pods. We can see that many of them are above 70%.
On the left side, there’s a trace of a non-schedulable Pod. The SchedulingContext mechanism reduced its filter processing time from 27ms to 7ms.
At the same time, it effectively reduces the interference and blocking of unschedulable tasks on other tasks.
Next, the preemption process.
Preemption is essentially a search tree, where computations are performed concurrently on multiple nodes to identify all potential victims at each node, as well as the actual victim to be selected. It also involves determining all node candidates.
The entire computation process is quite heavyweight. Assuming the process remains unchanged, how can we reduce the scale of the data involved in the computation?
=> Considering that priority is the basic principle of preemption, we can classify and prioritize Pods on the node in advance (note that this should be dynamically maintained alongside Events).
For a specific incoming Pod, the set of Pods it can preempt is also fixed, which narrows down the data scale.
PS: The GT/BE here refers to Gödel’s QoS levels, which can be ignored.
[Top-left] The essence of preemption is to free up resources. When priorities are clearly defined, the total amount of resources a Pod can free up through preemption is known.
In fact, some Pods may not be preempted due to PDB restrictions.
[Top-right] However, if the resources released through preemption, combined with the node’s available resources, are still insufficient for the current Pod’s needs, preemption will inevitably fail. The problem then becomes whether we can quickly determine the maximum amount of resources that can be freed up through preemption for a specific priority value.
=> Essentially, this is about calculating the resource prefix sum based on priority order.
[Bottom-left] The challenge is that Pods are added and removed very frequently. How can we maintain the order while also managing the resource prefix sum? We introduced a Spary-Tree (a type of BST) to maintain interval properties through its subtree structure. By rotating the tree, we can convert prefix sums into interval sums.
[Bottom-right] The Spary-Tree allows us to perform insertions
, deletions
, and prefix sum queries
in $log(n)$ time complexity.
The final result is that we achieve highly efficient heuristic pruning. Not all nodes and pods will enter the calculation process.
Under heuristic pruning, we can ensure that no feasible solutions are lost.
The preemption throughput of large-scale co-location clusters in tidal scenarios has been increased by more than 10 times, and cases that cannot be scheduled by preemption can be quickly filtered out within 2ms (in the past, such cases were the most time-consuming).
In addition, Gödel has some innovative designs that help achieve performance optimization while providing better scalability.
The first is Unit Semantic and Unit Framework.
The specific details here will not be expanded. We only need to know that compared with Pod Framework
, Unit Framwork
focuses on the scheduling stage division
and scheduling data preprocessing
at the Unit level.
For example, for DaemonSet Pod, the Nodes is preprocessed in the Locating stage, and its scope is narrowed to one node before entering the subsequent scheduling and preemption process. In large-scale clusters, the scheduling latency of DS Pods has dropped from 30ms to 0.3ms.
In addition, under the Unit Framework, it is easier to reuse calculation results through Unit aggregation, further improving scheduling efficiency.
The ApplicationUnit Distribution shows that we have effectively aggregated thousands of Pods and scheduled them as a unit.
In a ultra-large-scale cluster with nearly 25K+ nodes, we have suppressed the fluctuation of SLO within 1 second.
It can be seen that the overall SLO curve has been very stable since the upgrade on 2024-04-09.
The last one is the CommonStore Data Flow mechanism.
Our idea is different from upstream Kubernetes Scheduler. We will try our best to avoid centralized temporary calculations in the serial scheduling process. Instead, we prefer to maintain some information in real time based on events to reduce the computing burden of the plug-in runtime.
Specifically, we strictly divide the data source of Cache into two categories:
Respond to “exogenous” events to maintain the status of the cluster (such as Add/Update/Delete of ResourceObject)
Respond to “endogenous” scheduling decisions and maintain some temporary data (such as AssumePod/ForgetPod)
On this basis, we will perform various data preprocessing in the event handling process of Cache, and expose the data access interface to the plug-in from Snapshot in the form of StoreHandle to accelerate the plug-in calculation.
The specific layered design and data link are complex, so we will not expand them here. The important thing is that through this mechanism, we can effectively avoid the generation of new computing hotspots while continuously developing new features.
A typical example of open source code is PDB Store, which avoids the computational loss of List PDB and temporary Match by maintaining relevant matching relationships in advance.
Optimization of inter-component communication mechanisms
Currently, the communication of the entire system is based on a centralized API Server. After the multi-shard throughput reaches 5K+. Pods/s, it is difficult to continue to increase. We will solve this problem by splitting the communication link
More general batch scheduling
More rational separation of plugin and storage implementation
More reasonable mapping logic between plugins and storage, and continuous optimization of plugin calculation process
Intelligent queueing
Reduce invalid scheduling attempts and ensure better workload fairness
…