L4 Python 建模操作系统

 

状态机

提出了大胆的想法:无论是软件还是硬件,都是状态机 而状态和状态的迁移是可以 “画” 出来的! 理论上说,只需要两个 API:

  • dump_state() - 获取当前程序状态
  • single_step() - 执行一步

由此可以简化操作系统: 把复杂的东西分解成简单的东西,我们真正关心的概念:

  • 应用程序 (高级语言状态机)
  • 系统调用 (操作系统 API)
  • 操作系统内部实现

没有人规定上面三者如何实现,通常的思路:真实的操作系统 + QEMU/NEMU 模拟器

我们的思路:

  • 应用程序 = 纯粹计算的 Python 代码 + 系统调用
  • 操作系统 = Python 系统调用实现,有 “假想” 的 I/O 设备
def main():
    sys_write('Hello, OS World')

玩具的意义

并没有脱离真实的操作系统

  • “简化” 了操作系统的 API
    • 在暂时不要过度关注细节的时候理解操作系统
  • 细节也会有的,但不是现在
    • 学习路线:先 100% 理解玩具,再理解真实系统和玩具的差异

比如用 C 实现如下:

void sys_write(const char *s) { printf("%s", s); }
void sys_sched() { usleep(rand() % 10000); }
int sys_choose(int x) { return rand() % x; }

void sys_spawn(void *(*fn)(void *), void *args) {
    pthread_create(&threads[nthreads++], NULL, fn, args);
}

操作系统模型:简化实现

使用 Python 实现

#!/usr/bin/env python3

import sys
import random
from pathlib import Path

class OperatingSystem():
    """A minimal executable operating system model."""

    SYSCALLS = ['choose', 'write', 'spawn', 'sched']

    class Thread:
        """A "freezed" thread state."""

        def __init__(self, func, *args):
            self._func = func(*args)
            self.retval = None

        def step(self):
            """Proceed with the thread until its next trap."""
            syscall, args, *_ = self._func.send(self.retval)
            self.retval = None
            return syscall, args

    def __init__(self, src):
        variables = {}
        exec(src, variables)
        self._main = variables['main']

    def run(self):
        threads = [OperatingSystem.Thread(self._main)]
        while threads:  # Any thread lives
            try:
                match (t := threads[0]).step():
                    case 'choose', xs:  # Return a random choice
                        t.retval = random.choice(xs)
                    case 'write', xs:  # Write to debug console
                        print(xs, end='')
                    case 'spawn', (fn, args):  # Spawn a new thread
                        threads += [OperatingSystem.Thread(fn, *args)]
                    case 'sched', _:  # Non-deterministic schedule
                        random.shuffle(threads)
            except StopIteration:  # A thread terminates
                threads.remove(t)
                random.shuffle(threads)  # sys_sched()

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print(f'Usage: {sys.argv[0]} file')
        exit(1)

    src = Path(sys.argv[1]).read_text()
    for syscall in OperatingSystem.SYSCALLS:
        src = src.replace(f'sys_{syscall}',        # sys_write(...)
                          f'yield "{syscall}", ')  #  -> yield 'write', (...)

    OperatingSystem(src).run()

测试代码:

count = 0

def Tprint(name):
    global count
    for i in range(3):
        count += 1
        sys_write(f'#{count:02} Hello from {name}{i+1}\n')
        sys_sched()

def main():
    n = sys_choose([3, 4, 5])
    sys_write(f'#Thread = {n}\n')
    for name in 'ABCDE'[:n]:
        sys_spawn(Tprint, name)
    sys_sched()

使用 C 语言实现

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

static pthread_t threads[64];
static int nthreads = 0;

static inline void
sys_write(const char *s) {
  printf("%s", s);
  fflush(stdout);
}

static inline void
sys_sched() {
  usleep(rand() % 10000);
}

static inline void
sys_spawn(void *(*fn)(void *), void *args) {
    pthread_create(&threads[nthreads++], NULL, fn, args);
}

static inline int
sys_choose(int x) {
  return rand() % x;
}

// Constructor called before main()
static inline void __attribute__((constructor))
srand_init() {
  srand(time(0));
}

// Destructor called after main()
static inline void __attribute__((destructor))
thread_join() {
  for (int i = 0; i < nthreads; i++) {
    pthread_join(threads[i], NULL);  // Wait for thread terminations
  }
}

测试代码:

#include "os-real.h"

int count = 0;

void *Tprint(void *s) {
  char buf[64];
  for (int i = 0; i < 3; i++) {
    sprintf(buf, "#%02d Hello from %c%d\n", ++count, *(const char *)s, i);
    sys_write(buf);
    sys_sched();
  }
  return NULL;
}

int main() {
  int n = sys_choose(3) + 3;
  char buf[64];
  sprintf(buf, "#Thread = %d\n", n);
  sys_write(buf);
  for (int i = 0; i < n; i++) {
    sys_spawn(Tprint, &"ABCDE"[i]);
  }
}

在我们的操作系统模型中,应用程序 (状态机) 被分为两部分:确定性 (deterministic) 的本地计算,和可能产生非确定性的系统调用 (以 sys 开头的函数)。操作系统提供以下 API:

系统调用/Linux 对应 行为
sys_spawn(fn)/pthread_create 创建共享内存的线程,并且从 fn 开始执行
sys_fork()/fork 创建当前状态机的完整复制
sys_sched()/定时被动调用 切换到随机的线程/进程执行
sys_choose(xs)/rand 返回一个 xs 中的随机的选择
sys_write(s)/printf 向调试终端输出字符串 s
sys_bread(k)/read 读取虚拟设磁盘块 k 的数据
sys_bwrite(k, v)/write 向虚拟磁盘块 k 写入数据 v
sys_sync()/sync 将所有向虚拟磁盘的数据写入落盘
sys_crash()/长按电源按键 模拟系统崩溃

参考

4. Python 建模操作系统