寒夏摸鱼站

Rust 下的 Shell 管道调用

2023年02月12日(已修改)

什么是 Rust

一门新的系统级编程语言,C++ 的上位替代品,同时也是优良的低血压治疗器。

什么是 Shell

是内核与用户之间进行交互的一个系统程序,常见的 cmd、sh、bash 等都可以称之为 Shell。

什么是管道

管道是 Shell 提供的一种功能,可以将多个需要执行的程序按顺序将各自的标准输入输出连接在一起,达到一种复合的作用。

在 Rust 下执行 Shell 命令

Rust 中执行命令需要使用标准库 std::process 中的 Command 结构体:

use std::process;
use std::str;

fn main() {
  let output = process::Command::new("ls")
    .arg("/")
    .output()    // Get a Result<Output>
    .unwrap()    // Get an Output, panic when error
    .stdout;     // Get a Vec<u8>

  println!("{}", str::from_utf8(&output).unwrap());
}

关联函数 new 传入的是执行的程序名,通过链式调用 arg 方法顺序插入执行参数,然后调用 output 执行并获取输出结果,带上一个 unwrap 表示我们不关心执行出错的情况,如果出错了直接崩溃。

如果不想一个一个一个地插入执行参数,那就把 arg 换成 args,然后传入一个可迭代的类型,一般是一个字符串数组:

use std::process;
use std::str;

fn main() {
  let output = process::Command::new("ls")
    .args(["-l", "-a"])
    .output()            // Get a Result<process::Output>
    .unwrap()            // Get an process::Output, panic when error
    .stdout;             // Get a Vec<u8>

  println!("{}", str::from_utf8(&output).unwrap());
}

需要注意的是,不管 arg 还是 args,都是添加参数的意思,不会对之前的参数进行覆盖。

output 方法会阻塞程序的运行,等待 Shell 命令执行完毕,如果不想等待那就换成 spawn,然后在程序的其他地方等:

use std::process;
use std::str;

fn main() {
  let child = process::Command::new("ls")
    .args(["-l", "-a"])
    .spawn()            // Get a Result<process::Child>
    .unwrap();          // Get a process::Child, panic when error

  let output = child
    .wait_with_output()    // Get a Result<process::Output>
    .unwrap()              // Get a process::Output, panic whene error
    .stdout;               // Get a Vec<u8>

  println!("{}", str::from_utf8(&output).unwrap());
}

output 方法产生的是 process::Output 类型结构体,spawn 产生的是 process::Child 类型结构体,一个表示程序的输出,一个表示程序实体,所以我们想获得程序实体的输出,就需要用到 wait_with_output 方法,等待执行结束并获得 process::Output 结构体。同样,我们对程序的执行失败不感兴趣,用 unwrap 方法指示出错直接崩溃。

加入管道

管道本质上是一类特殊的输入输出,为了能够在 Rust 使用管道,我们需要对执行的命令进行输入输出重定向:

use std::process::{ Command, Stdio };
use std::str;

fn main() {
  let child1 = Command::new("ls")
    .args(["-l", "-a"])
    .stdout(Stdio::piped())    // Redirect stdout to pipe
    .spawn()                   // Get a Result<process::Child>
    .unwrap();                 // Get a process::Child, panic when error

  let child2 = Command::new("grep")
    .args(["-v", "\\."])
    .stdin(Stdio::from(child1.stdout.unwrap()))    // Redirect stdin to child1's stdout(pipe)
    .spawn()                                       // Get a Result<process::Child>
    .unwrap();                                     // Get a process::Child, panic when error

  let output = child2
    .wait_with_output()    // Get a Result<process::Output>
    .unwrap()              // Get a process::Output, panic when error
    .stdout;               // Get a Vec<u8>

  println!("{}", str::from_utf8(&output).unwrap());
}

Commandstdinstdout 方法使得我们可以重定向命令的标准输入输出,他们需要一个 Stdio 类型的结构。

Stdio::piped() 建立了一个管道,我们需要把第一个命令的标准输出重定向到管道里,这样子我们就不需要等待第一个命令执行完成,就能把它的标准输出给到第二个命令的标准输入使用。

最后我们等待第二个命令执行并返回输出即可。

似乎有点不太对劲

如果你执行了“加入管道”篇的代码,你会发现在最后的 println! 中实际上没有输出任何东西!

这是因为通过 Command 结构体调用 output 方法获得输出与通过 Child 结构体调用 wait_with_output 方法两者之间有本质不同。

output 方法默认会捕获命令的标准输出和标准错误,所以你可以通过得到的 Output 结构体获得 stdoutstderr,但是 wait_with_output 方法默认不捕获任何东西,命令的标准输出和标准错误全部继承自父进程,因此在命令执行后,所有的输出会被直接打印出来,而不会放入获得的 Output 结构体中。

解决方法其实很简单,因为我们需要让父进程捕获到命令的输出,所以把最后的命令输出重定向到管道里就行了:

use std::process::{ Command, Stdio };
use std::str;

fn main() {
  let child1 = Command::new("ls")
    .args(["-l", "-a"])
    .stdout(Stdio::piped())    // Redirect stdout to pipe
    .spawn()                   // Get a Result<process::Child>
    .unwrap();                 // Get a process::Child, panic when error

  let child2 = Command::new("grep")
    .args(["-v", "\\."])
    .stdin(Stdio::from(child1.stdout.unwrap()))    // Redirect stdin to child1's stdout(pipe)
    .stdout(Stdio::piped())                        // Redirect stdout to pipe
    .spawn()                                       // Get a Result<process::Child>
    .unwrap();                                     // Get a process::Child, panic when error

  let output = child2
    .wait_with_output()    // Get a Result<process::Output>
    .unwrap()              // Get a process::Output, panic when error
    .stdout;               // Get a Vec<u8>

  println!("{}", str::from_utf8(&output).unwrap());
}

执行一连串的管道调用

我们解决了管道的问题,但是这种调用方式实在是太难看了,如果我们要用管道连接 10 个甚至 9 个命令,那么我们需要实例化 10 个甚至 9 个 Child 结构体,然后重定向标准输入输出把它们连起来……

我们需要一种通用的解决方案,给一个函数传入我们需要管道执行的命令列表,然后让它自动帮我们完成任务,就像这样:

// Something...

fn main() {
  let output = exec(&[
    &("ls",   &["-l"]),
    &("awk",  &["NR!=1 && /^d/"]),
    &("grep", &["Aug"]),
    &("wc",   &["-l"])
  ]);

  println!("{}", str::from_utf8(&output.stdout).unwrap());
}

以上命令等同于在 Shell 中执行(统计工作目录下修改月份为八月的目录数量):

ls -l | awk "NR!=1 && /^d/" | grep Aug | wc -l

首先,我们让这个函数能够接收这么一串参数,可以看出参数传入的是 元组引用数组的引用,元组里头套了字符串字面量和另一个字符串字面量数组的引用,所以我们的函数定义这样写:

fn exec(cmds: &[&(&str, &[&str])]) -> Output {
  // Do something...
}

从上面我们对于管道的调用分析可知:对于第一个命令,我们需要重定向它的标准输出到管道;对于剩下的其他命令,我们需要重定向它的标准输入到上一个命令的标准输出,重定向它的标准输出到管道。

你可能会认为我们需要保存所有调用过程中产生的 Child 结构体,实际上并不需要,因为 Child 结构体在执行了 spawn 方法后,实际上已经 fork 出了一个新进程,再对它进行什么设置已经完全没用了,它和父进程之间唯一的关系只剩下你在执行前提供的管道。

因此我们实际上可以复用 Child 结构体的声明,为了标识一个命令是否是第一次执行,我们还可以用 Option<T> 来包装它,通过判断是否为 None 来得知是否是第一个命令,只需要 if let 语句就能完成:

fn exec(cmds: &[&(&str, &[&str])]) -> Output {
  let mut child: Option<Child> = None;

  for i in cmds {
    if let None = child {                // Is first command
      child = Some(
        Command::new(i.0)
          .args(i.1)
          .stdout(Stdio::piped())
          .spawn()
          .unwrap()
      );
    } else {                             // Not the first command
      child = Some(
        Command::new(i.0)
          .args(i.1)
          .stdin(Stdio::from(child.unwrap().stdout.unwrap()))
          .stdout(Stdio::piped())
          .spawn()
          .unwrap()
      );
    }
  }

  // Do something...
}

最后我们从 child 等待输出并返回即可:

fn exec(cmds: &[&(&str, &[&str])]) -> Output {
  let mut child: Option<Child> = None;

  for i in cmds {
    if let None = child {
      child = Some(
        Command::new(i.0)
          .args(i.1)
          .stdout(Stdio::piped())
          .spawn()
          .unwrap()
      );
    } else {
      child = Some(
        Command::new(i.0)
          .args(i.1)
          .stdin(Stdio::from(child.unwrap().stdout.unwrap()))
          .stdout(Stdio::piped())
          .spawn()
          .unwrap()
      );
    }
  }

  child
    .unwrap()
    .wait_with_output()
    .unwrap()
}

现在,你可以拿着这个函数随意执行各种管道调用了!

或许还可以看情况处理那些被 unwrap 掉的异常,让程序不那么容易崩溃出去。

新的问题

虽然上面的代码已经可以使用了,且不会产生逻辑错误,但是有一个难以察觉的问题会发生。

当你运行一个普通程序的时候,我们的执行函数不会产生任何的副作用,所有的资源(特别是进程资源)都得到了释放。

而当执行函数运行在一个守护进程(服务)上时,如果你查看正在运行的进程列表,你会发现守护进程产生了大量的 <defunct> 进程 —— 僵尸进程。

这是因为 我们并没有回收除了最后一个 Child 以外的其他进程资源。当一个命令被以 spawn 的形式执行时,需要显式地使用 waitwait_with_output 来回收其资源,或等待父进程结束被 init 进程自动回收。

一般情况下我们的程序不怎么会执行长期任务,所以回不回收其实无所谓 (错误的,所有获取到的资源在不需要使用的时候必须及时回收),当我们的程序需要作为守护进程留驻后台时,我们必须考虑资源回收。

因此,我们需要保存每一步生成使用的 Child,然后在最后统一回收。这种方式使用队列完成是最好的,因为管道操作你也可以想象成是队列执行:

fn exec(cmds: &[&(&str, &[&str])]) -> Output {
  let mut children = VecDeque::<Child>::new();

  for i in cmds {
    // The first process
    if children.is_empty() {
      children.push_back(
        Command::new(i.0)
          .args(i.1)
          .stdout(Stdio::piped())
          .spawn()
          .unwrap()
      );
    // Other processes
    } else {
      // Take ownership of last child stdout
      let input = children
        .back_mut()
        .unwrap()
        .stdout
        .take()
        .unwrap();

      children.push_back(
        Command::new(i.0)
          .args(i.1)
          .stdin(input)
          .stdout(Stdio::piped())
          .spawn()
          .unwrap()
      );
    }
  }

  // Wait other processes
  while children.len() > 1 {
    children
      .pop_front()
      .unwrap()
      .wait()
      .unwrap();
  }

  // Get output of target process
  children
    .pop_front()
    .unwrap()
    .wait_with_output()
    .unwrap()
}

这里可能需要解释一个技术性问题,即上面代码中 获得上一个 Childstdout 所有权

由于我们使用了 back_mut 来获得上一个 Child 的可变引用,而可变引用不允许 局部移动 的发生,我们获得 stdout 的过程本质上就是寻求进行一个局部移动获得所有权。

因为在 ChildStdout 转换成 Stdio 的过程中需要保证 ChildStdout 不是引用,所以这又导致了我们必须使得 stdout 是有所有权的。这似乎产生了一个死锁。

虽然我们不能进行局部移动,但是由于 stdout 是一个 Option 枚举,而 Option 枚举提供了 take 方法,来将枚举中的数据移动出来并使用 None 来替代,这样一来我们实际上没有发生任何局部移动(Option 枚举的所有权仍然属于 Child),同时又达到了移动的效果(Option 枚举所拥有的 ChildStdout 被移动了出来)。

这时,我们就可以在各种守护进程中使用 exec 来执行 Shell 命令了,而且不会产生僵尸进程。

另一种方法

我们知道,僵尸进程的产生本质上是由于父进程没有及时回收退出的子进程,而在守护进程中我们不可能直接通过结束守护进程的方式来回收僵尸进程,所以我们需要用 waitwait_with_output 来显式回收。

又因为父进程的退出会使得 init 进程回收所有其产生的僵尸进程,所以我们是否也可以参照这种思路,让 init 进程来替我们回收僵尸进程呢?

答案当然是可以,但是就会变得有亿点麻烦了。

我们的实现思路很简单 —— 把要执行的命令变成守护进程,通过管道连接彼此。当守护进程退出时,由于守护进程都是被 init 进程托管的,所以 init 会自动回收它们。

然而在实际过程中几乎不会使用这种方法,因为过于的麻烦,且如果一个进程出错,我们无法得知其到底发生了什么。

最简单的方法

Rust 中已经有个叫 Subprocess 的 Crate 实现了这种管道调用,嫌麻烦的话直接用就可以了。

文章标题:Rust 下的 Shell 管道调用

文章链接:https://blog.rainiar.top/posts/18/

最近修改:2023年03月13日

分享协议:CC BY-NC-SA 4.0 | 署名-非商业使用-相同方式共享